A table-valued function (TVF) that reads data from a Kafka stream. READ_STREAM returns a table with columns defined by the stream’s schema, along with metadata pseudo-columns such as $offset, $partition_id, and $timestamp.
READ_STREAM works with a managed stream object that tracks consumer offsets. When consume is set to true (the default), Firebolt advances the stream’s offsets after reading, so subsequent reads only return new messages.
Prerequisites
Before using READ_STREAM, you need:
- A Kafka location that stores connection details for your Kafka cluster:
CREATE LOCATION my_kafka_location WITH (
SOURCE = KAFKA,
BROKERS = 'broker1:9092,broker2:9092'
);
- A stream object that defines the schema and binds to a Kafka topic:
CREATE STREAM my_stream (
order_id INTEGER,
name TEXT,
amount NUMERIC(10,2)
)
TOPIC = 'my_topic'
LOCATION = 'my_kafka_location'
TYPE = 'JSON';
See CREATE STREAM for the full reference.
Syntax
READ_STREAM (
STREAM <stream_name>
[, <consume>]
[, LIMIT => <limit>]
)
Parameters
| Parameter | Description | Supported input types |
|---|
STREAM | The name of the stream object to read from. Must reference a stream created with CREATE STREAM. | Stream reference |
consume | Whether to advance the stream’s consumer offsets after reading. When true (default), messages are marked as consumed and subsequent reads return only new data. When false, offsets remain unchanged and the same messages can be re-read. Consuming requires a write transaction. | BOOLEAN |
LIMIT | Maximum number of messages to read per Kafka partition. Default: 1000. | INT, BIGINT |
Setting consume to true (the default) makes the query a write operation. Read-only queries, such as standalone SELECT statements outside of CREATE TABLE AS SELECT or INSERT INTO ... SELECT, cannot consume from a stream. Either set consume => false or use READ_STREAM inside a write statement.
Return type
The result is a table whose columns match the stream’s schema, plus the following pseudo-columns:
| Pseudo-column | Type | Description |
|---|
$offset | BIGINT | The Kafka offset of the message within its partition. |
$partition_id | BIGINT | The Kafka partition from which the message was read. |
$timestamp | TIMESTAMP | The Kafka broker timestamp of the message. |
Pseudo-columns are not included in SELECT * by default — reference them explicitly (for example, SELECT $offset, $partition_id, * FROM ...).
Stream types
The TYPE parameter on the stream controls how raw Kafka messages are parsed:
| Type | Behavior |
|---|
JSON | Each Kafka message is parsed as a JSON object. Columns defined in the stream schema are extracted by name from the JSON keys. Missing keys produce NULL. |
CSV | Each Kafka message is treated as a single CSV row. Fields are mapped positionally to the stream’s column definitions. |
RAW_TEXT | Each Kafka message is returned as a single TEXT column named data. Useful for unparsed ingestion where you apply transformations in SQL. |
BINARY | Each Kafka message is returned as a single BYTEA column named data. Useful for binary payloads. |
Consumer offsets
Firebolt tracks consumer offsets per partition for each stream object. You can inspect offsets through information_schema.streams:
SELECT stream_name, offsets
FROM information_schema.streams
WHERE stream_name = 'my_stream';
To manually reposition a stream’s offset on a specific partition, use ALTER STREAM:
ALTER STREAM my_stream SET PARTITION 0 OFFSET 100;
Examples
Read without consuming
Preview the latest unconsumed messages from a stream without advancing offsets:
SELECT *
FROM READ_STREAM(
STREAM my_stream,
consume => false
)
LIMIT 10;
Ingest into a managed table
Consume messages from the stream and insert them into a Firebolt table in a single statement:
CREATE TABLE orders AS
SELECT *
FROM READ_STREAM(STREAM my_stream);
After this statement, the stream’s offsets advance past the consumed messages. Running the same query again returns only messages that arrived after the previous read.
Incremental ingestion
Append new messages to an existing table:
INSERT INTO orders
SELECT *
FROM READ_STREAM(STREAM my_stream);
Apply SQL transformations on the stream data before loading. For a RAW_TEXT stream, you can parse fields from the raw message:
SELECT
STRING_TO_ARRAY(data, ',')[1]::INTEGER AS order_id,
STRING_TO_ARRAY(data, ',')[2] AS name,
STRING_TO_ARRAY(data, ',')[3]::INTEGER AS age
FROM READ_STREAM(STREAM my_raw_stream, consume => false)
WHERE STRING_TO_ARRAY(data, ',')[3]::INTEGER > 25;
Use pseudo-columns
Access Kafka metadata alongside your data:
SELECT
$offset,
$partition_id,
$timestamp,
order_id,
name
FROM READ_STREAM(STREAM my_stream, consume => false)
ORDER BY $offset;
Limit messages per partition
Control how many messages are read from each Kafka partition:
SELECT *
FROM READ_STREAM(
STREAM my_stream,
consume => false,
LIMIT => 500
);