> ## Documentation Index
> Fetch the complete documentation index at: https://docs.firebolt.io/llms.txt
> Use this file to discover all available pages before exploring further.

> Reference material for READ_STREAM function

# READ_STREAM

A table-valued function (TVF) that reads data from a [Kafka stream](#prerequisites). `READ_STREAM` returns a table with columns defined by the stream's schema, along with metadata pseudo-columns such as `$offset`, `$partition_id`, and `$timestamp`.

`READ_STREAM` works with a managed stream object that tracks consumer offsets. When `consume` is set to `true` (the default), Firebolt advances the stream's offsets after reading, so subsequent reads only return new messages.

## Prerequisites

Before using `READ_STREAM`, you need:

1. A **Kafka location** that stores connection details for your Kafka cluster:

```sql theme={"theme":{"light":"css-variables","dark":"css-variables"}}
CREATE LOCATION my_kafka_location WITH (
    SOURCE = KAFKA,
    BROKERS = 'broker1:9092,broker2:9092'
);
```

2. A **stream object** that defines the schema and binds to a Kafka topic:

```sql theme={"theme":{"light":"css-variables","dark":"css-variables"}}
CREATE STREAM my_stream (
    order_id INTEGER,
    name TEXT,
    amount NUMERIC(10,2)
)
TOPIC = 'my_topic'
LOCATION = 'my_kafka_location'
TYPE = 'JSON';
```

See [`CREATE STREAM`](/reference-sql/commands/data-definition/create-stream) for the full reference.

## Syntax

```sql theme={"theme":{"light":"css-variables","dark":"css-variables"}}
READ_STREAM (
    STREAM <stream_name>
    [, <consume>]
    [, LIMIT => <limit>]
)
```

## Parameters

| Parameter | Description                                                                                                                                                                                                                                                                            | Supported input types |
| :-------- | :------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | :-------------------- |
| `STREAM`  | The name of the stream object to read from. Must reference a stream created with `CREATE STREAM`.                                                                                                                                                                                      | Stream reference      |
| `consume` | Whether to advance the stream's consumer offsets after reading. When `true` (default), messages are marked as consumed and subsequent reads return only new data. When `false`, offsets remain unchanged and the same messages can be re-read. Consuming requires a write transaction. | `BOOLEAN`             |
| `LIMIT`   | Maximum number of messages to read per Kafka partition. Default: `1000`.                                                                                                                                                                                                               | `INT`, `BIGINT`       |

<Note>
  Setting `consume` to `true` (the default) makes the query a write operation. Read-only queries, such as standalone `SELECT` statements outside of `CREATE TABLE AS SELECT` or `INSERT INTO ... SELECT`, cannot consume from a stream. Either set `consume => false` or use `READ_STREAM` inside a write statement.
</Note>

## Return type

The result is a table whose columns match the stream's schema, plus the following pseudo-columns:

| Pseudo-column   | Type        | Description                                           |
| :-------------- | :---------- | :---------------------------------------------------- |
| `$offset`       | `BIGINT`    | The Kafka offset of the message within its partition. |
| `$partition_id` | `BIGINT`    | The Kafka partition from which the message was read.  |
| `$timestamp`    | `TIMESTAMP` | The Kafka broker timestamp of the message.            |

Pseudo-columns are not included in `SELECT *` by default — reference them explicitly (for example, `SELECT $offset, $partition_id, * FROM ...`).

## Stream types

The `TYPE` parameter on the stream controls how raw Kafka messages are parsed:

| Type       | Behavior                                                                                                                                                   |
| :--------- | :--------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `JSON`     | Each Kafka message is parsed as a JSON object. Columns defined in the stream schema are extracted by name from the JSON keys. Missing keys produce `NULL`. |
| `CSV`      | Each Kafka message is treated as a single CSV row. Fields are mapped positionally to the stream's column definitions.                                      |
| `RAW_TEXT` | Each Kafka message is returned as a single `TEXT` column named `data`. Useful for unparsed ingestion where you apply transformations in SQL.               |
| `BINARY`   | Each Kafka message is returned as a single `BYTEA` column named `data`. Useful for binary payloads.                                                        |

## Consumer offsets

Firebolt tracks consumer offsets per partition for each stream object. You can inspect offsets through [`information_schema.streams`](/reference-sql/information-schema/streams):

```sql theme={"theme":{"light":"css-variables","dark":"css-variables"}}
SELECT stream_name, offsets
FROM information_schema.streams
WHERE stream_name = 'my_stream';
```

To manually reposition a stream's offset on a specific partition, use `ALTER STREAM`:

```sql theme={"theme":{"light":"css-variables","dark":"css-variables"}}
ALTER STREAM my_stream SET PARTITION 0 OFFSET 100;
```

## Examples

### Read without consuming

Preview the latest unconsumed messages from a stream without advancing offsets:

```sql theme={"theme":{"light":"css-variables","dark":"css-variables"}}
SELECT *
FROM READ_STREAM(
    STREAM my_stream,
    consume => false
)
LIMIT 10;
```

### Ingest into a managed table

Consume messages from the stream and insert them into a Firebolt table in a single statement:

```sql theme={"theme":{"light":"css-variables","dark":"css-variables"}}
CREATE TABLE orders AS
SELECT *
FROM READ_STREAM(STREAM my_stream);
```

After this statement, the stream's offsets advance past the consumed messages. Running the same query again returns only messages that arrived after the previous read.

### Incremental ingestion

Append new messages to an existing table:

```sql theme={"theme":{"light":"css-variables","dark":"css-variables"}}
INSERT INTO orders
SELECT *
FROM READ_STREAM(STREAM my_stream);
```

### Filter and transform

Apply SQL transformations on the stream data before loading. For a `RAW_TEXT` stream, you can parse fields from the raw message:

```sql theme={"theme":{"light":"css-variables","dark":"css-variables"}}
SELECT
    STRING_TO_ARRAY(data, ',')[1]::INTEGER AS order_id,
    STRING_TO_ARRAY(data, ',')[2] AS name,
    STRING_TO_ARRAY(data, ',')[3]::INTEGER AS age
FROM READ_STREAM(STREAM my_raw_stream, consume => false)
WHERE STRING_TO_ARRAY(data, ',')[3]::INTEGER > 25;
```

### Use pseudo-columns

Access Kafka metadata alongside your data:

```sql theme={"theme":{"light":"css-variables","dark":"css-variables"}}
SELECT
    $offset,
    $partition_id,
    $timestamp,
    order_id,
    name
FROM READ_STREAM(STREAM my_stream, consume => false)
ORDER BY $offset;
```

### Limit messages per partition

Control how many messages are read from each Kafka partition:

```sql theme={"theme":{"light":"css-variables","dark":"css-variables"}}
SELECT *
FROM READ_STREAM(
    STREAM my_stream,
    consume => false,
    LIMIT => 500
);
```
