AWS Glue is a fully managed extract, transform, and load (ETL) service that makes it easy to prepare and transform data for analytics. AWS Glue automatically discovers your data and stores the associated metadata in the AWS Glue Data Catalog, making your data immediately searchable, queryable, and available for ETL operations.
You can connect AWS Glue to Firebolt using the Firebolt JDBC driver to build powerful data pipelines that can extract data from various sources, transform it using Spark, and load it into your Firebolt database for high-performance analytics.
Prerequisites
Before connecting AWS Glue to Firebolt, ensure you have:
- AWS Account – An active AWS account with appropriate permissions to create and manage AWS Glue resources.
- Firebolt account – An active Firebolt account. If you don’t have one, you can sign up.
- Firebolt database and engine – Access to a Firebolt database and engine. If you need to create these, see Create a database and Create an engine.
- Firebolt service account – A service account for programmatic access with its ID and secret.
- Appropriate permissions – Your service account must be associated with a user that has the appropriate permissions to query the database and operate the engine. Specifically, the user should have USAGE permission on the database and OPERATE permission on the engine. In short, a user should make sure that any operation they wish to perform on the Firebolt database or engine is allowed by the permissions granted to their service account.
- IAM permissions – AWS IAM permissions to create and manage Glue jobs, connections, and access S3 buckets.
Set up the JDBC connection in AWS Glue
- Download the Firebolt JDBC driver JAR from GitHub.
- Upload the JAR file to an S3 bucket that your AWS Glue job can access.
- Note the S3 path (e.g.,
s3://your-bucket/jars/firebolt-jdbc-3.x.x.jar
).
Create an ETL job with Firebolt
Read data from Firebolt
You can create an AWS Glue job that reads data from Firebolt using the JDBC driver. Below is a sample Glue script that connects to Firebolt to read data and then transforms it.
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
# Initialize Glue context
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
# Firebolt connection details
url = "jdbc:firebolt:<your_database>?engine=<your_engine>&account=<your_account>"
user = "your_client_id"
password = "your_client_secret"
driver = "com.firebolt.FireboltDriver"
source_table = "your_source_table"
# Read data from Firebolt using JDBC
source_df = spark.read \
.format("jdbc") \
.option("url", url) \
.option("user", user) \
.option("password", password) \
.option("driver", driver) \
.option("dbtable", source_table) \
.load()
# Apply basic transformations
filtered_df = source_df.filter(source_df.status == "active")
renamed_df = filtered_df.withColumnRenamed("customer_id", "id")
# Use Spark SQL for further filtering and selection
renamed_df.createOrReplaceTempView("temp_view")
processed_df = spark.sql("""
SELECT
id,
name,
email,
CURRENT_TIMESTAMP() as load_timestamp
FROM temp_view
WHERE email IS NOT NULL
""")
Write data to Firebolt
You can also write data back to Firebolt using the JDBC driver. Below is an example of how to write the processed data into a Firebolt table.
# Target table to write to in Firebolt
target_table = "your_target_table"
# Write processed data to Firebolt
processed_df.write \
.format("jdbc") \
.option("url", url) \
.option("user", user) \
.option("password", password) \
.option("driver", driver) \
.option("dbtable", target_table) \
.mode("overwrite") \
.save()
# Commit the job
job.commit()
When creating your AWS Glue job, configure these important parameters:
-
Job properties:
- Type: Spark
- Glue version: Choose the latest version (5.0 recommended)
- Language: Python 3
-
Advanced properties:
- Dependent JARs path:
s3://your-bucket/jars/firebolt-jdbc-3.x.x.jar
- Job parameters (optional, can be used to pass dynamic values/creds)
-
Security configuration: Choose appropriate IAM roles and encryption settings.
-
Resource allocation: Configure the number of workers and worker type based on your data volume.
DMS-S3-Glue-Firebolt data pipeline
A common use case for AWS Glue with Firebolt is building a data pipeline that processes change data capture (CDC) files from AWS Database Migration Service (DMS). This section covers how to set up an automated pipeline that:
- AWS DMS captures changes from source databases and writes them to S3 as CSV files
- AWS Glue processes these files incrementally and loads them into Firebolt
- Firebolt provides high-performance analytics on the replicated data
Architecture overview
Source → AWS DMS → S3 → AWS Glue → Firebolt
The pipeline handles incremental processing by tracking which files have been processed, ensuring data integrity and preventing duplicate processing.
Prerequisites for DMS integration
In addition to the general prerequisites, you’ll need:
- AWS DMS replication instance configured to write CDC data to S3
- S3 bucket where DMS writes the CSV files
- Glue Data Catalog database and table to track processed files
- EventBridge or Glue triggers to trigger the job when new files arrive
Set up the processed files tracking table
Create a Glue Data Catalog table to track which files have been processed:
- In the AWS Glue console, create a new database (e.g.,
processed_files_db
)
- Create a table with the following schema:
Column | Type | Description |
---|
file_path | string | S3 path of the processed file |
processed_at | timestamp | When the file was processed |
DMS-Glue integration script
Here’s a complete Glue script that handles DMS CDC files:
import sys
import boto3
from datetime import datetime
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.types import StructType, StructField, StringType, TimestampType
from pyspark.sql.functions import lit, current_timestamp
# Get job parameters
args = getResolvedOptions(sys.argv, [
"JOB_NAME", "driver", "url", "user", "password",
"bucket_name", "staging_table", "merge_query"
])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
# Parameters from job configuration
driver = args["driver"]
url = args["url"]
user = args["user"]
password = args["password"]
bucket_name = args["bucket_name"]
staging_table = args["staging_table"]
merge_query = args["merge_query"]
# Optional parameters
prefix = args.get("prefix", "") # S3 folder prefix
suffix = args.get("suffix", ".csv") # File extension filter
# Step 1: Get all files from S3 bucket
s3 = boto3.client('s3')
response = s3.list_objects_v2(Bucket=bucket_name, Prefix=prefix)
all_files = [obj['Key'] for obj in response.get('Contents', []) if obj['Key'].endswith(suffix)]
# Step 2: Read already processed files from Glue catalog
try:
processed_df = glueContext.create_dynamic_frame.from_catalog(
database="processed_files_db",
table_name="processed_files_table",
transformation_ctx="load_processed"
).toDF()
processed_files = [row['file_path'] for row in processed_df.collect()]
except Exception as e:
print(f"No processed files found or error reading catalog: {e}")
processed_files = []
# Step 3: Determine files to process
to_process = list(set(all_files) - set(processed_files))
print(f"Found {len(to_process)} files to process")
# Step 4: Process each file
for file_path in to_process:
try:
print(f"Processing file: {file_path}")
s3_path = f"s3://{bucket_name}/{file_path}"
# Read CSV file from S3
df = spark.read \
.option("header", "false") \
.option("inferSchema", "true") \
.csv(s3_path)
# Define column names based on your DMS output format
# Adjust these column names according to your actual DMS configuration
column_names = [
"operation", # I, U, D for Insert, Update, Delete
"id",
"name",
"email",
"updated_at",
# Add more columns as needed
]
df = df.toDF(*column_names)
# Add metadata columns
df = df.withColumn("batch_id", lit(file_path)) \
.withColumn("load_timestamp", current_timestamp())
# Step 5: Write to Firebolt staging table
df.write \
.format("jdbc") \
.option("url", url) \
.option("user", user) \
.option("password", password) \
.option("driver", driver) \
.option("dbtable", staging_table) \
.option("batchsize", 10000) \ # Adjust batch size as needed
.mode("append") \
.save()
# Step 6: Execute merge query via JDBC
print(f"Executing merge query for batch: {file_path}")
jvm = spark._sc._jvm
jvm.java.lang.Class.forName(driver)
conn = jvm.java.sql.DriverManager.getConnection(url, user, password)
stmt = conn.createStatement()
# Replace placeholder in merge query with actual batch_id
batch_merge_query = merge_query.replace("${batch_id}", file_path) #if using batch_id in merge query
stmt.execute(batch_merge_query)
stmt.close()
conn.close()
# Step 7: Record processed file
schema = StructType([
StructField("file_path", StringType(), True),
StructField("processed_at", TimestampType(), True)
])
now = datetime.utcnow()
row = [(file_path, now)]
processed_file_df = spark.createDataFrame(row, schema=schema)
processed_file_dyf = DynamicFrame.fromDF(processed_file_df, glueContext, "new_files")
# Write to processed files table
glueContext.write_dynamic_frame.from_options(
frame=processed_file_dyf,
connection_type="s3",
connection_options={"path": "s3://glue-table-catalog-bucket/processed_files_table/"},
format="parquet"
)
print(f"Successfully processed file: {file_path}")
except Exception as e:
print(f"Error processing file {file_path}: {str(e)}")
# Optionally, continue with next file or fail the job
continue
job.commit()
When creating your DMS-Glue job, set these job parameters:
Parameter | Description | Example |
---|
--driver | JDBC driver class | com.firebolt.FireboltDriver |
--url | Firebolt JDBC URL | jdbc:firebolt:your_db?engine=your_engine&account=your_account |
--user | Firebolt client ID | your_client_id |
--password | Firebolt client secret | your_client_secret |
--bucket_name | S3 bucket with DMS files | your-dms-bucket |
--staging_table | Staging table in Firebolt | staging.cdc_data |
--merge_query | SQL merge statement | See merge query example |
--prefix | S3 folder prefix (optional) | dms-output/ |
Merge query example
Create a merge query that handles CDC operations (Insert, Update, Delete):
MERGE INTO {target_table} AS target
USING (select * from {staging_table} where file_path = '${batch_id}') AS source
ON target.{merge_key} = source.{merge_key}
WHEN matched and source.operation='D' then delete
WHEN matched and source.operation='U' then UPDATE set name=source.name, email=source.email, updated_at=source.updated_at
WHEN NOT MATCHED BY TARGET and source.operation='I' then INSERT (id, name, email, updated_at) VALUES (source.id, source.name, source.email, source.updated_at);
Data integrity strategies
To maintain data integrity at scale (50,000+ records per hour), consider these approaches:
Strategy 1: Batch processing with staging
- Configure DMS to produce a regular number of files every 3-5 minutes
- Process multiple files in each Glue job run
- Use batch IDs to group related changes
- Execute MERGE statements after all files in a batch are loaded
This approach is the one used in the provided Glue script, where files are processed in batches and a merge query is executed after loading all files.
Strategy 2: Single file processing with controlled frequency
- Configure DMS to produce larger files (every 3+ minutes)
- Process one file at a time to avoid concurrent job conflicts
- Use Glue job queuing to handle multiple triggers
This could moves more work on DMS side, but it can simplify the Glue job logic and reduce the risk of concurrent processing issues.
Trigger configuration
Set up EventBridge or S3 event notifications to trigger the Glue job:
Option 1: S3 Event Notifications
{
"source": ["aws.s3"],
"detail-type": ["Object Created"],
"detail": {
"bucket": {
"name": ["your_bucket_name"]
}
}
}
This configuration uses the Glue workflow EventBridge trigger to start the Glue job whenever one or more new files are created in the specified S3 bucket.
Option 2: Scheduled triggers
Use a scheduled trigger to run the Glue job at regular intervals (e.g., every 5 minutes). This is useful if you expect DMS to produce files at a consistent rate.
Monitoring and troubleshooting
Key metrics to monitor
- Files processed per hour: Track throughput
- Processing latency: Time from file creation to Firebolt load
- Error rate: Failed file processing percentage
- Data freshness: Age of the oldest unprocessed file
Common issues
Issue | Cause | Solution |
---|
Concurrent job execution | Multiple triggers firing simultaneously | Use job queuing or implement file locking |
Schema evolution | DMS source schema changes | Add schema validation and dynamic column mapping |
Best practices for DMS integration
- File size optimization: Configure DMS to produce files of 10-100MB for optimal processing
- Monitoring: Set up CloudWatch alarms for job failures and processing delays
- Schema management: Handle schema evolution gracefully with dynamic column mapping
- Cost optimization: Use appropriate Glue worker types and auto-scaling
- Data validation: Add data quality checks before and after merge operations
Additional resources
Responses are generated using AI and may contain mistakes.