Creates a stream object that binds to a Kafka topic and defines the schema for incoming messages. Once created, a stream can be read using READ_STREAM to consume data from Kafka into Firebolt.
A stream tracks consumer offsets per partition so that each call to READ_STREAM with consume => true returns only new messages since the last read. You can inspect offsets through information_schema.streams and reposition them with ALTER STREAM.
Syntax
CREATE STREAM [IF NOT EXISTS] <stream_name> (
<column_name> <data_type> [NULL | NOT NULL]
[, <column_name> <data_type> [NULL | NOT NULL] ...]
)
TOPIC = '<topic_name>'
LOCATION = '<location_name>'
TYPE = '<stream_type>'
[DESCRIPTION = '<description>']
Parameters
| Parameter | Description |
|---|
IF NOT EXISTS | If a stream with <stream_name> already exists, the statement succeeds without creating a new stream. Without this clause, an error occurs if the stream name is already in use. |
<stream_name> | An identifier for the stream. Must be unique within the database. |
<column_name> | The name of a column in the stream schema. |
<data_type> | The data type of the column. |
TOPIC | The Kafka topic to bind the stream to. Required. |
LOCATION | The name of a Kafka location that stores connection details for the Kafka cluster. Required. |
TYPE | Controls how raw Kafka messages are parsed. Required. Must be one of: JSON, CSV, RAW_TEXT, or BINARY. See stream types for details on each format. |
DESCRIPTION | An optional text description for the stream. |
Stream types
| Type | Behavior |
|---|
JSON | Each Kafka message is parsed as a JSON object. Columns are extracted by name from JSON keys. Missing keys produce NULL. |
CSV | Each Kafka message is treated as a single CSV row. Fields are mapped positionally to the column definitions. |
RAW_TEXT | Each Kafka message is returned as a single TEXT column named data. |
BINARY | Each Kafka message is returned as a single BYTEA column named data. |
For RAW_TEXT and BINARY stream types, the stream schema should define a single column named data with the appropriate type (TEXT or BYTEA).
Examples
Create a JSON stream
CREATE LOCATION my_kafka WITH (
SOURCE = KAFKA,
BROKERS = 'broker1:9092,broker2:9092'
);
CREATE STREAM clickstream (
user_id INTEGER,
event_type TEXT,
page_url TEXT,
event_time TIMESTAMP
)
TOPIC = 'web_events'
LOCATION = 'my_kafka'
TYPE = 'JSON';
Create a CSV stream
CREATE STREAM sensor_readings (
sensor_id INTEGER,
temperature NUMERIC(5,2),
humidity NUMERIC(5,2)
)
TOPIC = 'sensors'
LOCATION = 'my_kafka'
TYPE = 'CSV';
Create a raw text stream
CREATE STREAM raw_logs (
data TEXT
)
TOPIC = 'application_logs'
LOCATION = 'my_kafka'
TYPE = 'RAW_TEXT';
You can then parse the raw messages in SQL when reading:
SELECT
SPLIT_PART(data, ' ', 1) AS log_level,
SPLIT_PART(data, ' ', 2) AS timestamp,
SUBSTRING(data FROM POSITION(' ' IN data, 2) + 1) AS message
FROM READ_STREAM(STREAM raw_logs, consume => false);
Create a stream with a description
CREATE STREAM orders (
order_id INTEGER,
customer_name TEXT,
amount NUMERIC(10,2)
)
TOPIC = 'orders_v2'
LOCATION = 'my_kafka'
TYPE = 'JSON'
DESCRIPTION = 'Order events from the payments service';
ALTER STREAM
You can modify a stream after creation using ALTER STREAM.
Set partition offset
Reposition a stream’s consumer offset on a specific Kafka partition:
ALTER STREAM <stream_name> SET PARTITION <partition_id> OFFSET <offset>;
| Parameter | Description |
|---|
<stream_name> | The name of the stream to alter. |
<partition_id> | The Kafka partition number to reposition. |
<offset> | The new offset position. |
Example:
ALTER STREAM my_stream SET PARTITION 0 OFFSET 100;
Add column
Add a new column to the stream schema:
ALTER STREAM <stream_name> ADD COLUMN <column_name> <data_type> [NULL | NOT NULL];
Example:
ALTER STREAM clickstream ADD COLUMN device_type TEXT NULL;
DROP STREAM
Remove a stream object:
DROP STREAM [IF EXISTS] <stream_name>;
Dropping a stream removes the stream definition and its consumer offset tracking. It does not affect data already ingested into Firebolt tables or the underlying Kafka topic.