This document covers the Python APIs for building adora nodes, operators, and dataflows. Install with:
pip install adora-rsfrom adora import NodeThe Node class is the primary interface for custom nodes. It connects to a running dataflow, receives input events, and sends outputs.
Create a new node and connect to the running dataflow.
# Standard: node ID is read from environment variables set by the daemon
node = Node()
# Dynamic: connect to a running dataflow by explicit node ID
node = Node(node_id="my-dynamic-node")Parameters:
node_id(str, optional) -- Explicit node ID for dynamic nodes. When omitted, the node reads its identity from environment variables set by the adora daemon.
Raises: RuntimeError if the node cannot connect to the dataflow.
Retrieve the next event from the event stream. Blocks until an event is available or the timeout expires.
event = node.next() # block indefinitely
event = node.next(timeout=2.0) # block up to 2 secondsParameters:
timeout(float, optional) -- Maximum wait time in seconds.
Returns: dict -- An event dictionary, or None if all senders have been dropped or the timeout expired.
Retrieve all buffered events without blocking.
events = node.drain()
for event in events:
print(event["type"])Returns: list[dict] -- A list of event dictionaries. Returns an empty list if no events are buffered.
Non-blocking receive. Returns the next buffered event if one is available.
event = node.try_recv()
if event is not None:
print(event["type"])Returns: dict | None -- An event dictionary, or None if no event is buffered.
Asynchronous receive. For use with asyncio.
event = await node.recv_async()
event = await node.recv_async(timeout=5.0)Parameters:
timeout(float, optional) -- Maximum wait time in seconds. Returns an error if the timeout is reached.
Returns: dict | None -- An event dictionary, or None if all senders have been dropped.
Note: This method is experimental. The pyo3 async (Rust-Python FFI) integration is still in development.
Check whether there are any buffered events in the event stream.
if not node.is_empty():
event = node.try_recv()Returns: bool
Send data on an output channel.
import pyarrow as pa
# Send raw bytes
node.send_output("status", b"OK")
# Send an Apache Arrow array (zero-copy capable)
node.send_output("values", pa.array([1, 2, 3]))
# Send with metadata
node.send_output("image", pa.array(pixels), {"camera_id": "front"})Parameters:
output_id(str) -- The output name as declared in the dataflow YAML.data(bytes | pyarrow.Array) -- The payload. Usebytesfor simple data orpyarrow.Arrayfor zero-copy shared-memory transport.metadata(dict, optional) -- Key-value pairs attached to the message. Supported value types:bool,int,float,str,list[int],list[float],list[str],datetime.datetime.
Raises: RuntimeError if data is neither bytes nor a pyarrow.Array.
Python nodes use the same metadata key conventions as Rust for communication patterns. Parameters are plain dicts with string keys.
Well-known metadata keys:
| Key | Description |
|---|---|
"request_id" |
Service request/response correlation (UUID v7) |
"goal_id" |
Action goal identification (UUID v7) |
"goal_status" |
Action result status: "succeeded", "aborted", or "canceled" |
"session_id" |
Streaming session identifier |
"segment_id" |
Streaming segment within a session (integer) |
"seq" |
Streaming chunk sequence number (integer) |
"fin" |
Last chunk of a streaming segment (bool) |
"flush" |
Discard older queued messages on input (bool) |
Service client example:
import uuid
# Send a request with a unique request_id
request_id = str(uuid.uuid7()) # Python 3.13+; use uuid_utils or uuid.uuid4() on older versions
node.send_output("request", data, {"request_id": request_id})Service server example:
# Pass through the metadata (includes request_id) from the incoming request
node.send_output("response", result, event["metadata"])Action client example:
goal_id = str(uuid.uuid7())
node.send_output("goal", data, {"goal_id": goal_id})Streaming example (flush downstream queues on user interruption):
params = {
"session_id": session_id,
"segment_id": 1,
"seq": 0,
"fin": False,
"flush": True,
}
node.send_output("text", data, metadata={"parameters": params})See patterns.md for the full guide.
Python nodes can log using either Python's built-in logging module (recommended) or the explicit node API.
Python logging module (auto-bridged):
When Node() is created, it automatically installs a handler that routes Python's logging module through the adora daemon. No configuration needed:
import logging
from adora import Node
node = Node() # Installs the logging bridge
logging.info("Sensor initialized") # -> structured "info" log entry
logging.warning("High temperature") # -> structured "warn" log entry
logging.debug("Raw bytes: %s", data) # -> structured "debug" log entryThese log entries are captured with full metadata (level, message, file path, line number) and work with min_log_level filtering, send_logs_as routing, and adora/logs subscribers.
Note: Do not call
logging.basicConfig()before creatingNode(). The constructor sets up the bridge; callingbasicConfig()first may install a conflicting handler.
Explicit node API:
Emit a structured log message with optional target and key-value fields.
node.log("info", "Processing frame", target="vision")
node.log("error", "Sensor timeout", fields={"sensor": "lidar", "retry": "3"})Parameters:
level(str) -- Log level:"error","warn","info","debug", or"trace".message(str) -- The log message.target(str, optional) -- Target module or subsystem name.fields(dict[str, str], optional) -- Structured key-value context fields.
Works with the daemon's min_log_level filtering, send_logs_as routing, and adora/logs subscribers.
Convenience methods for common log levels:
node.log_error("Connection failed")
node.log_warn("Temperature elevated")
node.log_info("Sensor initialized")
node.log_debug("Raw bytes received")
node.log_trace("Entering loop iteration")Each is equivalent to node.log(level, message).
When to use which:
| Method | Structured? | Fields? | Best for |
|---|---|---|---|
logging.info() |
Yes | No | General-purpose logging |
node.log("info", msg, fields={...}) |
Yes | Yes | Structured context (sensor_id, etc.) |
node.log_info(msg) |
Yes | No | Quick one-liner |
print() |
No | No | Legacy code, quick debugging |
Return the full dataflow descriptor (the parsed dataflow YAML) as a Python dictionary.
descriptor = node.dataflow_descriptor()
print(descriptor["nodes"])Returns: dict
Return the configuration block for this node from the dataflow descriptor.
config = node.node_config()
model_path = config.get("model", "default.pt")Returns: dict
Return the unique identifier of the running dataflow.
print(node.dataflow_id()) # e.g. "a1b2c3d4-..."Returns: str
Check whether this node was restarted after a previous exit or failure. Useful for deciding whether to restore saved state or start fresh.
if node.is_restart():
restore_checkpoint()Returns: bool
Return how many times this node has been restarted. Returns 0 on the first run, 1 after the first restart, and so on.
print(f"Restart #{node.restart_count()}")Returns: int
Merge a ROS2 subscription stream into the node's main event loop. After calling this method, ROS2 messages arrive as events with kind set to "external".
from adora import Node, Ros2Context, Ros2Node, Ros2NodeOptions, Ros2Topic
node = Node()
ros2_context = Ros2Context()
ros2_node = ros2_context.new_node("listener", Ros2NodeOptions())
topic = Ros2Topic("/chatter", "std_msgs/String", ros2_node)
subscription = ros2_node.create_subscription(topic)
node.merge_external_events(subscription)
for event in node:
if event["kind"] == "external":
print("ROS2:", event["value"])
elif event["type"] == "INPUT":
print("Adora:", event["id"])Parameters:
subscription(adora.Ros2Subscription) -- A ROS2 subscription created via the adora ROS2 bridge.
The Node class implements __iter__ and __next__, so you can iterate directly:
for event in node:
match event["type"]:
case "INPUT":
process(event["value"])
case "STOP":
breakThe iterator calls next() with no timeout on each iteration. It yields None when the event stream is closed, which terminates the loop.
Events are returned as plain Python dictionaries. The structure depends on the event type.
An input message arrived from another node.
{
"type": "INPUT",
"id": "camera_image", # input ID as declared in the dataflow YAML
"kind": "adora", # "adora" for dataflow events, "external" for ROS2
"value": <pyarrow.Array>, # the payload as an Apache Arrow array
"metadata": {
"timestamp": datetime, # UTC-aware datetime.datetime
"open_telemetry_context": "...", # tracing context (if enabled)
... # any user-supplied metadata
},
}Access the data:
values = event["value"].to_pylist() # convert to Python list
array = event["value"].to_numpy() # convert to NumPy arrayAn input channel was closed (the upstream node finished).
{
"type": "INPUT_CLOSED",
"id": "camera_image",
"kind": "adora",
}The dataflow is shutting down.
{
"type": "STOP",
"id": "MANUAL" | "ALL_INPUTS_CLOSED", # stop cause
"kind": "adora",
}An error occurred in the runtime.
{
"type": "ERROR",
"error": "description of the error",
"kind": "adora",
}When using merge_external_events, ROS2 messages arrive as:
{
"kind": "external",
"value": <pyarrow.Array>, # the ROS2 message as an Arrow array
}Used as the return value from operator on_event methods to control the event loop.
from adora import AdoraStatus| Value | Meaning |
|---|---|
AdoraStatus.CONTINUE |
Continue processing events (value 0) |
AdoraStatus.STOP |
Stop this operator (value 1) |
AdoraStatus.STOP_ALL |
Stop the entire dataflow (value 2) |
Operators run inside the adora runtime process (no separate OS process). They are defined as a Python class named Operator with an on_event method.
Create a Python file with an Operator class:
from adora import AdoraStatus
class Operator:
def __init__(self):
# Initialize state here
self.count = 0
def on_event(self, adora_event, send_output) -> AdoraStatus:
if adora_event["type"] == "INPUT":
self.count += 1
# Process the input and optionally send output
send_output("result", b"processed", adora_event["metadata"])
return AdoraStatus.CONTINUEMethods:
__init__(self)-- Called once when the operator is loaded. Initialize any state or models here.on_event(self, adora_event, send_output) -> AdoraStatus-- Called for every incoming event. Must return anAdoraStatusvalue.
Parameters of on_event:
adora_event(dict) -- An event dictionary.send_output(callable) -- Callback to send output data (see below).
The runtime also sets self.dataflow_descriptor on the operator instance with the parsed dataflow YAML as a dictionary.
The send_output callback is passed to on_event for sending data from an operator.
send_output(output_id, data, metadata=None)Parameters:
output_id(str) -- The output name as declared in the dataflow YAML.data(bytes | pyarrow.Array) -- The payload.metadata(dict, optional) -- Metadata to attach. Passadora_event["metadata"]to propagate tracing context.
Example:
import pyarrow as pa
from adora import AdoraStatus
class Operator:
def on_event(self, adora_event, send_output) -> AdoraStatus:
if adora_event["type"] == "INPUT":
result = pa.array([42], type=pa.int64())
send_output("output", result, adora_event["metadata"])
return AdoraStatus.CONTINUEfrom adora.builder import DataflowBuilder, Node, Operator, OutputBuild dataflow YAML programmatically in Python.
Create a new dataflow builder.
flow = DataflowBuilder("my-robot")Parameters:
name(str, optional) -- Name of the dataflow. Defaults to"adora-dataflow".
Add a node to the dataflow. Returns a Node object for further configuration.
sender = flow.add_node("sender")Parameters:
id(str) -- Unique node identifier.**kwargs-- Additional node configuration passed through to the YAML.
Returns: Node (builder)
Generate the YAML representation of the dataflow. If path is given, writes to file and returns None. Otherwise returns the YAML string.
# Write to file
flow.to_yaml("dataflow.yml")
# Get as string
yaml_str = flow.to_yaml()Parameters:
path(str, optional) -- File path to write the YAML.
Returns: str | None
DataflowBuilder supports the with statement:
with DataflowBuilder("my-flow") as flow:
flow.add_node("sender").path("sender.py")
flow.to_yaml("dataflow.yml")Returned by DataflowBuilder.add_node(). All setter methods return self for chaining.
Set the path to the node's executable or script.
node.path("my_node.py")Set command-line arguments for the node.
node.args("--verbose --port 8080")Set environment variables for the node.
node.env({"MODEL_PATH": "/models/yolo.pt"})Set the build command for the node (run before starting).
node.build("pip install -r requirements.txt")Set a Git repository as the source for the node.
node.git("https://github.com/org/repo.git", branch="main")Attach an Operator to this node.
op = Operator("detector", python="object_detection.py")
node.add_operator(op)Declare an output on this node and return an Output reference for use as an input source.
output = sender.add_output("data")Subscribe this node to an output from another node.
# Using an Output object
output = sender.add_output("data")
receiver.add_input("data", output)
# Using a string reference
receiver.add_input("tick", "adora/timer/millis/100")
# With a custom queue size
receiver.add_input("images", camera_output, queue_size=2)
# Lossless input (blocks sender when full)
receiver.add_input("commands", cmd_output, queue_size=100, queue_policy="backpressure")Parameters:
input_id(str) -- Name of the input on this node.source(str | Output) -- Either a string ("node_id/output_id") or anOutputobject.queue_size(int, optional) -- Maximum number of buffered messages for this input.queue_policy(str, optional) --"drop_oldest"(default) or"backpressure"(buffers up to 10xqueue_sizebefore dropping).
Return the dictionary representation of the node for YAML serialization.
Returned by Node.add_output(). Represents a reference to a node's output, used as a source in add_input().
output = sender.add_output("data")
receiver.add_input("sensor_data", output)
str(output) # "sender/data"Defines an operator for embedding in a node's YAML configuration.
__init__(id, name=None, description=None, build=None, python=None, shared_library=None, send_stdout_as=None)
op = Operator(
id="detector",
python="object_detection.py",
send_stdout_as="detection_text",
)Parameters:
id(str) -- Unique operator identifier.name(str, optional) -- Display name.description(str, optional) -- Human-readable description.build(str, optional) -- Build command to run before loading.python(str, optional) -- Path to the Python operator file.shared_library(str, optional) -- Path to a shared library operator.send_stdout_as(str, optional) -- Route the operator's stdout as an output with this ID.
Return the dictionary representation for YAML serialization.
from adora.cuda import torch_to_ipc_buffer, ipc_buffer_to_ipc_handle, open_ipc_handleUtilities for zero-copy GPU tensor sharing between nodes via CUDA IPC. Requires PyTorch with CUDA and Numba with CUDA support.
Convert a PyTorch CUDA tensor into an Arrow array containing the CUDA IPC handle, plus a metadata dictionary. Send both through the dataflow to share GPU memory without copying.
import torch
import pyarrow as pa
from adora import Node
from adora.cuda import torch_to_ipc_buffer
node = Node()
tensor = torch.randn(1024, 768, device="cuda")
ipc_buffer, metadata = torch_to_ipc_buffer(tensor)
node.send_output("gpu_data", ipc_buffer, metadata)Parameters:
tensor(torch.Tensor) -- A CUDA tensor.
Returns: tuple[pyarrow.Array, dict] -- The IPC handle as an int8 Arrow array, and metadata with shape, strides, dtype, size, offset, and source info.
Reconstruct a CUDA IPC handle from a received Arrow buffer and metadata.
from adora.cuda import ipc_buffer_to_ipc_handle
event = node.next()
ipc_handle = ipc_buffer_to_ipc_handle(event["value"], event["metadata"])Parameters:
handle_buffer(pyarrow.Array) -- The Arrow array fromevent["value"].metadata(dict) -- The metadata fromevent["metadata"].
Returns: numba.cuda.cudadrv.driver.IpcHandle
Open a CUDA IPC handle and yield a PyTorch tensor. Use as a context manager to ensure proper cleanup.
from adora.cuda import ipc_buffer_to_ipc_handle, open_ipc_handle
event = node.next()
ipc_handle = ipc_buffer_to_ipc_handle(event["value"], event["metadata"])
with open_ipc_handle(ipc_handle, event["metadata"]) as tensor:
result = tensor * 2 # use the GPU tensor directlyParameters:
ipc_handle(IpcHandle) -- Handle fromipc_buffer_to_ipc_handle.metadata(dict) -- The metadata dictionary with shape, strides, and dtype info.
Returns: Context manager yielding a torch.Tensor on CUDA.
A complete node that receives images, processes them, and sends results:
#!/usr/bin/env python3
"""Example node: receives messages, transforms them, and sends output."""
import logging
import pyarrow as pa
from adora import Node
def main():
node = Node()
for event in node:
if event["type"] == "INPUT":
input_id = event["id"]
if input_id == "message":
values = event["value"].to_pylist()
number = values[0]
# Create a struct array with multiple fields
result = pa.StructArray.from_arrays(
[
pa.array([number * 2]),
pa.array([f"Message #{number}"]),
],
names=["doubled", "description"],
)
node.send_output("transformed", result)
logging.info("Transformed message %d", number)
elif event["type"] == "STOP":
logging.info("Node stopping")
break
if __name__ == "__main__":
main()Run with:
adora run dataflow.ymlBuild a dataflow programmatically instead of writing YAML by hand:
#!/usr/bin/env python3
"""Build a simple sender -> receiver dataflow."""
from adora.builder import DataflowBuilder, Operator
flow = DataflowBuilder("example-flow")
# Add a timer-driven sender node
sender = flow.add_node("sender")
sender.path("sender.py")
tick_output = sender.add_output("message")
# Add a receiver that subscribes to the sender
receiver = flow.add_node("receiver")
receiver.path("receiver.py")
receiver.add_input("message", tick_output)
# Add a node with a timer input
timed_node = flow.add_node("periodic")
timed_node.path("periodic.py")
timed_node.add_input("tick", "adora/timer/millis/100")
# Add a node with an operator
runtime_node = flow.add_node("runtime-node")
op = Operator("detector", python="object_detection.py")
runtime_node.add_operator(op)
runtime_node.add_input("image", "camera/image")
# Write or print the YAML
flow.to_yaml("dataflow.yml")
print(flow.to_yaml())