Tributary DuckDB Extension

The Tributary extension by Query.Farm provides seamless integration between DuckDB and Apache Kafka, enabling real-time querying and analysis of streaming data. With this extension, users can consume messages directly from Kafka topics into DuckDB for immediate processing, as well as write processed data back to Kafka streams.
Features
- Direct Kafka Ingestion: Stream records from Kafka topics directly into DuckDB tables using SQL.
- SQL-Native Interface: Kafka integration is fully accessible via SQL, enabling easy adoption for data engineers and analysts.
Roadmap
- Flexible Topic Consumption: Supports consuming from specific partitions, offsets, or continuously from the latest messages.
- Real-Time Analytics: Perform analytical queries on streaming data as it arrives.
- Kafka Output: Optionally write results or processed data back to Kafka topics.
Some of this functionality is planned to be implemented. If youโre interested in additional functionality, please contact Query.Farm.
The extension is named Tributary because it is a bridge between the stream of data and the data lake.
DuckDB extensions are modular plugins that enhance the core engine with powerful new capabilities like this.
Getting Started
Tributary is a DuckDB community extension maintained and supported by Query.Farm.
Install Tributary in DuckDB by running:
INSTALL tributary FROM community;Then load it with:
LOAD tributary;Functionality
Love โค๏ธ this DuckDB extension? Youโll Love This.
Get the best from Query.Farm โ smart tips, powerful tools, and project updates sent directly to your inbox, but only when weโve got something great to share.
When Should I Use This?
Use the Tributary extension when you want to interact with Apache Kafka.
Hereโs an example of reading the entire contents of a Kafka topic:
SELECT *
FROM tributary_scan_topic('test-topic',
"bootstrap.servers" := 'localhost:9092'
);
API
The extension adds these functions:
- Metadata
- Topics
- Version
tributary_metadata
This function returns the metadata of the Kafka server as a table.
Signature:
tributary_metadata([KAFKA CONNECTION OPTIONS...])Named Parameters
The named parameters of this function configure the parameters used by librdkafka. All parameters are passed the VARCHAR type. The allowed parameter names are shown as part of tributary_scan_topic.
Return Value
Returns a table with this schema:
| Column Name | Type | Description |
|---|---|---|
brokers |
STRUCT(id INTEGER, host VARCHAR, port INTEGER)[] |
A list of brokers for this Kafka cluster. |
topics |
STRUCT(name VARCHAR, error VARCHAR, partitions STRUCT(id INTEGER, LEADER integer)[])[] |
A list of topics that exist in this Kafka cluster. |
Example
SELECT * FROM tributary_metadata(
"bootstrap.servers" := 'localhost:9092'
);
brokers = [{'id': 1, 'host': localhost, 'port': 9092}]
topics = [{'name': test-topic2, 'error': NULL, 'partitions': [{'id': 0, 'leader': 1}]}, {'name': test-topic, 'error': NULL, 'partitions': [{'id': 0, 'leader': 1}]}]tributary_scan_topic
This function returns the entire contents of the topic. It uses multiple threads if there are multiple partitions.
Signature:
tributary_scan_topic(VARCHAR,
[KAFKA OPTIONS...]
)Positional Arguments
| Name | Type | Description |
|---|---|---|
topic_name |
VARCHAR |
The name of the topic to scan. |
Named Parameters
The named parameters of this function configure the parameters used by librdkafka. All parameters are passed the VARCHAR type. The allowed parameter names are:
allow.auto.create.topicsapi.version.fallback.msapi.version.requestapi.version.request.timeout.msauto.commit.interval.msbatch.num.messagesbatch.sizebootstrap.serversbroker.address.familybroker.address.ttlbroker.version.fallbackbuiltin.featurescheck.crcsclient.dns.lookupclient.idclient.rackclient.software.namecompression.codecconnections.max.idle.mscoordinator.query.interval.msdebugdelivery.report.only.errorenable.auto.commitenable.auto.offset.storeenable.gapless.guaranteeenable.idempotenceenable.metrics.pushenable.partition.eofenable.random.seedenable.sasl.oauthbearer.unsecure.jwtenable.sparse.connectionsenable.ssl.certificate.verificationenable_sasl_queueenabled_eventsfetch.error.backoff.msfetch.max.bytesfetch.message.max.bytesfetch.min.bytesfetch.queue.backoff.msfetch.wait.max.msgroup.idgroup.protocolgroup.protocol.typeheartbeat.interval.msinternal.termination.signalisolation.levellog.connection.closelog.queuelog.thread.namelog_cblog_levelmax.in.flight.requests.per.connectionmax.poll.interval.msmessage.copy.max.bytesmessage.max.bytesmessage.send.max.retriesmetadata.max.age.msmetadata.recovery.strategymetadata.request.timeout.msoffset.store.methodopen_cbpartition.assignment.strategyqueue.buffering.backpressure.thresholdqueue.buffering.max.kbytesqueue.buffering.max.messagesqueue.buffering.max.msqueued.max.messages.kbytesqueued.min.messagesreceive.message.max.bytesreconnect.backoff.jitter.msreconnect.backoff.max.msreconnect.backoff.msretry.backoff.max.msretry.backoff.mssasl.kerberos.kinit.cmdsasl.kerberos.min.time.before.reloginsasl.kerberos.principalsasl.kerberos.service.namesasl.mechanismsasl.mechanismssasl.oauthbearer.client.idsasl.oauthbearer.client.secretsasl.oauthbearer.methodsasl.oauthbearer.token.endpoint.urlsasl.passwordsasl.usernameschema.registry.basic.auth.user.infoschema.registry.urlsecurity.protocolsession.timeout.mssocket.blocking.max.mssocket.connection.setup.timeout.mssocket.keepalive.enablesocket.max.failssocket.nagle.disablesocket.receive.buffer.bytessocket.send.buffer.bytessocket.timeout.mssocket_cbssl.ca.certificate.storesssl.ca.locationssl.certificate.locationssl.endpoint.identification.algorithmssl.engine.idssl.key.locationssl.key.passwordstatistics.interval.mssticky.partitioning.linger.mstest.mock.broker.rtttest.mock.num.brokerstopic.metadata.propagation.max.mstopic.metadata.refresh.fast.cnttopic.metadata.refresh.fast.interval.mstopic.metadata.refresh.interval.mstopic.metadata.refresh.sparsetransaction.timeout.mstransactional.id
Return Value
Returns a table with this schema:
| Column Name | Type | Description |
|---|---|---|
topic |
VARCHAR |
The name of the topic being scanned |
partition |
INTEGER |
The parition number where the message exists. |
offset |
BIGINT |
The offset inside of the partition where the message exists. |
message |
BLOB |
The message read from the topic. |
Example
SELECT * FROM tributary_scan_topic('test-topic',
"bootstrap.servers" := 'localhost:9092'
);
โโโโโโโโโโโโโโฌโโโโโโโโโโโโฌโโโโโโโโโฌโโโโโโโโโโ
โ topic โ partition โ offset โ message โ
โ varchar โ int32 โ int64 โ blob โ
โโโโโโโโโโโโโโผโโโโโโโโโโโโผโโโโโโโโโผโโโโโโโโโโค
โ test-topic โ 0 โ 0 โ test1 โ
โ test-topic โ 0 โ 1 โ test2 โ
โ test-topic โ 0 โ 2 โ test3 โ
โโโโโโโโโโโโโโดโโโโโโโโโโโโดโโโโโโโโโดโโโโโโโโโโtributary_version
Return the current version of the tributary extension.
Signature: tributary_version()
Example
SELECT tributary_version();
โโโโโโโโโโโโโโโโโโโโโโโ
โ tributary_version() โ
โ varchar โ
โโโโโโโโโโโโโโโโโโโโโโโค
โ 20250612.01 โ
โโโโโโโโโโโโโโโโโโโโโโโLonger Examples
NASA / IceCube Gravitational Wave Alerts
NASA operates the General Coordinates Network (GCN), which rapidly distributes alerts among space-based and ground-based observatories, physics experiments, and thousands of astronomers worldwide.
In this example, Iโll demonstrate how to retrieve the latest IceCube alerts related to gravitational waves detected by LIGO, Virgo, and KAGRA (LVK).
These alerts are published to a Kafka topic. Since the Kafka messages are binary blobs, Iโll walk through how to decode them into JSON. Thereโs also some minimal Kafka configuration required to read the topic, but GCN provides helpful instructions if you choose to sign up for access.
CREATE VIEW icecube_json_alerts AS
SELECT * EXCLUDE message, decode(message)::json as message FROM
tributary_scan_topic('gcn.notices.icecube.lvk_nu_track_search',
"bootstrap.servers" := "kafka.gcn.nasa.gov",
"group.id" := "test123",
"sasl.mechanisms" := "OAUTHBEARER",
"sasl.oauthbearer.client.id" := "YOUR_CLIENT_ID",
"sasl.oauthbearer.client.secret" := "YOUR_SECRET",
"sasl.oauthbearer.method" := "oidc",
"sasl.oauthbearer.token.endpoint.url" := "https://auth.gcn.nasa.gov/oauth2/token",
"security.protocol" := "sasl_ssl"
);
-- Parse out the JSON messages into their proper types.
CREATE VIEW icecube_alerts AS
SELECT
message ->> '$.ref_ID' as ref_id,
(message ->> '$.alert_datetime')::timestamp as alert_datetime,
(message ->> '$.pval_bayesian')::double as pval_bayesian,
(message ->> '$.n_events_coincident')::integer as n_events_coincident,
(message ->> '$.neutrino_flux_sensitivity_range.flux_sensitivity')::double[] as flux_sensitivity,
(message ->> '$.neutrino_flux_sensitivity_range.sensitive_energy_range')::integer[] as sensitive_energy_range
from icecube_json_alerts;
-- Now show the latest alerts.
SELECT alert_datetime, ref_id, pval_bayesian, flux_sensitivity, sensitive_energy_range
FROM icecube_alerts ORDER BY alert_datetime;
โโโโโโโโโโโโโโโโโโโโโโโโโโโฌโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโโ
โ alert_datetime โ ref_id โ pval_bayesian โ flux_sensitivity โ sensitive_energy_range โ
โ timestamp โ varchar โ double โ double[] โ int32[] โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโผโโโโโโโโโโโโผโโโโโโโโโโโโโโโโผโโโโโโโโโโโโโโโโโโโผโโโโโโโโโโโโโโโโโโโโโโโโโค
โ 2025-06-11 17:20:31.765 โ S250611b โ 0.9898 โ [0.0277, 1.1333] โ [520, 24600000] โ
โ 2025-06-11 17:20:44.449 โ S250611b โ 0.9898 โ [0.0277, 1.1333] โ [520, 24600000] โ
โ 2025-06-11 17:52:40.219 โ S250611d โ 0.872 โ [0.0277, 0.9687] โ [508, 22900000] โ
โ 2025-06-11 18:04:54.18 โ S250611e โ 0.8576 โ [0.0372, 0.4914] โ [728, 22200000] โ
โ 2025-06-11 18:05:07.742 โ S250611f โ 0.9986 โ [0.0376, 0.35] โ [755, 20900000] โ
โ 2025-06-11 18:05:20.1 โ S250611g โ 0.9991 โ [0.0384, 0.3367] โ [749, 20900000] โ
โ 2025-06-11 18:05:32.466 โ S250611h โ 0.6717 โ [0.0359, 0.4477] โ [613, 21900000] โ
โ 2025-06-11 18:05:44.753 โ S250611d โ 0.8954 โ [0.0277, 0.9687] โ [508, 22900000] โ
โ 2025-06-11 18:05:58.782 โ S250611e โ 0.8576 โ [0.0372, 0.4914] โ [728, 22200000] โ
โ 2025-06-11 18:41:47.742 โ S250611g โ 0.9991 โ [0.0384, 0.3367] โ [749, 20900000] โ
โ 2025-06-11 18:42:00.333 โ S250611h โ 0.6717 โ [0.0359, 0.4477] โ [613, 21900000] โ
โ 2025-06-11 22:50:48.777 โ S250611u โ 0.7604 โ [0.0277, 0.4666] โ [511, 22000000] โ
โ 2025-06-11 22:51:01.427 โ S250611u โ 0.7604 โ [0.0277, 0.4666] โ [511, 22000000] โ
โ 2025-06-12 01:48:27.681 โ S250612k โ 0.057 โ [0.0277, 1.1333] โ [626, 24100000] โ
โ 2025-06-12 01:48:40.172 โ S250612k โ 0.057 โ [0.0277, 1.1333] โ [626, 24100000] โ
โ 2025-06-12 06:30:23.042 โ S250612af โ 0.7603 โ [0.0277, 1.0516] โ [553, 23100000] โ
โ 2025-06-12 06:30:35.553 โ S250612af โ 0.751 โ [0.0277, 1.0516] โ [559, 23100000] โ
โ 2025-06-12 09:50:20.802 โ S250612aq โ 0.3257 โ [0.0277, 0.6479] โ [716, 22900000] โ
โ 2025-06-12 09:50:34.371 โ S250612aq โ 0.3257 โ [0.0277, 0.6479] โ [716, 22900000] โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโดโโโโโโโโโโโโดโโโโโโโโโโโโโโโโดโโโโโโโโโโโโโโโโโโโดโโโโโโโโโโโโโโโโโโโโโโโโโค
โ 19 rows 5 columns โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