📡

Radio

Two-way real-time messaging from DuckDB SQL. Subscribe to WebSocket and Redis Pub/Sub channels, query received events as ordinary tables, and queue outgoing messages with status tracking. Best fit: in-process pipelines, dashboards, and ad-hoc inspection — buffers are in-memory, scoped to the DuckDB process.

Install

-- Install the extension
INSTALL radio FROM community;

-- Load it into your session
LOAD radio;

-- Subscribe to a WebSocket channel — the URL is the subscription key
CALL radio_subscribe('wss://stream.example.com/events');

-- Drain buffered messages as a normal table
SELECT receive_time, channel, message
FROM radio_subscription_received_messages('wss://stream.example.com/events')
ORDER BY receive_time DESC
LIMIT 50;

-- Send a message — args are (url, channel, payload, max_attempts, retry_delay)
CALL radio_transmit_message(
  'wss://stream.example.com/events',
  'main',
  '{"type":"hello"}'::BLOB,
  3,
  INTERVAL '500 milliseconds'
);

Technical Overview

Why Use Radio?

Subscribe to WebSocket and Redis Pub/Sub channels from inside DuckDB SQL. Incoming messages buffer into queryable tables; outgoing messages queue with delivery status — all in one process, with no separate consumer service to operate.

📡 What this extension is for

Real-time messaging without leaving SQL. The path from "events arriving on a WebSocket" to "rows you can JOIN and WHERE against" collapses to a subscribe call plus a SELECT.

  • Listen and drain incrementally: Subscribe to a channel and incoming messages flow into an in-memory buffer exposed as ordinary DuckDB tables. Each row tracks how many times it has been seen, so re-querying drains only what's new.
  • Queue and broadcast outbound: Hand a message to the extension and it delivers asynchronously, with per-message status visible as a table. Poll for delivery, retry, or audit failures from SQL.
  • Inspect what's connected: Every active subscription shows up as a row with its endpoint, state, and delivery counters — useful for dashboards and runtime debugging.
  • Test pipelines without the wire: Inject canned messages straight into a subscription's inbox and exercise downstream SQL exactly as it would behave on the real channel.

🔌 Supported transports

Two transports today; the table-function surface stays identical regardless of protocol.

  • WebSocket: Standard RFC 6455 client. ws:// or wss:// URLs — fits browser-style real-time feeds, Slack-style event streams, and any custom server-sent API that speaks WS.
  • Redis Pub/Sub: Subscribe to one or more Redis channels by passing a Redis URL plus channel name. Fits application data flows that already use Redis as the bus.
  • Roadmap: Google Pub/Sub, Azure Service Bus, MQTT, and subprocess pipes are planned. Today's transports are WebSocket and Redis Pub/Sub only.

🛡️ Production caveats

What to know before pointing this at anything important.

  • In-memory, process-scoped buffers: Received messages live in-process. When DuckDB exits, subscriptions tear down and unread messages are lost. For at-least-once across restarts, drain to a durable store before processing.
  • Buffers are size-limited: The receive queue has a fixed capacity — older messages are evicted as new ones arrive. Drain frequently enough to keep up with the inbound rate.
  • Best-effort outbound delivery: Messages are queued and delivered asynchronously. Per-subscription success/failure counters and per-message status tables expose progress so you can build retry logic in SQL.
  • Single connection per subscription: Multiple DuckDB connections in the same process don't share subscription state. Open a subscription on the same connection that will drain it.
  • Experimental status: Function shapes may change. Pin a known-good extension version in production.

🎯 Common use cases

Live dashboards in DuckDB

Subscribe to a market-data or activity-event WebSocket and refresh a tile by re-querying on a timer. No separate consumer, no message broker.

SQL-driven event responders

Pull events, run the SQL logic, then publish a response back to a Pub/Sub fanout — all in one query loop.

Ad-hoc traffic inspection

Point at a channel you don't yet understand, drain into a CREATE TABLE AS, and GROUP BY to discover the message shape. Faster than wiring up a custom consumer.

Pipeline testing

Inject canned messages and exercise downstream SQL exactly as it would behave on the real channel.

Deep Dive

Technical Details

What you can do with three statements

-- 1. Open a WebSocket subscription. The URL itself is the key.
CALL radio_subscribe('wss://stream.example.com/events');

-- 2. (later, or in another query) drain the inbox
SELECT receive_time, channel, message
FROM radio_subscription_received_messages('wss://stream.example.com/events')
ORDER BY receive_time DESC
LIMIT 100;

-- 3. Reply or broadcast back
CALL radio_transmit_message(
  'wss://stream.example.com/events',
  'main',                                            -- channel
  '{"type":"ack","at":"2026-04-28T12:00:00Z"}'::BLOB,
  3,                                                 -- max_attempts
  INTERVAL '500 milliseconds'                        -- retry_delay
);

radio_subscribe opens the connection and starts buffering. radio_subscription_received_messages exposes that buffer as an ordinary table with columns (subscription_id, subscription_url, message_id, message_type, receive_time, seen_count, channel, message)JOIN, WHERE, GROUP BY, CREATE TABLE AS. radio_transmit_message queues an outbound message; poll radio_subscriptions for the per-subscription transmit_successes / transmit_failures counters.

In-process, in-memory, best-effort

Subscriptions, the received-message buffer, and the transmit queue all live in DuckDB process memory. Three things follow:

  • Process exit drops everything. When DuckDB shuts down, subscriptions are torn down and unread messages are lost. For at-least-once delivery across restarts, persist drained messages to a durable store before processing.
  • The received buffer is size-limited. Older messages are evicted as new ones arrive — drain with radio_received_messages at a cadence that keeps up with your inbound rate.
  • Outbound delivery is asynchronous and best-effort. radio_transmit_message returns once the message is queued; the per-subscription transmit_successes, transmit_failures, transmit_last_success_time, and transmit_last_failure_time columns on radio_subscriptions report progress.

Status is experimental. The function surface may change as more transports get added.

Architecture

Radio runs an asynchronous I/O thread per subscription. For WebSocket endpoints it speaks the standard frame protocol; for Redis Pub/Sub endpoints it opens a Redis connection and SUBSCRIBEs to the channel(s). Inbound messages land in a per-subscription FIFO buffer; outbound messages live in a transmit queue with a status field that the I/O thread updates as delivery completes.

DuckDB SQL never blocks on the wire — radio_received_messages and friends are pure table scans over the in-process buffer. If you want to wait for fresh messages before reading, radio_listen blocks the connection until something arrives or a timeout elapses, and radio_sleep is a convenience for polling loops.

Supported transports today

  • WebSocketws:// for plaintext, wss:// for TLS. Subscribe with the URL in radio_subscribe. Standard RFC 6455 framing; the extension handles ping/pong and reconnect internally.
  • Redis Pub/Sub — point a subscription at a Redis URL plus channel name. Same radio_subscribe / radio_received_messages / radio_transmit_message surface; Radio handles the Redis-specific SUBSCRIBE and PUBLISH under the hood.

The extension’s documentation page lists planned transports — Google Pub/Sub, Azure Service Bus, MQTT, subprocess pipes — but only WebSocket and Redis Pub/Sub are wired up today.

Compared to alternatives

  • A standalone consumer service — the typical pattern (Node.js / Python listening to WebSocket → write to a database → DuckDB queries the database). Radio collapses that into one process and keeps the data in DuckDB’s vectorized executor without a serialization hop. The trade is durability: a real consumer service can crash-restart from the bus.
  • The shellfs extension piping wscat/websocat output — works for read-only WebSocket streams, but each pipe is a one-shot command without bidirectional support, status tracking, or reconnect.
  • Kafka via tributary — different kind of bus. Tributary is for snapshotting / scanning Kafka topics; Radio is for live channel subscribe-and-send semantics on WebSocket / Redis Pub/Sub.

Reference

Extension Contents

Quick reference to all available functions and settings organized by category.

Name Description
Inbox

Queryable table views of received messages — across all subscriptions or filtered to one. Drain like any DuckDB table; the buffer is in-process and FIFO per subscription.

radio_received_messages() Table function exposing every received message (across subscriptions) as queryable rows
radio_subscription_received_message_add() Inject a message into a subscription's received buffer
radio_subscription_received_messages() Per-subscription view of received messages
Listening

