Radio
Two-way real-time messaging from DuckDB SQL. Subscribe to WebSocket and Redis Pub/Sub channels, query received events as ordinary tables, and queue outgoing messages with status tracking. Best fit: in-process pipelines, dashboards, and ad-hoc inspection — buffers are in-memory, scoped to the DuckDB process.
Install
-- Install the extension
INSTALL radio FROM community;
-- Load it into your session
LOAD radio;
-- Subscribe to a WebSocket channel — the URL is the subscription key
CALL radio_subscribe('wss://stream.example.com/events');
-- Drain buffered messages as a normal table
SELECT receive_time, channel, message
FROM radio_subscription_received_messages('wss://stream.example.com/events')
ORDER BY receive_time DESC
LIMIT 50;
-- Send a message — args are (url, channel, payload, max_attempts, retry_delay)
CALL radio_transmit_message(
'wss://stream.example.com/events',
'main',
'{"type":"hello"}'::BLOB,
3,
INTERVAL '500 milliseconds'
); Technical Overview
Why Use Radio?
Subscribe to WebSocket and Redis Pub/Sub channels from inside DuckDB SQL. Incoming messages buffer into queryable tables; outgoing messages queue with delivery status — all in one process, with no separate consumer service to operate.
📡 What this extension is for
Real-time messaging without leaving SQL. The path from "events arriving on a WebSocket" to "rows you can JOIN and WHERE against" collapses to a subscribe call plus a SELECT.
- • Listen and drain incrementally: Subscribe to a channel and incoming messages flow into an in-memory buffer exposed as ordinary DuckDB tables. Each row tracks how many times it has been seen, so re-querying drains only what's new.
- • Queue and broadcast outbound: Hand a message to the extension and it delivers asynchronously, with per-message status visible as a table. Poll for delivery, retry, or audit failures from SQL.
- • Inspect what's connected: Every active subscription shows up as a row with its endpoint, state, and delivery counters — useful for dashboards and runtime debugging.
- • Test pipelines without the wire: Inject canned messages straight into a subscription's inbox and exercise downstream SQL exactly as it would behave on the real channel.
🔌 Supported transports
Two transports today; the table-function surface stays identical regardless of protocol.
- • WebSocket: Standard RFC 6455 client.
ws://orwss://URLs — fits browser-style real-time feeds, Slack-style event streams, and any custom server-sent API that speaks WS. - • Redis Pub/Sub: Subscribe to one or more Redis channels by passing a Redis URL plus channel name. Fits application data flows that already use Redis as the bus.
- • Roadmap: Google Pub/Sub, Azure Service Bus, MQTT, and subprocess pipes are planned. Today's transports are WebSocket and Redis Pub/Sub only.
🛡️ Production caveats
What to know before pointing this at anything important.
- • In-memory, process-scoped buffers: Received messages live in-process. When DuckDB exits, subscriptions tear down and unread messages are lost. For at-least-once across restarts, drain to a durable store before processing.
- • Buffers are size-limited: The receive queue has a fixed capacity — older messages are evicted as new ones arrive. Drain frequently enough to keep up with the inbound rate.
- • Best-effort outbound delivery: Messages are queued and delivered asynchronously. Per-subscription success/failure counters and per-message status tables expose progress so you can build retry logic in SQL.
- • Single connection per subscription: Multiple DuckDB connections in the same process don't share subscription state. Open a subscription on the same connection that will drain it.
- • Experimental status: Function shapes may change. Pin a known-good extension version in production.
🎯 Common use cases
Live dashboards in DuckDB
Subscribe to a market-data or activity-event WebSocket and refresh a tile by re-querying on a timer. No separate consumer, no message broker.
SQL-driven event responders
Pull events, run the SQL logic, then publish a response back to a Pub/Sub fanout — all in one query loop.
Ad-hoc traffic inspection
Point at a channel you don't yet understand, drain into a CREATE TABLE AS, and GROUP BY to discover the message shape. Faster than wiring up a custom consumer.
Pipeline testing
Inject canned messages and exercise downstream SQL exactly as it would behave on the real channel.
Deep Dive
Technical Details
What you can do with three statements
-- 1. Open a WebSocket subscription. The URL itself is the key.
CALL radio_subscribe('wss://stream.example.com/events');
-- 2. (later, or in another query) drain the inbox
SELECT receive_time, channel, message
FROM radio_subscription_received_messages('wss://stream.example.com/events')
ORDER BY receive_time DESC
LIMIT 100;
-- 3. Reply or broadcast back
CALL radio_transmit_message(
'wss://stream.example.com/events',
'main', -- channel
'{"type":"ack","at":"2026-04-28T12:00:00Z"}'::BLOB,
3, -- max_attempts
INTERVAL '500 milliseconds' -- retry_delay
);
radio_subscribe opens the connection and starts buffering. radio_subscription_received_messages exposes that buffer as an ordinary table with columns (subscription_id, subscription_url, message_id, message_type, receive_time, seen_count, channel, message) — JOIN, WHERE, GROUP BY, CREATE TABLE AS. radio_transmit_message queues an outbound message; poll radio_subscriptions for the per-subscription transmit_successes / transmit_failures counters.
Subscriptions, the received-message buffer, and the transmit queue all live in DuckDB process memory. Three things follow:
- Process exit drops everything. When DuckDB shuts down, subscriptions are torn down and unread messages are lost. For at-least-once delivery across restarts, persist drained messages to a durable store before processing.
- The received buffer is size-limited. Older messages are evicted as new ones arrive — drain with
radio_received_messagesat a cadence that keeps up with your inbound rate. - Outbound delivery is asynchronous and best-effort.
radio_transmit_messagereturns once the message is queued; the per-subscriptiontransmit_successes,transmit_failures,transmit_last_success_time, andtransmit_last_failure_timecolumns onradio_subscriptionsreport progress.
Status is experimental. The function surface may change as more transports get added.
Architecture
Radio runs an asynchronous I/O thread per subscription. For WebSocket endpoints it speaks the standard frame protocol; for Redis Pub/Sub endpoints it opens a Redis connection and SUBSCRIBEs to the channel(s). Inbound messages land in a per-subscription FIFO buffer; outbound messages live in a transmit queue with a status field that the I/O thread updates as delivery completes.
DuckDB SQL never blocks on the wire — radio_received_messages and friends are pure table scans over the in-process buffer. If you want to wait for fresh messages before reading, radio_listen blocks the connection until something arrives or a timeout elapses, and radio_sleep is a convenience for polling loops.
Supported transports today
- WebSocket —
ws://for plaintext,wss://for TLS. Subscribe with the URL inradio_subscribe. Standard RFC 6455 framing; the extension handles ping/pong and reconnect internally. - Redis Pub/Sub — point a subscription at a Redis URL plus channel name. Same
radio_subscribe/radio_received_messages/radio_transmit_messagesurface; Radio handles the Redis-specificSUBSCRIBEandPUBLISHunder the hood.
The extension’s documentation page lists planned transports — Google Pub/Sub, Azure Service Bus, MQTT, subprocess pipes — but only WebSocket and Redis Pub/Sub are wired up today.
Compared to alternatives
- A standalone consumer service — the typical pattern (Node.js / Python listening to WebSocket → write to a database → DuckDB queries the database). Radio collapses that into one process and keeps the data in DuckDB’s vectorized executor without a serialization hop. The trade is durability: a real consumer service can crash-restart from the bus.
- The
shellfsextension pipingwscat/websocatoutput — works for read-only WebSocket streams, but each pipe is a one-shot command without bidirectional support, status tracking, or reconnect. - Kafka via
tributary— different kind of bus. Tributary is for snapshotting / scanning Kafka topics; Radio is for live channel subscribe-and-send semantics on WebSocket / Redis Pub/Sub.
Reference
Extension Contents
Quick reference to all available functions and settings organized by category.
| Name | Description | |
|---|---|---|
| Inbox Queryable table views of received messages — across all subscriptions or filtered to one. Drain like any DuckDB table; the buffer is in-process and FIFO per subscription. | ||
| radio_received_messages() | Table function exposing every received message (across subscriptions) as queryable rows | |
| radio_subscription_received_message_add() | Inject a message into a subscription's received buffer | |
| radio_subscription_received_messages() | Per-subscription view of received messages | |
| Listening Open and manage subscriptions to remote event sources (WebSockets, message buses). Each subscription has a name and an endpoint; incoming messages are buffered for SQL to drain. | ||
| radio_listen() | Open a listener on a channel — buffers incoming messages so subsequent SELECTs can drain them | |
| radio_subscribe() | Subscribe to a remote endpoint (WebSocket / event bus) under a logical subscription name | |
| radio_subscriptions() | Table function listing every active subscription with its endpoint and state | |
| radio_unsubscribe() | Tear down a subscription opened with radio_subscribe | |
| Maintenance Buffer flushing, polling sleeps, and version inspection. | ||
| radio_flush() | Drain all currently-buffered messages so subsequent reads see only fresh data | |
| radio_sleep() | Sleep the current connection — convenience for waiting between polling intervals when scripting | |
| radio_version() | Return the loaded Radio extension version | |
| Outbox Send messages out, with built-in delivery tracking. Queue, inspect, retract, and garbage-collect — all from SQL. | ||
| radio_subscription_transmit_message_delete() | Remove a queued outgoing message that hasn't yet been transmitted | |
| radio_subscription_transmit_messages() | Table view of pending and sent outgoing messages with delivery status — what's been transmitted, what's queued, what failed | |
| radio_subscription_transmit_messages_delete_finished() | Garbage-collect already-delivered outgoing messages from the transmit log | |
| radio_transmit_message() | Queue an outgoing message on a subscription | |
API Reference
Function Documentation
Detailed documentation for each function including signatures, parameters, and examples.
radio_flush
Signature
Parameters (Positional)
| Parameter | Type | Mode | Description |
|---|---|---|---|
col0 | INTERVAL | Positional |
Returns
Description
Drain all currently-buffered messages so subsequent reads see only fresh data.
Examples
Related Functions
radio_listen
Signature
Parameters (Positional)
| Parameter | Type | Mode | Description |
|---|---|---|---|
col0 | BOOLEAN | Positional | |
col1 | INTERVAL | Positional |
Returns
Description
Open a listener on a channel — buffers incoming messages so subsequent SELECTs can drain them.
Examples
Related Functions
radio_received_messages
Signature
Parameters
| Parameter | Type | Mode | Description |
|---|
Returns
Description
Table function exposing every received message (across subscriptions) as queryable rows.
Examples
Related Functions
radio_sleep
Signature
Parameters (Positional)
| Parameter | Type | Mode | Description |
|---|---|---|---|
col0 | INTERVAL | Positional |
Returns
Description
Sleep the current connection — convenience for waiting between polling intervals when scripting.
Examples
radio_subscribe
Signature
Parameters
| Parameter | Type | Mode | Description |
|---|---|---|---|
col0 | VARCHAR | Positional | |
transmit_retry_max_delay_ms | INTEGER | Named | |
transmit_retry_multiplier | DOUBLE | Named | |
transmit_retry_initial_delay_ms | INTEGER | Named | |
receive_message_capacity | INTEGER | Named |
Returns
Description
Subscribe to a remote endpoint (WebSocket / event bus) under a logical subscription name. Incoming messages flow into the received-messages buffer.
Examples
Related Functions
radio_subscription_received_message_add
Signature
Parameters (Positional)
| Parameter | Type | Mode | Description |
|---|---|---|---|
col0 | VARCHAR | Positional | |
col1 | VARCHAR | Positional | |
col2 | BLOB | Positional |
Returns
Description
Inject a message into a subscription's received buffer. Useful for testing pipelines without a real upstream.
Examples
Related Functions
radio_subscription_received_messages
Signature
Parameters (Positional)
| Parameter | Type | Mode | Description |
|---|---|---|---|
col0 | VARCHAR | Positional |
Returns
Description
Per-subscription view of received messages. Filter by subscription name to drain a single feed.
Examples
Related Functions
radio_subscription_transmit_message_delete
Signature
Parameters (Positional)
| Parameter | Type | Mode | Description |
|---|---|---|---|
col0 | VARCHAR | Positional | |
col1 | UBIGINT | Positional |
Returns
Description
Remove a queued outgoing message that hasn't yet been transmitted.
Examples
Related Functions
radio_subscription_transmit_messages
Signature
Parameters (Positional)
| Parameter | Type | Mode | Description |
|---|---|---|---|
col0 | VARCHAR | Positional |
Returns
Description
Table view of pending and sent outgoing messages with delivery status — what's been transmitted, what's queued, what failed.
Examples
Related Functions
radio_subscription_transmit_messages_delete_finished
Signature
Parameters (Positional)
| Parameter | Type | Mode | Description |
|---|---|---|---|
col0 | VARCHAR | Positional |
Returns
Description
Garbage-collect already-delivered outgoing messages from the transmit log.
Examples
Related Functions
radio_subscriptions
Signature
Parameters
| Parameter | Type | Mode | Description |
|---|
Returns
Description
Table function listing every active subscription with its endpoint and state.
Examples
Related Functions
radio_transmit_message
Signature
Parameters (Positional)
| Parameter | Type | Mode | Description |
|---|---|---|---|
col0 | VARCHAR | Positional | |
col1 | VARCHAR | Positional | |
col2 | BLOB | Positional | |
col3 | INTEGER | Positional | |
col4 | INTERVAL | Positional |
Returns
Description
Queue an outgoing message on a subscription. Returns once the message is buffered; delivery is tracked separately.
Examples
Related Functions
radio_unsubscribe
Signature
Parameters (Positional)
| Parameter | Type | Mode | Description |
|---|---|---|---|
col0 | VARCHAR | Positional |
Returns
Description
Tear down a subscription opened with radio_subscribe.
Examples
Related Functions
radio_version
Signature
Parameters
| Parameter | Type | Mode | Description |
|---|
Returns
Description
Return the loaded Radio extension version.
Examples
Practical Examples
Cookbook
Real-world recipes and patterns for common use cases.
Subscribe to a WebSocket channel
-- Open a subscription. The URL itself is the subscription identifier —
-- pass it to the read / send / unsubscribe functions later.
CALL radio_subscribe('wss://stream.example.com/events');
-- Inspect what's subscribed
SELECT * FROM radio_subscriptions();
The URL is what every other radio_* function takes as its subscription key. If you’ll reuse it across many statements, stash it in a SQL variable and bind via :feed:
SET VARIABLE feed = 'wss://stream.example.com/events';
CALL radio_subscribe(getvariable('feed'));
SELECT * FROM radio_subscription_received_messages(getvariable('feed'));
See radio_subscribe and radio_subscriptions for the full signatures, including the optional named parameters for buffer sizing and transmit retry tuning.
Subscribe to Redis Pub/Sub
Same function, different URL — Radio routes WebSocket and Redis Pub/Sub through the same surface:
CALL radio_subscribe('redis://cache.example.com:6379/0?channel=events');
-- Drain the same way as a WebSocket subscription, keyed by URL
SELECT receive_time, message
FROM radio_subscription_received_messages('redis://cache.example.com:6379/0?channel=events')
ORDER BY receive_time DESC;
Drain received messages
-- Recent messages across all subscriptions
SELECT subscription_url, channel, receive_time, message
FROM radio_received_messages()
ORDER BY receive_time DESC
LIMIT 100;
-- One feed only — pass the same URL you subscribed with
SELECT receive_time, channel, message
FROM radio_subscription_received_messages('wss://stream.example.com/events')
ORDER BY receive_time DESC;
The full row shape from radio_received_messages() / radio_subscription_received_messages(url) is (subscription_id UBIGINT, subscription_url VARCHAR, message_id UBIGINT, message_type, receive_time TIMESTAMP_MS, seen_count UBIGINT, channel VARCHAR, message BLOB). The payload is a BLOB — cast it to VARCHAR for text or use DuckDB’s JSON functions for structured payloads. These are normal DuckDB table functions — JOIN, filter, aggregate, push to Parquet, anything you’d do with a regular table. See radio_received_messages and radio_subscription_received_messages.
Wait for fresh messages before reading
For loops that need to act as messages arrive (rather than polling on a timer), radio_listen blocks the connection until something lands in the inbox or a timeout elapses:
-- Block up to 30 seconds for a message on any subscription.
-- Args: (return_on_first_message, max_wait).
CALL radio_listen(true, INTERVAL '30 seconds');
-- Then drain
SELECT * FROM radio_received_messages() ORDER BY receive_time DESC LIMIT 1;
Pair with radio_sleep when scripting between polls.
Send a message and track delivery
-- Args: (subscription_url, channel, payload, max_attempts, retry_delay).
CALL radio_transmit_message(
'wss://stream.example.com/events',
/* channel */ 'main',
/* payload */ '{"type":"hello","at":"2026-04-28T12:00:00Z"}'::BLOB,
/* max_attempts */ 3,
/* retry_delay */ INTERVAL '500 milliseconds'
);
-- Inspect transmit queue + delivery state
SELECT message_id, message
FROM radio_subscription_transmit_messages('wss://stream.example.com/events')
ORDER BY message_id DESC;
Per-subscription delivery counters live on radio_subscriptions (transmit_successes, transmit_failures, transmit_last_success_time, transmit_last_failure_time); poll those to confirm delivery progress. See radio_transmit_message and radio_subscription_transmit_messages.
Garbage-collect the outbox
-- Drop all already-delivered transmit messages
CALL radio_subscription_transmit_messages_delete_finished('wss://stream.example.com/events');
-- Or drop one specific pending outgoing message — message_id is the
-- UBIGINT returned by radio_subscription_transmit_messages.message_id.
CALL radio_subscription_transmit_message_delete('wss://stream.example.com/events', :message_id::UBIGINT);
See radio_subscription_transmit_messages_delete_finished and radio_subscription_transmit_message_delete.
Test pipelines without a real upstream
Inject messages straight into a subscription’s inbox to exercise downstream SQL — no broker, no real WebSocket, no flake:
CALL radio_subscription_received_message_add(
'wss://stream.example.com/events',
'channel-name',
'{"type":"test","value":42}'::BLOB
);
SELECT * FROM radio_subscription_received_messages('wss://stream.example.com/events');
See radio_subscription_received_message_add. Useful for unit-testing pipelines that consume radio_* tables.
Tear down
-- Close the subscription
CALL radio_unsubscribe('wss://stream.example.com/events');
See radio_unsubscribe.
Diagnostics
SELECT radio_version(); -- e.g. '0.4.x'
Use radio_version to confirm what’s loaded when matching against bug reports. radio_flush drains all currently-buffered messages so subsequent reads see only fresh data.
Platform Support
Compatibility
Extension availability may vary by platform and DuckDB version. Check below to ensure this extension supports your environment before installation.
Quick Facts
Platforms
Supported platform architectures
Compiled binary sizes
| Platform | Architecture | Size |
|---|---|---|
| Linux | aarch64 | 5.99 MB |
| Linux | x86_64 | 6.10 MB |
| Linux (musl) | x86_64 | 5.31 MB |
| macOS | Apple Silicon | 4.22 MB |
| macOS | Intel | 4.26 MB |
Gzipped download size from the DuckDB community-extensions registry.
Real-time messaging in SQL
Install Radio to subscribe to WebSocket / Redis Pub/Sub channels, drain incoming events as queryable rows, and queue outgoing messages — all from SQL.