Skip to main content
A table-valued function (TVF) that reads data from a Kafka stream. READ_STREAM returns a table with columns defined by the stream’s schema, along with metadata pseudo-columns such as $offset, $partition_id, and $timestamp. READ_STREAM works with a managed stream object that tracks consumer offsets. When consume is set to true (the default), Firebolt advances the stream’s offsets after reading, so subsequent reads only return new messages.

Prerequisites

Before using READ_STREAM, you need:
  1. A Kafka location that stores connection details for your Kafka cluster:
CREATE LOCATION my_kafka_location WITH (
    SOURCE = KAFKA,
    BROKERS = 'broker1:9092,broker2:9092'
);
  1. A stream object that defines the schema and binds to a Kafka topic:
CREATE STREAM my_stream (
    order_id INTEGER,
    name TEXT,
    amount NUMERIC(10,2)
)
TOPIC = 'my_topic'
LOCATION = 'my_kafka_location'
TYPE = 'JSON';
See CREATE STREAM for the full reference.

Syntax

READ_STREAM (
    STREAM <stream_name>
    [, <consume>]
    [, LIMIT => <limit>]
)

Parameters

ParameterDescriptionSupported input types
STREAMThe name of the stream object to read from. Must reference a stream created with CREATE STREAM.Stream reference
consumeWhether to advance the stream’s consumer offsets after reading. When true (default), messages are marked as consumed and subsequent reads return only new data. When false, offsets remain unchanged and the same messages can be re-read. Consuming requires a write transaction.BOOLEAN
LIMITMaximum number of messages to read per Kafka partition. Default: 1000.INT, BIGINT
Setting consume to true (the default) makes the query a write operation. Read-only queries, such as standalone SELECT statements outside of CREATE TABLE AS SELECT or INSERT INTO ... SELECT, cannot consume from a stream. Either set consume => false or use READ_STREAM inside a write statement.

Return type

The result is a table whose columns match the stream’s schema, plus the following pseudo-columns:
Pseudo-columnTypeDescription
$offsetBIGINTThe Kafka offset of the message within its partition.
$partition_idBIGINTThe Kafka partition from which the message was read.
$timestampTIMESTAMPThe Kafka broker timestamp of the message.
Pseudo-columns are not included in SELECT * by default — reference them explicitly (for example, SELECT $offset, $partition_id, * FROM ...).

Stream types

The TYPE parameter on the stream controls how raw Kafka messages are parsed:
TypeBehavior
JSONEach Kafka message is parsed as a JSON object. Columns defined in the stream schema are extracted by name from the JSON keys. Missing keys produce NULL.
CSVEach Kafka message is treated as a single CSV row. Fields are mapped positionally to the stream’s column definitions.
RAW_TEXTEach Kafka message is returned as a single TEXT column named data. Useful for unparsed ingestion where you apply transformations in SQL.
BINARYEach Kafka message is returned as a single BYTEA column named data. Useful for binary payloads.

Consumer offsets

Firebolt tracks consumer offsets per partition for each stream object. You can inspect offsets through information_schema.streams:
SELECT stream_name, offsets
FROM information_schema.streams
WHERE stream_name = 'my_stream';
To manually reposition a stream’s offset on a specific partition, use ALTER STREAM:
ALTER STREAM my_stream SET PARTITION 0 OFFSET 100;

Examples

Read without consuming

Preview the latest unconsumed messages from a stream without advancing offsets:
SELECT *
FROM READ_STREAM(
    STREAM my_stream,
    consume => false
)
LIMIT 10;

Ingest into a managed table

Consume messages from the stream and insert them into a Firebolt table in a single statement:
CREATE TABLE orders AS
SELECT *
FROM READ_STREAM(STREAM my_stream);
After this statement, the stream’s offsets advance past the consumed messages. Running the same query again returns only messages that arrived after the previous read.

Incremental ingestion

Append new messages to an existing table:
INSERT INTO orders
SELECT *
FROM READ_STREAM(STREAM my_stream);

Filter and transform

Apply SQL transformations on the stream data before loading. For a RAW_TEXT stream, you can parse fields from the raw message:
SELECT
    STRING_TO_ARRAY(data, ',')[1]::INTEGER AS order_id,
    STRING_TO_ARRAY(data, ',')[2] AS name,
    STRING_TO_ARRAY(data, ',')[3]::INTEGER AS age
FROM READ_STREAM(STREAM my_raw_stream, consume => false)
WHERE STRING_TO_ARRAY(data, ',')[3]::INTEGER > 25;

Use pseudo-columns

Access Kafka metadata alongside your data:
SELECT
    $offset,
    $partition_id,
    $timestamp,
    order_id,
    name
FROM READ_STREAM(STREAM my_stream, consume => false)
ORDER BY $offset;

Limit messages per partition

Control how many messages are read from each Kafka partition:
SELECT *
FROM READ_STREAM(
    STREAM my_stream,
    consume => false,
    LIMIT => 500
);