Airflow
Learn how to use the Apache Airflow provider package to connect Airflow to Firebolt.
Apache Airflow is a data orchestration tool that allows you to programmatically create, schedule, and monitor workflows. You can connect a Firebolt database into your data pipeline using the Airflow provider package for Firebolt. For example, you can schedule automatic incremental data ingestion into Firebolt.
This guide explains how to install the Airflow provider package for Firebolt, set up a connection to Firebolt resources using the Airflow user interface (UI), and create an example Directed Acyclic Graph (DAG) for common Firebolt tasks. The source code for the Airflow provider package for Firebolt is available in the airflow-provider-firebolt repository on GitHub.
Prerequisites
Make sure that you have:
-
A Firebolt account. Create a new account.
-
A Firebolt database and engine.
-
Python version 3.8 or later.
-
An installation of Airflow. See the Airflow installation guide.
Install the Airflow provider package for Firebolt
You need to install the Airflow provider package for Firebolt. This package enables Firebolt as a Connection type in the Airflow UI.
-
Install the package.
Run the following command to install the package:
-
Upgrade to the latest version.
Run the latest version of the provider package. Release history is available on PyPI.
Use the following command to upgrade:
Restart Airflow after upgrading to apply the new changes.
-
Install a specific version.
If a specific version is required, replace
1.0.0
with the desired version:
-
Install the provider for AWS Managed Airflow (MWAA).
Ensure you are using version 2 of AWS Managed Airflow (MWAA) when working with the Firebolt Airflow provider. Add
airflow-provider-firebolt
to therequirements.txt
file following the instructions in the MWAA Documentation.
Connect Airflow to Firebolt
Create a connection object in the Airflow UI to integrate Firebolt with Airflow.
Steps to configure a connection
-
Open the Airflow UI and log in.
-
Select the Admin menu.
-
Choose Connections.
-
Select the + button to add a new connection.
-
Choose Firebolt from the Connection Type list
-
Provide the details in the following table. These connection parameters correspond to built-in Airflow variables.
Parameter Description Example value Connection id The name of the connection for the UI. My_Firebolt_Connection
Description Information about the connection. Connection to Firebolt database MyDatabase using engine MyFireboltDatabase_general_purpose.
Database The name of the Firebolt database to connect to. MyFireboltDatabase
Engine The name of the engine to run queries MyFireboltEngine
Client ID The ID of your service account. XyZ83JSuhsua82hs
Client Secret The secret for your service account authentication. yy7h&993))29&%j
Account The name of your account. developer
Extra The additional properties that you may need to set (optional). {"property1": "value1", "property2": "value2"}
Client ID and secret credentials can be obtained by registering a service account.
-
Choose Test to verify the connection.
-
Once the test succeeds, select Save.
Create a DAG for data processing with Firebolt
A DAG file in Airflow is a Python script that defines tasks and their execution order for a data workflow. The following example is an example DAG for performing the following tasks:
- Start a Firebolt engine.
- Create an external table linked to an Amazon S3 data source.
- Create a fact table for ingested data. For more information, see Firebolt-managed tables.
- Insert data into the fact table.
- Stop the Firebolt engine. This task is not required if your engine has
AUTO_STOP
configured
DAG script example
The following DAG script creates a DAG named firebolt_provider_trip_data
. It uses an Airflow connection to Firebolt named my_firebolt_connection
. For the contents of the SQL scripts that the DAG runs, see the following SQL script examples. You can run this example with your own database and engine by updating the connector values in Airfow, setting the FIREBOLT_CONN_ID
to match your connector, and creating the necessary custom variables in Airflow.
This DAG showcases various Firebolt tasks as an example and is not intended to represent a typical real-world workflow or pipeline.
Define Airflow variables
Airflow variables store-key value pairs that DAGs can use during execution. You can create and manage variables through the Airflow user interface (UI) or JSON documents. For detailed instructions, check out Airflow’s Variables and Managing Variables documentation pages.
Example variable for SQL files
The DAG example uses the custom variable firebolt_sql_path
to define the directory within your Airflow home directory where SQL files are stored. The DAG reads these files to execute tasks in Firebolt.
- Key:
firebolt_sql_path
- Value: Path to the directory containing SQL scripts. For example,
~/airflow/sql_store
.
Using the variable in the DAG
A python function in the DAG reads the SQL scripts stored in the directory defined by firebolt_sql_path
. This allows the DAG to dynamically execute the SQL files as tasks in Firebolt.
The following example demonstrates how the variable is accessed in the DAG script:
SQL script examples
Save the following SQL scripts to your tmpl_search_path
.
trip_data__create_external_table.sql
This example creates the ex_trip_data
fact table to connect to a public Amazon S3 data store.
trip_data__create_table.sql
This example creates the my_taxi_trip_data
fact table, to receive ingested data.
trip_data__process.sql
An INSERT INTO
operation ingests data into the my_taxi_trip_data
fact table using the ex_trip_data
external table. This example uses the external table metadata column, $source_file_timestamp
, to retrieve records exclusively from the latest file.
Example: Working with query results
The FireboltOperator
is designed to execute SQL queries but does not return query results. To retrieve query results, use the FireboltHook
class. The following example demonstrates how to use FireboltHook
to execute a query and log the row count in the my_taxi_trip_data
table.
Python code example: Retrieiving query results
Example: Controlling query execution timeout
The Firebolt provider includes parameters to control query execution time and behavior when a timeout occurs:
query_timeout
: Sets the maximum duration (in seconds) that a query can runfail_on_query_timeout
- IfTrue
, a timeout raises aQueryTimeoutError
. IfFalse
, the task terminates quietly, and the task proceeds without raising an error.
Python code example: Using timeout settings
In this example, the FireboltOperator
task stops execution after one second and proceeds without error. The PythonOperator
task fetches data from Firebolt with a timeout of 0.5 seconds and raises an error if the query times out.
Additional resources
For more information about connecting to Airflow, refer to the following resources: