Skip to main content
Creates a stream object that binds to a Kafka topic and defines the schema for incoming messages. Once created, a stream can be read using READ_STREAM to consume data from Kafka into Firebolt. A stream tracks consumer offsets per partition so that each call to READ_STREAM with consume => true returns only new messages since the last read. You can inspect offsets through information_schema.streams and reposition them with ALTER STREAM.

Syntax

CREATE STREAM [IF NOT EXISTS] <stream_name> (
    <column_name> <data_type> [NULL | NOT NULL]
    [, <column_name> <data_type> [NULL | NOT NULL] ...]
)
TOPIC = '<topic_name>'
LOCATION = '<location_name>'
TYPE = '<stream_type>'
[DESCRIPTION = '<description>']

Parameters

ParameterDescription
IF NOT EXISTSIf a stream with <stream_name> already exists, the statement succeeds without creating a new stream. Without this clause, an error occurs if the stream name is already in use.
<stream_name>An identifier for the stream. Must be unique within the database.
<column_name>The name of a column in the stream schema.
<data_type>The data type of the column.
TOPICThe Kafka topic to bind the stream to. Required.
LOCATIONThe name of a Kafka location that stores connection details for the Kafka cluster. Required.
TYPEControls how raw Kafka messages are parsed. Required. Must be one of: JSON, CSV, RAW_TEXT, or BINARY. See stream types for details on each format.
DESCRIPTIONAn optional text description for the stream.

Stream types

TypeBehavior
JSONEach Kafka message is parsed as a JSON object. Columns are extracted by name from JSON keys. Missing keys produce NULL.
CSVEach Kafka message is treated as a single CSV row. Fields are mapped positionally to the column definitions.
RAW_TEXTEach Kafka message is returned as a single TEXT column named data.
BINARYEach Kafka message is returned as a single BYTEA column named data.
For RAW_TEXT and BINARY stream types, the stream schema should define a single column named data with the appropriate type (TEXT or BYTEA).

Examples

Create a JSON stream

CREATE LOCATION my_kafka WITH (
    SOURCE = KAFKA,
    BROKERS = 'broker1:9092,broker2:9092'
);

CREATE STREAM clickstream (
    user_id INTEGER,
    event_type TEXT,
    page_url TEXT,
    event_time TIMESTAMP
)
TOPIC = 'web_events'
LOCATION = 'my_kafka'
TYPE = 'JSON';

Create a CSV stream

CREATE STREAM sensor_readings (
    sensor_id INTEGER,
    temperature NUMERIC(5,2),
    humidity NUMERIC(5,2)
)
TOPIC = 'sensors'
LOCATION = 'my_kafka'
TYPE = 'CSV';

Create a raw text stream

CREATE STREAM raw_logs (
    data TEXT
)
TOPIC = 'application_logs'
LOCATION = 'my_kafka'
TYPE = 'RAW_TEXT';
You can then parse the raw messages in SQL when reading:
SELECT
    SPLIT_PART(data, ' ', 1) AS log_level,
    SPLIT_PART(data, ' ', 2) AS timestamp,
    SUBSTRING(data FROM POSITION(' ' IN data, 2) + 1) AS message
FROM READ_STREAM(STREAM raw_logs, consume => false);

Create a stream with a description

CREATE STREAM orders (
    order_id INTEGER,
    customer_name TEXT,
    amount NUMERIC(10,2)
)
TOPIC = 'orders_v2'
LOCATION = 'my_kafka'
TYPE = 'JSON'
DESCRIPTION = 'Order events from the payments service';

ALTER STREAM

You can modify a stream after creation using ALTER STREAM.

Set partition offset

Reposition a stream’s consumer offset on a specific Kafka partition:
ALTER STREAM <stream_name> SET PARTITION <partition_id> OFFSET <offset>;
ParameterDescription
<stream_name>The name of the stream to alter.
<partition_id>The Kafka partition number to reposition.
<offset>The new offset position.
Example:
ALTER STREAM my_stream SET PARTITION 0 OFFSET 100;

Add column

Add a new column to the stream schema:
ALTER STREAM <stream_name> ADD COLUMN <column_name> <data_type> [NULL | NOT NULL];
Example:
ALTER STREAM clickstream ADD COLUMN device_type TEXT NULL;

DROP STREAM

Remove a stream object:
DROP STREAM [IF EXISTS] <stream_name>;
Dropping a stream removes the stream definition and its consumer offset tracking. It does not affect data already ingested into Firebolt tables or the underlying Kafka topic.