Tributary DuckDB Extension
The Tributary extension by Query.Farm provides seamless integration between DuckDB and Apache Kafka, enabling real-time querying and analysis of streaming data. With this extension, users can consume messages directly from Kafka topics into DuckDB for immediate processing, as well as write processed data back to Kafka streams.
Features
- Direct Kafka Ingestion: Stream records from Kafka topics directly into DuckDB tables using SQL.
- SQL-Native Interface: Kafka integration is fully accessible via SQL, enabling easy adoption for data engineers and analysts.
Roadmap
- Flexible Topic Consumption: Supports consuming from specific partitions, offsets, or continuously from the latest messages.
- Real-Time Analytics: Perform analytical queries on streaming data as it arrives.
- Kafka Output: Optionally write results or processed data back to Kafka topics.
Some of this functionality is planned to be implemented. If youβre interested in additional functionality, please contact Query.Farm.
The extension is named Tributary because it is a bridge between the stream of data and the data lake.
DuckDB extensions are modular plugins that enhance the core engine with powerful new capabilities like this.
Getting Started
Tributary is a DuckDB community extension maintained and supported by Query.Farm.
Install Tributary in DuckDB by running:
FROM community; INSTALL tributary
Then load it with:
LOAD tributary;
Functionality
Love β€οΈ this DuckDB extension? Youβll Love This.
Get the best from Query.Farm β smart tips, powerful tools, and project updates sent directly to your inbox, but only when weβve got something great to share.
When Should I Use This?
Use the Tributary extension when you want to interact with Apache Kafka.
Hereβs an example of reading the entire contents of a Kafka topic:
SELECT *
FROM tributary_scan_topic('test-topic',
"bootstrap.servers" := 'localhost:9092'
);
API
The extension adds these functions:
- Metadata
- Topics
- Version
tributary_metadata
This function returns the metadata of the Kafka server as a table.
Signature:
...]) tributary_metadata([KAFKA CONNECTION OPTIONS
Named Parameters
The named parameters of this function configure the parameters used by librdkafka
. All parameters are passed the VARCHAR
type. The allowed parameter names are shown as part of tributary_scan_topic
.
Return Value
Returns a table with this schema:
Column Name | Type | Description |
---|---|---|
brokers |
STRUCT(id INTEGER, host VARCHAR, port INTEGER)[] |
A list of brokers for this Kafka cluster. |
topics |
STRUCT(name VARCHAR, error VARCHAR, partitions STRUCT(id INTEGER, LEADER integer)[])[] |
A list of topics that exist in this Kafka cluster. |
Example
SELECT * FROM tributary_metadata(
"bootstrap.servers" := 'localhost:9092'
);= [{'id': 1, 'host': localhost, 'port': 9092}]
brokers = [{'name': test-topic2, 'error': NULL, 'partitions': [{'id': 0, 'leader': 1}]}, {'name': test-topic, 'error': NULL, 'partitions': [{'id': 0, 'leader': 1}]}] topics
tributary_scan_topic
This function returns the entire contents of the topic. It uses multiple threads if there are multiple partitions.
Signature:
VARCHAR,
tributary_scan_topic(...]
[KAFKA OPTIONS )
Positional Arguments
Name | Type | Description |
---|---|---|
topic_name |
VARCHAR |
The name of the topic to scan. |
Named Parameters
The named parameters of this function configure the parameters used by librdkafka
. All parameters are passed the VARCHAR
type. The allowed parameter names are:
allow.auto.create.topics
api.version.fallback.ms
api.version.request
api.version.request.timeout.ms
auto.commit.interval.ms
batch.num.messages
batch.size
bootstrap.servers
broker.address.family
broker.address.ttl
broker.version.fallback
builtin.features
check.crcs
client.dns.lookup
client.id
client.rack
client.software.name
compression.codec
connections.max.idle.ms
coordinator.query.interval.ms
debug
delivery.report.only.error
enable.auto.commit
enable.auto.offset.store
enable.gapless.guarantee
enable.idempotence
enable.metrics.push
enable.partition.eof
enable.random.seed
enable.sasl.oauthbearer.unsecure.jwt
enable.sparse.connections
enable.ssl.certificate.verification
enable_sasl_queue
enabled_events
fetch.error.backoff.ms
fetch.max.bytes
fetch.message.max.bytes
fetch.min.bytes
fetch.queue.backoff.ms
fetch.wait.max.ms
group.id
group.protocol
group.protocol.type
heartbeat.interval.ms
internal.termination.signal
isolation.level
log.connection.close
log.queue
log.thread.name
log_cb
log_level
max.in.flight.requests.per.connection
max.poll.interval.ms
message.copy.max.bytes
message.max.bytes
message.send.max.retries
metadata.max.age.ms
metadata.recovery.strategy
metadata.request.timeout.ms
offset.store.method
open_cb
partition.assignment.strategy
queue.buffering.backpressure.threshold
queue.buffering.max.kbytes
queue.buffering.max.messages
queue.buffering.max.ms
queued.max.messages.kbytes
queued.min.messages
receive.message.max.bytes
reconnect.backoff.jitter.ms
reconnect.backoff.max.ms
reconnect.backoff.ms
retry.backoff.max.ms
retry.backoff.ms
sasl.kerberos.kinit.cmd
sasl.kerberos.min.time.before.relogin
sasl.kerberos.principal
sasl.kerberos.service.name
sasl.mechanism
sasl.mechanisms
sasl.oauthbearer.client.id
sasl.oauthbearer.client.secret
sasl.oauthbearer.method
sasl.oauthbearer.token.endpoint.url
sasl.password
sasl.username
schema.registry.basic.auth.user.info
schema.registry.url
security.protocol
session.timeout.ms
socket.blocking.max.ms
socket.connection.setup.timeout.ms
socket.keepalive.enable
socket.max.fails
socket.nagle.disable
socket.receive.buffer.bytes
socket.send.buffer.bytes
socket.timeout.ms
socket_cb
ssl.ca.certificate.stores
ssl.ca.location
ssl.certificate.location
ssl.endpoint.identification.algorithm
ssl.engine.id
ssl.key.location
ssl.key.password
statistics.interval.ms
sticky.partitioning.linger.ms
test.mock.broker.rtt
test.mock.num.brokers
topic.metadata.propagation.max.ms
topic.metadata.refresh.fast.cnt
topic.metadata.refresh.fast.interval.ms
topic.metadata.refresh.interval.ms
topic.metadata.refresh.sparse
transaction.timeout.ms
transactional.id
Return Value
Returns a table with this schema:
Column Name | Type | Description |
---|---|---|
topic |
VARCHAR |
The name of the topic being scanned |
partition |
INTEGER |
The parition number where the message exists. |
offset |
BIGINT |
The offset inside of the partition where the message exists. |
message |
BLOB |
The message read from the topic. |
Example
SELECT * FROM tributary_scan_topic('test-topic',
"bootstrap.servers" := 'localhost:9092'
);
ββββββββββββββ¬ββββββββββββ¬βββββββββ¬ββββββββββpartition β offset β message β
β topic β varchar β int32 β int64 β blob β
β
ββββββββββββββΌββββββββββββΌβββββββββΌββββββββββ€-topic β 0 β 0 β test1 β
β test-topic β 0 β 1 β test2 β
β test-topic β 0 β 2 β test3 β
β test ββββββββββββββ΄ββββββββββββ΄βββββββββ΄ββββββββββ
tributary_version
Return the current version of the tributary
extension.
Signature: tributary_version()
Example
SELECT tributary_version();
βββββββββββββββββββββββ
β tributary_version() βvarchar β
β
βββββββββββββββββββββββ€20250612.01 β
β βββββββββββββββββββββββ
Longer Examples
NASA / IceCube Gravitational Wave Alerts
NASA operates the General Coordinates Network (GCN), which rapidly distributes alerts among space-based and ground-based observatories, physics experiments, and thousands of astronomers worldwide.
In this example, Iβll demonstrate how to retrieve the latest IceCube alerts related to gravitational waves detected by LIGO, Virgo, and KAGRA (LVK).
These alerts are published to a Kafka topic. Since the Kafka messages are binary blobs, Iβll walk through how to decode them into JSON. Thereβs also some minimal Kafka configuration required to read the topic, but GCN provides helpful instructions if you choose to sign up for access.
CREATE VIEW icecube_json_alerts AS
SELECT * EXCLUDE message, decode(message)::json as message FROM
'gcn.notices.icecube.lvk_nu_track_search',
tributary_scan_topic("bootstrap.servers" := "kafka.gcn.nasa.gov",
"group.id" := "test123",
"sasl.mechanisms" := "OAUTHBEARER",
"sasl.oauthbearer.client.id" := "YOUR_CLIENT_ID",
"sasl.oauthbearer.client.secret" := "YOUR_SECRET",
"sasl.oauthbearer.method" := "oidc",
"sasl.oauthbearer.token.endpoint.url" := "https://auth.gcn.nasa.gov/oauth2/token",
"security.protocol" := "sasl_ssl"
);
-- Parse out the JSON messages into their proper types.
CREATE VIEW icecube_alerts AS
SELECT
->> '$.ref_ID' as ref_id,
message ->> '$.alert_datetime')::timestamp as alert_datetime,
(message ->> '$.pval_bayesian')::double as pval_bayesian,
(message ->> '$.n_events_coincident')::integer as n_events_coincident,
(message ->> '$.neutrino_flux_sensitivity_range.flux_sensitivity')::double[] as flux_sensitivity,
(message ->> '$.neutrino_flux_sensitivity_range.sensitive_energy_range')::integer[] as sensitive_energy_range
(message from icecube_json_alerts;
-- Now show the latest alerts.
SELECT alert_datetime, ref_id, pval_bayesian, flux_sensitivity, sensitive_energy_range
FROM icecube_alerts ORDER BY alert_datetime;
βββββββββββββββββββββββββββ¬ββββββββββββ¬ββββββββββββββββ¬βββββββββββββββββββ¬βββββββββββββββββββββββββ
β alert_datetime β ref_id β pval_bayesian β flux_sensitivity β sensitive_energy_range βtimestamp β varchar β double β double[] β int32[] β
β
βββββββββββββββββββββββββββΌββββββββββββΌββββββββββββββββΌβββββββββββββββββββΌβββββββββββββββββββββββββ€2025-06-11 17:20:31.765 β S250611b β 0.9898 β [0.0277, 1.1333] β [520, 24600000] β
β 2025-06-11 17:20:44.449 β S250611b β 0.9898 β [0.0277, 1.1333] β [520, 24600000] β
β 2025-06-11 17:52:40.219 β S250611d β 0.872 β [0.0277, 0.9687] β [508, 22900000] β
β 2025-06-11 18:04:54.18 β S250611e β 0.8576 β [0.0372, 0.4914] β [728, 22200000] β
β 2025-06-11 18:05:07.742 β S250611f β 0.9986 β [0.0376, 0.35] β [755, 20900000] β
β 2025-06-11 18:05:20.1 β S250611g β 0.9991 β [0.0384, 0.3367] β [749, 20900000] β
β 2025-06-11 18:05:32.466 β S250611h β 0.6717 β [0.0359, 0.4477] β [613, 21900000] β
β 2025-06-11 18:05:44.753 β S250611d β 0.8954 β [0.0277, 0.9687] β [508, 22900000] β
β 2025-06-11 18:05:58.782 β S250611e β 0.8576 β [0.0372, 0.4914] β [728, 22200000] β
β 2025-06-11 18:41:47.742 β S250611g β 0.9991 β [0.0384, 0.3367] β [749, 20900000] β
β 2025-06-11 18:42:00.333 β S250611h β 0.6717 β [0.0359, 0.4477] β [613, 21900000] β
β 2025-06-11 22:50:48.777 β S250611u β 0.7604 β [0.0277, 0.4666] β [511, 22000000] β
β 2025-06-11 22:51:01.427 β S250611u β 0.7604 β [0.0277, 0.4666] β [511, 22000000] β
β 2025-06-12 01:48:27.681 β S250612k β 0.057 β [0.0277, 1.1333] β [626, 24100000] β
β 2025-06-12 01:48:40.172 β S250612k β 0.057 β [0.0277, 1.1333] β [626, 24100000] β
β 2025-06-12 06:30:23.042 β S250612af β 0.7603 β [0.0277, 1.0516] β [553, 23100000] β
β 2025-06-12 06:30:35.553 β S250612af β 0.751 β [0.0277, 1.0516] β [559, 23100000] β
β 2025-06-12 09:50:20.802 β S250612aq β 0.3257 β [0.0277, 0.6479] β [716, 22900000] β
β 2025-06-12 09:50:34.371 β S250612aq β 0.3257 β [0.0277, 0.6479] β [716, 22900000] β
β
βββββββββββββββββββββββββββ΄ββββββββββββ΄ββββββββββββββββ΄βββββββββββββββββββ΄βββββββββββββββββββββββββ€19 rows 5 columns β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