Connecting to Airflow
Apache Airflow is a data orchestration tool that allows you to programmatically author, schedule, and monitor workflows. You can use the Airflow provider package for Firebolt to integrate a Firebolt database into your data pipeline—for example, you can set up a schedule for automatic incremental data ingestion into Firebolt.
This topic covers the installation of the Airflow provider package for Firebolt using pip and the airflow-provider-firebolt package on PyPI. It then provides steps for setting up a connection to Firebolt resources from the Airflow UI. Finally, a sample Airflow Directed Acyclic Graph (DAG) Python script demonstrates common Firebolt tasks.
The source code for the Airflow provider package for Firebolt is available in the airflow-provider-firebolt repository on GitHub.
- Prerequisites
- Installing the Airflow provider package for Firebolt
- Connecting Airflow to Firebolt
- Example—creating a DAG for data processing with Firebolt
Prerequisites
-
A Firebolt account. For more information, see Creating a new account.
-
A Firebolt database and engine.
-
Python version 3.8 or later.
-
An installation of Airflow. For more information, see Installation in Airflow documentation.
Installing the Airflow provider package for Firebolt
These instructions are based on using pip to install the Airflow core package as demonstrated in the Airflow Quick Start for Running Airflow locally. After you install Airflow, install the latest version of the Airflow provider package for Firebolt as shown below.
pip install airflow-provider-firebolt
After you install the Firebolt provider, Firebolt is available as a Connection type selection in the Airflow UI.
Upgrading to the latest version
We recommend running the latest version of the provider package. Release history is available on PyPI. To upgrade to the most recent version, use the command below, then restart Airflow for the changes to take effect.
pip install airflow-provider-firebolt --upgrade
Installing a specific version
If your application requires a specific version of the package, use the command below, replacing 1.0.0
with the version you need.
pip install airflow-provider-firebolt==1.0.0
Installing the provider for AWS Managed Airflow (MWAA)
Using the Firebolt Airflow provider with AWS Managed Airflow (MWAA) requires that you are using v2 of MWAA.
In order to install the provider, add airflow-provider-firebolt
to the requirements.txt
file configured as per the MWAA Documentation.
Connecting Airflow to Firebolt
To configure a connection to Firebolt, you create an Airflow connection object. For more information, see Managing Connections in Airflow documentation.
The instructions below demonstrate how to use the Airflow UI to create a connection to a Firebolt database and engine.
To configure an Airflow connection to Firebolt
-
Start the Airflow UI and log in.
-
From the Admin menu, choose Connections.
-
Choose the + button to add a new connection.
-
From the Connection Type list, choose Firebolt.
-
Provide connection parameters for Firebolt according to the following guidelines. These connection parameters correspond to built-in Airflow variables.
Parameter Description Example value Connection id The name of the connection. This appears in the list of connections in the Airflow UI. My_Firebolt_Connection
Description Self-explanatory. Connection to Firebolt database MyDatabase using engine MyFireboltDatabase_general_purpose.
Database The name of the Firebolt database to connect to. MyFireboltDatabase
Engine The name of the engine you want to run your queries. MyFireboltEngine
Client ID The ID of your service account. XyZ83JSuhsua82hs
Client Secret The secret for your service account authentication. yy7h&993))29&%j
Account The name of your account. developer
Extra Extra properties that you may need to set. Most users would not need any. {"property1": "value1", "property2": "value2"}
Client id and secret credentials can be obtained by registering a service account.
-
Choose Test to verify that connection parameters are correct.
The engine that you specify in Advanced Connection Properties must be running for the test to complete successfully.
-
After the test is successful, choose Save.
Example—creating a DAG for data processing with Firebolt
A DAG file in Airflow is a Python script that defines the tasks and execution order for a data workflow. The DAG example in this section performs the following tasks in succession.
This DAG performs a variety of tasks in Firebolt for demonstration purposes only. It does not demonstrate a workflow or pipeline that would be typical of real-world applications.
-
Start an engine to perform subsequent tasks. For more information, see Understanding engine fundamentals. This task fails if the engine is running.
-
Run SQL to create an external table in Firebolt connected to a data source in Amazon S3. For more information, see Working with external tables.
-
Run SQL to create a fact table to receive ingested data. For more information, see Working with tables.
-
Run SQL that ingests the data using an INSERT command.
In this example, the DAG is set up to run only when triggered (schedule_interval=None
).
Airflow variables
The DAG example uses two custom variables defined in Airflow. You can define key-value pairs for variables using the Airflow UI or JSON documents. For more information, see Variables and Managing Variables in Airflow documentation.
- The variable key
firebolt_sql_path
has a value that specifies a subdirectory of your airflow home directory where SQL files are stored—for example,~/airflow/sql_store
. The example DAG uses a Python function to read SQL files that the DAG tasks run as scripts in Firebolt.
DAG script example
The DAG script below creates a DAG named firebolt_provider_trip_data
. It uses an Airflow connection to Firebolt named my_firebolt_connection
. For the contents of the SQL scripts that the DAG runs, see SQL script examples below. By changing connector values in Airflow, changing the FIREBOLT_CONN_ID
value to point to your connector, and creating custom variables in Airflow, you can run this example using your own database and engine.
import time
import airflow
from airflow.models import DAG
from airflow.models import Variable
from firebolt_provider.operators.firebolt \
import FireboltOperator, FireboltStartEngineOperator, FireboltStopEngineOperator
default_args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(1)
}
### Function to connect to Firebolt
def connection_params(conn_opp, field):
connector = FireboltOperator(
firebolt_conn_id=conn_opp, sql="", task_id="CONNECT")
return connector.get_db_hook()._get_conn_params()[field]
### Change the value of FIREBOLT_CONN_ID to match the name of your connector.
FIREBOLT_CONN_ID = 'firebolt_benf_tutorial'
FIREBOLT_ENGINE_NAME = connection_params(FIREBOLT_CONN_ID, 'engine_name')
tmpl_search_path = Variable.get("firebolt_sql_path")
### Function to open query files saved locally.
def get_query(query_file):
return open(query_file, "r").read()
### Define a variable based on an Airflow DAG class.
### For class parameters, see https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/dag/index.html#airflow.models.dag.DAG.
dag = DAG('firebolt_provider_startstop_trip_data',
default_args=default_args,
template_searchpath=tmpl_search_path,
schedule_interval=None,
catchup=False,
tags=["firebolt"])
### Define DAG tasks and task sequence.
### Where necessary, read local sql files using the Airflow variable.
task_start_engine = FireboltStartEngineOperator(
dag=dag,
task_id="START_ENGINE",
firebolt_conn_id=FIREBOLT_CONN_ID,
engine_name=FIREBOLT_ENGINE_NAME)
task_trip_data__external_table = FireboltOperator(
dag=dag,
task_id="task_trip_data__external_table",
sql=get_query(f'{tmpl_search_path}/trip_data__create_external_table.sql'),
firebolt_conn_id=FIREBOLT_CONN_ID
)
task_trip_data__create_table = FireboltOperator(
dag=dag,
task_id="task_trip_data__create_table",
sql=get_query(f'{tmpl_search_path}/trip_data__create_table.sql'),
firebolt_conn_id=FIREBOLT_CONN_ID
)
task_trip_data__create_table.post_execute = lambda **x: time.sleep(10)
task_trip_data__process_data = FireboltOperator(
dag=dag,
task_id="task_trip_data__process_data",
sql=get_query(f'{tmpl_search_path}/trip_data__process.sql'),
firebolt_conn_id=FIREBOLT_CONN_ID
)
task_stop_engine = FireboltStopEngineOperator(
dag=dag,
task_id="STOP_ENGINE",
firebolt_conn_id=FIREBOLT_CONN_ID,
engine_name=FIREBOLT_ENGINE_NAME)
(task_start_engine >> task_trip_data__external_table >>
task_trip_data__create_table >> task_trip_data__process_data >> task_stop_engine)
SQL script examples
The contents of the SQL scripts that DAG tasks run are shown below.
trip_data__create_external_table.sql
Creates a fact table, ex_trip_data
, to connect to a public Amazon S3 data store.
CREATE EXTERNAL TABLE IF NOT EXISTS ex_trip_data(
vendorid INTEGER,
lpep_pickup_datetime TIMESTAMP,
lpep_dropoff_datetime TIMESTAMP,
passenger_count INTEGER,
trip_distance REAL,
ratecodeid INTEGER,
store_and_fwd_flag TEXT,
pu_location_id INTEGER,
do_location_id INTEGER,
payment_type INTEGER,
fare_amount REAL,
extra REAL,
mta_tax REAL,
tip_amount REAL,
tolls_amount REAL,
improvement_surcharge REAL,
total_amount REAL,
congestion_surcharge REAL
)
url = 's3://firebolt-publishing-public/samples/taxi/'
object_pattern = '*yellow*2020*.csv'
type = (CSV SKIP_HEADER_ROWS = 1);
trip_data__create_table.sql
Creates a fact table, my_taxi_trip_data
, to receive ingested data.
DROP TABLE IF EXISTS my_taxi_trip_data;
CREATE FACT TABLE IF NOT EXISTS my_taxi_trip_data(
vendorid INTEGER,
lpep_pickup_datetime TIMESTAMP,
lpep_dropoff_datetime TIMESTAMP,
passenger_count INTEGER,
trip_distance REAL,
ratecodeid INTEGER,
store_and_fwd_flag TEXT,
pu_location_id INTEGER,
do_location_id INTEGER,
payment_type INTEGER,
fare_amount REAL,
extra REAL,
mta_tax REAL,
tip_amount REAL,
tolls_amount REAL,
improvement_surcharge REAL,
total_amount REAL,
congestion_surcharge REAL
SOURCE_FILE_NAME TEXT,
SOURCE_FILE_TIMESTAMP TIMESTAMP
) PRIMARY INDEX vendorid;
trip_data__process.sql
An INSERT INTO
operation that ingests data into the my_taxi_trip_data
fact table using the ex_trip_data
external table. The example uses the external table metadata column, $source_file_timestamp
, to retrieve records only from the latest file.
INSERT INTO my_taxi_trip_data
SELECT
vendorid,
lpep_pickup_datetime,
lpep_dropoff_datetime,
passenger_count,
trip_distance,
ratecodeid,
store_and_fwd_flag,
pu_location_id,
do_location_id,
payment_type,
fare_amount,
extra,
mta_tax,
tip_amount,
tolls_amount,
improvement_surcharge,
total_amount,
congestion_surcharge,
$source_file_name,
$source_file_timestamp
FROM ex_trip_data
WHERE $source_file_timestamp > (SELECT MAX($source_file_timestamp) FROM my_taxi_trip_data);