Tributary DuckDB Extension

A duck with a radio

The Tributary extension by Query.Farm provides seamless integration between DuckDB and Apache Kafka, enabling real-time querying and analysis of streaming data. With this extension, users can consume messages directly from Kafka topics into DuckDB for immediate processing, as well as write processed data back to Kafka streams.

Features

  • Direct Kafka Ingestion: Stream records from Kafka topics directly into DuckDB tables using SQL.
  • SQL-Native Interface: Kafka integration is fully accessible via SQL, enabling easy adoption for data engineers and analysts.

Roadmap

  • Flexible Topic Consumption: Supports consuming from specific partitions, offsets, or continuously from the latest messages.
  • Real-Time Analytics: Perform analytical queries on streaming data as it arrives.
  • Kafka Output: Optionally write results or processed data back to Kafka topics.
Tip

Some of this functionality is planned to be implemented. If you’re interested in additional functionality, please contact Query.Farm.

The extension is named Tributary because it is a bridge between the stream of data and the data lake.

Note

DuckDB extensions are modular plugins that enhance the core engine with powerful new capabilities like this.

Getting Started

Tributary is a DuckDB community extension maintained and supported by Query.Farm.

Install Tributary in DuckDB by running:

INSTALL tributary FROM community;

Then load it with:

LOAD tributary;

Functionality

Love ❀️ this DuckDB extension? You’ll Love This.

Get the best from Query.Farm β€” smart tips, powerful tools, and project updates sent directly to your inbox, but only when we’ve got something great to share.

When Should I Use This?

Use the Tributary extension when you want to interact with Apache Kafka.

Here’s an example of reading the entire contents of a Kafka topic:

SELECT *
FROM tributary_scan_topic('test-topic',
  "bootstrap.servers" := 'localhost:9092'
);

API

The extension adds these functions:


tributary_metadata

This function returns the metadata of the Kafka server as a table.

Signature:

tributary_metadata([KAFKA CONNECTION OPTIONS...])

Named Parameters

The named parameters of this function configure the parameters used by librdkafka. All parameters are passed the VARCHAR type. The allowed parameter names are shown as part of tributary_scan_topic.

Return Value

Returns a table with this schema:

Column Name Type Description
brokers STRUCT(id INTEGER, host VARCHAR, port INTEGER)[] A list of brokers for this Kafka cluster.
topics STRUCT(name VARCHAR, error VARCHAR, partitions STRUCT(id INTEGER, LEADER integer)[])[] A list of topics that exist in this Kafka cluster.

Example

SELECT * FROM tributary_metadata(
  "bootstrap.servers" := 'localhost:9092'
);
brokers = [{'id': 1, 'host': localhost, 'port': 9092}]
 topics = [{'name': test-topic2, 'error': NULL, 'partitions': [{'id': 0, 'leader': 1}]}, {'name': test-topic, 'error': NULL, 'partitions': [{'id': 0, 'leader': 1}]}]

tributary_scan_topic

This function returns the entire contents of the topic. It uses multiple threads if there are multiple partitions.

Signature:

tributary_scan_topic(VARCHAR,
  [KAFKA OPTIONS...]
)

Positional Arguments

Name Type Description
topic_name VARCHAR The name of the topic to scan.

Named Parameters

