Contents

ksqlDB is a purpose-built streaming database that sits on top of Apache Kafka. Under the hood every ksqlDB query compiles down to a Kafka Streams topology, so you get the same fault-tolerance, scalability, and exactly-once guarantees — expressed entirely in SQL rather than Java.

Push Queries vs Pull Queries
Query TypeSyntax markerBehaviourTypical use-case
Push query EMIT CHANGES Keeps the connection open and streams new results as events arrive Live dashboards, alerting, event-driven microservices
Pull query No EMIT CHANGES Returns current materialized state immediately and closes On-demand lookups, REST endpoints, ad-hoc exploration
Streams vs Tables
AbstractionSemanticsKafka analogy
STREAM An unbounded sequence of immutable events — every message is a new fact Append-only topic; retains all messages
TABLE A mutable, keyed view — each new message for a key is an upsert; the latest value wins Compacted topic; one record per key
ksqlDB vs Kafka Streams API — Code vs SQL tradeoff
ksqlDBKafka Streams API
LanguageSQLJava / Scala
DeploymentStandalone server; queries sent over REST/CLIEmbedded in your application JVM
State managementManaged automatically by ksqlDB serverFull control via state stores and interactive queries
FlexibilityLimited to what SQL can expressArbitrary business logic, custom serdes, processor API
Best forAnalytics, filtering, aggregation, dashboardsComplex event-driven workflows, custom serializiation, ML pipelines
Both can coexist in the same Kafka cluster. ksqlDB is ideal for analytics and rapid iteration; the Kafka Streams API gives you full programmatic control when SQL is not expressive enough.

The quickest way to get ksqlDB running locally is Docker Compose. The stack below starts a single-node Kafka broker (KRaft mode), a ksqlDB server, and the interactive ksqlDB CLI.

version: "3.8" services: broker: image: confluentinc/cp-kafka:7.6.0 hostname: broker container_name: broker ports: - "9092:9092" environment: KAFKA_NODE_ID: 1 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092 KAFKA_LISTENERS: PLAINTEXT://broker:9092,CONTROLLER://broker:9093 KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER KAFKA_CONTROLLER_QUORUM_VOTERS: 1@broker:9093 KAFKA_PROCESS_ROLES: broker,controller KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 CLUSTER_ID: "MkU3OEVBNTcwNTJENDM2Qg" ksqldb-server: image: confluentinc/ksqldb-server:0.29.0 hostname: ksqldb-server container_name: ksqldb-server depends_on: - broker ports: - "8088:8088" environment: KSQL_LISTENERS: http://0.0.0.0:8088 KSQL_BOOTSTRAP_SERVERS: broker:9092 KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true" KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true" ksqldb-cli: image: confluentinc/ksqldb-cli:0.29.0 container_name: ksqldb-cli depends_on: - ksqldb-server entrypoint: /bin/sh tty: true

Start the stack and connect to the CLI:

# Start all services docker compose up -d # Connect the ksqlDB CLI to the server docker exec -it ksqldb-cli ksql http://ksqldb-server:8088

Once inside the CLI you will see the ksqlDB prompt:

