7 minute read

風音屋アドバイザーの渡部徹太郎(@fetarodc) です。 このブログでは、Airflowの基本であるスケジューラ、DAG、DAG_RUNの仕組みを解説した後、タスクの優先順位を決めるpriority_weightの仕組みと注意点を解説します。 また、priority_weightを考慮しなかったことで実際に起こった問題について紹介します。

airflow_logo

はじめに

とある案件でAirflowのタスクが意図通りに実行可能状態にならないという問題が起きました。 DAGの中ではタスクAの後にタスクBがあるにもかかわらず、タスクAが終わっても長い間タスクBが実行可能状態になりませんでした。 調べていくと、priority_weightという値とそれに対するスケジューラの挙動を考慮できていなかったことがわかりました。

Airflowには、タスクの起動の優先順位で決めるためのpriority_weightという値があります。 Airflowのタスクが意図したどおりに実行可能状態にならない場合は、このpriority_weightの存在を見落としている可能性があります。 priority_weightの公式の説明はこちらです。

なお、今回の説明にあたってAirflowのバージョンは2.1.3を前提としていますが、最新の2.6.1でもpriority_weightの仕組みは同じです。

スケジューラ、DAG、DAG_RUNの基本

priority_weightの説明に入る前に、Airflowの基礎である、スケジューラ、DAG、DAG_RUNについて説明してきましょう。 図1ではDAGファイルが、AirflowのスケジューラとAirflowワーカによってどのように処理されるかが図示されています。

図1 図1

まず、DAGファイルですが、DAGファイルそのものはPythonのソースコードです。DAGの中には複数のタスクが定義され、タスクには依存関係があります。 この例では my_dag1 という名前のDAGに4つのタスク task1-a , task1-b, task1-c , task1-d が定義されています。

このDAGファイルは定期的にAirflowのスケジューラによって読み込まれます。読み込まれたDAGは、そのスケジューリングの定義に基づき、実行タイミングごとにDAG_RUNというオブジェクトをに変換されます。 図1では2023年5月14日実行分のDAG_RUNと、2023年5月15日実行分のDAG_RUNとの2つが生成されています。 生成されたDAG_RUNは、Airflowのデータベースの中のDAG_RUNテーブルに格納されます。

DAG_RUNテーブルに格納された各タスクは、Airflowスケジューラーによって読み取られ、実行キューに投入されます。 実行キューに投入されるたタスクは「実行可能状態」になります。

Airflowワーカーに空きができると、実行キューに投入されたタスクから一つタスクを選び実行します。

priority_weightの説明

スケジューラー、DAG、DAG_RUNの基本がわかったところで、priority_weightの説明に移ります。

priority_weightの計算方法

priority_weightは、DAG_RUNの中の各タスクに割り当てられる値です。

priority_weightの計算方法はアルゴリズムによって異なります。 デフォルトではdownstreamというアルゴリズムになります。

downstreamアルゴリズムでは、priority_weightの割当は、タスクの依存関係のなかで、「子孫の数+1」がpriority_weightになります。 例えば、図2の例では task1-a のタスクのpriority_weightは4、 task1-btask1-c のタスクは2、 task1-d のタスク1となります。

図2 図2

downstream アルゴリズムの他にも、子供から親に向かって値を計算していく upstream アルゴリズムや、すべてのタスクに同じ値を割り当てる absolute アルゴリズムがあります。公式ドキュメントに説明があります。

priority_weightによって何が変わるか?

priority_weightの値が大きいタスクほど、スケジューラーが優先的に実行キューにタスクを投入するようになります。 結果として、優先的にタスクがワーカーに渡され、優先的にタスクが起動されるようになります。

この動きは、AirflowスケジューラがデータベースにあるDAG_RUNテーブルから、実行可能なTASKのうちpriority_weightが高い順に検索することで実現されています。 Airflowスケジューラのソースコードでいうと この部分です。

        # Get all task instances associated with scheduled
        # DagRuns which are not backfilled, in the given states,
        # and the dag is not paused
        query = (
            session.query(TI)
            .outerjoin(TI.dag_run)
            .filter(or_(DR.run_id.is_(None), DR.run_type != DagRunType.BACKFILL_JOB))
            .join(TI.dag_model)
            .filter(not_(DM.is_paused))
            .filter(TI.state == State.SCHEDULED)
            .options(selectinload('dag_model'))
            .order_by(-TI.priority_weight, TI.execution_date)
        )

