Apache Airflow is a data orchestration tool that allows you to programmatically create, schedule, and monitor workflows. You can connect a Firebolt database into your data pipeline using the Airflow provider package for Firebolt. For example, you can schedule automatic incremental data ingestion into Firebolt. This guide explains how to install the Airflow provider package for Firebolt, set up a connection to Firebolt resources using the Airflow user interface (UI), and create an example Directed Acyclic Graph (DAG) for common Firebolt tasks. The source code for the Airflow provider package for Firebolt is available in the airflow-provider-firebolt repository on GitHub.Documentation Index
Fetch the complete documentation index at: https://docs.firebolt.io/llms.txt
Use this file to discover all available pages before exploring further.
Prerequisites
Make sure that you have:- A Firebolt account. Create a new account.
- A Firebolt database and engine.
- Python version 3.8 or later.
- An installation of Airflow version 2.x. See the Airflow installation guide.
The Firebolt Airflow provider package currently supports Apache Airflow 2.x only. Airflow 3.x is not yet supported.
Install the Airflow provider package for Firebolt
You need to install the Airflow provider package for Firebolt. This package enables Firebolt as a Connection type in the Airflow UI.- Install the package. Run the following command to install the package:
- Upgrade to the latest version. Run the latest version of the provider package. Release history is available on PyPI. Use the following command to upgrade:
-
Install a specific version.
If a specific version is required, replace
1.0.0with the desired version:
-
Install the provider for AWS Managed Airflow (MWAA).
Ensure you are using version 2 of AWS Managed Airflow (MWAA) when working with the Firebolt Airflow provider. Add
airflow-provider-fireboltto therequirements.txtfile following the instructions in the MWAA Documentation.
Connect Airflow to Firebolt
Create a connection object in the Airflow UI to integrate Firebolt with Airflow.Steps to configure a connection
- Open the Airflow UI and log in.
- Select the Admin menu.
- Choose Connections.
- Select the + button to add a new connection.
- Choose Firebolt from the Connection Type list
-
Provide the details in the following table. These connection parameters correspond to built-in Airflow variables.
Parameter Description Example value Connection id The name of the connection for the UI. My_Firebolt_ConnectionDescription Information about the connection. Connection to Firebolt database MyDatabase using engine MyFireboltDatabase_general_purpose.Database The name of the Firebolt database to connect to. MyFireboltDatabaseEngine The name of the engine to run queries MyFireboltEngineClient ID The ID of your service account. XyZ83JSuhsua82hsClient Secret The secret for your service account authentication. yy7h&993))29&%jAccount The name of your account. developerExtra The additional properties that you may need to set (optional). {"property1": "value1", "property2": "value2"}Client ID and secret credentials can be obtained by registering a service account. - Choose Test to verify the connection.
- Once the test succeeds, select Save.
Create a DAG for data processing with Firebolt
A DAG file in Airflow is a Python script that defines tasks and their execution order for a data workflow. The following example is an example DAG for performing the following tasks:- Start a Firebolt engine.
- Create an external table linked to an Amazon S3 data source.
- Create a fact table for ingested data. For more information, see Firebolt-managed tables.
- Insert data into the fact table.
- Stop the Firebolt engine. This task is not required if your engine has
AUTO_STOPconfigured
DAG script example
The following DAG script creates a DAG namedfirebolt_provider_trip_data. It uses an Airflow connection to Firebolt named my_firebolt_connection. For the contents of the SQL scripts that the DAG runs, see the following SQL script examples. You can run this example with your own database and engine by updating the connector values in Airflow, setting the FIREBOLT_CONN_ID to match your connector, and creating the necessary custom variables in Airflow.
This DAG showcases various Firebolt tasks as an example and is not intended to represent a typical real-world workflow or pipeline.
Define Airflow variables
Airflow variables store-key value pairs that DAGs can use during execution. You can create and manage variables through the Airflow user interface (UI) or JSON documents. For detailed instructions, check out Airflow’s Variables and Managing Variables documentation pages. Example variable for SQL filesThe DAG example uses the custom variable
firebolt_sql_path to define the directory within your Airflow home directory where SQL files are stored. The DAG reads these files to execute tasks in Firebolt.
- Key:
firebolt_sql_path - Value: Path to the directory containing SQL scripts. For example,
~/airflow/sql_store.
A python function in the DAG reads the SQL scripts stored in the directory defined by
firebolt_sql_path. This allows the DAG to dynamically execute the SQL files as tasks in Firebolt.
The following example demonstrates how the variable is accessed in the DAG script:
SQL script examples
Save the following SQL scripts to yourtmpl_search_path.
trip_data__create_external_table.sql
This example creates theex_trip_data fact table to connect to a public Amazon S3 data store.
trip_data__create_table.sql
This example creates themy_taxi_trip_data fact table, to receive ingested data.
trip_data__process.sql
AnINSERT INTO operation ingests data into the my_taxi_trip_data fact table using the ex_trip_data
external table. This example uses the external table metadata column, $source_file_timestamp, to retrieve records exclusively from the latest file.
Example: Working with query results
TheFireboltOperator is designed to execute SQL queries but does not return query results. To retrieve query results, use the FireboltHook class. The following example demonstrates how to use FireboltHook to execute a query and log the row count in the my_taxi_trip_data table.
Python code example: Retrieiving query results
Example: Controlling query execution timeout
The Firebolt provider includes parameters to control query execution time and behavior when a timeout occurs:query_timeout: Sets the maximum duration (in seconds) that a query can runfail_on_query_timeout- IfTrue, a timeout raises aQueryTimeoutError. IfFalse, the task terminates quietly, and the task proceeds without raising an error.
Python code example: Using timeout settings
In this example, theFireboltOperator task stops execution after one second and proceeds without error. The PythonOperator task fetches data from Firebolt with a timeout of 0.5 seconds and raises an error if the query times out.