Skip to content

Commit f335f8b

Browse files
committed
[CrateDB] Add support for the data export subsystem
1 parent 6a7aea6 commit f335f8b

File tree

9 files changed

+187
-6
lines changed

9 files changed

+187
-6
lines changed

etc/test/cratedb.ini

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,18 @@
33
; ######################################
44

55

6+
; ==========================================
7+
; General settings
8+
; ==========================================
9+
10+
; http server
11+
[kotori]
12+
13+
; TODO: Refactor to [http] section, or, even better, into the channel configuration.
14+
http_listen = localhost
15+
http_port = 24643
16+
17+
618
; =====================
719
; Connectivity settings
820
; =====================
@@ -39,3 +51,25 @@ type = application
3951
realm = mqttkit-2
4052
mqtt_topics = mqttkit-2/#
4153
application = kotori.daq.application.mqttkit:mqttkit_application
54+
55+
[mqttkit-2.http-api-generic]
56+
enable = true
57+
58+
type = application
59+
application = kotori.io.protocol.forwarder:boot
60+
61+
realm = mqttkit-2
62+
source = http:/api/mqttkit-2/{address:.*}/{slot:(data|event)} [POST]
63+
target = mqtt:/mqttkit-2/{address}/{slot}.json
64+
65+
[mqttkit-2.cratedb-data-export]
66+
enable = true
67+
68+
type = application
69+
application = kotori.io.protocol.forwarder:boot
70+
71+
realm = mqttkit-2
72+
source = http:/api/{realm:mqttkit-2}/{network:.*}/{gateway:.*}/{node:.*}/{slot:(data|event)}.{suffix} [GET]
73+
target = cratedb:/{database}?measurement={measurement}
74+
transform = kotori.daq.strategy.wan:WanBusStrategy.topology_to_storage,
75+
kotori.io.protocol.cratedb:QueryTransformer.transform

etc/test/main.ini

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ include =
1515
; http server
1616
[kotori]
1717

18-
; TODO: Refactor to [http] section.
18+
; TODO: Refactor to [http] section, or, even better, into the channel configuration.
1919
http_listen = localhost
2020
http_port = 24642
2121

kotori/daq/storage/cratedb.py

Lines changed: 79 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,29 @@
11
# -*- coding: utf-8 -*-
22
# (c) 2023 Andreas Motl <[email protected]>
33
import calendar
4+
import functools
45
import json
6+
from collections import OrderedDict
57
from decimal import Decimal
68
from copy import deepcopy
7-
from datetime import datetime, date
9+
from datetime import datetime, date, timezone
810

911
import crate.client.http
1012
import pytz
1113
import requests
1214
from crate import client
15+
from crate.client.converter import DefaultTypeConverter
1316
from crate.client.exceptions import ProgrammingError
1417
from funcy import project
18+
from munch import Munch
1519
from twisted.logger import Logger
1620

1721
from kotori.daq.storage.util import format_chunk
1822

1923
log = Logger()
2024

2125

22-
class CrateDBAdapter(object):
26+
class CrateDBAdapter:
2327
"""
2428
Kotori database backend adapter for CrateDB.
2529
@@ -86,6 +90,79 @@ def create_table(self, tablename):
8690
cursor.execute(sql_ddl)
8791
cursor.close()
8892

93+
def query(self, expression: str, tdata: Munch = None):
94+
"""
95+
Query CrateDB and respond with results in suitable shape.
96+
97+
Make sure to synchronize data by using `REFRESH TABLE ...` before running
98+
the actual `SELECT` statement. This is applicable in test case scenarios.
99+
100+
Response format::
101+
102+
[
103+
{
104+
"time": ...,
105+
"tags": {"city": "berlin", "location": "balcony"},
106+
"fields": {"temperature": 42.42, "humidity": 84.84},
107+
},
108+
...
109+
]
110+
111+
TODO: Unify with `kotori.test.util:CrateDBWrapper.query`.
112+
"""
113+
114+
log.info(f"Database query: {expression}")
115+
116+
tdata = tdata or {}
117+
118+
# Before reading data from CrateDB, synchronize it.
119+
# Currently, it is mostly needed to satisfy synchronization constraints when running the test suite.
120+
# However, users also may expect to see data "immediately". On the other hand, in order to satisfy
121+
# different needs, this should be made configurable per realm, channel and/or request.
122+
# TODO: Maybe just _optionally_ synchronize with the database when reading data.
123+
if tdata:
124+
refresh_sql = f"REFRESH TABLE {self.get_tablename(tdata)}"
125+
self.execute(refresh_sql)
126+
127+
def dict_from_row(columns, row):
128+
"""
129+
https://stackoverflow.com/questions/3300464/how-can-i-get-dict-from-sqlite-query
130+
https://stackoverflow.com/questions/4147707/python-mysqldb-sqlite-result-as-dictionary
131+
"""
132+
return dict(zip(columns, row))
133+
134+
def record_from_dict(item):
135+
record = OrderedDict()
136+
record.update({"time": item["time"]})
137+
record.update(item["tags"])
138+
record.update(item["fields"])
139+
return record
140+
141+
# Query database, with convenience data type converters. Assume timestamps to be in UTC.
142+
cursor = self.db_client.cursor(converter=DefaultTypeConverter(), time_zone=timezone.utc)
143+
cursor.execute(expression)
144+
data_raw = cursor.fetchall()
145+
146+
# Provide fully-qualified records to downstream components, including column names.
147+
column_names = [column_info[0] for column_info in cursor.description]
148+
data_tags_fields = map(functools.partial(dict_from_row, column_names), data_raw)
149+
150+
# Bring results into classic "records" shape.
151+
data_records = map(record_from_dict, data_tags_fields)
152+
153+
cursor.close()
154+
return data_records
155+
156+
def execute(self, expression: str):
157+
"""
158+
Execute a database query, using a cursor, and return its results.
159+
"""
160+
cursor = self.db_client.cursor()
161+
cursor.execute(expression)
162+
result = cursor._result
163+
cursor.close()
164+
return result
165+
89166
def write(self, meta, data):
90167
"""
91168
Format ingress data chunk and store it into database table.

