Skip to content

Commit a22da7c

Browse files
committed
[InfluxDB] Improve export adapter interfaces, and naming things
This patch sets the stage for more easily bringing in different database adapters when _exporting_ data.
1 parent b2817b6 commit a22da7c

File tree

11 files changed

+255
-191
lines changed

11 files changed

+255
-191
lines changed

kotori/daq/storage/influx.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
from copy import deepcopy
55
from funcy import project
66
from collections import OrderedDict
7+
8+
from munch import Munch
79
from twisted.logger import Logger
810
from influxdb.client import InfluxDBClient, InfluxDBClientError
911

@@ -12,7 +14,7 @@
1214
log = Logger()
1315

1416

15-
class InfluxDBAdapter(object):
17+
class InfluxDBAdapter:
1618

1719
def __init__(self, settings=None, database=None):
1820

@@ -64,6 +66,10 @@ def is_udp_database(self, name):
6466
return True
6567
return False
6668

69+
def query(self, expression: str, tdata: Munch = None):
70+
log.info(f"Database query: {expression}")
71+
return self.influx_client.query(expression)
72+
6773
def write(self, meta, data):
6874

6975
meta_copy = deepcopy(dict(meta))
@@ -122,7 +128,7 @@ def get_tags(data):
122128
return project(data, ['gateway', 'node'])
123129

124130

