Radio DuckDB Extension
The Radio extension by Query.Farm enables DuckDB to interact with real-time event systems such as WebSocket and Redis Publish/Subscribe servers. It allows DuckDB to both receive and send events: incoming messages are buffered and queryable with SQL, while outgoing events are buffered and support delivery tracking.
The extension is named Radio because it effectively equips DuckDB with a two-way radioβallowing it to listen for and broadcast messages across event-driven systems.
DuckDB extensions are modular plugins that enhance the core engine with powerful new capabilities like this.
Getting Started
Radio is a DuckDB community extension maintained and supported by Query.Farm.
Install Radio in DuckDB by running:
FROM community; INSTALL radio
Then load it with:
LOAD radio;
Functionality
The Radio extension introduces a few core concepts.
At its core is a single radio object that manages multiple subscriptions to event sources. Think of the radio as being a radio that can listen to multiple broadcasts at the same time. The event sources that the radio extension can interact with include WebSocket servers and Redis publish/subscribe. Support for additional event systems will be added over time.
Additional event sources planned are: Google PubSub, Azure ServiceBus, MQTT / Mosquito, Subprocesses / Pipes.
If youβre interested in these additional event sources, contact us.
You can create multiple subscriptions to event sources, each subscription is indepent of other subscriptions. Each subscription has an independent connection to the event source and has a independent send and receive thread. Multiple subscriptions can exist simultaneously in the same DuckDB process (even to the same event source).
Every subscription maintains two internal queues:
- Received messages
- Messages to transmit
The received messages queue is size limited, as it fills up older messages are discarded based on the configured queue size. The size of the receive queue is configurable when the subcription is created.
Received messages have a field called seen_count
which tracks the number of times the message has been seen by DuckDB. Newly received messages will have a seen_count
of 1.
Architecture Diagram
This diagram shows the internal architecture of the extension and how it integrated into DuckDB.
Integration Flowchart
This flowchart shows which functions to use for each type of activity with the extension.
Love β€οΈ this DuckDB extension? Youβll Love This.
Get the best from Query.Farm β smart tips, powerful tools, and project updates sent directly to your inbox, but only when weβve got something great to share.
When Should I Use This?
Use the Radio extension when you want to receive or send events from a Websocket server or Redis Publish/Subscribe service within DuckDBβwithout needing external event-handling infrastructure or orchestration logic.
Hereβs an example of connecting to a WebSocket server and receiving messages:
-- Connect to the server
CALL radio_subscribe('ws://127.0.0.1:20400');
βββββββββββββββββββ
β subscription_id β
β uint64 β
βββββββββββββββββββ€0 β
β
βββββββββββββββββββ
-- Block until a message is received, or timeout after 10 seconds
CALL radio_listen(true, interval '10 seconds');
βββββββββββββββββββ¬βββββββββββββββββββββββ
β subscription_id β subscription_url βvarchar β
β uint64 β
βββββββββββββββββββΌβββββββββββββββββββββββ€0 β ws://127.0.0.1:20400 β
β
βββββββββββββββββββ΄βββββββββββββββββββββββ
-- Return all received messages in the received message queue
--
-- In this case it shows that the connection was made to the websocket server.
-- The second message is just a normal message from the server.
SELECT * FROM radio_received_messages();
= 0
subscription_id = ws://127.0.0.1:20400
subscription_url = 1000
message_id = connection
message_type = 2025-06-02 15:11:26.676
receive_time = 1
seen_count = NULL
channel =
message
= 0
subscription_id = ws://127.0.0.1:20400
subscription_url = 1001
message_id = message
message_type = 2025-06-02 15:11:26.676
receive_time = 1
seen_count = NULL
channel = Message counter: 263 Server time: 2025-06-02T15:11:26.675966Z message
API
The extension adds these functions:
- Subscriptions
- Blocking / Waiting
- Messages
- Messages from subscriptions
- Messages for a single subscription
- Transmit Messages
- Versioning
radio_subscribe
This function creates a new subscription.
Signature:
VARCHAR,
radio_subscribe(INTEGER,
receive_message_capacity : INTEGER,
transmit_retry_max_delay_ms : DOUBLE,
transmit_retry_initial_delay_ms : DOUBLE) transmit_retry_multiplier :
Positional Arguments
Name | Type | Description |
---|---|---|
url |
VARCHAR |
The URL of the event source, for websockets use wss:// or ws:// |
Named Parameters
All named parameters are not required to be passed, if the parameter is not passed the default value will be used.
Name | Type | Default Value | Description |
---|---|---|---|
receive_message_capacity |
INTEGER |
1000 | Maximum number of messages that can be queued for receipt. |
transmit_retry_max_delay_ms |
INTEGER |
10000 | Maximum delay (in milliseconds) between retry attempts when transmitting. |
transmit_retry_initial_delay_ms |
INTEGER |
100.0 | Initial delay (in milliseconds) before the first retry attempt when transmitting. |
transmit_retry_multiplier |
DOUBLE |
1.5 | Multiplier for exponential backoff between retry attempts during transmission. |
Return Value
The unique subscription identifier is returned for the new subscription.
Example
CALL radio_subscribe('ws://127.0.0.1:20400');
βββββββββββββββββββ
β subscription_id β
β uint64 β
βββββββββββββββββββ€0 β
β βββββββββββββββββββ
radio_unsubscribe
This function creates a deletes a previously created subscription.
Signature: radio_unsubscribe(VARCHAR)
Positional Arguments
Name | Type | Description |
---|---|---|
url |
VARCHAR |
The URL of the event source, for websockets use wss:// |
Return Value
The unique identifier of the subscription that was removed.
Example
CALL radio_unsubscribe('ws://127.0.0.1:20400');
βββββββββββββββββββ
β subscription_id β
β uint64 β
βββββββββββββββββββ€0 β
β βββββββββββββββββββ
radio_subscriptions
This function returns a table that contains information about all current subscriptions.
Signature: radio_subscriptions()
Return Value
Returns a table with this schema:
Column Name | Type | Description |
---|---|---|
subscription_id |
UBIGINT |
Unique identifier for the subscription. |
url |
VARCHAR |
The event source URL associated with the subscription. |
creation_time |
TIMESTAMP_MS |
Timestamp when the subscription was created. |
activation_time |
TIMESTAMP_MS |
Timestamp when the subscription was activated (i.e., connected to the URL). |
disabled |
BOOLEAN |
Indicates whether the subscription is currently disabled. |
received_last_message_time |
TIMESTAMP_MS |
Timestamp of the most recent received message. |
received_messages_processed |
UBIGINT |
Total number of messages successfully received. |
transmit_messages_processed |
UBIGINT |
Total number of transmit messages queued. |
transmit_last_queue_time |
TIMESTAMP_MS |
Timestamp when the last transmit message was queued. |
transmit_last_success_time |
TIMESTAMP_MS |
Timestamp of the last successful message transmission. |
transmit_last_failure_time |
TIMESTAMP_MS |
Timestamp of the last failed message transmission attempt. |
transmit_successes |
UBIGINT |
Total number of transmit messages sent successfully. |
transmit_failures |
UBIGINT |
Total number of failed transmit attempts. |
Example
SELECT * from radio_subscriptions();
= 0
subscription_id = ws://127.0.0.1:20400
url = 2025-05-28 23:11:33.749
creation_time = 2025-05-28 23:11:33.755
activation_time = false
disabled = 2025-05-28 23:11:33.755
received_last_message_time = 1
received_messages_processed = 0
transmit_messages_processed = 0
transmit_messages_dropped = NULL
transmit_last_queue_time = NULL
transmit_last_success_time = NULL
transmit_last_failure_time = 0
transmit_successes = 0 transmit_failures
radio_listen
radio_listen
checks for newly arrived messages across all subscriptions. A message is considered newly arrived if its never been seen before as indicated by the seen_count
on the message record. radio_listen
can optionally block execution for a specified time interval, waiting for any new message to arrive on any subscription. If no wait time is provided, radio_listen
returns immediately with information about any currently subscriptions with unread messages. Use this function to pause execution until at least one message is ready to be processed.
Signature: radio_listen(BOOLEAN, INTERVAL)
Positional Arguments
Name | Type | Description |
---|---|---|
wait_for_messages |
BOOLEAN |
Wait for messages to arrive before returning result. |
duration |
INTERVAL |
The amount of time to wait for messages to arrive. |
Return Value
Returns a table with this schema:
Column Name | Type | Description |
---|---|---|
subscription_id |
UBIGINT |
The unique identifier of the subscription |
subscription_url |
VARCHAR |
The URL of the subscription that has newly messages. |
Example
CALL radio_listen(true, interval '10 seconds');
βββββββββββββββββββ¬βββββββββββββββββββββββ
β subscription_id β subscription_url βvarchar β
β uint64 β
βββββββββββββββββββΌβββββββββββββββββββββββ€0 β ws://127.0.0.1:20400 β
β βββββββββββββββββββ΄βββββββββββββββββββββββ
radio_flush
radio_flush
wants for any messages that are pending to be sent to be sent. A time interval is supplied after which radio_flush
returns a row indicating if all messages have been sent.
Signature: radio_flush(INTERVAL)
Positional Arguments
Name | Type | Description |
---|---|---|
duration |
INTERVAL |
The amount of time to wait for messages to be sent. |
Return Value
Returns a table with this schema:
Column Name | Type | Description |
---|---|---|
all_messages_flushed |
BOOLEAN |
Indicate if all messages from all subscriptions have been sent. |
Example
CALL radio_flush(interval '10 seconds');
ββββββββββββββββββββββββ
β all_messages_flushed βboolean β
β
ββββββββββββββββββββββββ€true β
β ββββββββββββββββββββββββ
radio_received_messages
This function returns messages received from all subscriptions.
Messages remain in the queue until they are automatically evicted due to limited queue capacity. The queue follows a first-in, first-out (FIFO) policy, so newer messages will overwrite the oldest ones when the queue is full. You can filter for new messages by adding seen_count = 0
to the WHERE
clause if desired.
Each message has a unique identifier that is unique to the subscription where it was received.
Signature: radio_received_messages()
Return Value
Returns a table with this schema:
Column Name | Type | Description |
---|---|---|
subscription_id |
VARCHAR |
The unique id of the subscription. |
subscription_url |
VARCHAR |
The URL of the subscription where the message was received. |
message_type |
ENUM |
The type of message on of message , error , connection , disconnection . |
message_id |
UBIGINT |
The unique id of the message in the subscription. |
receive_time |
TIMESTAMP_NS |
The timestamp that the message was received. |
seen_count |
UBIGINT |
The number of times the message has been seen or retrieved. |
channel |
VARCHAR |
The name of the channel where the message was received. |
message |
BLOB |
The message from the event bus. |
Example
SELECT * FROM radio_received_messages();
= 0
subscription_id = ws://127.0.0.1:20400
subscription_url = 1000
message_id = NULL
channel = message
message_type = 2025-05-30 00:04:40.289
receive_time = 2
seen_count = Message counter: 74 Server time: 2025-05-30T00:04:40.289464Z message
radio_subscription_received_messages
Retrieves messages received from a given subscription.
Errors are stored separately from regular messages to ensure clarity, as they may not be easily distinguished in the message stream. Received messages are held in a queue until they are automatically evicted due to limited capacity. The queue operates in a first-in, first-out (FIFO) mannerβnewer messages will replace the oldest when the queue is full.
Signature: radio_subscription_received_messages(VARCHAR, VARCHAR)
Positional Arguments
Name | Type | Description |
---|---|---|
url |
VARCHAR |
The URL of the subscription. |
Return Value
Returns a table with this schema:
Column Name | Type | Description |
---|---|---|
message_id |
UBIGINT |
The unique id of the message in the subscription. |
receive_time |
TIMESTAMP_NS |
The timestamp that the message was received. |
seen_count |
UBIGINT |
The number of times the message has been retrieved. |
channel |
VARCHAR |
The name of the channel where the message was received. |
message |
BLOB |
The message from the event bus. |
Example
SELECT * from radio_subscription_received_messages('ws://127.0.0.1:20400', 'message');
= 1000
message_id = 2025-05-29 03:55:59.109
receive_time = NULL
channel = 4
seen_count = Message counter: 70 Server time: 2025-05-29T03:55:59.109567Z message
radio_subscription_transmit_messages
Retrieves transmit messages that will be sent via the specified subscription. Messages that have been sent will continue to be returned until evicted from the send queue. This is useful for delivery tracking and determining if a message was actually sent.
Signature: radio_subscription_transmit_messages(VARCHAR)
Positional Arguments
Name | Type | Description |
---|---|---|
url |
VARCHAR |
The URL of the subscription. |
Return Value
Returns a table with this schema:
Column Name | Type | Description |
---|---|---|
message_id |
VARCHAR |
The unique identifier of the transmit message. |
creation_time |
TIMESTAMP_MS |
The timestamp when the transmit message was crated. |
last_attempt_time |
TIMESTAMP_MS |
The timestamp when the message was last attempted to be transmitted. |
sent |
BOOLEAN |
Indicate if the message has been successfully sent. |
expire_time |
TIMESTAMP_MS |
The time at which attempts to transmit the message will end. |
max_attempts |
UINTEGER |
The number of attempts to try to send the message. |
try_count |
UINTEGER |
The number of attempts that have been performed to transmit the message. |
channel |
VARCHAR |
The channel where the message should be sent. |
message |
BLOB |
The message to transmit. |
Example
SELECT * FROM radio_subscription_transmit_messages('ws://127.0.0.1:20400');
= 1001
message_id = 2025-05-29 03:56:00.206
creation_time = NULL
last_attempt_time = false
sent = 2025-05-29 03:57:00.202
expire_time = 10
max_attempts = 0
try_count = radio_test
channel = test_message message
radio_transmit_message
Adds a new message to be transmitted via a subscription. Returns the unique identifier of the newly created message.
If the message is not able to be transmitted it will be retrieved using exponential backoff until the either the maximum number of attempts is exhausted or the expiration time is reached.
Signature: radio_transmit_message(VARCHAR, VARCHAR, BLOB, INTEGER, INTERVAL)
Positional Arguments
Name | Type | Description |
---|---|---|
url |
VARCHAR |
The URL of the subscription where the message should be sent. |
channel |
VARCHAR |
The channel where the message should be sent |
message |
BLOB |
The message to send |
max_attempts |
INTEGER |
The maximum number of attempts to transmit this message. Must be greater than zero. |
expire_interval |
INTERVAL |
The maximum amount of time to spend attempting to transmit this message. If the message is not sent after this interval, it is marked as not be able to be sent. |
Example
CALL radio_transmit_message(
'ws://127.0.0.1:20400',
NULL,
'test message'::blob,
10,
interval '1 minute');
ββββββββββββββ
β message_id β
β uint64 β
ββββββββββββββ€1002 β
β ββββββββββββββ
radio_transmit_messages_delete_finished
Remove all transmit messags that have reached a terminal state from the transmit queue.
Signature: radio_transmit_messages_delete_finished(VARCHAR)
Positional Arguments
Name | Type | Description |
---|---|---|
url |
VARCHAR |
The URL of the subscription where the message should be sent. |
Example
CALL radio_transmit_messages_delete_finished('ws://127.0.0.1:20400');
βββββββββββ
β ok βboolean β
β
βββββββββββ€true β
β βββββββββββ
radio_subscription_transmit_message_delete
Remove a transmit message from the transmit queue by supplying its message identifier.
Signature: radio_subscription_transmit_message_delete(VARCHAR, UBIGINT)
Positional Arguments
Name | Type | Description |
---|---|---|
url |
VARCHAR |
The URL of the subscription where the message should be sent. |
message_id |
UBIGINT |
The unique identifier of the transmit message to delete. |
Example
CALL radio_subscription_transmit_message_delete('ws://127.0.0.1:20400', 3022::UBIGINT);
βββββββββββ
β ok βboolean β
β
βββββββββββ€true β
β βββββββββββ
radio_version
Return the current version of the radio extension.
Signature: radio_version()
Example
SELECT radio_version();
βββββββββββββββββββ
β radio_version() βvarchar β
β
βββββββββββββββββββ€20250601.01 β
β βββββββββββββββββββ