🌊

Tributary

Read Apache Kafka topics from DuckDB SQL. Scan a topic into a query, snapshot it into a table, JOIN against local data, or inspect cluster metadata — all without a separate Kafka client. Authentication, partitions, and offsets are configured through the underlying librdkafka named-parameter surface.

A successor extension is on the way

Tributary is the right tool today for reading Kafka topics from DuckDB — snapshots, joins, and cluster inspection all work. A more capable successor is in active development that will add producer support, Schema Registry integration, and transactional semantics. Subscribe below and we'll email you the moment it ships, with a migration note.

Install

-- Install the extension
INSTALL tributary FROM community;

-- Load it into your session
LOAD tributary;

-- Scan an entire Kafka topic
SELECT *
FROM tributary_scan_topic('test-topic',
                          "bootstrap.servers" := 'localhost:9092');

Technical Overview

Why Use Tributary?

Read Apache Kafka topics directly from DuckDB SQL. No separate consumer process, no JSON-dump file in the middle — point a SELECT at a topic and treat its messages as rows you can WHERE, JOIN, and CREATE TABLE AS.

🌊 What this extension is for

Tributary makes Kafka topics queryable from inside DuckDB. The path between "messages on a broker" and "rows in a SQL query" collapses to one statement — no consumer code, no intermediate file, no DataFrame round-trip.

  • Snapshot a topic: One CREATE TABLE AS materializes the current contents of a topic for offline analysis.
  • Join Kafka with local data: JOIN topic rows against a local DuckDB table — the typical pattern for enriching events with reference data on the fly.
  • Inspect a cluster: Cluster-metadata queries return broker and topic descriptors as a row — useful for scripted audits and partition-parallel scan planning.
  • Ad-hoc analytics: Count, group, window, or aggregate the messages currently on a topic — without building a Kafka Streams or Flink job for a one-off question.

⚙️ How it works

Tributary is a thin DuckDB-table-function wrapper around librdkafka. Every named parameter you pass is forwarded as a librdkafka configuration key, so any consumer-side option Kafka supports is also a Tributary option — broker discovery, SASL, SSL, offset reset, fetch tuning.

  • Configuration is parameter-passthrough: No Tributary-specific auth or transport settings. Every name in librdkafka's CONFIGURATION reference is accepted as a named SQL parameter.
  • Multi-partition parallelism: Each partition is read concurrently, so cluster scans scale with topic partition count rather than serializing through one consumer.
  • Output shape: Each row is (topic, partition, offset, message). The message is a BLOB — decode it in SQL with DuckDB's JSON functions, Avro/Protobuf extensions, or decode(...).
  • No catalog, no secrets: There is no ATTACH ... (TYPE tributary) and no managed CREATE SECRET TYPE tributary. Every call passes its own credentials through named parameters — source them from getenv() and pass via SQL variables so they never appear inline in queries.

🛡️ Production caveats

What to know before pointing this at a busy production cluster.

  • Reads, not writes: The verified function surface is consumer-side. There is no documented producer / write-back function — for cache-priming or topic-publishing workflows, keep a real Kafka producer.
  • Whole-topic scan by default: A scan reads the topic's current contents end-to-end. Scope with WHERE on partition/offset/timestamp, or set auto.offset.reset := 'latest'.
  • Credentials in named parameters: Pass SASL / SSL credentials via bind variables seeded from getenv() so they don't appear in cached SQL plans.
  • Experimental status: The function surface may change as more librdkafka paths get exposed. Pin a known-good extension version.

🎯 Common Use Cases

Snapshot a topic for offline analysis

One CREATE TABLE AS SELECT against a topic — the simplest path from a Kafka stream to a Parquet-exportable DuckDB table.

Enrich Kafka events with local reference data

Decode JSON messages on the way in, then JOIN against local lookup tables — users, products, geography. Ideal for audits, sampling, and one-off correlation questions.

Cluster discovery and audit

List topics and partition layouts as rows you can WHERE/GROUP BY — easier than parsing kafka-topics.sh output by hand.

Deep Dive

Technical Details

DuckDB ↔ Apache Kafka

SQL access to Apache Kafka topics — consumer-side, partition-parallel, librdkafka-configured.

What you can do with one query

The shortest path from “Kafka topic” to “queryable DuckDB table”:

CREATE TABLE orders_snapshot AS
SELECT
  partition,
  "offset",
  json_extract_string(message::VARCHAR, '$.user_id') AS user_id,
  json_extract_string(message::VARCHAR, '$.amount')  AS amount
FROM tributary_scan_topic('orders',
  "bootstrap.servers" := 'kafka:9092');