Open and manage subscriptions to remote event sources (WebSockets, message buses). Each subscription has a name and an endpoint; incoming messages are buffered for SQL to drain.

radio_listen() Open a listener on a channel — buffers incoming messages so subsequent SELECTs can drain them
radio_subscribe() Subscribe to a remote endpoint (WebSocket / event bus) under a logical subscription name
radio_subscriptions() Table function listing every active subscription with its endpoint and state
radio_unsubscribe() Tear down a subscription opened with radio_subscribe
Maintenance

Buffer flushing, polling sleeps, and version inspection.

radio_flush() Drain all currently-buffered messages so subsequent reads see only fresh data
radio_sleep() Sleep the current connection — convenience for waiting between polling intervals when scripting
radio_version() Return the loaded Radio extension version
Outbox

Send messages out, with built-in delivery tracking. Queue, inspect, retract, and garbage-collect — all from SQL.

radio_subscription_transmit_message_delete() Remove a queued outgoing message that hasn't yet been transmitted
radio_subscription_transmit_messages() Table view of pending and sent outgoing messages with delivery status — what's been transmitted, what's queued, what failed
radio_subscription_transmit_messages_delete_finished() Garbage-collect already-delivered outgoing messages from the transmit log
radio_transmit_message() Queue an outgoing message on a subscription

API Reference

Function Documentation

Detailed documentation for each function including signatures, parameters, and examples.

radio_flush

Table Function Maintenance
Signature
radio_flush(col0: INTERVAL) → None
Parameters (Positional)
Parameter Type Mode Description
col0 INTERVAL Positional
Returns
Description

Drain all currently-buffered messages so subsequent reads see only fresh data.

Examples

radio_listen

Table Function Listening
Signature
radio_listen(col0: BOOLEAN, col1: INTERVAL) → None
Parameters (Positional)
Parameter Type Mode Description
col0 BOOLEAN Positional
col1 INTERVAL Positional
Returns
Description

Open a listener on a channel — buffers incoming messages so subsequent SELECTs can drain them.

Examples

radio_received_messages

Table Function Inbox
Signature
radio_received_messages() → None
Parameters
Parameter Type Mode Description
Returns
Description

Table function exposing every received message (across subscriptions) as queryable rows.

Examples

radio_sleep

Table Function Maintenance
Signature
radio_sleep(col0: INTERVAL) → None
Parameters (Positional)
Parameter Type Mode Description
col0 INTERVAL Positional
Returns
Description

Sleep the current connection — convenience for waiting between polling intervals when scripting.

Examples

radio_subscribe

Table Function Listening
Signature
radio_subscribe(col0: VARCHAR, transmit_retry_max_delay_ms := INTEGER, transmit_retry_multiplier := DOUBLE, transmit_retry_initial_delay_ms := INTEGER, receive_message_capacity := INTEGER) → None
Parameters
Parameter Type Mode Description
col0 VARCHAR Positional
transmit_retry_max_delay_ms INTEGER Named
transmit_retry_multiplier DOUBLE Named
transmit_retry_initial_delay_ms INTEGER Named
receive_message_capacity INTEGER Named
Returns
Description

Subscribe to a remote endpoint (WebSocket / event bus) under a logical subscription name. Incoming messages flow into the received-messages buffer.

Examples

radio_subscription_received_message_add

Table Function Inbox
Signature
radio_subscription_received_message_add(col0: VARCHAR, col1: VARCHAR, col2: BLOB) → None
Parameters (Positional)
Parameter Type Mode Description
col0 VARCHAR Positional
col1 VARCHAR Positional
col2 BLOB Positional
Returns
Description

Inject a message into a subscription's received buffer. Useful for testing pipelines without a real upstream.

Examples

radio_subscription_received_messages

Table Function Inbox
Signature
radio_subscription_received_messages(col0: VARCHAR) → None
Parameters (Positional)
Parameter Type Mode Description
col0 VARCHAR Positional
Returns
Description

Per-subscription view of received messages. Filter by subscription name to drain a single feed.

Examples