The named parameters of this function configure the parameters used by librdkafka. All parameters are passed the VARCHAR type. The allowed parameter names are:

  • allow.auto.create.topics
  • api.version.fallback.ms
  • api.version.request
  • api.version.request.timeout.ms
  • auto.commit.interval.ms
  • batch.num.messages
  • batch.size
  • bootstrap.servers
  • broker.address.family
  • broker.address.ttl
  • broker.version.fallback
  • builtin.features
  • check.crcs
  • client.dns.lookup
  • client.id
  • client.rack
  • client.software.name
  • compression.codec
  • connections.max.idle.ms
  • coordinator.query.interval.ms
  • debug
  • delivery.report.only.error
  • enable.auto.commit
  • enable.auto.offset.store
  • enable.gapless.guarantee
  • enable.idempotence
  • enable.metrics.push
  • enable.partition.eof
  • enable.random.seed
  • enable.sasl.oauthbearer.unsecure.jwt
  • enable.sparse.connections
  • enable.ssl.certificate.verification
  • enable_sasl_queue
  • enabled_events
  • fetch.error.backoff.ms
  • fetch.max.bytes
  • fetch.message.max.bytes
  • fetch.min.bytes
  • fetch.queue.backoff.ms
  • fetch.wait.max.ms
  • group.id
  • group.protocol
  • group.protocol.type
  • heartbeat.interval.ms
  • internal.termination.signal
  • isolation.level
  • log.connection.close
  • log.queue
  • log.thread.name
  • log_cb
  • log_level
  • max.in.flight.requests.per.connection
  • max.poll.interval.ms
  • message.copy.max.bytes
  • message.max.bytes
  • message.send.max.retries
  • metadata.max.age.ms
  • metadata.recovery.strategy
  • metadata.request.timeout.ms
  • offset.store.method
  • open_cb
  • partition.assignment.strategy
  • queue.buffering.backpressure.threshold
  • queue.buffering.max.kbytes
  • queue.buffering.max.messages
  • queue.buffering.max.ms
  • queued.max.messages.kbytes
  • queued.min.messages
  • receive.message.max.bytes
  • reconnect.backoff.jitter.ms
  • reconnect.backoff.max.ms
  • reconnect.backoff.ms
  • retry.backoff.max.ms
  • retry.backoff.ms
  • sasl.kerberos.kinit.cmd
  • sasl.kerberos.min.time.before.relogin
  • sasl.kerberos.principal
  • sasl.kerberos.service.name
  • sasl.mechanism
  • sasl.mechanisms
  • sasl.oauthbearer.client.id
  • sasl.oauthbearer.client.secret
  • sasl.oauthbearer.method
  • sasl.oauthbearer.token.endpoint.url
  • sasl.password
  • sasl.username
  • schema.registry.basic.auth.user.info
  • schema.registry.url
  • security.protocol
  • session.timeout.ms
  • socket.blocking.max.ms
  • socket.connection.setup.timeout.ms
  • socket.keepalive.enable
  • socket.max.fails
  • socket.nagle.disable
  • socket.receive.buffer.bytes
  • socket.send.buffer.bytes
  • socket.timeout.ms
  • socket_cb
  • ssl.ca.certificate.stores
  • ssl.ca.location
  • ssl.certificate.location
  • ssl.endpoint.identification.algorithm
  • ssl.engine.id
  • ssl.key.location
  • ssl.key.password
  • statistics.interval.ms
  • sticky.partitioning.linger.ms
  • test.mock.broker.rtt
  • test.mock.num.brokers
  • topic.metadata.propagation.max.ms
  • topic.metadata.refresh.fast.cnt
  • topic.metadata.refresh.fast.interval.ms
  • topic.metadata.refresh.interval.ms
  • topic.metadata.refresh.sparse
  • transaction.timeout.ms
  • transactional.id

Return Value

Returns a table with this schema:

Column Name Type Description
topic VARCHAR The name of the topic being scanned
partition INTEGER The parition number where the message exists.
offset BIGINT The offset inside of the partition where the message exists.
message BLOB The message read from the topic.

Example

SELECT * FROM tributary_scan_topic('test-topic',
  "bootstrap.servers" := 'localhost:9092'
);
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚   topic    β”‚ partition β”‚ offset β”‚ message β”‚
β”‚  varchar   β”‚   int32   β”‚ int64  β”‚  blob   β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚ test-topic β”‚         0 β”‚      0 β”‚ test1   β”‚
β”‚ test-topic β”‚         0 β”‚      1 β”‚ test2   β”‚
β”‚ test-topic β”‚         0 β”‚      2 β”‚ test3   β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

tributary_version

Return the current version of the tributary extension.

Signature: tributary_version()

Example

SELECT tributary_version();
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ tributary_version() β”‚
β”‚       varchar       β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚ 20250612.01         β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Longer Examples

NASA / IceCube Gravitational Wave Alerts

NASA operates the General Coordinates Network (GCN), which rapidly distributes alerts among space-based and ground-based observatories, physics experiments, and thousands of astronomers worldwide.

In this example, I’ll demonstrate how to retrieve the latest IceCube alerts related to gravitational waves detected by LIGO, Virgo, and KAGRA (LVK).

These alerts are published to a Kafka topic. Since the Kafka messages are binary blobs, I’ll walk through how to decode them into JSON. There’s also some minimal Kafka configuration required to read the topic, but GCN provides helpful instructions if you choose to sign up for access.

CREATE VIEW icecube_json_alerts AS
SELECT * EXCLUDE message, decode(message)::json as message FROM
tributary_scan_topic('gcn.notices.icecube.lvk_nu_track_search',
  "bootstrap.servers" := "kafka.gcn.nasa.gov",
  "group.id" := "test123",
  "sasl.mechanisms" := "OAUTHBEARER",
  "sasl.oauthbearer.client.id" := "YOUR_CLIENT_ID",
  "sasl.oauthbearer.client.secret" := "YOUR_SECRET",
  "sasl.oauthbearer.method" := "oidc",
  "sasl.oauthbearer.token.endpoint.url" := "https://auth.gcn.nasa.gov/oauth2/token",
  "security.protocol" := "sasl_ssl"
);