tributary_scan_topic reads each partition in parallel, hands DuckDB the message bytes as a BLOB, and lets you decode them inline with whatever SQL fits your serialization. Any librdkafka configuration optionsecurity.protocol, sasl.username, auto.offset.reset, ssl.ca.location — passes through as a named parameter.

Best fit, scoped honestly

This extension is the consumer side of Kafka — read paths, snapshot paths, cluster introspection. It is not a Kafka producer: there is no documented write-back function in the current version. For publishing to topics, keep a real Kafka producer.

Status is experimental; the function surface may change as more librdkafka paths get exposed. The configuration surface is large because Tributary forwards every parameter to librdkafka unchanged — that’s the design, but it means typos surface as librdkafka errors at scan time rather than at SQL parse time.

Architecture

Tributary is a thin DuckDB table-function wrapper around librdkafka, the standard C/C++ Kafka client. Every named parameter you pass to tributary_scan_topic or tributary_metadata is forwarded to librdkafka’s configuration as-is — there’s no Tributary-specific naming layer to learn. If a setting works in the Kafka consumer config reference, it works here.

A topic with N partitions is scanned by N parallel consumers, and DuckDB pipelines the resulting Arrow batches into the rest of the query plan. The output schema is fixed: (topic VARCHAR, partition INTEGER, offset BIGINT, message BLOB). The message column is intentionally a BLOB — Tributary doesn’t try to guess your wire format. Cast it (message::VARCHAR) and use DuckDB’s JSON functions, or pair with the avro / protobuf community extensions, depending on what’s on your topic.

Securing credentials

Authentication parameters are plain SQL values, so anything passed inline appears in the cached query plan and EXPLAIN output. Source secrets from the environment via getenv() and bind them in:

SET VARIABLE k_user = getenv('KAFKA_USER');
SET VARIABLE k_pass = getenv('KAFKA_PASSWORD');

SELECT *
FROM tributary_scan_topic('orders',
  "bootstrap.servers"  := 'broker.example.com:9093',
  "security.protocol"  := 'SASL_SSL',
  "sasl.mechanism"     := 'SCRAM-SHA-512',
  "sasl.username"      := :k_user,
  "sasl.password"      := :k_pass);

getenv runs at parse time, so the literal credential never appears in any cached plan. See Kafka security: SASL and SSL for the upstream guidance on which mechanism fits your cluster.

Scoping the scan

tributary_scan_topic defaults to reading the topic’s current contents — fine for a low-volume topic or a one-off snapshot, but expensive on a high-throughput production topic. Three ways to scope:

  • Filter on partition / offset in a WHERE clause. DuckDB pushes these down so partitions you don’t need aren’t consumed at all.
  • Use auto.offset.reset := 'latest' so a fresh consumer group starts at the tail of the topic instead of the head.
  • Set enable.auto.commit := 'false' for ad-hoc analytics so the read doesn’t advance any production consumer-group offsets you might inadvertently share.

Compared to alternatives

  • Kafka Connect — purpose-built for sustained, structured Kafka↔system pipelines. Reach for that when you need delivery guarantees, schema-registry integration, or recurring sync. Reach for Tributary when you want SQL on demand.
  • kcat + CSV → DuckDB — what people often did before this extension. Tributary collapses the file-in-the-middle and lets you push DuckDB predicates back into the consume.
  • Python confluent-kafka + DataFrames — much heavier setup; you give up DuckDB’s vectorized execution against the streamed data and re-introduce row-format conversion at the DataFrame boundary.

Reference

Extension Contents

Quick reference to all available functions and settings organized by category.

Name Description
Inspection

Inspect cluster shape and the loaded extension build — broker / topic / partition discovery via tributary_metadata, and the version stamp via tributary_version.

tributary_metadata() Inspect a Kafka cluster without consuming from a specific topic
tributary_version() Returns the loaded Tributary extension version as a date-stamped build string (e
Reading

Stream messages from Kafka topics into DuckDB tables. Multi-partition topics are scanned in parallel; the result is (topic, partition, offset, message) rows you decode in SQL.

tributary_scan_topic() Read messages from a Kafka topic as DuckDB rows

API Reference

Function Documentation

Detailed documentation for each function including signatures, parameters, and examples.

tributary_metadata

Table Function Inspection
Signature
tributary_metadata(bootstrap.servers := VARCHAR, ... := VARCHAR) → TABLE
Parameters (Named)
Parameter Type Mode Description
bootstrap.servers VARCHAR Named Comma-separated host:port list of brokers to bootstrap from.
... VARCHAR Named Varargs Any other librdkafka configuration name — same surface as tributary_scan_topic.
Returns

A table with the following columns:

Column Type Description
brokers STRUCT[] Array of broker descriptors — id, host, port.
topics STRUCT[] Array of topic descriptors — name, error, partitions[]. Each partition includes its leader, replicas, and ISR set.
Description

Inspect a Kafka cluster without consuming from a specific topic. Returns one row containing two arrays: brokers and topics. Useful for discovering what a cluster exposes, sizing partition-parallel scans, or scripting topic-list audits.

Examples
1

List every topic on a cluster

SELECT unnest(topics).name AS topic_name
FROM tributary_metadata("bootstrap.servers" := 'kafka:9092');
2

Count partitions per topic

SELECT
  t.name                            AS topic,
  array_length(t.partitions)         AS partitions
FROM (
  SELECT unnest(topics) AS t
  FROM tributary_metadata("bootstrap.servers" := 'kafka:9092')
)
ORDER BY partitions DESC;

tributary_scan_topic

Table Function Reading
Signature
tributary_scan_topic(topic_name: VARCHAR, bootstrap.servers := VARCHAR, group.id := VARCHAR, security.protocol := VARCHAR, sasl.mechanism := VARCHAR, sasl.username := VARCHAR, sasl.password := VARCHAR, ... := VARCHAR) → TABLE
Parameters
Parameter Type Mode Description
topic_name VARCHAR Positional Kafka topic to read.
bootstrap.servers VARCHAR Named Comma-separated host:port list of brokers to bootstrap from. Required.
group.id VARCHAR Named Consumer group id. See Kafka consumer configs.
security.protocol VARCHAR Named PLAINTEXT, SSL, SASL_PLAINTEXT, or SASL_SSL. See Kafka security.
sasl.mechanism VARCHAR Named SASL mechanism — PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, etc. See Kafka SASL.
sasl.username VARCHAR Named SASL username (PLAIN / SCRAM).
sasl.password VARCHAR Named SASL password (PLAIN / SCRAM). Pass via getenv() and a SQL variable rather than inlining.
... VARCHAR Named Varargs Any other librdkafka configuration name — auto.offset.reset, enable.auto.commit, ssl.ca.location, etc. — passes through unchanged. Tributary doesn't validate keys; misspellings surface as librdkafka errors at scan time.
Returns

A table with the following columns:

Column Type Description
topic VARCHAR Topic the message was read from.
partition INTEGER Kafka partition id the message lives in.
offset BIGINT Per-partition offset of the message.
message BLOB Raw message payload as a BLOB. Cast or parse it with DuckDB's JSON / Avro / decode utilities depending on your topic's serialization.
Description

Read messages from a Kafka topic as DuckDB rows. Multi-partition topics are scanned in parallel by partition. Each row is (topic, partition, offset, message); the message is a BLOB you decode in SQL based on your serialization (JSON, Avro, Protobuf, etc.).

For a long-running topic this scans the entire current contents — pair with a WHERE clause on timestamp / partition / offset for finite slices, or with tributary_metadata first to plan the scan.

Examples
1

Snapshot a topic into a local table

CREATE TABLE orders_snapshot AS
SELECT *
FROM tributary_scan_topic('orders',
                          "bootstrap.servers" := 'kafka:9092');
2

Decode JSON messages on the way in

SELECT
  partition,
  "offset",
  json_extract_string(message::VARCHAR, '$.user_id') AS user_id,
  json_extract_string(message::VARCHAR, '$.event')   AS event
FROM tributary_scan_topic('events',
                          "bootstrap.servers" := 'kafka:9092');
3

SASL-authenticated consume against a managed Kafka

SET VARIABLE k_user = getenv('KAFKA_USER');
SET VARIABLE k_pass = getenv('KAFKA_PASSWORD');

SELECT *
FROM tributary_scan_topic('orders',
  "bootstrap.servers"  := 'broker.example.com:9093',
  "security.protocol"  := 'SASL_SSL',
  "sasl.mechanism"     := 'SCRAM-SHA-512',
  "sasl.username"      := :k_user,
  "sasl.password"      := :k_pass);

tributary_version

Scalar Function Inspection
Signature
tributary_version() → VARCHAR
Parameters
Parameter Type Mode Description
Returns
Description

Returns the loaded Tributary extension version as a date-stamped build string (e.g. '20250612.01'). Use to confirm what's loaded when matching against bug reports or release notes.

Examples
1

Print the loaded version

SELECT tributary_version();

Practical Examples

Cookbook

Real-world recipes and patterns for common use cases.

Recipes for Apache Kafka read paths, JSON decoding, joins, and discovery.

Read a Kafka topic

The simplest pattern — point a query at a topic and treat it as a table:

SELECT *
FROM tributary_scan_topic('orders',
                          "bootstrap.servers" := 'kafka:9092');

Each row corresponds to one Kafka message, with metadata columns (topic, partition, offset) plus the raw payload as a BLOB. See tributary_scan_topic for the full signature and parameter table.

Decode JSON messages on the way in

Most topics carry JSON. Cast the message column to VARCHAR and use DuckDB’s JSON functions:

SELECT
  partition,
  "offset",
  json_extract_string(message::VARCHAR, '$.user_id') AS user_id,
  json_extract_string(message::VARCHAR, '$.event')   AS event
FROM tributary_scan_topic('events',
                          "bootstrap.servers" := 'kafka:9092');

offset is a reserved keyword in DuckDB (used by LIMIT … OFFSET …), so quote it as "offset" whenever you reference the column directly.

For Avro / Protobuf payloads, pair with DuckDB’s avro / protobuf community extensions and decode the BLOB column the same way.

Filter while consuming

DuckDB pushes predicates down into the partition-parallel scan — partitions you don’t need aren’t consumed at all:

SELECT *
FROM tributary_scan_topic('orders',
                          "bootstrap.servers" := 'kafka:9092')
WHERE partition = 0
  AND "offset" >= 1000000;

For time-bounded scans, decode the timestamp from the message body and filter there, or use auto.offset.reset := 'latest' so a fresh consumer group starts at the tail.

Snapshot a topic into a local table

CREATE TABLE orders_snapshot AS
SELECT *
FROM tributary_scan_topic('orders',
                          "bootstrap.servers" := 'kafka:9092');

Useful for ad-hoc analytics over a finite slice of the stream — once it’s a DuckDB table, the rest of your query plan runs at full vectorized speed.

Join Kafka with local data

SELECT k.partition, k."offset", u.name, msg.event
FROM tributary_scan_topic('events',
                          "bootstrap.servers" := 'kafka:9092') k
CROSS JOIN LATERAL (
  SELECT
    json_extract_string(k.message::VARCHAR, '$.user_id') AS user_id,
    json_extract_string(k.message::VARCHAR, '$.event')   AS event
) msg
JOIN users u ON u.user_id = msg.user_id::BIGINT;

The classic enrichment pattern — Kafka stream on one side, reference data on the other, joined in a single SQL statement.

Authenticate against managed Kafka

Source credentials from the environment so they don’t appear in cached query plans:

SET VARIABLE k_user = getenv('KAFKA_USER');
SET VARIABLE k_pass = getenv('KAFKA_PASSWORD');

SELECT *
FROM tributary_scan_topic('orders',
  "bootstrap.servers"  := 'broker.example.com:9093',
  "security.protocol"  := 'SASL_SSL',
  "sasl.mechanism"     := 'SCRAM-SHA-512',
  "sasl.username"      := :k_user,
  "sasl.password"      := :k_pass);

Same pattern for SSL-only clusters: pass security.protocol := 'SSL' plus the relevant ssl.ca.location / ssl.certificate.location parameters. See Kafka SASL and SSL for the upstream config matrix.

Discover what’s on a cluster

-- List every topic the cluster exposes
SELECT unnest(topics).name AS topic_name
FROM tributary_metadata("bootstrap.servers" := 'kafka:9092');

-- Count partitions per topic
SELECT
  t.name                      AS topic,
  array_length(t.partitions)  AS partitions
FROM (
  SELECT unnest(topics) AS t
  FROM tributary_metadata("bootstrap.servers" := 'kafka:9092')
)
ORDER BY partitions DESC;

See tributary_metadata for the full output shape.

Diagnostics

SELECT tributary_version();   -- e.g. '20250612.01'

Use tributary_version when you need to confirm the loaded build against bug reports or release notes.

Platform Support

Compatibility

Extension availability may vary by platform and DuckDB version. Check below to ensure this extension supports your environment before installation.

Quick Facts

Software License MIT
Pricing Free
Written In C++
Source Available Yes
View on GitHub
Usage
20,424+
loads in last 30 days

Platforms

Linux
Linux (musl)
macOS
Windows
WASM
Supported platform architectures
Linux: x86_64, aarch64
Linux (musl): Not available
macOS: Apple Silicon, Intel
Windows: x86_64
WASM: Browser-based

DuckDB Versions

Release calendar
Supported
v1.1.0 v1.1.1 v1.1.2 v1.1.3

Kafka topics as DuckDB tables

Install Tributary to point SELECT queries directly at Kafka topics — snapshot, join, and analyze without writing a consumer.