AWS Glue logo

AWS Glue is a fully managed extract, transform, and load (ETL) service that makes it easy to prepare and transform data for analytics. AWS Glue automatically discovers your data and stores the associated metadata in the AWS Glue Data Catalog, making your data immediately searchable, queryable, and available for ETL operations.

You can connect AWS Glue to Firebolt using the Firebolt JDBC driver to build powerful data pipelines that can extract data from various sources, transform it using Spark, and load it into your Firebolt database for high-performance analytics.

Prerequisites

Before connecting AWS Glue to Firebolt, ensure you have:

  • AWS Account – An active AWS account with appropriate permissions to create and manage AWS Glue resources.
  • Firebolt account – An active Firebolt account. If you don’t have one, you can sign up.
  • Firebolt database and engine – Access to a Firebolt database and engine. If you need to create these, see Create a database and Create an engine.
  • Firebolt service account – A service account for programmatic access with its ID and secret.
  • Appropriate permissions – Your service account must be associated with a user that has the appropriate permissions to query the database and operate the engine. Specifically, the user should have USAGE permission on the database and OPERATE permission on the engine. In short, a user should make sure that any operation they wish to perform on the Firebolt database or engine is allowed by the permissions granted to their service account.
  • IAM permissions – AWS IAM permissions to create and manage Glue jobs, connections, and access S3 buckets.