=========================================== = _ _ ____ ____ = = | | _____ __ _| | _ \| __ ) = = | |/ / __|/ _` | | | | | _ \ = = | <\__ \ (_| | | |_| | |_) | = = |_|\_\___/\__, |_|____/|____/ = = |_| = = The Database purpose-built = = for stream processing apps = =========================================== ksql> Set the auto-offset-reset property inside the CLI so push queries read from the beginning of a topic during development: SET 'auto.offset.reset' = 'earliest';

Before you can query data in ksqlDB you must register a STREAM or TABLE on top of an existing Kafka topic. The statement does not move or copy data — it only creates metadata describing the topic's schema and format.

CREATE STREAM — order events
-- Register a stream on the raw orders topic CREATE STREAM orders_stream ( order_id VARCHAR KEY, customer_id VARCHAR, region VARCHAR, product VARCHAR, amount DOUBLE, order_ts BIGINT ) WITH ( KAFKA_TOPIC = 'orders', VALUE_FORMAT = 'JSON', TIMESTAMP = 'order_ts' );
CREATE TABLE — customer lookup
-- Register a table on the customers compacted topic -- Each customer_id maps to the latest customer record CREATE TABLE customers_table ( customer_id VARCHAR PRIMARY KEY, full_name VARCHAR, email VARCHAR, tier VARCHAR ) WITH ( KAFKA_TOPIC = 'customers', VALUE_FORMAT = 'JSON' );
Inspecting Streams and Tables
-- List all registered streams SHOW STREAMS; -- List all registered tables SHOW TABLES; -- Show the schema and properties of a stream DESCRIBE orders_stream; -- Show the schema plus runtime statistics DESCRIBE EXTENDED orders_stream; The VALUE_FORMAT can be JSON, AVRO, PROTOBUF, or KAFKA (raw bytes). For Avro and Protobuf, point ksqlDB at your Schema Registry by setting KSQL_KSQL_SCHEMA_REGISTRY_URL in the server config.
Push Query — stream results as they arrive

Add EMIT CHANGES to turn any SELECT into a push query. The connection stays open and ksqlDB pushes new rows to the client every time a matching event arrives on the topic.

-- Continuously stream all incoming orders SELECT order_id, customer_id, region, amount FROM orders_stream EMIT CHANGES; -- Push query with a filter SELECT order_id, region, amount FROM orders_stream WHERE region = 'WEST' EMIT CHANGES;
Pull Query — point-in-time snapshot from a materialized table

Pull queries execute against a materialized table (one created by a persistent aggregation query). They return immediately with the current state — no EMIT CHANGES.

-- Pull the current order count for the EAST region -- (assumes orders_by_region table has been materialized — see Aggregations section) SELECT region, order_count, total_revenue FROM orders_by_region WHERE region = 'EAST';
PRINT — inspect a Kafka topic directly
-- Print all messages from the beginning of a topic PRINT 'orders' FROM BEGINNING; -- Print with a row limit PRINT 'orders' FROM BEGINNING LIMIT 10; -- Print only new messages arriving from now PRINT 'orders'; PRINT bypasses stream/table registration and reads the raw Kafka topic. It is useful for inspecting topics that have not yet been registered as a stream or table.

ksqlDB supports a rich set of scalar functions for transforming events inline. The result of a CREATE STREAM ... AS SELECT is a new persistent stream backed by a new Kafka topic.

Basic filtering and projection
-- Create a derived stream containing only high-value orders from the WEST region CREATE STREAM west_high_value_orders AS SELECT order_id, customer_id, region, UCASE(product) AS product_upper, amount, order_ts FROM orders_stream WHERE region = 'WEST' AND amount > 500.0 EMIT CHANGES;
Type casting and CASE WHEN
-- Classify orders into tiers using CASE WHEN CREATE STREAM orders_with_tier AS SELECT order_id, customer_id, region, amount, CAST(amount AS INTEGER) AS amount_int, CASE WHEN amount >= 1000 THEN 'PLATINUM' WHEN amount >= 500 THEN 'GOLD' WHEN amount >= 100 THEN 'SILVER' ELSE 'BRONZE' END AS order_tier FROM orders_stream EMIT CHANGES;
String functions — LCASE, UCASE, SUBSTRING
-- Normalize region to lowercase, extract first 3 chars of order_id as prefix CREATE STREAM orders_normalized AS SELECT order_id, LCASE(region) AS region_lower, SUBSTRING(order_id, 1, 3) AS order_prefix, UCASE(product) AS product_name FROM orders_stream EMIT CHANGES;
Extracting fields from nested JSON — EXTRACTJSONFIELD
-- Assume the 'metadata' field contains a JSON string like: -- {"source":"web","campaign":"spring_sale"} CREATE STREAM orders_with_source AS SELECT order_id, amount, EXTRACTJSONFIELD(metadata, '$.source') AS order_source, EXTRACTJSONFIELD(metadata, '$.campaign') AS campaign FROM orders_raw_stream EMIT CHANGES;
AS_VALUE — promote a key column into the value
-- order_id is the message KEY; AS_VALUE copies it into the value payload CREATE STREAM orders_with_id_in_value AS SELECT AS_VALUE(order_id) AS order_id, customer_id, region, amount FROM orders_stream EMIT CHANGES; Columns declared as KEY are part of the Kafka message key, not the value. Use AS_VALUE() when a downstream consumer needs the key available inside the JSON value payload.

Aggregating a stream with GROUP BY always produces a TABLE — ksqlDB materializes the rolling aggregate into a state store. The table is continuously updated as new events arrive.

Non-windowed aggregation — lifetime totals per region
-- Lifetime order count and revenue per region (unbounded aggregation) CREATE TABLE region_totals AS SELECT region, COUNT(*) AS order_count, SUM(amount) AS total_revenue, LATEST_BY_OFFSET(order_id) AS latest_order_id FROM orders_stream GROUP BY region EMIT CHANGES;
WINDOW TUMBLING — 5-minute non-overlapping windows

This is the core of the real-time order analytics pipeline: aggregate order events by region in 5-minute tumbling windows.

-- Orders aggregated by region in 5-minute tumbling windows CREATE TABLE orders_by_region AS SELECT region AS region, COUNT(*) AS order_count, SUM(amount) AS total_revenue, WINDOWSTART AS window_start, WINDOWEND AS window_end FROM orders_stream WINDOW TUMBLING (SIZE 5 MINUTES) GROUP BY region EMIT CHANGES;
WINDOW HOPPING — overlapping windows
-- 10-minute hopping window that advances every 2 minutes -- Each order will appear in multiple windows (10/2 = 5 windows) CREATE TABLE orders_hopping AS SELECT region, COUNT(*) AS order_count, SUM(amount) AS total_revenue FROM orders_stream WINDOW HOPPING (SIZE 10 MINUTES, ADVANCE BY 2 MINUTES) GROUP BY region EMIT CHANGES;
WINDOW SESSION — activity-based windows
-- Session window: group orders from the same customer -- if they arrive within 30 minutes of each other CREATE TABLE customer_sessions AS SELECT customer_id, COUNT(*) AS orders_in_session, SUM(amount) AS session_revenue FROM orders_stream WINDOW SESSION (30 MINUTES) GROUP BY customer_id EMIT CHANGES;
Windowing comparison
Window typeOverlapping?BoundariesUse-case
TUMBLINGNoFixed, non-overlapping5-min metrics, billing periods
HOPPINGYesFixed size, configurable advanceRolling averages, sliding metrics
SESSIONNoActivity-based, gap-drivenUser sessions, clickstream grouping
Use WINDOWSTART and WINDOWEND pseudo-columns to include the window boundaries in your output. They return millisecond epoch timestamps.

A common enrichment pattern is joining a stream of events against a lookup table to add dimension data. In ksqlDB, a stream-table join produces a new enriched stream — each event from the stream is looked up against the latest value in the table at the time of the event.

Enrich orders with customer data
-- Join the order stream with the customers table on customer_id -- The result is a new stream where each order row is enriched with customer info CREATE STREAM orders_enriched AS SELECT o.order_id, o.customer_id, c.full_name AS customer_name, c.email AS customer_email, c.tier AS customer_tier, o.region, o.product, o.amount, o.order_ts FROM orders_stream o LEFT JOIN customers_table c ON o.customer_id = c.customer_id EMIT CHANGES;
Aggregate the enriched stream — orders per region per tier
-- After enrichment, aggregate by region AND customer tier in 5-min tumbling windows CREATE TABLE orders_region_tier AS SELECT region, customer_tier, COUNT(*) AS order_count, SUM(amount) AS total_revenue, WINDOWSTART AS window_start, WINDOWEND AS window_end FROM orders_enriched WINDOW TUMBLING (SIZE 5 MINUTES) GROUP BY region, customer_tier EMIT CHANGES;
Foreign-key table-table join
-- Join two tables on a non-primary key (foreign key join) -- Requires ksqlDB 0.24+ with the FKTABLE syntax CREATE TABLE order_summary AS SELECT o.order_id, o.amount, o.region, c.full_name AS customer_name, c.tier AS customer_tier FROM orders_by_region o JOIN customers_table c ON o.customer_id = c.customer_id; For a stream-table join to work, both sides must be co-partitioned — the stream and the table must have the same number of partitions and the same partitioning key (customer_id in the example above). Use PARTITION BY to repartition a stream before joining if needed.

Every CREATE STREAM ... AS SELECT or CREATE TABLE ... AS SELECT statement launches a persistent query — a continuously running Kafka Streams topology managed by the ksqlDB server. These queries survive restarts and run until explicitly terminated.

Managing persistent queries
-- List all running persistent queries SHOW QUERIES; -- Show details of a specific query EXPLAIN CSAS_ORDERS_ENRICHED_0; -- Pause a query without deleting it PAUSE CSAS_ORDERS_ENRICHED_0; -- Resume a paused query RESUME CSAS_ORDERS_ENRICHED_0; -- Terminate (stop and delete) a persistent query TERMINATE CSAS_ORDERS_ENRICHED_0; -- Drop the stream or table AND its underlying Kafka topic DROP STREAM IF EXISTS orders_enriched DELETE TOPIC; DROP TABLE IF EXISTS orders_by_region DELETE TOPIC;
Source Connector — ingest from a database into Kafka
-- Create a JDBC source connector to read from a PostgreSQL orders table CREATE SOURCE CONNECTOR jdbc_orders_source WITH ( 'connector.class' = 'io.confluent.connect.jdbc.JdbcSourceConnector', 'connection.url' = 'jdbc:postgresql://db-host:5432/sales', 'connection.user' = 'kafka_user', 'connection.password' = 'secret', 'table.whitelist' = 'orders', 'mode' = 'timestamp+incrementing', 'timestamp.column.name' = 'updated_at', 'incrementing.column.name' = 'id', 'topic.prefix' = 'pg_', 'key.converter' = 'org.apache.kafka.connect.storage.StringConverter', 'value.converter' = 'org.apache.kafka.connect.json.JsonConverter' );
Sink Connector — write ksqlDB results to Elasticsearch
-- Write the orders_by_region materialized table to Elasticsearch CREATE SINK CONNECTOR es_orders_sink WITH ( 'connector.class' = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector', 'connection.url' = 'http://elasticsearch:9200', 'topics' = 'ORDERS_BY_REGION', 'type.name' = '_doc', 'key.ignore' = 'false', 'schema.ignore' = 'true', 'value.converter' = 'org.apache.kafka.connect.json.JsonConverter', 'value.converter.schemas.enable' = 'false' );
Listing and dropping connectors
SHOW CONNECTORS; DROP CONNECTOR jdbc_orders_source; DROP CONNECTOR es_orders_sink; ksqlDB manages connectors through the embedded Kafka Connect worker. You must include Kafka Connect on the ksqlDB server classpath (or use the Confluent Platform image) to use CREATE SOURCE/SINK CONNECTOR.

ksqlDB exposes an HTTP/2 REST API on port 8088. All CLI operations are available programmatically, making it easy to integrate ksqlDB into CI pipelines, microservices, or dashboards.

POST /ksql — run DDL and management statements
# Create a stream via REST curl -s -X POST http://localhost:8088/ksql \ -H "Content-Type: application/vnd.ksql.v1+json; charset=utf-8" \ -d '{ "ksql": "CREATE STREAM orders_stream (order_id VARCHAR KEY, customer_id VARCHAR, region VARCHAR, amount DOUBLE) WITH (KAFKA_TOPIC='\''orders'\'', VALUE_FORMAT='\''JSON'\'');", "streamsProperties": { "ksql.streams.auto.offset.reset": "earliest" } }'
POST /query — run a pull query
# Pull query — returns JSON array immediately curl -s -X POST http://localhost:8088/query \ -H "Content-Type: application/vnd.ksql.v1+json; charset=utf-8" \ -d '{ "ksql": "SELECT region, order_count, total_revenue FROM orders_by_region WHERE region = '\''EAST'\'';", "streamsProperties": {} }'
POST /query-stream — run a push query (HTTP/2 streaming)
# Push query over HTTP/2 — keep connection open, stream results as newline-delimited JSON curl -s -X POST http://localhost:8088/query-stream \ -H "Content-Type: application/vnd.ksqlapi.data.v1+json" \ -d '{ "sql": "SELECT order_id, region, amount FROM orders_stream EMIT CHANGES;", "properties": { "auto.offset.reset": "latest" } }'
Java client — io.confluent.ksql:ksqldb-api-client
<!-- Maven dependency --> <dependency> <groupId>io.confluent.ksql</groupId> <artifactId>ksqldb-api-client</artifactId> <version>7.6.0</version> </dependency> import io.confluent.ksql.api.client.*; ClientOptions options = ClientOptions.create() .setHost("localhost") .setPort(8088); Client client = Client.create(options); // Pull query — blocking BatchedQueryResult result = client .executeQuery("SELECT region, order_count FROM orders_by_region WHERE region = 'EAST';") .get(); for (Row row : result.get()) { System.out.printf("Region: %s, Count: %d%n", row.getString("REGION"), row.getLong("ORDER_COUNT")); } // Push query — reactive, returns a Publisher client.streamQuery("SELECT order_id, amount FROM orders_stream EMIT CHANGES;") .thenAccept(streamedQueryResult -> { streamedQueryResult.subscribe(new BaseSubscriber<Row>() { @Override protected void hookOnNext(Row row) { System.out.println("New order: " + row.getString("ORDER_ID") + " — $" + row.getDouble("AMOUNT")); } }); }); The Java client uses the HTTP/2 protocol. Push query results are delivered as a reactive stream using the Reactive Streams API, so you can integrate with Project Reactor or RxJava.

ksqlDB and the Kafka Streams API are complementary, not competing. ksqlDB compiles every SQL statement into a Kafka Streams topology internally — so you get identical guarantees with far less code for common patterns. The tradeoff is flexibility: when SQL cannot express your logic, reach for the API.

DimensionksqlDBKafka Streams API
Language SQL (+ REST/CLI/Java client) Java or Scala
Deployment unit Separate ksqlDB server process; queries submitted at runtime JVM library embedded in your application
Schema management Declared inline in DDL; Schema Registry optional Custom Serdes; Schema Registry optional but common
State store access Queryable via pull queries or REST Full control via Interactive Queries API; expose via any server
Windowing TUMBLING, HOPPING, SESSION via SQL keywords TimeWindows, SlidingWindows, SessionWindows via DSL
Custom logic User-defined functions (UDF/UDAF) via Java plugins Arbitrary Java; Processor API for full control
Testing ksqlDB testing framework; REST integration tests TopologyTestDriver for unit tests; fast and hermetic
Best for Analytics, ETL pipelines, real-time dashboards, rapid prototyping Complex business workflows, custom serialization, ML inference, microservices
Decision guide
End-to-end order analytics pipeline recap
-- Step 1: Register the raw events stream CREATE STREAM orders_stream ( ... ) WITH (KAFKA_TOPIC='orders', VALUE_FORMAT='JSON'); -- Step 2: Register the customer dimension table CREATE TABLE customers_table ( ... ) WITH (KAFKA_TOPIC='customers', VALUE_FORMAT='JSON'); -- Step 3: Enrich orders with customer info (persistent query → new topic) CREATE STREAM orders_enriched AS SELECT o.*, c.full_name, c.tier FROM orders_stream o LEFT JOIN customers_table c ON o.customer_id = c.customer_id EMIT CHANGES; -- Step 4: Aggregate by region in 5-minute tumbling windows (persistent query → materialized table) CREATE TABLE orders_by_region AS SELECT region, COUNT(*) AS order_count, SUM(amount) AS total_revenue, WINDOWSTART AS window_start, WINDOWEND AS window_end FROM orders_enriched WINDOW TUMBLING (SIZE 5 MINUTES) GROUP BY region EMIT CHANGES; -- Step 5: Dashboard or service pulls current window state on demand SELECT region, order_count, total_revenue FROM orders_by_region WHERE region = 'WEST'; The three persistent queries in this pipeline each run as independent Kafka Streams topologies inside the ksqlDB server. You can scale ksqlDB horizontally by adding more server instances — Kafka will redistribute partition assignments automatically.