Contents
- What is ksqlDB
- Running ksqlDB
- Streams & Tables
- Querying Data
- Filtering & Transforming
- Aggregations & Windowing
- Stream-Table Joins
- Persistent Queries & Connectors
- REST API
- ksqlDB vs Kafka Streams
ksqlDB is a purpose-built streaming database that sits on top of Apache Kafka. Under the hood every ksqlDB query compiles down to a
Kafka Streams topology, so you get the same fault-tolerance, scalability, and exactly-once guarantees — expressed
entirely in SQL rather than Java.
- Reads from and writes to Kafka topics, treating them as tables or event streams.
- Supports two query modes: push queries that stream results continuously, and pull queries that return a point-in-time snapshot.
- Materializes aggregation results in local state stores (backed by RocksDB + Kafka changelog topics) that are directly queryable.
- Ships with built-in source and sink connectors via Kafka Connect integration.
- Exposes a REST API and a Java/Scala client in addition to the interactive CLI.
Push Queries vs Pull Queries
| Query Type | Syntax marker | Behaviour | Typical use-case |
| Push query |
EMIT CHANGES |
Keeps the connection open and streams new results as events arrive |
Live dashboards, alerting, event-driven microservices |
| Pull query |
No EMIT CHANGES |
Returns current materialized state immediately and closes |
On-demand lookups, REST endpoints, ad-hoc exploration |
Streams vs Tables
| Abstraction | Semantics | Kafka analogy |
| STREAM |
An unbounded sequence of immutable events — every message is a new fact |
Append-only topic; retains all messages |
| TABLE |
A mutable, keyed view — each new message for a key is an upsert; the latest value wins |
Compacted topic; one record per key |
ksqlDB vs Kafka Streams API — Code vs SQL tradeoff
| ksqlDB | Kafka Streams API |
| Language | SQL | Java / Scala |
| Deployment | Standalone server; queries sent over REST/CLI | Embedded in your application JVM |
| State management | Managed automatically by ksqlDB server | Full control via state stores and interactive queries |
| Flexibility | Limited to what SQL can express | Arbitrary business logic, custom serdes, processor API |
| Best for | Analytics, filtering, aggregation, dashboards | Complex event-driven workflows, custom serializiation, ML pipelines |
Both can coexist in the same Kafka cluster. ksqlDB is ideal for analytics and rapid iteration; the Kafka Streams API gives you full programmatic control when SQL is not expressive enough.
The quickest way to get ksqlDB running locally is Docker Compose. The stack below starts a single-node Kafka broker (KRaft mode),
a ksqlDB server, and the interactive ksqlDB CLI.
version: "3.8"
services:
broker:
image: confluentinc/cp-kafka:7.6.0
hostname: broker
container_name: broker
ports:
- "9092:9092"
environment:
KAFKA_NODE_ID: 1
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092
KAFKA_LISTENERS: PLAINTEXT://broker:9092,CONTROLLER://broker:9093
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@broker:9093
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
CLUSTER_ID: "MkU3OEVBNTcwNTJENDM2Qg"
ksqldb-server:
image: confluentinc/ksqldb-server:0.29.0
hostname: ksqldb-server
container_name: ksqldb-server
depends_on:
- broker
ports:
- "8088:8088"
environment:
KSQL_LISTENERS: http://0.0.0.0:8088
KSQL_BOOTSTRAP_SERVERS: broker:9092
KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
ksqldb-cli:
image: confluentinc/ksqldb-cli:0.29.0
container_name: ksqldb-cli
depends_on:
- ksqldb-server
entrypoint: /bin/sh
tty: true
Start the stack and connect to the CLI:
# Start all services
docker compose up -d
# Connect the ksqlDB CLI to the server
docker exec -it ksqldb-cli ksql http://ksqldb-server:8088
Once inside the CLI you will see the ksqlDB prompt:
===========================================
= _ _ ____ ____ =
= | | _____ __ _| | _ \| __ ) =
= | |/ / __|/ _` | | | | | _ \ =
= | <\__ \ (_| | | |_| | |_) | =
= |_|\_\___/\__, |_|____/|____/ =
= |_| =
= The Database purpose-built =
= for stream processing apps =
===========================================
ksql>
Set the auto-offset-reset property inside the CLI so push queries read from the beginning of a topic during development: SET 'auto.offset.reset' = 'earliest';
Before you can query data in ksqlDB you must register a STREAM or TABLE on top of an existing Kafka topic.
The statement does not move or copy data — it only creates metadata describing the topic's schema and format.
CREATE STREAM — order events
-- Register a stream on the raw orders topic
CREATE STREAM orders_stream (
order_id VARCHAR KEY,
customer_id VARCHAR,
region VARCHAR,
product VARCHAR,
amount DOUBLE,
order_ts BIGINT
) WITH (
KAFKA_TOPIC = 'orders',
VALUE_FORMAT = 'JSON',
TIMESTAMP = 'order_ts'
);
CREATE TABLE — customer lookup
-- Register a table on the customers compacted topic
-- Each customer_id maps to the latest customer record
CREATE TABLE customers_table (
customer_id VARCHAR PRIMARY KEY,
full_name VARCHAR,
email VARCHAR,
tier VARCHAR
) WITH (
KAFKA_TOPIC = 'customers',
VALUE_FORMAT = 'JSON'
);
Inspecting Streams and Tables
-- List all registered streams
SHOW STREAMS;
-- List all registered tables
SHOW TABLES;
-- Show the schema and properties of a stream
DESCRIBE orders_stream;
-- Show the schema plus runtime statistics
DESCRIBE EXTENDED orders_stream;
The VALUE_FORMAT can be JSON, AVRO, PROTOBUF, or KAFKA (raw bytes). For Avro and Protobuf, point ksqlDB at your Schema Registry by setting KSQL_KSQL_SCHEMA_REGISTRY_URL in the server config.
Push Query — stream results as they arrive
Add EMIT CHANGES to turn any SELECT into a push query. The connection stays open and ksqlDB pushes
new rows to the client every time a matching event arrives on the topic.
-- Continuously stream all incoming orders
SELECT order_id, customer_id, region, amount
FROM orders_stream
EMIT CHANGES;
-- Push query with a filter
SELECT order_id, region, amount
FROM orders_stream
WHERE region = 'WEST'
EMIT CHANGES;
Pull Query — point-in-time snapshot from a materialized table
Pull queries execute against a materialized table (one created by a persistent aggregation query).
They return immediately with the current state — no EMIT CHANGES.
-- Pull the current order count for the EAST region
-- (assumes orders_by_region table has been materialized — see Aggregations section)
SELECT region, order_count, total_revenue
FROM orders_by_region
WHERE region = 'EAST';
PRINT — inspect a Kafka topic directly
-- Print all messages from the beginning of a topic
PRINT 'orders' FROM BEGINNING;
-- Print with a row limit
PRINT 'orders' FROM BEGINNING LIMIT 10;
-- Print only new messages arriving from now
PRINT 'orders';
PRINT bypasses stream/table registration and reads the raw Kafka topic. It is useful for inspecting topics that have not yet been registered as a stream or table.
ksqlDB supports a rich set of scalar functions for transforming events inline.
The result of a CREATE STREAM ... AS SELECT is a new persistent stream backed by a new Kafka topic.
Basic filtering and projection
-- Create a derived stream containing only high-value orders from the WEST region
CREATE STREAM west_high_value_orders AS
SELECT
order_id,
customer_id,
region,
UCASE(product) AS product_upper,
amount,
order_ts
FROM orders_stream
WHERE region = 'WEST'
AND amount > 500.0
EMIT CHANGES;
Type casting and CASE WHEN
-- Classify orders into tiers using CASE WHEN
CREATE STREAM orders_with_tier AS
SELECT
order_id,
customer_id,
region,
amount,
CAST(amount AS INTEGER) AS amount_int,
CASE
WHEN amount >= 1000 THEN 'PLATINUM'
WHEN amount >= 500 THEN 'GOLD'
WHEN amount >= 100 THEN 'SILVER'
ELSE 'BRONZE'
END AS order_tier
FROM orders_stream
EMIT CHANGES;
String functions — LCASE, UCASE, SUBSTRING
-- Normalize region to lowercase, extract first 3 chars of order_id as prefix
CREATE STREAM orders_normalized AS
SELECT
order_id,
LCASE(region) AS region_lower,
SUBSTRING(order_id, 1, 3) AS order_prefix,
UCASE(product) AS product_name
FROM orders_stream
EMIT CHANGES;
Extracting fields from nested JSON — EXTRACTJSONFIELD
-- Assume the 'metadata' field contains a JSON string like:
-- {"source":"web","campaign":"spring_sale"}
CREATE STREAM orders_with_source AS
SELECT
order_id,
amount,
EXTRACTJSONFIELD(metadata, '$.source') AS order_source,
EXTRACTJSONFIELD(metadata, '$.campaign') AS campaign
FROM orders_raw_stream
EMIT CHANGES;
AS_VALUE — promote a key column into the value
-- order_id is the message KEY; AS_VALUE copies it into the value payload
CREATE STREAM orders_with_id_in_value AS
SELECT
AS_VALUE(order_id) AS order_id,
customer_id,
region,
amount
FROM orders_stream
EMIT CHANGES;
Columns declared as KEY are part of the Kafka message key, not the value. Use AS_VALUE() when a downstream consumer needs the key available inside the JSON value payload.
Aggregating a stream with GROUP BY always produces a TABLE — ksqlDB materializes the
rolling aggregate into a state store. The table is continuously updated as new events arrive.
Non-windowed aggregation — lifetime totals per region
-- Lifetime order count and revenue per region (unbounded aggregation)
CREATE TABLE region_totals AS
SELECT
region,
COUNT(*) AS order_count,
SUM(amount) AS total_revenue,
LATEST_BY_OFFSET(order_id) AS latest_order_id
FROM orders_stream
GROUP BY region
EMIT CHANGES;
WINDOW TUMBLING — 5-minute non-overlapping windows
This is the core of the real-time order analytics pipeline: aggregate order events by region in
5-minute tumbling windows.
-- Orders aggregated by region in 5-minute tumbling windows
CREATE TABLE orders_by_region AS
SELECT
region AS region,
COUNT(*) AS order_count,
SUM(amount) AS total_revenue,
WINDOWSTART AS window_start,
WINDOWEND AS window_end
FROM orders_stream
WINDOW TUMBLING (SIZE 5 MINUTES)
GROUP BY region
EMIT CHANGES;
WINDOW HOPPING — overlapping windows
-- 10-minute hopping window that advances every 2 minutes
-- Each order will appear in multiple windows (10/2 = 5 windows)
CREATE TABLE orders_hopping AS
SELECT
region,
COUNT(*) AS order_count,
SUM(amount) AS total_revenue
FROM orders_stream
WINDOW HOPPING (SIZE 10 MINUTES, ADVANCE BY 2 MINUTES)
GROUP BY region
EMIT CHANGES;
WINDOW SESSION — activity-based windows
-- Session window: group orders from the same customer
-- if they arrive within 30 minutes of each other
CREATE TABLE customer_sessions AS
SELECT
customer_id,
COUNT(*) AS orders_in_session,
SUM(amount) AS session_revenue
FROM orders_stream
WINDOW SESSION (30 MINUTES)
GROUP BY customer_id
EMIT CHANGES;
Windowing comparison
| Window type | Overlapping? | Boundaries | Use-case |
| TUMBLING | No | Fixed, non-overlapping | 5-min metrics, billing periods |
| HOPPING | Yes | Fixed size, configurable advance | Rolling averages, sliding metrics |
| SESSION | No | Activity-based, gap-driven | User sessions, clickstream grouping |
Use WINDOWSTART and WINDOWEND pseudo-columns to include the window boundaries in your output. They return millisecond epoch timestamps.
A common enrichment pattern is joining a stream of events against a lookup table to add dimension data.
In ksqlDB, a stream-table join produces a new enriched stream — each event from the stream is looked up
against the latest value in the table at the time of the event.
Enrich orders with customer data
-- Join the order stream with the customers table on customer_id
-- The result is a new stream where each order row is enriched with customer info
CREATE STREAM orders_enriched AS
SELECT
o.order_id,
o.customer_id,
c.full_name AS customer_name,
c.email AS customer_email,
c.tier AS customer_tier,
o.region,
o.product,
o.amount,
o.order_ts
FROM orders_stream o
LEFT JOIN customers_table c
ON o.customer_id = c.customer_id
EMIT CHANGES;
Aggregate the enriched stream — orders per region per tier
-- After enrichment, aggregate by region AND customer tier in 5-min tumbling windows
CREATE TABLE orders_region_tier AS
SELECT
region,
customer_tier,
COUNT(*) AS order_count,
SUM(amount) AS total_revenue,
WINDOWSTART AS window_start,
WINDOWEND AS window_end
FROM orders_enriched
WINDOW TUMBLING (SIZE 5 MINUTES)
GROUP BY region, customer_tier
EMIT CHANGES;
Foreign-key table-table join
-- Join two tables on a non-primary key (foreign key join)
-- Requires ksqlDB 0.24+ with the FKTABLE syntax
CREATE TABLE order_summary AS
SELECT
o.order_id,
o.amount,
o.region,
c.full_name AS customer_name,
c.tier AS customer_tier
FROM orders_by_region o
JOIN customers_table c
ON o.customer_id = c.customer_id;
For a stream-table join to work, both sides must be co-partitioned — the stream and the table must have the same number of partitions and the same partitioning key (customer_id in the example above). Use PARTITION BY to repartition a stream before joining if needed.
Every CREATE STREAM ... AS SELECT or CREATE TABLE ... AS SELECT statement launches a
persistent query — a continuously running Kafka Streams topology managed by the ksqlDB server.
These queries survive restarts and run until explicitly terminated.
Managing persistent queries
-- List all running persistent queries
SHOW QUERIES;
-- Show details of a specific query
EXPLAIN CSAS_ORDERS_ENRICHED_0;
-- Pause a query without deleting it
PAUSE CSAS_ORDERS_ENRICHED_0;
-- Resume a paused query
RESUME CSAS_ORDERS_ENRICHED_0;
-- Terminate (stop and delete) a persistent query
TERMINATE CSAS_ORDERS_ENRICHED_0;
-- Drop the stream or table AND its underlying Kafka topic
DROP STREAM IF EXISTS orders_enriched DELETE TOPIC;
DROP TABLE IF EXISTS orders_by_region DELETE TOPIC;
Source Connector — ingest from a database into Kafka
-- Create a JDBC source connector to read from a PostgreSQL orders table
CREATE SOURCE CONNECTOR jdbc_orders_source WITH (
'connector.class' = 'io.confluent.connect.jdbc.JdbcSourceConnector',
'connection.url' = 'jdbc:postgresql://db-host:5432/sales',
'connection.user' = 'kafka_user',
'connection.password' = 'secret',
'table.whitelist' = 'orders',
'mode' = 'timestamp+incrementing',
'timestamp.column.name' = 'updated_at',
'incrementing.column.name' = 'id',
'topic.prefix' = 'pg_',
'key.converter' = 'org.apache.kafka.connect.storage.StringConverter',
'value.converter' = 'org.apache.kafka.connect.json.JsonConverter'
);
Sink Connector — write ksqlDB results to Elasticsearch
-- Write the orders_by_region materialized table to Elasticsearch
CREATE SINK CONNECTOR es_orders_sink WITH (
'connector.class' = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
'connection.url' = 'http://elasticsearch:9200',
'topics' = 'ORDERS_BY_REGION',
'type.name' = '_doc',
'key.ignore' = 'false',
'schema.ignore' = 'true',
'value.converter' = 'org.apache.kafka.connect.json.JsonConverter',
'value.converter.schemas.enable' = 'false'
);
Listing and dropping connectors
SHOW CONNECTORS;
DROP CONNECTOR jdbc_orders_source;
DROP CONNECTOR es_orders_sink;
ksqlDB manages connectors through the embedded Kafka Connect worker. You must include Kafka Connect on the ksqlDB server classpath (or use the Confluent Platform image) to use CREATE SOURCE/SINK CONNECTOR.
ksqlDB exposes an HTTP/2 REST API on port 8088. All CLI operations are available programmatically,
making it easy to integrate ksqlDB into CI pipelines, microservices, or dashboards.
POST /ksql — run DDL and management statements
# Create a stream via REST
curl -s -X POST http://localhost:8088/ksql \
-H "Content-Type: application/vnd.ksql.v1+json; charset=utf-8" \
-d '{
"ksql": "CREATE STREAM orders_stream (order_id VARCHAR KEY, customer_id VARCHAR, region VARCHAR, amount DOUBLE) WITH (KAFKA_TOPIC='\''orders'\'', VALUE_FORMAT='\''JSON'\'');",
"streamsProperties": {
"ksql.streams.auto.offset.reset": "earliest"
}
}'
POST /query — run a pull query
# Pull query — returns JSON array immediately
curl -s -X POST http://localhost:8088/query \
-H "Content-Type: application/vnd.ksql.v1+json; charset=utf-8" \
-d '{
"ksql": "SELECT region, order_count, total_revenue FROM orders_by_region WHERE region = '\''EAST'\'';",
"streamsProperties": {}
}'
POST /query-stream — run a push query (HTTP/2 streaming)
# Push query over HTTP/2 — keep connection open, stream results as newline-delimited JSON
curl -s -X POST http://localhost:8088/query-stream \
-H "Content-Type: application/vnd.ksqlapi.data.v1+json" \
-d '{
"sql": "SELECT order_id, region, amount FROM orders_stream EMIT CHANGES;",
"properties": {
"auto.offset.reset": "latest"
}
}'
Java client — io.confluent.ksql:ksqldb-api-client
<!-- Maven dependency -->
<dependency>
<groupId>io.confluent.ksql</groupId>
<artifactId>ksqldb-api-client</artifactId>
<version>7.6.0</version>
</dependency>
import io.confluent.ksql.api.client.*;
ClientOptions options = ClientOptions.create()
.setHost("localhost")
.setPort(8088);
Client client = Client.create(options);
// Pull query — blocking
BatchedQueryResult result = client
.executeQuery("SELECT region, order_count FROM orders_by_region WHERE region = 'EAST';")
.get();
for (Row row : result.get()) {
System.out.printf("Region: %s, Count: %d%n",
row.getString("REGION"),
row.getLong("ORDER_COUNT"));
}
// Push query — reactive, returns a Publisher
client.streamQuery("SELECT order_id, amount FROM orders_stream EMIT CHANGES;")
.thenAccept(streamedQueryResult -> {
streamedQueryResult.subscribe(new BaseSubscriber<Row>() {
@Override
protected void hookOnNext(Row row) {
System.out.println("New order: " + row.getString("ORDER_ID")
+ " — $" + row.getDouble("AMOUNT"));
}
});
});
The Java client uses the HTTP/2 protocol. Push query results are delivered as a reactive stream using the Reactive Streams API, so you can integrate with Project Reactor or RxJava.
ksqlDB and the Kafka Streams API are complementary, not competing. ksqlDB compiles every SQL statement
into a Kafka Streams topology internally — so you get identical guarantees with far less code for
common patterns. The tradeoff is flexibility: when SQL cannot express your logic, reach for the API.
| Dimension | ksqlDB | Kafka Streams API |
| Language |
SQL (+ REST/CLI/Java client) |
Java or Scala |
| Deployment unit |
Separate ksqlDB server process; queries submitted at runtime |
JVM library embedded in your application |
| Schema management |
Declared inline in DDL; Schema Registry optional |
Custom Serdes; Schema Registry optional but common |
| State store access |
Queryable via pull queries or REST |
Full control via Interactive Queries API; expose via any server |
| Windowing |
TUMBLING, HOPPING, SESSION via SQL keywords |
TimeWindows, SlidingWindows, SessionWindows via DSL |
| Custom logic |
User-defined functions (UDF/UDAF) via Java plugins |
Arbitrary Java; Processor API for full control |
| Testing |
ksqlDB testing framework; REST integration tests |
TopologyTestDriver for unit tests; fast and hermetic |
| Best for |
Analytics, ETL pipelines, real-time dashboards, rapid prototyping |
Complex business workflows, custom serialization, ML inference, microservices |
Decision guide
- Use ksqlDB when the transformation can be expressed in SQL: filtering, aggregation, enrichment, windowing, fan-out to multiple topics.
- Use Kafka Streams API when you need custom Serdes, fine-grained state store management, complex branching logic, or tight integration with application code.
- Use both together: let ksqlDB handle the analytics layer (dashboards, alerting) while Kafka Streams handles the transactional processing layer (order fulfillment, fraud detection) — they share the same Kafka cluster transparently.
- ksqlDB queries can read from topics produced by a Kafka Streams application, and vice versa — they interoperate naturally via Kafka topics.
End-to-end order analytics pipeline recap
-- Step 1: Register the raw events stream
CREATE STREAM orders_stream ( ... ) WITH (KAFKA_TOPIC='orders', VALUE_FORMAT='JSON');
-- Step 2: Register the customer dimension table
CREATE TABLE customers_table ( ... ) WITH (KAFKA_TOPIC='customers', VALUE_FORMAT='JSON');
-- Step 3: Enrich orders with customer info (persistent query → new topic)
CREATE STREAM orders_enriched AS
SELECT o.*, c.full_name, c.tier
FROM orders_stream o
LEFT JOIN customers_table c ON o.customer_id = c.customer_id
EMIT CHANGES;
-- Step 4: Aggregate by region in 5-minute tumbling windows (persistent query → materialized table)
CREATE TABLE orders_by_region AS
SELECT region, COUNT(*) AS order_count, SUM(amount) AS total_revenue,
WINDOWSTART AS window_start, WINDOWEND AS window_end
FROM orders_enriched
WINDOW TUMBLING (SIZE 5 MINUTES)
GROUP BY region
EMIT CHANGES;
-- Step 5: Dashboard or service pulls current window state on demand
SELECT region, order_count, total_revenue
FROM orders_by_region
WHERE region = 'WEST';
The three persistent queries in this pipeline each run as independent Kafka Streams topologies inside the ksqlDB server. You can scale ksqlDB horizontally by adding more server instances — Kafka will redistribute partition assignments automatically.