Connecting to Airflow
Apache Airflow is a data orchestration tool that allows you to programmatically create, schedule, and monitor workflows. You can connect a Firebolt database into your data pipeline using the Airflow provider package for Firebolt. For example, you can schedule automatic incremental data ingestion into Firebolt.
This guide explains how to install the Airflow provider package for Firebolt, set up a connection to Firebolt resources using the Airflow user interface (UI), and create an example Directed Acyclic Graph (DAG) for common Firebolt tasks. The source code for the Airflow provider package for Firebolt is available in the airflow-provider-firebolt repository on GitHub.
- Prerequisites
- Install the Airflow provider package for Firebolt
- Connect Airflow to Firebolt
- Create a DAG for data processing with Firebolt
- Example: Working with query results
- Example: Controlling query execution timeout
- Additional resources
Prerequisites
Make sure that you have:
-
A Firebolt account. Create a new account here.
-
A Firebolt database and engine.
-
Python version 3.8 or later.
-
An installation of Airflow. See the Airflow installation guide.
Install the Airflow provider package for Firebolt
You need to install the Airflow provider package for Firebolt. This package enables Firebolt as a Connection type in the Airflow UI.
-
Install the package.
Run the following command to install the package:
pip install airflow-provider-firebolt
-
Upgrade to the latest version.
Run the latest version of the provider package. Release history is available on PyPI.
Use the following command to upgrade:
pip install airflow-provider-firebolt --upgrade
Restart Airflow after upgrading to apply the new changes.
-
Install a specific version.
If a specific version is required, replace
1.0.0
with the desired version:pip install airflow-provider-firebolt==1.0.0
-
Install the provider for AWS Managed Airflow (MWAA).
Ensure you are using version 2 of AWS Managed Airflow (MWAA) when working with the Firebolt Airflow provider. Add
airflow-provider-firebolt
to therequirements.txt
file following the instructions in the MWAA Documentation.
Connect Airflow to Firebolt
Create a connection object in the Airflow UI to integrate Firebolt with Airflow.
Steps to configure a connection
-
Open the Airflow UI and log in.
-
Select the Admin menu.
-
Choose Connections.
-
Select the + button to add a new connection.
-
Choose Firebolt from the Connection Type list
-
Provide the details in the following table. These connection parameters correspond to built-in Airflow variables.
Parameter Description Example value Connection id The name of the connection for the UI. My_Firebolt_Connection
Description Information about the connection. Connection to Firebolt database MyDatabase using engine MyFireboltDatabase_general_purpose.
Database The name of the Firebolt database to connect to. MyFireboltDatabase
Engine The name of the engine to run queries MyFireboltEngine
Client ID The ID of your service account. XyZ83JSuhsua82hs
Client Secret The secret for your service account authentication. yy7h&993))29&%j
Account The name of your account. developer
Extra The additional properties that you may need to set (optional). {"property1": "value1", "property2": "value2"}
Client ID and secret credentials can be obtained by registering a service account.
-
Choose Test to verify the connection.
-
Once the test succeeds, select Save.
Create a DAG for data processing with Firebolt
A DAG file in Airflow is a Python script that defines tasks and their execution order for a data workflow. The following example is an example DAG for performing the following tasks:
- Start a Firebolt engine.
- Create an external table linked to an Amazon S3 data source.
- Create a fact table for ingested data. For more information, see Working with tables.
- Insert data into the fact table.
- Stop the Firebolt engine. This task is not required if your engine has
AUTO_STOP
configured
DAG script example
The following DAG script creates a DAG named firebolt_provider_trip_data
. It uses an Airflow connection to Firebolt named my_firebolt_connection
. For the contents of the SQL scripts that the DAG runs, see the following SQL script examples. You can run this example with your own database and engine by updating the connector values in Airfow, setting the FIREBOLT_CONN_ID
to match your connector, and creating the necessary custom variables in Airflow.
import time
import airflow
from airflow.models import DAG, Variable
from firebolt_provider.operators.firebolt \
import FireboltOperator, FireboltStartEngineOperator, FireboltStopEngineOperator
# Define function to get Firebolt connection parameters
def connection_params(conn_opp, field):
connector = FireboltOperator(
firebolt_conn_id=conn_opp, sql="", task_id="CONNECT")
conn_parameters = connector.get_db_hook()._get_conn_params()
return getattr(conn_parameters, field)
# Set up the Firebolt connection ID
firebolt_conn_id = 'firebolt'
firebolt_engine_name = connection_params(firebolt_conn_id, 'engine_name')
tmpl_search_path = Variable.get("firebolt_sql_path")
default_args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(1)
}
# Function to open query files saved locally.
def get_query(query_file):
return open(query_file, "r").read()
# Define a variable based on an Airflow DAG class.
# For class parameters, see https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/dag/index.html#airflow.models.dag.DAG.
with DAG('firebolt_provider_startstop_trip_data',
default_args=default_args,
template_searchpath=tmpl_search_path,
schedule_interval=None,
catchup=False,
tags=["firebolt"]) as dag:
# Define DAG tasks and task sequence.
# Where necessary, read local sql files using the Airflow variable.
task_start_engine = FireboltStartEngineOperator(
dag=dag,
task_id="START_ENGINE",
firebolt_conn_id=firebolt_conn_id,
engine_name=firebolt_engine_name)
task_trip_data__external_table = FireboltOperator(
dag=dag,
task_id="task_trip_data__external_table",
sql=get_query(f'{tmpl_search_path}/trip_data__create_external_table.sql'),
firebolt_conn_id=firebolt_conn_id
)
task_trip_data__create_table = FireboltOperator(
dag=dag,
task_id="task_trip_data__create_table",
sql=get_query(f'{tmpl_search_path}/trip_data__create_table.sql'),
firebolt_conn_id=firebolt_conn_id
)
task_trip_data__create_table.post_execute = lambda **x: time.sleep(10)
task_trip_data__process_data = FireboltOperator(
dag=dag,
task_id="task_trip_data__process_data",
sql=get_query(f'{tmpl_search_path}/trip_data__process.sql'),
firebolt_conn_id=firebolt_conn_id
)
task_stop_engine = FireboltStopEngineOperator(
dag=dag,
task_id="STOP_ENGINE",
firebolt_conn_id=firebolt_conn_id,
engine_name=firebolt_engine_name)
(task_start_engine >> task_trip_data__external_table >>
task_trip_data__create_table >> task_trip_data__process_data >> task_stop_engine)
This DAG showcases various Firebolt tasks as an example and is not intended to represent a typical real-world workflow or pipeline.
Define Airflow variables
Airflow variables store-key value pairs that DAGs can use during execution. You can create and manage variables through the Airflow user interface (UI) or JSON documents. For detailed instructions, check out Airflow’s Variables and Managing Variables documentation pages.
Example variable for SQL files
The DAG example uses the custom variable firebolt_sql_path
to define the directory within your Airflow home directory where SQL files are stored. The DAG reads these files to execute tasks in Firebolt.
- Key:
firebolt_sql_path
- Value: Path to the directory containing SQL scripts. For example,
~/airflow/sql_store
.
Using the variable in the DAG
A python function in the DAG reads the SQL scripts stored in the directory defined by firebolt_sql_path
. This allows the DAG to dynamically execute the SQL files as tasks in Firebolt.
The following example demonstrates how the variable is accessed in the DAG script:
tmpl_search_path = Variable.get("firebolt_sql_path")
def get_query(query_file):
with open(query_file, "r") as file:
return file.read()
SQL script examples
Save the following SQL scripts to your tmpl_search_path
.
trip_data__create_external_table.sql
This example creates the ex_trip_data
fact table to connect to a public Amazon S3 data store.
CREATE EXTERNAL TABLE IF NOT EXISTS ex_trip_data(
vendorid INTEGER,
lpep_pickup_datetime TIMESTAMP,
lpep_dropoff_datetime TIMESTAMP,
passenger_count INTEGER,
trip_distance REAL,
ratecodeid INTEGER,
store_and_fwd_flag TEXT,
pu_location_id INTEGER,
do_location_id INTEGER,
payment_type INTEGER,
fare_amount REAL,
extra REAL,
mta_tax REAL,
tip_amount REAL,
tolls_amount REAL,
improvement_surcharge REAL,
total_amount REAL,
congestion_surcharge REAL
)
url = 's3://firebolt-publishing-public/samples/taxi/'
object_pattern = '*yellow*2020*.csv'
type = (CSV SKIP_HEADER_ROWS = true);
trip_data__create_table.sql
This example creates the my_taxi_trip_data
fact table, to receive ingested data.
DROP TABLE IF EXISTS my_taxi_trip_data;
CREATE FACT TABLE IF NOT EXISTS my_taxi_trip_data(
vendorid INTEGER,
lpep_pickup_datetime TIMESTAMP,
lpep_dropoff_datetime TIMESTAMP,
passenger_count INTEGER,
trip_distance REAL,
ratecodeid INTEGER,
store_and_fwd_flag TEXT,
pu_location_id INTEGER,
do_location_id INTEGER,
payment_type INTEGER,
fare_amount REAL,
extra REAL,
mta_tax REAL,
tip_amount REAL,
tolls_amount REAL,
improvement_surcharge REAL,
total_amount REAL,
congestion_surcharge REAL,
SOURCE_FILE_NAME TEXT,
SOURCE_FILE_TIMESTAMP TIMESTAMP
) PRIMARY INDEX vendorid;
trip_data__process.sql
An INSERT INTO
operation ingests data into the my_taxi_trip_data
fact table using the ex_trip_data
external table. This example uses the external table metadata column, $source_file_timestamp
, to retrieve records exclusively from the latest file.
INSERT INTO my_taxi_trip_data
SELECT
vendorid,
lpep_pickup_datetime,
lpep_dropoff_datetime,
passenger_count,
trip_distance,
ratecodeid,
store_and_fwd_flag,
pu_location_id,
do_location_id,
payment_type,
fare_amount,
extra,
mta_tax,
tip_amount,
tolls_amount,
improvement_surcharge,
total_amount,
congestion_surcharge,
$source_file_name,
$source_file_timestamp
FROM ex_trip_data
WHERE coalesce($source_file_timestamp > (SELECT MAX(source_file_timestamp) FROM my_taxi_trip_data), true);
Example: Working with query results
The FireboltOperator
is designed to execute SQL queries but does not return query results. To retrieve query results, use the FireboltHook
class. The following example demonstrates how to use FireboltHook
to execute a query and log the row count in the my_taxi_trip_data
table.
Python code example: Retrieiving query results
import logging
import airflow
from airflow import DAG
from airflow.operators.python import PythonOperator
from firebolt_provider.hooks.firebolt import FireboltHook
from airflow.providers.common.sql.hooks.sql import fetch_one_handler
# Set up the Firebolt connection ID
firebolt_conn_id = 'firebolt'
default_args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(1)
}
# Function to notify the team about the data
def notify(message: str):
logging.info(message)
# Function to fetch data from Firebolt and notify the team
def fetch_firebolt_data():
hook = FireboltHook(firebolt_conn_id=firebolt_conn_id)
results = hook.run(
"SELECT count(*) FROM my_taxi_trip_data",
handler=fetch_one_handler
)
count = results[0]
notify("Amount of data in Firebolt: " + str(count))
with DAG(
'return_result_dag',
default_args=default_args,
schedule_interval=None, # Run manually
catchup=False
) as dag:
# Define a Python operator to fetch data from Firebolt and notify the team
monitor_firebolt_data = PythonOperator(
task_id='monitor_firebolt_data',
python_callable=fetch_firebolt_data
)
monitor_firebolt_data
Example: Controlling query execution timeout
The Firebolt provider includes parameters to control query execution time and behavior when a timeout occurs:
query_timeout
: Sets the maximum duration (in seconds) that a query can runfail_on_query_timeout
- IfTrue
, a timeout raises aQueryTimeoutError
. IfFalse
, the task terminates quietly, and the task proceeds without raising an error.
Python code example: Using timeout settings
In this example, the FireboltOperator
task stops execution after one second and proceeds without error. The PythonOperator
task fetches data from Firebolt with a timeout of 0.5 seconds and raises an error if the query times out.
import airflow
from airflow.models import DAG, Variable
from airflow.operators.python import PythonOperator
from firebolt_provider.hooks.firebolt import FireboltHook
from airflow.providers.common.sql.hooks.sql import fetch_one_handler
from firebolt_provider.operators.firebolt import FireboltOperator
# Set up the Firebolt connection ID
firebolt_conn_id = 'firebolt'
default_args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(1)
}
tmpl_search_path = Variable.get("firebolt_sql_path")
def get_query(query_file):
return open(query_file, "r").read()
# Function to fetch data with a timeout
def fetch_with_timeout():
hook = FireboltHook(
firebolt_conn_id=firebolt_conn_id,
query_timeout=0.5,
fail_on_query_timeout=True,
)
results = hook.run(
"SELECT count(*) FROM my_taxi_trip_data",
handler=fetch_one_handler,
)
print(f"Results: {results}")
# Define the DAG
with DAG(
'timeout_dag',
default_args=default_args,
schedule_interval=None, # Run manually
catchup=False
) as dag:
# Firebolt operator with a timeout
firebolt_operator_with_timeout = FireboltOperator(
dag=dag,
task_id="insert_with_timeout",
sql=get_query(f'{tmpl_search_path}/trip_data__process.sql'),
firebolt_conn_id=firebolt_conn_id,
query_timeout=1,
# Task will not fail if query times out, and will proceed to the next task
fail_on_query_timeout=False,
)
# Python operator to fetch data with a timeout
operator_with_hook_timeout = PythonOperator(
dag=dag,
task_id='select_with_hook_timeout',
python_callable=fetch_with_timeout,
)
firebolt_operator_with_timeout >> operator_with_hook_timeout
Additional resources
For more information about connecting to Airflow, refer to the following resources: