-
Notifications
You must be signed in to change notification settings - Fork 8
Expand file tree
/
Copy pathtransformer.py
More file actions
41 lines (30 loc) · 1.14 KB
/
transformer.py
File metadata and controls
41 lines (30 loc) · 1.14 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
"""Transformer node that receives messages and outputs a struct with multiple fields."""
import logging
import pyarrow as pa
from adora import Node
def main():
node = Node()
for event in node:
if event["type"] == "INPUT":
# Get the incoming message number
values = event["value"].to_pylist()
number = values[0]
# Create a struct array with multiple fields
struct_array = pa.StructArray.from_arrays(
[
pa.array([number * 2]), # doubled value
pa.array([f"Message #{number}"]), # string description
pa.array([number % 2 == 0]), # is_even flag
],
names=["doubled", "description", "is_even"],
)
node.send_output("transformed", struct_array)
logging.info(
"Transformed message %d -> struct with doubled=%d", number, number * 2
)
elif event["type"] == "STOP":
logging.info("Transformer stopping")
break
logging.info("Transformer finished")
if __name__ == "__main__":
main()