Set up the JDBC connection in AWS Glue

  1. Download the Firebolt JDBC driver JAR from GitHub.
  2. Upload the JAR file to an S3 bucket that your AWS Glue job can access.
  3. Note the S3 path (e.g., s3://your-bucket/jars/firebolt-jdbc-3.x.x.jar).

Create an ETL job with Firebolt

Read data from Firebolt

You can create an AWS Glue job that reads data from Firebolt using the JDBC driver. Below is a sample Glue script that connects to Firebolt to read data and then transforms it.

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

# Initialize Glue context
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# Firebolt connection details
url = "jdbc:firebolt:<your_database>?engine=<your_engine>&account=<your_account>"
user = "your_client_id"
password = "your_client_secret"
driver = "com.firebolt.FireboltDriver"
source_table = "your_source_table"

# Read data from Firebolt using JDBC
source_df = spark.read \
    .format("jdbc") \
    .option("url", url) \
    .option("user", user) \
    .option("password", password) \
    .option("driver", driver) \
    .option("dbtable", source_table) \
    .load()

# Apply basic transformations
filtered_df = source_df.filter(source_df.status == "active")
renamed_df = filtered_df.withColumnRenamed("customer_id", "id")

# Use Spark SQL for further filtering and selection
renamed_df.createOrReplaceTempView("temp_view")
processed_df = spark.sql("""
    SELECT
        id,
        name,
        email,
        CURRENT_TIMESTAMP() as load_timestamp
    FROM temp_view
    WHERE email IS NOT NULL
""")

Write data to Firebolt

You can also write data back to Firebolt using the JDBC driver. Below is an example of how to write the processed data into a Firebolt table.

# Target table to write to in Firebolt
target_table = "your_target_table"

# Write processed data to Firebolt
processed_df.write \
    .format("jdbc") \
    .option("url", url) \
    .option("user", user) \
    .option("password", password) \
    .option("driver", driver) \
    .option("dbtable", target_table) \
    .mode("overwrite") \
    .save()

# Commit the job
job.commit()

Configure job parameters

When creating your AWS Glue job, configure these important parameters:

  1. Job properties:

    • Type: Spark
    • Glue version: Choose the latest version (5.0 recommended)
    • Language: Python 3
  2. Advanced properties:

    • Dependent JARs path: s3://your-bucket/jars/firebolt-jdbc-3.x.x.jar
    • Job parameters (optional, can be used to pass dynamic values/creds)
  3. Security configuration: Choose appropriate IAM roles and encryption settings.

  4. Resource allocation: Configure the number of workers and worker type based on your data volume.

DMS-S3-Glue-Firebolt data pipeline

A common use case for AWS Glue with Firebolt is building a data pipeline that processes change data capture (CDC) files from AWS Database Migration Service (DMS). This section covers how to set up an automated pipeline that:

  1. AWS DMS captures changes from source databases and writes them to S3 as CSV files
  2. AWS Glue processes these files incrementally and loads them into Firebolt
  3. Firebolt provides high-performance analytics on the replicated data

Architecture overview

Source → AWS DMS → S3 → AWS Glue → Firebolt

The pipeline handles incremental processing by tracking which files have been processed, ensuring data integrity and preventing duplicate processing.

Prerequisites for DMS integration

In addition to the general prerequisites, you’ll need:

  • AWS DMS replication instance configured to write CDC data to S3
  • S3 bucket where DMS writes the CSV files
  • Glue Data Catalog database and table to track processed files
  • EventBridge or Glue triggers to trigger the job when new files arrive

Set up the processed files tracking table

Create a Glue Data Catalog table to track which files have been processed:

  1. In the AWS Glue console, create a new database (e.g., processed_files_db)
  2. Create a table with the following schema:
ColumnTypeDescription
file_pathstringS3 path of the processed file
processed_attimestampWhen the file was processed

DMS-Glue integration script

Here’s a complete Glue script that handles DMS CDC files:

import sys
import boto3
from datetime import datetime
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.types import StructType, StructField, StringType, TimestampType
from pyspark.sql.functions import lit, current_timestamp

# Get job parameters
args = getResolvedOptions(sys.argv, [
    "JOB_NAME", "driver", "url", "user", "password", 
    "bucket_name", "staging_table", "merge_query"
])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# Parameters from job configuration
driver = args["driver"]
url = args["url"]
user = args["user"]
password = args["password"]
bucket_name = args["bucket_name"]
staging_table = args["staging_table"]
merge_query = args["merge_query"]

# Optional parameters
prefix = args.get("prefix", "")  # S3 folder prefix
suffix = args.get("suffix", ".csv")  # File extension filter

# Step 1: Get all files from S3 bucket
s3 = boto3.client('s3')
response = s3.list_objects_v2(Bucket=bucket_name, Prefix=prefix)
all_files = [obj['Key'] for obj in response.get('Contents', []) if obj['Key'].endswith(suffix)]

# Step 2: Read already processed files from Glue catalog
try:
    processed_df = glueContext.create_dynamic_frame.from_catalog(
        database="processed_files_db",
        table_name="processed_files_table",
        transformation_ctx="load_processed"
    ).toDF()
    processed_files = [row['file_path'] for row in processed_df.collect()]
except Exception as e:
    print(f"No processed files found or error reading catalog: {e}")
    processed_files = []

# Step 3: Determine files to process
to_process = list(set(all_files) - set(processed_files))
print(f"Found {len(to_process)} files to process")

# Step 4: Process each file
for file_path in to_process:
    try:
        print(f"Processing file: {file_path}")
        s3_path = f"s3://{bucket_name}/{file_path}"
        
        # Read CSV file from S3
        df = spark.read \
            .option("header", "false") \
            .option("inferSchema", "true") \
            .csv(s3_path)
        
        # Define column names based on your DMS output format
        # Adjust these column names according to your actual DMS configuration
        column_names = [
            "operation",      # I, U, D for Insert, Update, Delete
            "id", 
            "name",
            "email",
            "updated_at",
            # Add more columns as needed
        ]
        df = df.toDF(*column_names)
        
        # Add metadata columns
        df = df.withColumn("batch_id", lit(file_path)) \
               .withColumn("load_timestamp", current_timestamp())
        
        # Step 5: Write to Firebolt staging table
        df.write \
            .format("jdbc") \
            .option("url", url) \
            .option("user", user) \
            .option("password", password) \
            .option("driver", driver) \
            .option("dbtable", staging_table) \
            .option("batchsize", 10000) \  # Adjust batch size as needed
            .mode("append") \
            .save()
        
        # Step 6: Execute merge query via JDBC
        print(f"Executing merge query for batch: {file_path}")
        jvm = spark._sc._jvm
        jvm.java.lang.Class.forName(driver)
        conn = jvm.java.sql.DriverManager.getConnection(url, user, password)
        stmt = conn.createStatement()

        # Replace placeholder in merge query with actual batch_id
        batch_merge_query = merge_query.replace("${batch_id}", file_path) #if using batch_id in merge query
        stmt.execute(batch_merge_query)
        
        stmt.close()
        conn.close()
        
        # Step 7: Record processed file
        schema = StructType([
            StructField("file_path", StringType(), True),
            StructField("processed_at", TimestampType(), True)
        ])
        now = datetime.utcnow()
        row = [(file_path, now)]
        processed_file_df = spark.createDataFrame(row, schema=schema)
        processed_file_dyf = DynamicFrame.fromDF(processed_file_df, glueContext, "new_files")
        
        # Write to processed files table
        glueContext.write_dynamic_frame.from_options(
            frame=processed_file_dyf,
            connection_type="s3",
            connection_options={"path": "s3://glue-table-catalog-bucket/processed_files_table/"},
            format="parquet"
        )
        
        print(f"Successfully processed file: {file_path}")
        
    except Exception as e:
        print(f"Error processing file {file_path}: {str(e)}")
        # Optionally, continue with next file or fail the job
        continue

job.commit()

Configure DMS job parameters

When creating your DMS-Glue job, set these job parameters:

ParameterDescriptionExample
--driverJDBC driver classcom.firebolt.FireboltDriver
--urlFirebolt JDBC URLjdbc:firebolt:your_db?engine=your_engine&account=your_account
--userFirebolt client IDyour_client_id
--passwordFirebolt client secretyour_client_secret
--bucket_nameS3 bucket with DMS filesyour-dms-bucket
--staging_tableStaging table in Fireboltstaging.cdc_data
--merge_querySQL merge statementSee merge query example
--prefixS3 folder prefix (optional)dms-output/

Merge query example

Create a merge query that handles CDC operations (Insert, Update, Delete):

MERGE INTO {target_table} AS target
        USING (select * from {staging_table} where file_path = '${batch_id}') AS source
        ON target.{merge_key} = source.{merge_key}
        WHEN matched and source.operation='D' then delete
        WHEN matched and source.operation='U' then UPDATE set name=source.name, email=source.email, updated_at=source.updated_at
        WHEN NOT MATCHED BY TARGET and source.operation='I' then INSERT (id, name, email, updated_at) VALUES (source.id, source.name, source.email, source.updated_at);

Data integrity strategies

To maintain data integrity at scale (50,000+ records per hour), consider these approaches:

Strategy 1: Batch processing with staging

  • Configure DMS to produce a regular number of files every 3-5 minutes
  • Process multiple files in each Glue job run
  • Use batch IDs to group related changes
  • Execute MERGE statements after all files in a batch are loaded

This approach is the one used in the provided Glue script, where files are processed in batches and a merge query is executed after loading all files.

Strategy 2: Single file processing with controlled frequency

  • Configure DMS to produce larger files (every 3+ minutes)
  • Process one file at a time to avoid concurrent job conflicts
  • Use Glue job queuing to handle multiple triggers

This could moves more work on DMS side, but it can simplify the Glue job logic and reduce the risk of concurrent processing issues.

Trigger configuration

Set up EventBridge or S3 event notifications to trigger the Glue job:

Option 1: S3 Event Notifications

{
  "source": ["aws.s3"],
  "detail-type": ["Object Created"],
  "detail": {
    "bucket": {
      "name": ["your_bucket_name"]
    }
  }
}

This configuration uses the Glue workflow EventBridge trigger to start the Glue job whenever one or more new files are created in the specified S3 bucket.

Option 2: Scheduled triggers

Use a scheduled trigger to run the Glue job at regular intervals (e.g., every 5 minutes). This is useful if you expect DMS to produce files at a consistent rate.

Monitoring and troubleshooting

Key metrics to monitor

  • Files processed per hour: Track throughput
  • Processing latency: Time from file creation to Firebolt load
  • Error rate: Failed file processing percentage
  • Data freshness: Age of the oldest unprocessed file

Common issues

IssueCauseSolution
Concurrent job executionMultiple triggers firing simultaneouslyUse job queuing or implement file locking
Schema evolutionDMS source schema changesAdd schema validation and dynamic column mapping

Best practices for DMS integration

  1. File size optimization: Configure DMS to produce files of 10-100MB for optimal processing
  2. Monitoring: Set up CloudWatch alarms for job failures and processing delays
  3. Schema management: Handle schema evolution gracefully with dynamic column mapping
  4. Cost optimization: Use appropriate Glue worker types and auto-scaling
  5. Data validation: Add data quality checks before and after merge operations

Additional resources