-- Parse out the JSON messages into their proper types.
CREATE VIEW icecube_alerts AS
SELECT
 message ->> '$.ref_ID' as ref_id,
(message ->> '$.alert_datetime')::timestamp as alert_datetime,
(message ->> '$.pval_bayesian')::double as pval_bayesian,
(message ->> '$.n_events_coincident')::integer as n_events_coincident,
(message ->> '$.neutrino_flux_sensitivity_range.flux_sensitivity')::double[] as flux_sensitivity,
(message ->> '$.neutrino_flux_sensitivity_range.sensitive_energy_range')::integer[] as sensitive_energy_range
  from icecube_json_alerts;

-- Now show the latest alerts.
SELECT alert_datetime, ref_id, pval_bayesian, flux_sensitivity, sensitive_energy_range
FROM icecube_alerts ORDER BY alert_datetime;
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚     alert_datetime      β”‚  ref_id   β”‚ pval_bayesian β”‚ flux_sensitivity β”‚ sensitive_energy_range β”‚
β”‚        timestamp        β”‚  varchar  β”‚    double     β”‚     double[]     β”‚        int32[]         β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚ 2025-06-11 17:20:31.765 β”‚ S250611b  β”‚        0.9898 β”‚ [0.0277, 1.1333] β”‚ [520, 24600000]        β”‚
β”‚ 2025-06-11 17:20:44.449 β”‚ S250611b  β”‚        0.9898 β”‚ [0.0277, 1.1333] β”‚ [520, 24600000]        β”‚
β”‚ 2025-06-11 17:52:40.219 β”‚ S250611d  β”‚         0.872 β”‚ [0.0277, 0.9687] β”‚ [508, 22900000]        β”‚
β”‚ 2025-06-11 18:04:54.18  β”‚ S250611e  β”‚        0.8576 β”‚ [0.0372, 0.4914] β”‚ [728, 22200000]        β”‚
β”‚ 2025-06-11 18:05:07.742 β”‚ S250611f  β”‚        0.9986 β”‚ [0.0376, 0.35]   β”‚ [755, 20900000]        β”‚
β”‚ 2025-06-11 18:05:20.1   β”‚ S250611g  β”‚        0.9991 β”‚ [0.0384, 0.3367] β”‚ [749, 20900000]        β”‚
β”‚ 2025-06-11 18:05:32.466 β”‚ S250611h  β”‚        0.6717 β”‚ [0.0359, 0.4477] β”‚ [613, 21900000]        β”‚
β”‚ 2025-06-11 18:05:44.753 β”‚ S250611d  β”‚        0.8954 β”‚ [0.0277, 0.9687] β”‚ [508, 22900000]        β”‚
β”‚ 2025-06-11 18:05:58.782 β”‚ S250611e  β”‚        0.8576 β”‚ [0.0372, 0.4914] β”‚ [728, 22200000]        β”‚
β”‚ 2025-06-11 18:41:47.742 β”‚ S250611g  β”‚        0.9991 β”‚ [0.0384, 0.3367] β”‚ [749, 20900000]        β”‚
β”‚ 2025-06-11 18:42:00.333 β”‚ S250611h  β”‚        0.6717 β”‚ [0.0359, 0.4477] β”‚ [613, 21900000]        β”‚
β”‚ 2025-06-11 22:50:48.777 β”‚ S250611u  β”‚        0.7604 β”‚ [0.0277, 0.4666] β”‚ [511, 22000000]        β”‚
β”‚ 2025-06-11 22:51:01.427 β”‚ S250611u  β”‚        0.7604 β”‚ [0.0277, 0.4666] β”‚ [511, 22000000]        β”‚
β”‚ 2025-06-12 01:48:27.681 β”‚ S250612k  β”‚         0.057 β”‚ [0.0277, 1.1333] β”‚ [626, 24100000]        β”‚
β”‚ 2025-06-12 01:48:40.172 β”‚ S250612k  β”‚         0.057 β”‚ [0.0277, 1.1333] β”‚ [626, 24100000]        β”‚
β”‚ 2025-06-12 06:30:23.042 β”‚ S250612af β”‚        0.7603 β”‚ [0.0277, 1.0516] β”‚ [553, 23100000]        β”‚
β”‚ 2025-06-12 06:30:35.553 β”‚ S250612af β”‚         0.751 β”‚ [0.0277, 1.0516] β”‚ [559, 23100000]        β”‚
β”‚ 2025-06-12 09:50:20.802 β”‚ S250612aq β”‚        0.3257 β”‚ [0.0277, 0.6479] β”‚ [716, 22900000]        β”‚
β”‚ 2025-06-12 09:50:34.371 β”‚ S250612aq β”‚        0.3257 β”‚ [0.0277, 0.6479] β”‚ [716, 22900000]        β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚ 19 rows                                                                               5 columns β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