125-
class BusInfluxForwarder(object):
131+
class BusInfluxForwarder: # pragma: nocover
126132
"""
127133
Generic software bus -> influxdb forwarder based on prototypic implementation at HiveEyes
128134
TODO: Generalize and refactor

kotori/io/export/database.py

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
# -*- coding: utf-8 -*-
2+
# (c) 2016-2023 Andreas Motl <[email protected]>
3+
import collections.abc
4+
from twisted.logger import Logger
5+
6+
log = Logger()
7+
8+
9+
class DataFrameQuery:
10+
"""
11+
Query database, reshape result, and return as pandas DataFrame.
12+
"""
13+
14+
def __init__(self, settings=None, bucket=None):
15+
self.settings = settings
16+
self.bucket = bucket
17+
self.request = bucket.request
18+
19+
def query(self):
20+
21+
bucket = self.bucket
22+
23+
log.info("Creating database adapter")
24+
25+
# Get a database adapter object.
26+
# TODO: Support multiple databases at the same time.
27+
# TODO: Pool adapter instances, keyed by database.
28+
if "influxdb" in self.settings:
29+
from kotori.daq.storage.influx import InfluxDBAdapter
30+
database = InfluxDBAdapter(
31+
settings=self.settings.influxdb,
32+
database=bucket.tdata.database,
33+
)
34+
else:
35+
log.warn("No time-series database configured")
36+
return
37+
38+
# Get query expression from transformation dictionary.
39+
expression = bucket.tdata.expression
40+
log.info('Query expression: {expression}', expression=expression)
41+
42+
# Run database query.
43+
result = database.query(expression, tdata=bucket.tdata)
44+
45+
# Bring results into records format.
46+
# [{'time': '2020-03-10T03:29:42Z', 'humidity': 51.8, 'temperature': 25.26}]
47+
records = list(flatten(result))
48+
49+
# Stop when having no results.
50+
if not records:
51+
return
52+
53+
# Bring column names in order, `time` should be the first column.
54+
columns = list(records[0].keys())
55+
if 'time' in columns and columns.index('time') != 0:
56+
columns.remove('time')
57+
columns.insert(0, 'time')
58+
59+
# Produce pandas DataFrame from database results.
60+
import pandas
61+
df = pandas.DataFrame(records, columns=columns)
62+
63+
# Convert `time` column to a pandas datetime object.
64+
df['time'] = pandas.to_datetime(df['time'])
65+
66+
return df
67+
68+
69+
def flatten(l):
70+
"""
71+
Flatten irregular/nested results.
72+
73+
See also: https://stackoverflow.com/questions/21461140/flatten-an-irregular-list-of-lists-in-python-respecting-pandas-dataframes
74+
"""
75+
import pandas
76+
for el in l:
77+
if isinstance(el, collections.abc.Iterable) and not isinstance(el, (str, pandas.DataFrame, dict)):
78+
for sub in flatten(el):
79+
yield sub
80+
else:
81+
yield el

kotori/io/export/influx.py

Lines changed: 0 additions & 73 deletions
This file was deleted.

kotori/io/protocol/forwarder.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ def __init__(self, channel=None):
5858

5959
def setupService(self):
6060
#self.log(log.info, u'Setting up')
61-
log.info(u'Starting {name}'.format(name=self.logname))
61+
log.info(u'Starting ProtocolForwarderService: {name}'.format(name=self.logname))
6262

6363
self.settings = self.parent.settings
6464

kotori/io/protocol/http.py

Lines changed: 53 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
# -*- coding: utf-8 -*-
2-
# (c) 2016-2021 Andreas Motl <[email protected]>
2+
# (c) 2016-2023 Andreas Motl <[email protected]>
3+
import dataclasses
34
import re
45
import json
56
import mimetypes
@@ -48,61 +49,89 @@ def log(self, request):
4849
log.debug(line)
4950

5051

52+
@dataclasses.dataclass
53+
class HttpServerAddress:
54+
"""
55+
Represent a typical host/port pair for configuring IP server listening addresses.
56+
Other than this, provide sensible factory and helper methods.
57+
"""
58+
host: str
59+
port: int
60+
61+
@classmethod
62+
def from_settings(cls, settings):
63+
return cls(host=settings.kotori.http_listen, port=int(settings.kotori.http_port))
64+
65+
@property
66+
def combined(self):
67+
return f"{self.host}:{self.port}"
68+
69+
@property
70+
def slug(self):
71+
return f"{self.host}-{self.port}"
72+
73+
5174
class HttpServerService(Service):
5275
"""
53-
Singleton instance of a Twisted service wrapping
54-
the Twisted TCP/HTTP server object "Site", in turn
55-
obtaining a ``HttpChannelContainer`` as root resource.
76+
A Twisted service for managing multiple Twisted TCP/HTTP `Site` server objects,
77+
and associating them with corresponding `HttpChannelContainer` root resources.
5678
"""
5779

58-
_instance = None
80+
_instances = {}
5981

6082
def __init__(self, settings):
83+
log.info(f"Initializing HttpServerService. settings={settings}")
6184

62-
# Propagate global settings
85+
# Propagate global settings.
6386
self.settings = settings
6487

65-
# Unique name of this service
66-
self.name = 'http-server-default'
88+
# Extract listen address settings.
89+
self.address = HttpServerAddress.from_settings(self.settings)
6790

68-
# Root resource object representing a channel
69-
# Contains routing machinery
91+
# Assign a unique name to the Twisted service object.
92+
self.name = f'http-server-{self.address.slug}'
93+
94+
# Assign a root resource object, representing
95+
# a channel containing the routing machinery.
7096
self.root = HttpChannelContainer(self.settings)
7197

72-
# Forward route registration method to channel object
98+
# Forward route registration method to channel object.
7399
self.registerEndpoint = self.root.registerEndpoint
74100

75101
def startService(self):
76102
"""
77-
Start TCP listener on designated HTTP port,
78-
serving ``HttpChannelContainer`` as root resource.
103+
Start TCP listener on designated HTTP port, serving a
104+
`HttpChannelContainer` as root resource.
79105
"""
80106

81-
# Don't start service twice
107+
# Don't start service twice.
82108
if self.running == 1:
83109
return
84110

85111
self.running = 1
86112

87-
# Prepare startup
88-
http_listen = self.settings.kotori.http_listen
89-
http_port = int(self.settings.kotori.http_port)
90-
log.info('Starting HTTP service on {http_listen}:{http_port}', http_listen=http_listen, http_port=http_port)
113+
# Prepare startup.
114+
log.info(f"Starting HTTP service on {self.address.combined}")
91115

92116
# Configure root Site object and start listening to requests.
93117
# This must take place only once - can't bind to the same port multiple times!
94118
factory = LocalSite(self.root)
95-
reactor.listenTCP(http_port, factory, interface=http_listen)
119+
reactor.listenTCP(self.address.port, factory, interface=self.address.host)
96120

97121
@classmethod
98122
def create(cls, settings):
99123
"""
100-
Singleton factory
124+
Factory method for creating `HttpServerService` instances.
125+
126+
It makes sure to create only one instance per listening address,
127+
in order not to bind to the same port multiple times.
101128
"""
102-
if not cls._instance:
103-
cls._instance = HttpServerService(settings)
104-
cls._instance.startService()
105-
return cls._instance
129+
key = HttpServerAddress.from_settings(settings).combined
130+
if key not in cls._instances:
131+
instance = HttpServerService(settings)
132+
instance.startService()
133+
cls._instances[key] = instance
134+
return cls._instances[key]
106135

107136

108137
class HttpChannelContainer(Resource):

0 commit comments

Comments
 (0)