なお、このソースコードのバージョンはAirflow2.1.3ですが、執筆時点で最新のAirflow2.6.1でもほぼ同様です。

このコードでは、pythonのORMであるSQL Alchemyを用いてDBに対して検索を実行していますが、 .order_by(-TI.priority_weight, TI.execution_date) の部分は、 priority_weightの値をソートの第一キー、実行時間(exeution_date)を第二キーとして、 降順にソートして結果を取得することを示しています。このクエリで取得されたタスクが、その順番で実行キューに投入されます。

注意:DAG_RUNの起動順とタスクの起動順は一致しない

このDBに対する検索は、複数のDAG_RUNをまたがって行われるため、とあるDAG_RUNが先に実行開始されたからと言って、その中のタスクが優先的に実行キューに投入されわけではないことになります。

図3 図3

例えば、図3の例では、 my_dag1 のDAG_RUNの task1-d タスクは、 my_dat2 のDAG_RUNの task2-b のタスクよりもpriority_weightが低いため、実行キューに投入されるのは後になります。 これは my_dag1my_dag2 よりも先に起動していても関係ありません。 つまり、DAG_RUNが先に起動しているからと言って、その中のタスクが優先的に実行されるわけではないのです。

priority_weightの考慮不足で起きた問題

このpriority_weightを考慮しなかったために実際に起こった問題を紹介します。

状況:複数のDAGが動いているシステム

その環境ではDAGの開始処理と終了処理をシステム全体で共通化していました。DAGの先頭にはstartというタスクを設定し、 その後に処理したいメインのタスクを middleとして続け、最後にendというタスクを設定していました。全体像を図示したものが図4です。

図4 図4

DAGごとにmiddleの数は大きく異なります。図4のように、dag_1ではmiddleの数は2、dag_2ではmiddleの数は45といったように、大きな差がありました。

課題:DAG1の後に起動したDAG2が先に処理されてしまう

このシステムにおいて、dag_1の終了処理(end)がなかなか実行状態にならないという問題が起きました。

dag_1のmiddle1とmiddle2の処理は終わっているのに、それから10分たっても終了処理であるendが実行可能状態になりません。待たされている10分の間は、dag_1の後に起動したはずのdag_2に属するタスクが実行されていました。

システム管理者の期待としては、起動時間の早いdag_1の処理をすべて終わらせてから、起動時間の遅いdag_2の処理を開始してほしかったのですが、そうはなりませんでした。

原因:priority_weight = downstream

この事象はpriority_weightの仕組みを知っていると説明できます。priority_weightの計算アルゴリズムはdownstreamでした。

dag_1とdag_2の実行時間が重なると、Airflowのスケジューラーはdag_1とdag_2のタスクの両方をpriority_weightの降順で取得し、実行キューに投入しようとします。 この計算により、dag_1のendはpriority_weightが1と計算されるため、他の47個のタスクがキューに投入されるまで待たされることになります。 そして、スケジューラが高負荷状態になると、実行キューにタスクを投入するにも時間がかかるようになります。 この環境では一つのタスクを実行キューに投入するのに16秒かかっていました。 そのため、dag_1のendが実行キューに投入されるのは752秒後となっていました。

このように、スケジューラが高負荷で実行キューにタスクを投入するのに時間がかかる場合、 priority_weightが低いDAGの中の下流のタスクは、実行可能状態になるまで時間がかかります。

解決策:priority_weight = absolute

結局この問題は、priority_weightの計算アルゴリズムをabsoluteに変更することで、解決しました。

これにより、priority_weightはすべて同じ値となり、タスクが実行キューに投入される時間は、タスクの実行時間順になったためです。

まとめ

このブログでは、Airflowの基礎である、スケジューラ、DAG、DAG_RUNについて説明したのち、タスクを実行可能状態にする際の優先度を決めるpriority_weightについてソースコードを交え解説しました。 また、このpriority_weightを考慮しなかったために、実際に起こった問題も解説しました。

みなさんも、Airflowを使う際には、priority_weightを考慮して、DAGの構造を設計してみてください。

Tags:

Posted:

Author: fetarodc

渡部徹太郎(@fetarodc)。 風音屋アドバイザー。 著書に『実践的データ基盤への処方箋』『図解即戦力 ビッグデータ分析のシステムと開発がこれ1冊でしっかりわかる教科書』『RDB技術者のためのNoSQLガイド』。