Integrate with Debezium
Debezium is a distributed platform that captures changes from your databases so that your analytics are always consistent between systems. Debezium uses CDC (Change Data Capture) events to record all row-level changes within each database table in a change event. Applications read these events to see the changes in the same order in which they occurred. Firebolt integrates with Debezium via its JDBC driver, allowing you to ingest CDC events directly into Firebolt for further analysis.
Debezium uses Apache Kafka as the backbone for handling and delivering CDC events. Kafka Connect allows for data transfer between Kafka and other systems. Kafka topics organize and store captured events. In Firebolt’s integration with Debezium, any database supported by Debezium can act as a source for capturing database changes, while Firebolt serves as the sink for storing and analyzing these changes via Debezium’s JDBC sink connector. If a native Debezium source connector is unavailable for a specific database, as long as the messages conform to Debezium’s message format, they can still be consumed by the sink connector and loaded into Firebolt.
Topics:
- Prerequisites – Set up your environment to use Debezium.
- Quickstart – Learn how to quickly set up a CDC pipeline with Kafka and Kafka Connect.
- Firebolt connector configuration – Configure the Firebolt JDBC driver and define the necessary connection properties.
- Using Docker Compose – Deploy the setup using Docker Compose.
- Manual installation – Set up Kafka and Kafka Connect manually.
- Sample Event – Learn how to send a sample JSON message through Kafka’s console producer and write the event to a Firebolt database.
- Configuration – Configure Firebolt sink for your specific use case.
- Limitations – Firebolt does not support some operations in data loading.
- Additional resources – Learn more about Debezium source connectors, custom configurations for Kafka Connect, and Debezium sink connectors.
Prerequisites
You must have the following prerequisites before connecting Firebolt to Debezium:
- Firebolt account – You need an active Firebolt account. If you do not have one, you can sign up for one.
- Firebolt service account – You must have access to an active Firebolt service account, which facilitates programmatic access to Firebolt, its ID and secret.
- Firebolt User: You need a separate user associated with your service account. The user should have USAGE permission to query your database, and OPERATE permission to start and stop an engine if it is not already started. It should also have at least USAGE, SELECT and INSERT permissions on the schema you are planning to query.
Quickstart
There are two ways to deploy this setup: using Docker Compose or starting each service manually on the local machine. Docker Compose simplifies deployment by managing all required services in containers, while manual deployment provides more flexibility for configuring individual components.
Firebolt connector configuration
Before using Kafka Connect to register the Firebolt connector, you need to configure it properly. This step involves setting up Debezium’s Kafka Connect so it can communicate with Firebolt. Specifically, you must configure the Firebolt JDBC driver and define the necessary connection properties. These settings establish the connection and will be used later when Kafka Connect is set up.
- Create a
sink-connector.json
file in a directory of your choice. -
Copy and paste the following JSON into this file, replacing the placeholders as described below
{ "name": "firebolt-sink-connector", "config": { "connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector", "topics": "cdc.public.demo", "connection.url": "jdbc:firebolt:database?account=<account_name>&engine=<engine_name>&merge_prepared_statement_batches=true", "connection.username": "<client_id>", "connection.password": "<client_secret>", "hibernate.dialect": "org.hibernate.dialect.PostgreSQLDialect", "primary.key.mode": "none", "primary.key.fields": "", "insert.mode": "insert", "schema.evolution": "basic", "delete.enabled": "false", "quote.identifiers": true } }
Use the following configuration parameters for your sink connector:
Parameter Description name
A name for this connector. connector.class
Set the connector class to Debezium’s sink connector io.debezium.connector.jdbc.JdbcSinkConnector
.topics
The list of Kafka topics that this connector will be listening to, separated by commas. If Debezium source connector is used to create events then the topics would have a specific naming scheme. For this example you can use cdc.public.demo
(seetopic.prefix
andtopic.include.list
fields in Create a Postgres connector configuration for reference)connection.url
The Firebolt JDBC connection string. merge_prepared_statement_batches
here is important to improve the insert performance, see the note below for more information.connection.username
The client ID of your Firebolt service account. connection.password
THe secret of your Firebolt service account. hibernate.dialect
Specifies the SQL dialect for the target database. Set to org.hibernate.dialect.PostgreSQLDialect
.insert.mode
A Kafka Connect property that defines how data is written to a target system. You can use either insert
for new rows, orupdate
to modify existing rows in your Firebolt database.upsert
allows combining both insert and update operations.primary.key.mode
Specifies how the connector resolves primary key, which is required for "insert.mode": "update"
or when"delete.enabled"
is set totrue
. Otherwise set to"none"
primary.key.fields
Specify which fields to use as primary key columns, separated by commas. This parameter is only used when insert.mode
is set to update.schema.evolution
Controls how the connector handles schema changes. Use basic
for basic schema evolution (adding columns if a new one is encountered or if the name is changed) ornone
to disable schema evolution entirely. For more information see Schema evolution in Debezium’s documentation.delete.enabled
A boolean value that determines whether delete operations from the source database should be propagated to Firebolt. Set to true
to enable deletion of rows in Firebolt when they are deleted in the source database. Requiresprimary.key.mode
set to other thannone
.quote.identifiers
A boolean value that determines whether to quote database identifiers (e.g., table and column names). Set to true
to ensure compatibility with case-sensitive or reserved keywords in Firebolt.For more information, see Apache Kafka’s sink connector configs and Debezium’s connector for JDBC
merge_prepared_statement_batches
JDBC connection flag is designed to improve performance when multiple rows are updated in the source database either at the same time or in quick succession. It optimizes the INSERT queries by allowing the JDBC driver to combine multiple rows into a single statement that’s executed in one request to Firebolt, eliminating additional network round trips to the server.
Using the Docker Compose
This tutorial showcases how to quickly create a CDC pipeline with Kafka, Postgres database and Kafka connect cluster running in their own containers. In order to follow it you need to have docker installed. Modern docker versions come with docker compose
by default.
We’ll be using debezium’s docker images in order to simplify the installation. You can use your own images or non-debezium images, but there might be some additional setup required. Refer to the docker hub READMEs for more information.
Download and set up the JDBC driver
- Download the latest Firebolt’s JDBC driver (.jar) from Firebolt’s JDBC repository.
- Place it in a directory that will be mounted in docker and note its path.
Create a docker compose file and start the services
-
Create a
docker-compose.yml
file with the following contents:services: zookeeper: image: quay.io/debezium/zookeeper:3.0 environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 ports: - "2181:2181" - "2888:2888" - "3888:3888" networks: - kafka-net kafka: image: quay.io/debezium/kafka:3.0 depends_on: - zookeeper environment: ZOOKEEPER_CONNECT: zookeeper:2181 ports: - "9092:9092" - "29092:29092" networks: - kafka-net postgres: image: quay.io/debezium/postgres:16 environment: POSTGRES_USER: <pg_user> POSTGRES_PASSWORD: <pg_password> POSTGRES_DB: postgres ports: - "5432:5432" networks: - kafka-net connect: image: quay.io/debezium/connect:3.0 depends_on: - kafka - postgres environment: BOOTSTRAP_SERVERS: kafka:9092 GROUP_ID: 1 CONFIG_STORAGE_TOPIC: my_connect_configs OFFSET_STORAGE_TOPIC: my_connect_offsets KEY_CONVERTER_SCHEMAS_ENABLE: "true" VALUE_CONVERTER_SCHEMAS_ENABLE: "true" KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter CONNECT_CONSUMER_MAX_POLL_RECORDS: 2000 # increase to improve throughput volumes: - /path/to/your/jdbc/directory/firebolt-jdbc-<version>.jar:/kafka/libs/firebolt-jdbc-<version>.jar ports: - "8083:8083" networks: - kafka-net networks: kafka-net: driver: bridge
- Replace palceholders
/path/to/your/jdbc/directory/
with the directory where you have placed the JDBC driver andfirebolt-jdbc-<version>.jar
with the name of the driver in that directory. - Replace placeholders
<pg_user>
and<pg_password>
with the credentials of your choice. Note them down for later. - Run
docker compose up
in the directory containingdocker-compose.yml
file.
Create a Postgres connector configuration
- Create a postgres connector settings
source-connector.json
file in a directory of your choice. -
Copy and paste the following JSON into this file, replacing the placeholders as described below
{ "name": "postgres-source-connector", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "database.hostname": "postgres", "database.port": "5432", "database.user": "<pg_user>", "database.password": "<pg_password>", "database.dbname" : "postgres", "topic.prefix": "cdc", "table.include.list": "public.demo" } }
Parameter Description name
A name for this connector. connector.class
Connector to use, for Posgres should be io.debezium.connector.postgresql.PostgresConnector
database.hostname
Host name where your database is set up. For this demo postgres
specifies that we’re using a postgres docker compose service for connection.database.port
Port to access the database. Standard Postgres port is 5432
.database.user
Postgres username specified earlier in docker-compose.yml
database.password
Postgres password specified earlier in docker-compose.yml
database.dbname
Database name specified earlier. For this example we can use postgres
.topic.prefix
A unique topic prefix for topics created. Should be unique across connectors. table.include.list
A list of Postgres tables to monitor for changes.
Use the Conect API to add source and sink connectors
-
Make a POST request to set up a source connector with the config file
source-connector.json
set up earlier:curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d @source-connector.json
-
Make a POST request to set up a sink connector with the config file
sink-connector.json
set up earlier:curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d @sink-connector.json
-
Verify the status of the connectors:
curl -X GET http://localhost:8083/connectors/postgres-source-connector/status curl -X GET http://localhost:8083/connectors/firebolt-sink-connector/status
To get more information if something went wrong fetch Kafka Conenct logs via docker logs kafka-connect-1
.
Write data and see it propagated
If everything succeeded you’re now ready to send CDC data throught the Kafka pipeline.
- Connect to Postgres or use the connection established before
docker compose exec postgres psql -U <pg_user> -d postgres
. Usepg_user
andpg_password
specified earlier indocker-compose.yml
-
Create a demo table:
CREATE TABLE demo (id INT, data VARCHAR(255));
-
Write data to the table that’s being monitored:
INSERT INTO demo VALUES (1, 'my data');
- You should shortly see a table created in Firebolt with the name
<topic_name>_<postgres_schema>_<posgres_table_name>
(for our demo it will becdc_public_demo
) and the above data written.
Manual installation
Connect to Apache Kafka
To connect to Apache Kafka, you must download and start a Kafka service, create a Kafka topic, and configure and start Kafka Connect. This guide walks you through setting up Kafka locally. If you’re setting it up in production or in a distributed mode refer to the Kafka documentation.
-
Download the latest Kafka release and extract it:
$ tar -xzf kafka_2.13-3.9.0.tgz $ cd kafka_2.13-3.9.0
-
Start Kafka
a. Start ZooKeeper, a centralized service that coordinates distributed systems and manages Kafka’s metadata, before starting Kafka to ensure proper coordination and cluster integrity, as follows:$ bin/zookeeper-server-start.sh config/zookeeper.properties
b. Next, start a Kafka broker service using the configuration settings in config/server.properties as follows:
$ bin/kafka-server-start.sh config/server.properties
The configuration in config/server.properties contains default settings that come with the Kafka installation package. Once all services have successfully launched, you will have a basic Kafka environment running and ready to use.
-
Create a Kafka Topic
Kafka topics store captured events. The following code example creates a topic named firebolt-cdc-events:$ bin/kafka-topics.sh --create --topic firebolt-cdc-events --bootstrap-server localhost:9092
In the previous example,
firebolt-cdc-events
is the desired topic name andlocalhost:9092
is the URL for the default Kafka server. If you’re running on a different URL or port, replacelocalhost:9092
.(Optional) Use the
--describe
option to verify that the Kafka topic has been created as follows:$ bin/kafka-topics.sh --describe --topic firebolt-cdc-events --bootstrap-server localhost:9092
-
Configure and start Kafka Connect
Kafka Connect is a framework that helps integrate Kafka with systems that are external to Kafka. To use Kafka Connect with Firebolt, you need to modify the Kafka connect properties, install the Debezium and Firebolt JDBC drivers, and create a sink connector for a Debezium sink connector.-
Modify connector properties
Kafka Connect operates with both standalone mode as a single process, and distributed mode. The following code shows how to set up Kafka Connect for standalone mode. For information about distributed mode, see Apache’s documentation for Kafka Connect. Set the following properties inconfig/connect-standalone.properties
file to use JSON converters and specify the plugin path:key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=true value.converter.schemas.enable=true plugin.path=/usr/local/share/kafka/plugins
If your event messages use a different format than JSON, refer to the Connect configuration API.
In the previous code example, ensure that
plugin.path
points to the location that will contain the Debezium plugin, which we’ll download in the next step. - Install the Debezium and Firebolt JDBC drivers
Extract the following files into your Kafka Connect environment:- Download the Debezium JDBC connector
- Copy the driver to the directory you’ve specified in
plugin.path
in the previous step. - Download the latest Firebolt’s JDBC driver (.jar) from Firebolt’s JDBC repository.
- Copy the JDBC driver into
libs
folder within your kafka installation.
-
Configure a sink connector
Use the sink-connector-config.json configuration file for the sink connector created earlier. Copy it intoconfig
directory of your Kafka installtion or note its path for the next step. - Start Kafka Connect Start Kafka Connect in standalone mode by running the following command from your kafka installation folder:
$ bin/connect-standalone.sh config/connect-standalone.properties config/sink-connector-config.json
- The
connect-standalone.properties
file contains Kafka Connect properties includingplugin.path
. - The
sink-connector-config.json
file contains the Debezium sink configuration from the previous step.
-
Sample Event
This section shows you how to send custom events through a Kafka console producer, which is a command-line tool that allows users to manually send messages to a Kafka topic. If you want to set up a connector for capturing database events, use Debezium’s source connector, which continuously monitors a database for changes. If you have a database that’s not supported by Debezium, you can use any method that conforms to the following structure, and Firebolt’s sink connector will be able to consume it. For more information about the expected Kafka message structure, see Debezium’s Data change events documentation.
The following code example starts a Kafka console producer that sends messages to the firebolt-cdc-events topic on a Kafka broker running at localhost:9092:
$ bin/kafka-console-producer.sh --topic firebolt-cdc-events --bootstrap-server localhost:9092
After running this command, the console producer will display a prompt where you can paste a payload or message. Paste the following example of a Kafka message that defines a single integer field “id” with a value 1.
{
"schema": {
"type": "struct",
"name": "connector-name.database-name.table-name.Key",
"optional": false,
"fields": [
{
"name": "id",
"index": "0",
"schema": {
"type": "INT32",
"optional": "false"
}
}
]
},
"payload": {
"id": "1"
}
}
You must paste the message without newlines as a single line as follows:
{"schema":{"type":"struct","name":"connector-name.database-name.table-name.Key","optional":false,"fields":[{"name":"id","index":"0","schema":{"type":"INT32","optional":"false"}}]},"payload":{"id":"1"}}
Sending this message to Kafka routes it to the Debezium sink connector, which creates a table called firebolt_cdc_events
from the topic name after replacing dashes with underscores. Kafka creates a table with a single column id
of type INT
. You can customize table names in your Debezium configuration. Debezium writes the row from the message into this newly created table, and all subsequent messages to this topic must conform to the same schema to be written to the same table.
Configuration
Change Data Capture (CDC)
Debezium is highly effective at capturing and processing CDC (Change Data Capture) events from source databases. When configuring the Firebolt sink connector, you can tailor the setup to handle specific CDC operation types based on your use case.
Append-Only Ingestion
This configuration is ideal for scenarios where you only need to append new records to your Firebolt table. No updates or deletions from the source database will be processed.
Configuration: Add or modify the following keys in the sink configuration:
"insert.mode": "insert",
"primary.key.mode": "none",
"schema.evolution": "basic"
How it works:
- Processes only
INSERT
operations from the source database. - Ignores
UPDATE
andDELETE
operations. - Automatically adapts to new columns in the source schema (basic schema evolution).
- Does not require a primary key.
Use cases:
- Historical logging or audit trails.
- Time-series data ingestion.
- Event streams where updates and deletions are not needed.
Append-Only Ingestion with Deletes
This configuration appends new records to Firebolt tables and also propagates deletions from the source database.
Configuration: Add or modify the following keys in the sink configuration:
"insert.mode": "insert",
"primary.key.mode": "record_key",
"primary.key.fields": "<pk_col>",
"schema.evolution": "basic",
"delete.enabled": "true"
How it works:
- Processes
INSERT
andDELETE
operations. - Ignores
UPDATE
operations. - Automatically adapts to new columns in the source schema (basic schema evolution).
- Requires a primary key to identify records for deletion. This should be set in the source database.
Use cases:
- Event streams where deletions must be reflected in the analytics layer.
- Data warehousing scenarios requiring clean datasets by removing obsolete records.
Append + Update Scenarios
Note: This scenario is currently not supported.
This configuration is used when both new records and updates to existing records need to be processed.
Configuration: Add or modify the following keys in the sink configuration:
"insert.mode": "upsert",
"primary.key.mode": "record_key",
"primary.key.fields": "<pk_col>"
How it works:
- Updates existing records based on the primary key.
- Inserts new records if no matching primary key exists.
- Requires a primary key to uniquely identify records.
Use cases:
- Customer records that need to reflect the latest state.
- Inventory systems requiring updates and new product additions.
Complete CDC Pipeline with Deletes
Note: This scenario is currently not supported.
This configuration handles full CDC operations, including inserts, updates, and deletions.
Configuration: Add or modify the following keys in the sink configuration:
"insert.mode": "upsert",
"primary.key.mode": "record_key",
"primary.key.fields": "<pk_col>",
"delete.enabled": "true"
How it works:
- Processes
INSERT
,UPDATE
, andDELETE
operations. - Requires a primary key to uniquely identify records.
- Deletes records in Firebolt when they are deleted in the source database.
Use cases:
- Full data synchronization between systems.
- Master data management requiring referential integrity.
Throughput Optimization
For high-throughput use cases, such as IoT metrics ingestion, you can optimize the connector configuration to improve performance.
Configuration: Add or modify the following key in the sink configuration:
"batch.size": 5000
Additional Kafka Connect settings: To align with the batch size, adjust the Kafka Connect worker’s consumer.max.poll.records
property. For example, set it to 5000
:
- In Docker Compose: Override the
CONNECT_CONSUMER_MAX_POLL_RECORDS
environment variable. - In manual installations: Update the
consumer.max.poll.records
property in theconnect-standalone.properties
file.
Note: Ensure other Kafka settings, such as fetch.max.bytes
and max.partition.fetch.bytes
, are configured to handle the expected payload size.
For more details, refer to Debezium’s batch support optimization guide.
Limitations
The following limitations apply when integrating Firebolt with Apache Kafka:
- You can only insert or update data in your table using either
insert
orupdate
mode. The"insert.mode":"upsert"
setting is not yet supported. - Append+Update and CDC scenarios are not yet supported due to the limitation above.
Additional resources
- Learn how to setup a database source you can use with Debezium’s source connectors.
- Learn more about Kafka Connect.
- Learn how to customize a Kafka Connect configuration.
- Learn how a Debezium sink works and about its limitations.