radio_subscription_transmit_message_delete

Table Function Outbox
Signature
radio_subscription_transmit_message_delete(col0: VARCHAR, col1: UBIGINT) → None
Parameters (Positional)
Parameter Type Mode Description
col0 VARCHAR Positional
col1 UBIGINT Positional
Returns
Description

Remove a queued outgoing message that hasn't yet been transmitted.

Examples

radio_subscription_transmit_messages

Table Function Outbox
Signature
radio_subscription_transmit_messages(col0: VARCHAR) → None
Parameters (Positional)
Parameter Type Mode Description
col0 VARCHAR Positional
Returns
Description

Table view of pending and sent outgoing messages with delivery status — what's been transmitted, what's queued, what failed.

Examples

radio_subscription_transmit_messages_delete_finished

Table Function Outbox
Signature
radio_subscription_transmit_messages_delete_finished(col0: VARCHAR) → None
Parameters (Positional)
Parameter Type Mode Description
col0 VARCHAR Positional
Returns
Description

Garbage-collect already-delivered outgoing messages from the transmit log.

Examples

radio_subscriptions

Table Function Listening
Signature
radio_subscriptions() → None
Parameters
Parameter Type Mode Description
Returns
Description

Table function listing every active subscription with its endpoint and state.

Examples

radio_transmit_message

Table Function Outbox
Signature
radio_transmit_message(col0: VARCHAR, col1: VARCHAR, col2: BLOB, col3: INTEGER, col4: INTERVAL) → None
Parameters (Positional)
Parameter Type Mode Description
col0 VARCHAR Positional
col1 VARCHAR Positional
col2 BLOB Positional
col3 INTEGER Positional
col4 INTERVAL Positional
Returns
Description

Queue an outgoing message on a subscription. Returns once the message is buffered; delivery is tracked separately.

Examples

radio_unsubscribe

Table Function Listening
Signature
radio_unsubscribe(col0: VARCHAR) → None
Parameters (Positional)
Parameter Type Mode Description
col0 VARCHAR Positional
Returns
Description

Tear down a subscription opened with radio_subscribe.

Examples

radio_version

Scalar Function Maintenance
Signature
radio_version() → VARCHAR
Parameters
Parameter Type Mode Description
Returns
Description

Return the loaded Radio extension version.

Examples

Practical Examples

Cookbook

Real-world recipes and patterns for common use cases.

Subscribe to a WebSocket channel

-- Open a subscription. The URL itself is the subscription identifier —
-- pass it to the read / send / unsubscribe functions later.
CALL radio_subscribe('wss://stream.example.com/events');

-- Inspect what's subscribed
SELECT * FROM radio_subscriptions();

The URL is what every other radio_* function takes as its subscription key. If you’ll reuse it across many statements, stash it in a SQL variable and bind via :feed:

SET VARIABLE feed = 'wss://stream.example.com/events';

CALL radio_subscribe(getvariable('feed'));

SELECT * FROM radio_subscription_received_messages(getvariable('feed'));

See radio_subscribe and radio_subscriptions for the full signatures, including the optional named parameters for buffer sizing and transmit retry tuning.

Subscribe to Redis Pub/Sub

Same function, different URL — Radio routes WebSocket and Redis Pub/Sub through the same surface:

CALL radio_subscribe('redis://cache.example.com:6379/0?channel=events');

-- Drain the same way as a WebSocket subscription, keyed by URL
SELECT receive_time, message
FROM radio_subscription_received_messages('redis://cache.example.com:6379/0?channel=events')
ORDER BY receive_time DESC;

Drain received messages

-- Recent messages across all subscriptions
SELECT subscription_url, channel, receive_time, message
FROM radio_received_messages()
ORDER BY receive_time DESC
LIMIT 100;

-- One feed only — pass the same URL you subscribed with
SELECT receive_time, channel, message
FROM radio_subscription_received_messages('wss://stream.example.com/events')
ORDER BY receive_time DESC;

