This guide walks you through writing Python nodes and operators for adora dataflows.
cargo install adora-cli # CLI (adora command)
pip install adora-rs # Python node/operator APIThe adora-rs package includes pyarrow as a dependency.
Building from source (instead of pip install adora-rs):
pip install maturin # requires >= 1.8
cd apis/python/node && maturin develop --uv && cd ../../..The operator API is compiled into the adora Python module automatically -- there is no separate install step for it.
Create three files:
sender.py -- sends 100 numbered messages:
import pyarrow as pa
from adora import Node
node = Node()
for i in range(100):
node.send_output("message", pa.array([i]))receiver.py -- receives and prints messages:
from adora import Node
node = Node()
for event in node:
if event["type"] == "INPUT":
values = event["value"].to_pylist()
print(f"Received {event['id']}: {values}")
elif event["type"] == "STOP":
breakdataflow.yml -- connects sender to receiver:
nodes:
- id: sender
path: sender.py
outputs:
- message
- id: receiver
path: receiver.py
inputs:
message: sender/messageRun it:
adora run dataflow.ymlEvery call to node.next() or iteration over for event in node returns an event dictionary:
| Key | Type | Description |
|---|---|---|
type |
str | "INPUT", "INPUT_CLOSED", "STOP", or "ERROR" |
id |
str | Input name (e.g. "message") -- only for INPUT events |
value |
pyarrow.Array or None | The data payload |
metadata |
dict | Tracing/routing metadata |
Handle events by checking event["type"]:
for event in node:
match event["type"]:
case "INPUT":
process(event["id"], event["value"])
case "INPUT_CLOSED":
print(f"Input {event['id']} closed")
case "STOP":
breakAll data flows through adora as Apache Arrow arrays. Common patterns:
import pyarrow as pa
# Simple values
node.send_output("count", pa.array([42]))
node.send_output("names", pa.array(["alice", "bob"]))
# Read values back
values = event["value"].to_pylist() # [42] or ["alice", "bob"]
# Structured data
struct = pa.StructArray.from_arrays(
[pa.array([1.5]), pa.array(["hello"])],
names=["x", "y"],
)
node.send_output("point", struct)
# Raw bytes (images, serialized data, etc.)
node.send_output("frame", pa.array(raw_bytes))Operators are lightweight alternatives to nodes. They run inside the adora runtime process (no separate OS process), making them faster for simple transformations.
Define an Operator class with an on_event method:
# doubler_op.py
import pyarrow as pa
from adora import AdoraStatus
class Operator:
def on_event(self, event, send_output) -> AdoraStatus:
if event["type"] == "INPUT":
value = event["value"].to_pylist()[0]
send_output("doubled", pa.array([value * 2]), event["metadata"])
return AdoraStatus.CONTINUEReference it in YAML with operator instead of path:
nodes:
- id: timer
path: adora/timer/millis/500
outputs:
- tick
- id: doubler
operator:
python: doubler_op.py
inputs:
tick: timer/tick
outputs:
- doubledWhen to use operators vs nodes:
| Nodes | Operators | |
|---|---|---|
| Process model | Separate OS process | In-process (shared runtime) |
| Startup cost | Higher | Lower |
| Isolation | Full process isolation | Shared memory space |
| Best for | Long-running, heavy compute | Lightweight transforms, filters |
For nodes that need async I/O (HTTP calls, database queries, etc.), use recv_async():
import asyncio
from adora import Node
async def main():
node = Node()
for _ in range(50):
event = await node.recv_async()
if event["type"] == "STOP":
break
# Do async work here
result = await fetch_data(event["value"])
node.send_output("result", result)
asyncio.run(main())See examples/python-async for a complete example.
Use node.log() for structured logging that integrates with adora logs:
node.log("info", "Processing item", {"count": str(i)})Or use Python's standard logging module -- adora captures stdout/stderr automatically:
import logging
logging.info("Processing item %d", i)See examples/python-logging for logging module integration.
Built-in timer nodes generate periodic ticks without writing any code:
nodes:
- id: tick-source
path: adora/timer/millis/100 # tick every 100ms
outputs:
- tick
- id: my-node
path: my_node.py
inputs:
tick: tick-source/tickAlso available: adora/timer/hz/30 for 30 Hz.
- Python API Reference -- full API docs for Node, Operator, DataflowBuilder, CUDA
- Communication Patterns -- service (request/reply), action (goal/feedback/result), and streaming (session/segment/chunk) patterns
- Examples -- python-dataflow, python-async, python-drain, python-concurrent-rw, python-multiple-arrays
- Distributed Deployment -- running across multiple machines with
adora up