Executing sequential and concurrent tasks within one DAG

I am new to Airflow and have a few basic questions about how to properly run some tasks concurrently and others sequentially within one DAG.

In my DAG, the basic steps are: refresh data, run 3 separate scripts, deploy. Each of these applications is run in a separate Docker container.

In the example below, everything is done in sequence, however, my objective is to refresh data, then do this, that, and the_other_thing in parallel, then deploy.

refresh >> [this, that, the_other_thing] >> deploy

I would only like to deploy after [this, that, the_other_thing] are finished, but it is unclear which of the three will finish last. What is the best practice to execute this sequence within one DAG? Is it enough to set concurrency=3 and to execute the [this, that, the_other_thing] in a for loop? Any suggestions are appreciated

from builtins import range
from datetime import timedelta, datetime

from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
from airflow.hooks.base_hook import BaseHook

image = 'myserver.com:8080/my_project:latest'

args = {
    'owner': 'Airflow',
    'start_date': datetime(2020,01,01),
    'depends_on_past': False,
    "retries": 2,
    'retry_delay': timedelta(minutes=5)
}

conn_foo_db = BaseHook.get_connection('foobar_db')
conn_docker = BaseHook.get_connection('my_registry')

dag = DAG(
    dag_id='analysis',
    default_args=args,
    schedule_interval='0 3 * * *',
    dagrun_timeout=timedelta(minutes=180),
    max_active_runs=1,
    concurrency=1,
    tags=['daily']
)

refresh_data = BashOperator(
    task_id='refresh_data',
    bash_command='docker run '
                 '-i --rm '
                 f"-e DB_PASSWORD='{ conn_foo_db.password }' "
                 f' { image }  '
                 'app=refresh',
    dag=dag,
)

this = BashOperator(
    task_id='run_app_this',
    bash_command='docker run '
                 '-i --rm '
                 f"-e DB_PASSWORD='{ conn_foo_db.password }' "
                 f' { image }  '
                 'app=do_this ',
    dag=dag,
)

that = BashOperator(
    task_id='run_app_that',
    bash_command='docker run '
                 '-i --rm '
                 f"-e DB_PASSWORD='{ conn_foo_db.password }' "
                 f' { image }  '
                 'app=do_that',
    dag=dag,
)

the_other_thing = BashOperator(
    task_id='run_app_the_other_thing',
    bash_command='docker run '
                 '-i --rm '
                 f"-e DB_PASSWORD='{ conn_foo_db.password }' "
                 f' { image }  '
                 'app=do_the_other_thing ',
    dag=dag,
)

deploy = BashOperator(
    task_id='deploy',
    bash_command='docker run '
                 '-i --rm '
                 f"-e DB_PASSWORD='{ conn_foo_db.password }' "
                 f' { image }  '
                 'app=deploy ',
    dag=dag,
)

refresh_data >> do_this >> do_that >> do_the_other_thing >> deploy_to_dashboard

if __name__ == "__main__":
    dag.cli()

Source: Docker Questions

LEAVE A COMMENT