The full row shape from radio_received_messages() / radio_subscription_received_messages(url) is (subscription_id UBIGINT, subscription_url VARCHAR, message_id UBIGINT, message_type, receive_time TIMESTAMP_MS, seen_count UBIGINT, channel VARCHAR, message BLOB). The payload is a BLOB — cast it to VARCHAR for text or use DuckDB’s JSON functions for structured payloads. These are normal DuckDB table functions — JOIN, filter, aggregate, push to Parquet, anything you’d do with a regular table. See radio_received_messages and radio_subscription_received_messages.

Wait for fresh messages before reading

For loops that need to act as messages arrive (rather than polling on a timer), radio_listen blocks the connection until something lands in the inbox or a timeout elapses:

-- Block up to 30 seconds for a message on any subscription.
-- Args: (return_on_first_message, max_wait).
CALL radio_listen(true, INTERVAL '30 seconds');

-- Then drain
SELECT * FROM radio_received_messages() ORDER BY receive_time DESC LIMIT 1;

Pair with radio_sleep when scripting between polls.

Send a message and track delivery

-- Args: (subscription_url, channel, payload, max_attempts, retry_delay).
CALL radio_transmit_message(
  'wss://stream.example.com/events',
  /* channel       */ 'main',
  /* payload       */ '{"type":"hello","at":"2026-04-28T12:00:00Z"}'::BLOB,
  /* max_attempts  */ 3,
  /* retry_delay   */ INTERVAL '500 milliseconds'
);

-- Inspect transmit queue + delivery state
SELECT message_id, message
FROM radio_subscription_transmit_messages('wss://stream.example.com/events')
ORDER BY message_id DESC;

Per-subscription delivery counters live on radio_subscriptions (transmit_successes, transmit_failures, transmit_last_success_time, transmit_last_failure_time); poll those to confirm delivery progress. See radio_transmit_message and radio_subscription_transmit_messages.

Garbage-collect the outbox

-- Drop all already-delivered transmit messages
CALL radio_subscription_transmit_messages_delete_finished('wss://stream.example.com/events');

-- Or drop one specific pending outgoing message — message_id is the
-- UBIGINT returned by radio_subscription_transmit_messages.message_id.
CALL radio_subscription_transmit_message_delete('wss://stream.example.com/events', :message_id::UBIGINT);

See radio_subscription_transmit_messages_delete_finished and radio_subscription_transmit_message_delete.

Test pipelines without a real upstream

Inject messages straight into a subscription’s inbox to exercise downstream SQL — no broker, no real WebSocket, no flake:

CALL radio_subscription_received_message_add(
  'wss://stream.example.com/events',
  'channel-name',
  '{"type":"test","value":42}'::BLOB
);

SELECT * FROM radio_subscription_received_messages('wss://stream.example.com/events');

See radio_subscription_received_message_add. Useful for unit-testing pipelines that consume radio_* tables.

Tear down

-- Close the subscription
CALL radio_unsubscribe('wss://stream.example.com/events');

See radio_unsubscribe.

Diagnostics

SELECT radio_version();   -- e.g. '0.4.x'

Use radio_version to confirm what’s loaded when matching against bug reports. radio_flush drains all currently-buffered messages so subsequent reads see only fresh data.

Platform Support

Compatibility

Extension availability may vary by platform and DuckDB version. Check below to ensure this extension supports your environment before installation.

Quick Facts

Software License MIT
Pricing Free
Written In C++
Source Available Yes
View on GitHub
Usage
13,741+
loads in last 30 days

Platforms

Linux
Linux (musl)
macOS
Windows
WASM
Supported platform architectures
Linux: aarch64, x86_64
Linux (musl): x86_64
macOS: Intel, Apple Silicon
Windows: Not available
WASM: Not available
Compiled binary sizes
Platform Architecture Size
Linux aarch64 5.99 MB
Linux x86_64 6.10 MB
Linux (musl) x86_64 5.31 MB
macOS Apple Silicon 4.22 MB
macOS Intel 4.26 MB

Gzipped download size from the DuckDB community-extensions registry.

DuckDB Versions

Release calendar
Supported
v1.4.4 v1.5.2

Real-time messaging in SQL

Install Radio to subscribe to WebSocket / Redis Pub/Sub channels, drain incoming events as queryable rows, and queue outgoing messages — all from SQL.