-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathprocessor.py
More file actions
60 lines (44 loc) · 1.75 KB
/
processor.py
File metadata and controls
60 lines (44 loc) · 1.75 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
"""Processor node demonstrating send_logs_as routing.
Receives sensor readings, computes a running average, and emits results.
Uses Python logging at various levels. Because the dataflow YAML sets
send_logs_as: log_entries, structured log entries from this node are
routed as data messages to any downstream node subscribing to
processor/log_entries.
Note: raw print() output is NOT routed by send_logs_as -- only parsed
structured log entries are forwarded.
"""
import json
import logging
import pyarrow as pa
from adora import Node
def main():
node = Node()
readings = []
error_injected = False
for event in node:
if event["type"] == "INPUT":
arr = event["value"]
if len(arr) == 0:
continue
value = arr[0].as_py()
readings.append(value)
# Keep a sliding window of 20
if len(readings) > 20:
readings.pop(0)
avg = sum(readings) / len(readings)
logging.info("processed: avg=%.2f samples=%d", avg, len(readings))
# Inject one error for demonstration
if not error_injected and len(readings) >= 10:
logging.error("simulated processing error at sample %d", len(readings))
error_injected = True
if avg > 26.0:
logging.warning("average temperature elevated: %.2f", avg)
# Raw print -- NOT captured by send_logs_as
print(f"processor stdout: avg={avg:.2f}")
result = pa.array([avg])
node.send_output("result", result)
elif event["type"] == "STOP":
logging.info("processor stopping, processed %d readings", len(readings))
break
if __name__ == "__main__":
main()