CDC events with Kafka and Debezium
sink-connector.json
file in a directory of your choice.
Parameter | Description |
---|---|
name | A name for this connector. |
connector.class | Set the connector class to Debezium’s sink connector io.debezium.connector.jdbc.JdbcSinkConnector . |
topics | The list of Kafka topics that this connector will be listening to, separated by commas. If Debezium source connector is used to create events then the topics would have a specific naming scheme. For this example you can use cdc.public.demo (see topic.prefix and topic.include.list fields in Create a Postgres connector configuration for reference) |
connection.url | The Firebolt JDBC connection string. merge_prepared_statement_batches here is important to improve the insert performance, see the note below for more information. |
connection.username | The client ID of your Firebolt service account. |
connection.password | THe secret of your Firebolt service account. |
hibernate.dialect | Specifies the SQL dialect for the target database. Set to org.hibernate.dialect.PostgreSQLDialect . |
insert.mode | A Kafka Connect property that defines how data is written to a target system. You can use either insert for new rows, or update to modify existing rows in your Firebolt database. upsert allows combining both insert and update operations. |
primary.key.mode | Specifies how the connector resolves primary key, which is required for "insert.mode": "update" or when "delete.enabled" is set to true . Otherwise set to "none" |
primary.key.fields | Specify which fields to use as primary key columns, separated by commas. This parameter is only used when insert.mode is set to update. |
schema.evolution | Controls how the connector handles schema changes. Use basic for basic schema evolution (adding columns if a new one is encountered or if the name is changed) or none to disable schema evolution entirely. For more information see Schema evolution in Debezium’s documentation. |
delete.enabled | A boolean value that determines whether delete operations from the source database should be propagated to Firebolt. Set to true to enable deletion of rows in Firebolt when they are deleted in the source database. Requires primary.key.mode set to other than none . |
quote.identifiers | A boolean value that determines whether to quote database identifiers (e.g., table and column names). Set to true to ensure compatibility with case-sensitive or reserved keywords in Firebolt. |
merge_prepared_statement_batches
JDBC connection flag is designed to improve performance when multiple rows are updated in the source database either at the same time or in quick succession. It optimizes the INSERT queries by allowing the JDBC driver to combine multiple rows into a single statement that’s executed in one request to Firebolt, eliminating additional network round trips to the server.
docker compose
by default.
We’ll be using debezium’s docker images in order to simplify the installation. You can use your own images or non-debezium images, but there might be some additional setup required. Refer to the docker hub READMEs for more information.
docker-compose.yml
file with the following contents:
/path/to/your/jdbc/directory/
with the directory where you have placed the JDBC driver and firebolt-jdbc-<version>.jar
with the name of the driver in that directory.
<pg_user>
and <pg_password>
with the credentials of your choice. Note them down for later.
docker compose up
in the directory containing docker-compose.yml
file.
source-connector.json
file in a directory of your choice.
Parameter | Description |
---|---|
name | A name for this connector. |
connector.class | Connector to use, for Posgres should be io.debezium.connector.postgresql.PostgresConnector |
database.hostname | Host name where your database is set up. For this demo postgres specifies that we’re using a postgres docker compose service for connection. |
database.port | Port to access the database. Standard Postgres port is 5432 . |
database.user | Postgres username specified earlier in docker-compose.yml |
database.password | Postgres password specified earlier in docker-compose.yml |
database.dbname | Database name specified earlier. For this example we can use postgres . |
topic.prefix | A unique topic prefix for topics created. Should be unique across connectors. |
table.include.list | A list of Postgres tables to monitor for changes. |
source-connector.json
set up earlier:
sink-connector.json
set up earlier:
docker logs kafka-connect-1
.
docker compose exec postgres psql -U <pg_user> -d postgres
. Use pg_user
and pg_password
specified earlier in docker-compose.yml
<topic_name>_<postgres_schema>_<posgres_table_name>
(for our demo it will be cdc_public_demo
) and the above data written.
firebolt-cdc-events
is the desired topic name and localhost:9092
is the URL for the default Kafka server. If you’re running on a different URL or port, replace localhost:9092
.
(Optional) Use the --describe
option to verify that the Kafka topic has been created as follows:
config/connect-standalone.properties
file to use JSON converters and specify the plugin path:
plugin.path
points to the location that will contain the Debezium plugin, which we’ll download in the next step.
plugin.path
in the previous step.libs
folder within your kafka installation.config
directory of your Kafka installtion or note its path for the next step.
connect-standalone.properties
file contains Kafka Connect properties including plugin.path
.sink-connector-config.json
file contains the Debezium sink configuration from the previous step.firebolt_cdc_events
from the topic name after replacing dashes with underscores. Kafka creates a table with a single column id
of type INT
. You can customize table names in your Debezium configuration. Debezium writes the row from the message into this newly created table, and all subsequent messages to this topic must conform to the same schema to be written to the same table.
INSERT
operations from the source database.UPDATE
and DELETE
operations.INSERT
and DELETE
operations.UPDATE
operations.INSERT
, UPDATE
, and DELETE
operations.consumer.max.poll.records
property. For example, set it to 5000
:
CONNECT_CONSUMER_MAX_POLL_RECORDS
environment variable.consumer.max.poll.records
property in the connect-standalone.properties
file.fetch.max.bytes
and max.partition.fetch.bytes
, are configured to handle the expected payload size.
For more details, refer to Debezium’s batch support optimization guide.
insert
or update
mode. The "insert.mode":"upsert"
setting is not yet supported.