Scheduling Queries in Airflow
- bigquery-etl has tooling to automatically generate Airflow DAGs for scheduling queries
- To be scheduled, a query must be assigned to a DAG that is specified in
dags.yaml
- New DAGs can be configured in
dags.yaml
, e.g., by adding the following:bqetl_ssl_ratios: # name of the DAG; must start with bqetl_ schedule_interval: 0 2 * * * # query schedule description: The DAG schedules SSL ratios queries. default_args: owner: example@mozilla.com start_date: "2020-04-05" # YYYY-MM-DD email: ["example@mozilla.com"] retries: 2 # number of retries if the query execution fails retry_delay: 30m
- All DAG names need to have
bqetl_
as prefix. schedule_interval
is either defined as a CRON expression or alternatively as one of the following CRON presets:once
,hourly
,daily
,weekly
,monthly
start_date
defines the first date for which the query should be executed- Airflow will not automatically backfill older dates if
start_date
is set in the past, backfilling can be done via the Airflow web interface
- Airflow will not automatically backfill older dates if
email
lists email addresses alerts should be sent to in case of failures when running the query
- New DAGs can be configured in
- Alternatively, new DAGs can also be created via the
bqetl
CLI by runningbqetl dag create bqetl_ssl_ratios --schedule_interval='0 2 * * *' --owner="example@mozilla.com" --start_date="2020-04-05" --description="This DAG generates SSL ratios."
- To schedule a specific query, add a
metadata.yaml
file that includes ascheduling
section, for example:friendly_name: SSL ratios # ... more metadata, see Query Metadata section above scheduling: dag_name: bqetl_ssl_ratios
- Additional scheduling options:
depends_on_past
keeps query from getting executed if the previous schedule for the query hasn't succeededdate_partition_parameter
- by default set tosubmission_date
; can be set tonull
if query doesn't write to a partitioned tableparameters
specifies a list of query parameters, e.g.["n_clients:INT64:500"]
arguments
- a list of arguments passed when running the query, for example:["--append_table"]
referenced_tables
- manually curated list of tables a Python or BigQuery script depends on; forquery.sql
files dependencies will get determined automatically and should only be overwritten manually if really necessarymultipart
indicates whether a query is split over multiple filespart1.sql
,part2.sql
, ...depends_on
defines external dependencies in telemetry-airflow that are not detected automatically:depends_on: - task_id: external_task dag_name: external_dag execution_delta: 1h
task_id
: name of task query depends ondag_name
: name of the DAG the external task is part ofexecution_delta
: time difference between theschedule_intervals
of the external DAG and the DAG the query is part of
depends_on_tables_existing
defines tables that the ETL will await the existence of via an Airflow sensor before running:depends_on_tables_existing: - task_id: wait_for_foo_bar_baz table_id: 'foo.bar.baz_{{ ds_nodash }}' poke_interval: 30m timeout: 12h retries: 1 retry_delay: 10m
task_id
: ID to use for the generated Airflow sensor task.table_id
: Fully qualified ID of the table to wait for, including the project and dataset.poke_interval
: Time that the sensor should wait in between each check, formatted as a timedelta string like "2h" or "30m". This parameter is optional (the default poke interval is 5 minutes).timeout
: Time allowed before the sensor times out and fails, formatted as a timedelta string like "2h" or "30m". This parameter is optional (the default timeout is 8 hours).retries
: The number of retries that should be performed if the sensor times out or otherwise fails. This parameter is optional (the default depends on how the DAG is configured).retry_delay
: Time delay between retries, formatted as a timedelta string like "2h" or "30m". This parameter is optional (the default depends on how the DAG is configured).
depends_on_table_partitions_existing
defines table partitions that the ETL will await the existence of via an Airflow sensor before running:depends_on_table_partitions_existing: - task_id: wait_for_foo_bar_baz table_id: foo.bar.baz partition_id: '{{ ds_nodash }}' poke_interval: 30m timeout: 12h retries: 1 retry_delay: 10m
task_id
: ID to use for the generated Airflow sensor task.table_id
: Fully qualified ID of the table to check, including the project and dataset. Note that the service accountairflow-access@moz-fx-data-shared-prod.iam.gserviceaccount.com
will need to have the BigQuery Job User role on the project and read access to the dataset.partition_id
: ID of the partition to wait for.poke_interval
: Time that the sensor should wait in between each check, formatted as a timedelta string like "2h" or "30m". This parameter is optional (the default poke interval is 5 minutes).timeout
: Time allowed before the sensor times out and fails, formatted as a timedelta string like "2h" or "30m". This parameter is optional (the default timeout is 8 hours).retries
: The number of retries that should be performed if the sensor times out or otherwise fails. This parameter is optional (the default depends on how the DAG is configured).retry_delay
: Time delay between retries, formatted as a timedelta string like "2h" or "30m". This parameter is optional (the default depends on how the DAG is configured).
trigger_rule
: The rule that determines when the airflow task that runs this query should run. The default isall_success
("trigger this task when all directly upstream tasks have succeeded"); other rules can allow a task to run even if not all preceding tasks have succeeded. See the Airflow docs for the list of trigger rule options.destination_table
: The table to write to. If unspecified, defaults to the query destination; if None, no destination table is used (the query is simply run as-is). Note that if no destination table is specified, you will need to specify thesubmission_date
parameter manuallyexternal_downstream_tasks
defines external downstream dependencies for whichExternalTaskMarker
s will be added to the generated DAG. These task markers ensure that when the task is cleared for triggering a rerun, all downstream tasks are automatically cleared as well.external_downstream_tasks: - task_id: external_downstream_task dag_name: external_dag execution_delta: 1h
- Additional scheduling options:
- Queries can also be scheduled using the
bqetl
CLI:./bqetl query schedule path/to/query_v1 --dag bqetl_ssl_ratios
- To generate all Airflow DAGs run
./bqetl dag generate
- Generated DAGs are located in the
dags/
directory - Dependencies between queries scheduled in bigquery-etl and dependencies to stable tables are detected automatically
- Generated DAGs are located in the
- Specific DAGs can be generated by running
./bqetl dag generate bqetl_ssl_ratios
- Generated DAGs do not need to be checked into
main
. CI automatically generates DAGs and writes them to the telemetry-airflow-dags repo from where Airflow will pick them up - Generated DAGs will be automatically detected and scheduled by Airflow
- It might take up to 10 minutes for new DAGs and updates to show up in the Airflow UI
- The DAG can be found at
https://workflow.telemetry.mozilla.org/dags/{YOUR_DAG_NAME}/grid
- NOTE - New DAGs will not be started automatically. You must enable them manually via the Airflow UI. To enable the DAG, toggle the switch next to your DAG name in the upper left corner.
- To generate tasks for importing data from Fivetran that an ETL task depends on add:
depends_on_fivetran: - task_id: fivetran_import_1 - task_id: another_fivetran_import
- The Fivetran connector ID needs to be set as a variable
<task_id>_connector_id
in the Airflow admin interface for each import task
- The Fivetran connector ID needs to be set as a variable