Tributary
Read Apache Kafka topics from DuckDB SQL. Scan a topic into a query, snapshot it into a table, JOIN against local data, or inspect cluster metadata — all without a separate Kafka client. Authentication, partitions, and offsets are configured through the underlying librdkafka named-parameter surface.
Tributary is the right tool today for reading Kafka topics from DuckDB — snapshots, joins, and cluster inspection all work. A more capable successor is in active development that will add producer support, Schema Registry integration, and transactional semantics. Subscribe below and we'll email you the moment it ships, with a migration note.
Install
-- Install the extension
INSTALL tributary FROM community;
-- Load it into your session
LOAD tributary;
-- Scan an entire Kafka topic
SELECT *
FROM tributary_scan_topic('test-topic',
"bootstrap.servers" := 'localhost:9092'); Technical Overview
Why Use Tributary?
Read Apache Kafka topics directly from DuckDB SQL. No separate consumer process, no JSON-dump file in the middle — point a SELECT at a topic and treat its messages as rows you can WHERE, JOIN, and CREATE TABLE AS.
🌊 What this extension is for
Tributary makes Kafka topics queryable from inside DuckDB. The path between "messages on a broker" and "rows in a SQL query" collapses to one statement — no consumer code, no intermediate file, no DataFrame round-trip.
- • Snapshot a topic: One
CREATE TABLE ASmaterializes the current contents of a topic for offline analysis. - • Join Kafka with local data:
JOINtopic rows against a local DuckDB table — the typical pattern for enriching events with reference data on the fly. - • Inspect a cluster: Cluster-metadata queries return broker and topic descriptors as a row — useful for scripted audits and partition-parallel scan planning.
- • Ad-hoc analytics: Count, group, window, or aggregate the messages currently on a topic — without building a Kafka Streams or Flink job for a one-off question.
⚙️ How it works
Tributary is a thin DuckDB-table-function wrapper around librdkafka. Every named parameter you pass is forwarded as a librdkafka configuration key, so any consumer-side option Kafka supports is also a Tributary option — broker discovery, SASL, SSL, offset reset, fetch tuning.
- • Configuration is parameter-passthrough: No Tributary-specific auth or transport settings. Every name in librdkafka's CONFIGURATION reference is accepted as a named SQL parameter.
- • Multi-partition parallelism: Each partition is read concurrently, so cluster scans scale with topic partition count rather than serializing through one consumer.
- • Output shape: Each row is
(topic, partition, offset, message). The message is aBLOB— decode it in SQL with DuckDB's JSON functions, Avro/Protobuf extensions, ordecode(...). - • No catalog, no secrets: There is no
ATTACH ... (TYPE tributary)and no managedCREATE SECRET TYPE tributary. Every call passes its own credentials through named parameters — source them fromgetenv()and pass via SQL variables so they never appear inline in queries.
🛡️ Production caveats
What to know before pointing this at a busy production cluster.
- • Reads, not writes: The verified function surface is consumer-side. There is no documented producer / write-back function — for cache-priming or topic-publishing workflows, keep a real Kafka producer.
- • Whole-topic scan by default: A scan reads the topic's current contents end-to-end. Scope with
WHEREon partition/offset/timestamp, or setauto.offset.reset := 'latest'. - • Credentials in named parameters: Pass SASL / SSL credentials via bind variables seeded from
getenv()so they don't appear in cached SQL plans. - • Experimental status: The function surface may change as more
librdkafkapaths get exposed. Pin a known-good extension version.
🎯 Common Use Cases
Snapshot a topic for offline analysis
One CREATE TABLE AS SELECT against a topic — the simplest path from a Kafka stream to a Parquet-exportable DuckDB table.
Enrich Kafka events with local reference data
Decode JSON messages on the way in, then JOIN against local lookup tables — users, products, geography. Ideal for audits, sampling, and one-off correlation questions.
Cluster discovery and audit
List topics and partition layouts as rows you can WHERE/GROUP BY — easier than parsing kafka-topics.sh output by hand.
Deep Dive
Technical Details
DuckDB ↔ Apache Kafka
SQL access to Apache Kafka topics — consumer-side, partition-parallel, librdkafka-configured.
What you can do with one query
The shortest path from “Kafka topic” to “queryable DuckDB table”:
CREATE TABLE orders_snapshot AS
SELECT
partition,
"offset",
json_extract_string(message::VARCHAR, '$.user_id') AS user_id,
json_extract_string(message::VARCHAR, '$.amount') AS amount
FROM tributary_scan_topic('orders',
"bootstrap.servers" := 'kafka:9092');
tributary_scan_topic reads each partition in parallel, hands DuckDB the message bytes as a BLOB, and lets you decode them inline with whatever SQL fits your serialization. Any librdkafka configuration option — security.protocol, sasl.username, auto.offset.reset, ssl.ca.location — passes through as a named parameter.
This extension is the consumer side of Kafka — read paths, snapshot paths, cluster introspection. It is not a Kafka producer: there is no documented write-back function in the current version. For publishing to topics, keep a real Kafka producer.
Status is experimental; the function surface may change as more librdkafka paths get exposed. The configuration surface is large because Tributary forwards every parameter to librdkafka unchanged — that’s the design, but it means typos surface as librdkafka errors at scan time rather than at SQL parse time.
Architecture
Tributary is a thin DuckDB table-function wrapper around librdkafka, the standard C/C++ Kafka client. Every named parameter you pass to tributary_scan_topic or tributary_metadata is forwarded to librdkafka’s configuration as-is — there’s no Tributary-specific naming layer to learn. If a setting works in the Kafka consumer config reference, it works here.
A topic with N partitions is scanned by N parallel consumers, and DuckDB pipelines the resulting Arrow batches into the rest of the query plan. The output schema is fixed: (topic VARCHAR, partition INTEGER, offset BIGINT, message BLOB). The message column is intentionally a BLOB — Tributary doesn’t try to guess your wire format. Cast it (message::VARCHAR) and use DuckDB’s JSON functions, or pair with the avro / protobuf community extensions, depending on what’s on your topic.
Securing credentials
Authentication parameters are plain SQL values, so anything passed inline appears in the cached query plan and EXPLAIN output. Source secrets from the environment via getenv() and bind them in:
SET VARIABLE k_user = getenv('KAFKA_USER');
SET VARIABLE k_pass = getenv('KAFKA_PASSWORD');
SELECT *
FROM tributary_scan_topic('orders',
"bootstrap.servers" := 'broker.example.com:9093',
"security.protocol" := 'SASL_SSL',
"sasl.mechanism" := 'SCRAM-SHA-512',
"sasl.username" := :k_user,
"sasl.password" := :k_pass);
getenv runs at parse time, so the literal credential never appears in any cached plan. See Kafka security: SASL and SSL for the upstream guidance on which mechanism fits your cluster.
Scoping the scan
tributary_scan_topic defaults to reading the topic’s current contents — fine for a low-volume topic or a one-off snapshot, but expensive on a high-throughput production topic. Three ways to scope:
- Filter on
partition/offsetin aWHEREclause. DuckDB pushes these down so partitions you don’t need aren’t consumed at all. - Use
auto.offset.reset := 'latest'so a fresh consumer group starts at the tail of the topic instead of the head. - Set
enable.auto.commit := 'false'for ad-hoc analytics so the read doesn’t advance any production consumer-group offsets you might inadvertently share.
Compared to alternatives
- Kafka Connect — purpose-built for sustained, structured Kafka↔system pipelines. Reach for that when you need delivery guarantees, schema-registry integration, or recurring sync. Reach for Tributary when you want SQL on demand.
kcat+ CSV → DuckDB — what people often did before this extension. Tributary collapses the file-in-the-middle and lets you push DuckDB predicates back into the consume.- Python
confluent-kafka+ DataFrames — much heavier setup; you give up DuckDB’s vectorized execution against the streamed data and re-introduce row-format conversion at the DataFrame boundary.
Reference
Extension Contents
Quick reference to all available functions and settings organized by category.
| Name | Description | |
|---|---|---|
| Inspection Inspect cluster shape and the loaded extension build — broker / topic / partition discovery via | ||
| tributary_metadata() | Inspect a Kafka cluster without consuming from a specific topic | |
| tributary_version() | Returns the loaded Tributary extension version as a date-stamped build string (e | |
| Reading Stream messages from Kafka topics into DuckDB tables. Multi-partition topics are scanned in parallel; the result is | ||
| tributary_scan_topic() | Read messages from a Kafka topic as DuckDB rows | |
API Reference
Function Documentation
Detailed documentation for each function including signatures, parameters, and examples.
tributary_metadata
Signature
Parameters (Named)
| Parameter | Type | Mode | Description |
|---|---|---|---|
bootstrap.servers | VARCHAR | Named | Comma-separated host:port list of brokers to bootstrap from. |
... | VARCHAR | Named |
Varargs
Any other librdkafka configuration name — same surface as tributary_scan_topic. |
Returns
A table with the following columns:
| Column | Type | Description |
|---|---|---|
brokers | STRUCT[] | Array of broker descriptors — id, host, port. |
topics | STRUCT[] | Array of topic descriptors — name, error, partitions[]. Each partition includes its leader, replicas, and ISR set. |
Description
Inspect a Kafka cluster without consuming from a specific topic. Returns one row containing two arrays: brokers and topics. Useful for discovering what a cluster exposes, sizing partition-parallel scans, or scripting topic-list audits.
Examples
List every topic on a cluster
SELECT unnest(topics).name AS topic_name
FROM tributary_metadata("bootstrap.servers" := 'kafka:9092'); Count partitions per topic
SELECT
t.name AS topic,
array_length(t.partitions) AS partitions
FROM (
SELECT unnest(topics) AS t
FROM tributary_metadata("bootstrap.servers" := 'kafka:9092')
)
ORDER BY partitions DESC; Related Functions
tributary_scan_topic
Signature
Parameters
| Parameter | Type | Mode | Description |
|---|---|---|---|
topic_name | VARCHAR | Positional | Kafka topic to read. |
bootstrap.servers | VARCHAR | Named | Comma-separated host:port list of brokers to bootstrap from. Required. |
group.id | VARCHAR | Named | Consumer group id. See Kafka consumer configs. |
security.protocol | VARCHAR | Named | PLAINTEXT, SSL, SASL_PLAINTEXT, or SASL_SSL. See Kafka security. |
sasl.mechanism | VARCHAR | Named | SASL mechanism — PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, etc. See Kafka SASL. |
sasl.username | VARCHAR | Named | SASL username (PLAIN / SCRAM). |
sasl.password | VARCHAR | Named | SASL password (PLAIN / SCRAM). Pass via getenv() and a SQL variable rather than inlining. |
... | VARCHAR | Named |
Varargs
Any other librdkafka configuration name — auto.offset.reset, enable.auto.commit, ssl.ca.location, etc. — passes through unchanged. Tributary doesn't validate keys; misspellings surface as librdkafka errors at scan time. |
Returns
A table with the following columns:
| Column | Type | Description |
|---|---|---|
topic | VARCHAR | Topic the message was read from. |
partition | INTEGER | Kafka partition id the message lives in. |
offset | BIGINT | Per-partition offset of the message. |
message | BLOB | Raw message payload as a BLOB. Cast or parse it with DuckDB's JSON / Avro / decode utilities depending on your topic's serialization. |
Description
Read messages from a Kafka topic as DuckDB rows. Multi-partition topics are scanned in parallel by partition. Each row is (topic, partition, offset, message); the message is a BLOB you decode in SQL based on your serialization (JSON, Avro, Protobuf, etc.).
For a long-running topic this scans the entire current contents — pair with a WHERE clause on timestamp / partition / offset for finite slices, or with tributary_metadata first to plan the scan.
Examples
Snapshot a topic into a local table
CREATE TABLE orders_snapshot AS
SELECT *
FROM tributary_scan_topic('orders',
"bootstrap.servers" := 'kafka:9092'); Decode JSON messages on the way in
SELECT
partition,
"offset",
json_extract_string(message::VARCHAR, '$.user_id') AS user_id,
json_extract_string(message::VARCHAR, '$.event') AS event
FROM tributary_scan_topic('events',
"bootstrap.servers" := 'kafka:9092'); SASL-authenticated consume against a managed Kafka
SET VARIABLE k_user = getenv('KAFKA_USER');
SET VARIABLE k_pass = getenv('KAFKA_PASSWORD');
SELECT *
FROM tributary_scan_topic('orders',
"bootstrap.servers" := 'broker.example.com:9093',
"security.protocol" := 'SASL_SSL',
"sasl.mechanism" := 'SCRAM-SHA-512',
"sasl.username" := :k_user,
"sasl.password" := :k_pass); Related Functions
tributary_version
Signature
Parameters
| Parameter | Type | Mode | Description |
|---|
Returns
Description
Returns the loaded Tributary extension version as a date-stamped build string (e.g. '20250612.01'). Use to confirm what's loaded when matching against bug reports or release notes.
Examples
Print the loaded version
SELECT tributary_version(); Practical Examples
Cookbook
Real-world recipes and patterns for common use cases.
Recipes for Apache Kafka read paths, JSON decoding, joins, and discovery.
Read a Kafka topic
The simplest pattern — point a query at a topic and treat it as a table:
SELECT *
FROM tributary_scan_topic('orders',
"bootstrap.servers" := 'kafka:9092');
Each row corresponds to one Kafka message, with metadata columns (topic, partition, offset) plus the raw payload as a BLOB. See tributary_scan_topic for the full signature and parameter table.
Decode JSON messages on the way in
Most topics carry JSON. Cast the message column to VARCHAR and use DuckDB’s JSON functions:
SELECT
partition,
"offset",
json_extract_string(message::VARCHAR, '$.user_id') AS user_id,
json_extract_string(message::VARCHAR, '$.event') AS event
FROM tributary_scan_topic('events',
"bootstrap.servers" := 'kafka:9092');
offset is a reserved keyword in DuckDB (used by LIMIT … OFFSET …), so quote it as "offset" whenever you reference the column directly.
For Avro / Protobuf payloads, pair with DuckDB’s avro / protobuf community extensions and decode the BLOB column the same way.
Filter while consuming
DuckDB pushes predicates down into the partition-parallel scan — partitions you don’t need aren’t consumed at all:
SELECT *
FROM tributary_scan_topic('orders',
"bootstrap.servers" := 'kafka:9092')
WHERE partition = 0
AND "offset" >= 1000000;
For time-bounded scans, decode the timestamp from the message body and filter there, or use auto.offset.reset := 'latest' so a fresh consumer group starts at the tail.
Snapshot a topic into a local table
CREATE TABLE orders_snapshot AS
SELECT *
FROM tributary_scan_topic('orders',
"bootstrap.servers" := 'kafka:9092');
Useful for ad-hoc analytics over a finite slice of the stream — once it’s a DuckDB table, the rest of your query plan runs at full vectorized speed.
Join Kafka with local data
SELECT k.partition, k."offset", u.name, msg.event
FROM tributary_scan_topic('events',
"bootstrap.servers" := 'kafka:9092') k
CROSS JOIN LATERAL (
SELECT
json_extract_string(k.message::VARCHAR, '$.user_id') AS user_id,
json_extract_string(k.message::VARCHAR, '$.event') AS event
) msg
JOIN users u ON u.user_id = msg.user_id::BIGINT;
The classic enrichment pattern — Kafka stream on one side, reference data on the other, joined in a single SQL statement.
Authenticate against managed Kafka
Source credentials from the environment so they don’t appear in cached query plans:
SET VARIABLE k_user = getenv('KAFKA_USER');
SET VARIABLE k_pass = getenv('KAFKA_PASSWORD');
SELECT *
FROM tributary_scan_topic('orders',
"bootstrap.servers" := 'broker.example.com:9093',
"security.protocol" := 'SASL_SSL',
"sasl.mechanism" := 'SCRAM-SHA-512',
"sasl.username" := :k_user,
"sasl.password" := :k_pass);
Same pattern for SSL-only clusters: pass security.protocol := 'SSL' plus the relevant ssl.ca.location / ssl.certificate.location parameters. See Kafka SASL and SSL for the upstream config matrix.
Discover what’s on a cluster
-- List every topic the cluster exposes
SELECT unnest(topics).name AS topic_name
FROM tributary_metadata("bootstrap.servers" := 'kafka:9092');
-- Count partitions per topic
SELECT
t.name AS topic,
array_length(t.partitions) AS partitions
FROM (
SELECT unnest(topics) AS t
FROM tributary_metadata("bootstrap.servers" := 'kafka:9092')
)
ORDER BY partitions DESC;
See tributary_metadata for the full output shape.
Diagnostics
SELECT tributary_version(); -- e.g. '20250612.01'
Use tributary_version when you need to confirm the loaded build against bug reports or release notes.
Platform Support
Compatibility
Extension availability may vary by platform and DuckDB version. Check below to ensure this extension supports your environment before installation.
Quick Facts
Platforms
Supported platform architectures
Kafka topics as DuckDB tables
Install Tributary to point SELECT queries directly at Kafka topics — snapshot, join, and analyze without writing a consumer.