kotori/io/export/database.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,11 @@ def query(self):
3131
settings=self.settings.influxdb,
3232
database=bucket.tdata.database,
3333
)
34+
elif "cratedb" in self.settings:
35+
from kotori.daq.storage.cratedb import CrateDBAdapter
36+
database = CrateDBAdapter(
37+
settings=self.settings.cratedb,
38+
)
3439
else:
3540
log.warn("No time-series database configured")
3641
return

kotori/io/protocol/cratedb.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
# -*- coding: utf-8 -*-
2+
# (c) 2023 Andreas Motl <[email protected]>
3+
from twisted.logger import Logger
4+
5+
from kotori.io.protocol.util import compute_daterange
6+
7+
log = Logger()
8+
9+
10+
class QueryTransformer:
11+
12+
@classmethod
13+
def transform(cls, data):
14+
"""
15+
Compute CrateDB query expression from data in transformation dictionary.
16+
Also compute date range from query parameters "from" and "to".
17+
"""
18+
19+
log.info(f"Querying database: {data}")
20+
21+
# The PyInfluxQL query generator is versatile enough to be used for all SQL databases.
22+
from pyinfluxql import Query
23+
24+
# TODO: Use ".date_range" API method
25+
time_begin, time_end = compute_daterange(data.get('from'), data.get('to'))
26+
27+
# TODO: Add querying by tags.
28+
tags = {}
29+
# tags = CrateDBAdapter.get_tags(data)
30+
31+
table = f"{data.database}.{data.measurement}"
32+
expression = Query('*').from_(table).where(time__gte=time_begin, time__lte=time_end, **tags)
33+
34+
result = {
35+
'expression': str(expression),
36+
'time_begin': time_begin,
37+
'time_end': time_end,
38+
}
39+
40+
return result

kotori/io/protocol/target.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,8 @@ def setupService(self):
6161
except Exception as ex:
6262
log.failure("Connecting to MQTT broker failed: {ex}", ex=last_error_and_traceback())
6363

64-
elif self.scheme == 'influxdb':
65-
# InfluxDB has no subsystem service, it's just an adapter
64+
# CrateDB and InfluxDB are not subsystem services, they are just adapters.
65+
elif self.scheme in ['cratedb', 'influxdb']:
6666
pass
6767

6868
else:
@@ -87,7 +87,7 @@ def emit(self, uri, bucket):
8787
# TODO: Use threads.deferToThread here?
8888
return self.downstream.publish(topic, payload)
8989

90-
elif self.scheme == 'influxdb':
90+
elif self.scheme in ['cratedb', 'influxdb']:
9191

9292
# InfluxDB query wrapper using expression derived from transformation data
9393
dfq = DataFrameQuery(settings=self.settings, bucket=bucket)

test/settings/mqttkit.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ class TestSettings:
1616
cratedb_measurement_events = 'foo_bar_events'
1717
mqtt2_topic_json = 'mqttkit-2/itest/foo/bar/data.json'
1818
grafana2_dashboards = ['mqttkit-2-itest', 'mqttkit-2-itest3']
19+
io_channel_path = '/mqttkit-2/itest/foo/bar/data'
1920

2021
# InfluxDB settings.
2122
influx_database = 'mqttkit_1_itest'

test/test_export.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,25 @@
2323
ts_to = '2020-03-10T23:59:59.000Z'
2424

2525

26+
@pytest_twisted.inlineCallbacks
27+
@pytest.mark.http
28+
@pytest.mark.export
29+
@pytest.mark.cratedb
30+
def test_export_cratedb_general(machinery_cratedb, reset_cratedb):
31+
"""
32+
Submit single reading in JSON format to HTTP API and proof
33+
it can be retrieved back from the HTTP API in different formats.
34+
35+
This uses CrateDB as timeseries database.
36+
"""
37+
38+
channel_path = settings.io_channel_path
39+
http_submit = functools.partial(http_json_sensor, port=24643)
40+
http_fetch = functools.partial(http_get_data, port=24643)
41+
42+
yield verify_export_general(channel_path, http_submit, http_fetch)
43+
44+
2645
@pytest_twisted.inlineCallbacks
2746
@pytest.mark.http
2847
@pytest.mark.export

test/util.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,9 @@ def query(self):
259259
},
260260
...
261261
]
262+
263+
TODO: Refactor to / unify with `kotori.daq.storage.cratedb:CrateDBAdapter.query`.
264+
TODO: Add dict-based record retrieval as a row factory to `crate` library.
262265
"""
263266
logger.info('CrateDB: Querying database')
264267
db_table = self.get_tablename()
@@ -280,6 +283,8 @@ def query(self):
280283
def execute(self, expression):
281284
"""
282285
Actually execute the database query, using a cursor.
286+
287+
TODO: Use `kotori.daq.storage.cratedb:CrateDBAdapter.execute`.
283288
"""
284289
cursor = self.client.cursor()
285290
cursor.execute(expression)

0 commit comments

Comments
 (0)