> ## Documentation Index
> Fetch the complete documentation index at: https://docs.firebolt.io/llms.txt
> Use this file to discover all available pages before exploring further.

> Reference and syntax for the CREATE STREAM command.

# CREATE STREAM

Creates a stream object that binds to a Kafka topic and defines the schema for incoming messages. Once created, a stream can be read using [`READ_STREAM`](/reference-sql/functions-reference/table-valued/read_stream) to consume data from Kafka into Firebolt.

A stream tracks consumer offsets per partition so that each call to `READ_STREAM` with `consume => true` returns only new messages since the last read. You can inspect offsets through [`information_schema.streams`](/reference-sql/information-schema/streams) and reposition them with `ALTER STREAM`.

## Syntax

```sql theme={"theme":{"light":"css-variables","dark":"css-variables"}}
CREATE STREAM [IF NOT EXISTS] <stream_name> (
    <column_name> <data_type> [NULL | NOT NULL]
    [, <column_name> <data_type> [NULL | NOT NULL] ...]
)
TOPIC = '<topic_name>'
LOCATION = '<location_name>'
TYPE = '<stream_type>'
[DESCRIPTION = '<description>']
```

## Parameters

| Parameter       | Description                                                                                                                                                                                                                            |
| :-------------- | :------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `IF NOT EXISTS` | If a stream with `<stream_name>` already exists, the statement succeeds without creating a new stream. Without this clause, an error occurs if the stream name is already in use.                                                      |
| `<stream_name>` | An identifier for the stream. Must be unique within the database.                                                                                                                                                                      |
| `<column_name>` | The name of a column in the stream schema.                                                                                                                                                                                             |
| `<data_type>`   | The [data type](/reference-sql/data-types) of the column.                                                                                                                                                                              |
| `TOPIC`         | The Kafka topic to bind the stream to. Required.                                                                                                                                                                                       |
| `LOCATION`      | The name of a Kafka [location](/reference-sql/commands/data-definition/create-location) that stores connection details for the Kafka cluster. Required.                                                                                |
| `TYPE`          | Controls how raw Kafka messages are parsed. Required. Must be one of: `JSON`, `CSV`, `RAW_TEXT`, or `BINARY`. See [stream types](/reference-sql/functions-reference/table-valued/read_stream#stream-types) for details on each format. |
| `DESCRIPTION`   | An optional text description for the stream.                                                                                                                                                                                           |

## Stream types

| Type       | Behavior                                                                                                                  |
| :--------- | :------------------------------------------------------------------------------------------------------------------------ |
| `JSON`     | Each Kafka message is parsed as a JSON object. Columns are extracted by name from JSON keys. Missing keys produce `NULL`. |
| `CSV`      | Each Kafka message is treated as a single CSV row. Fields are mapped positionally to the column definitions.              |
| `RAW_TEXT` | Each Kafka message is returned as a single `TEXT` column named `data`.                                                    |
| `BINARY`   | Each Kafka message is returned as a single `BYTEA` column named `data`.                                                   |

<Note>
  For `RAW_TEXT` and `BINARY` stream types, the stream schema should define a single column named `data` with the appropriate type (`TEXT` or `BYTEA`).
</Note>

## Examples

### Create a JSON stream

```sql theme={"theme":{"light":"css-variables","dark":"css-variables"}}
CREATE LOCATION my_kafka WITH (
    SOURCE = KAFKA,
    BROKERS = 'broker1:9092,broker2:9092'
);

CREATE STREAM clickstream (
    user_id INTEGER,
    event_type TEXT,
    page_url TEXT,
    event_time TIMESTAMP
)
TOPIC = 'web_events'
LOCATION = 'my_kafka'
TYPE = 'JSON';
```

### Create a CSV stream

```sql theme={"theme":{"light":"css-variables","dark":"css-variables"}}
CREATE STREAM sensor_readings (
    sensor_id INTEGER,
    temperature NUMERIC(5,2),
    humidity NUMERIC(5,2)
)
TOPIC = 'sensors'
LOCATION = 'my_kafka'
TYPE = 'CSV';
```

### Create a raw text stream

```sql theme={"theme":{"light":"css-variables","dark":"css-variables"}}
CREATE STREAM raw_logs (
    data TEXT
)
TOPIC = 'application_logs'
LOCATION = 'my_kafka'
TYPE = 'RAW_TEXT';
```

You can then parse the raw messages in SQL when reading:

```sql theme={"theme":{"light":"css-variables","dark":"css-variables"}}
SELECT
    SPLIT_PART(data, ' ', 1) AS log_level,
    SPLIT_PART(data, ' ', 2) AS timestamp,
    SUBSTRING(data FROM POSITION(' ' IN data, 2) + 1) AS message
FROM READ_STREAM(STREAM raw_logs, consume => false);
```

### Create a stream with a description

```sql theme={"theme":{"light":"css-variables","dark":"css-variables"}}
CREATE STREAM orders (
    order_id INTEGER,
    customer_name TEXT,
    amount NUMERIC(10,2)
)
TOPIC = 'orders_v2'
LOCATION = 'my_kafka'
TYPE = 'JSON'
DESCRIPTION = 'Order events from the payments service';
```

## Related

* [`READ_STREAM`](/reference-sql/functions-reference/table-valued/read_stream) — Read data from a stream.
* [`ALTER STREAM`](#alter-stream) — Modify stream offsets or schema.
* [`information_schema.streams`](/reference-sql/information-schema/streams) — View stream metadata and consumer offsets.
* [`CREATE LOCATION`](/reference-sql/commands/data-definition/create-location) — Create a location object for Kafka connection details.

<h2 id="alter-stream">
  ALTER STREAM
</h2>

You can modify a stream after creation using `ALTER STREAM`.

### Set partition offset

Reposition a stream's consumer offset on a specific Kafka partition:

```sql theme={"theme":{"light":"css-variables","dark":"css-variables"}}
ALTER STREAM <stream_name> SET PARTITION <partition_id> OFFSET <offset>;
```

| Parameter        | Description                               |
| :--------------- | :---------------------------------------- |
| `<stream_name>`  | The name of the stream to alter.          |
| `<partition_id>` | The Kafka partition number to reposition. |
| `<offset>`       | The new offset position.                  |

**Example:**

```sql theme={"theme":{"light":"css-variables","dark":"css-variables"}}
ALTER STREAM my_stream SET PARTITION 0 OFFSET 100;
```

### Add column

Add a new column to the stream schema:

```sql theme={"theme":{"light":"css-variables","dark":"css-variables"}}
ALTER STREAM <stream_name> ADD COLUMN <column_name> <data_type> [NULL | NOT NULL];
```

**Example:**

```sql theme={"theme":{"light":"css-variables","dark":"css-variables"}}
ALTER STREAM clickstream ADD COLUMN device_type TEXT NULL;
```

## DROP STREAM

Remove a stream object:

```sql theme={"theme":{"light":"css-variables","dark":"css-variables"}}
DROP STREAM [IF EXISTS] <stream_name>;
```

Dropping a stream removes the stream definition and its consumer offset tracking. It does not affect data already ingested into Firebolt tables or the underlying Kafka topic.
