Skip to content

Commit f774a90

Browse files
committed
[CrateDB] Add basic data acquisition support for CrateDB
1 parent 5950201 commit f774a90

File tree

15 files changed

+497
-10
lines changed

15 files changed

+497
-10
lines changed

.env

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,11 @@ MOSQUITTO_VERSION=2.0
66
MOSQUITTO_MQTT_PORT=1883
77
MOSQUITTO_WS_PORT=9001
88

9+
# CrateDB
10+
CRATEDB_VERSION=latest
11+
CRATEDB_HTTP_PORT=4200
12+
CRATEDB_POSTGRESQL_PORT=5432
13+
914
# InfluxDB
1015
INFLUXDB_VERSION=1.8
1116
INFLUXDB_HTTP_PORT=8086

.github/workflows/tests.yml

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,19 @@ jobs:
2525
os: [ ubuntu-20.04 ] # , macos-latest, windows-latest ]
2626
python-version: [ "3.7", "3.8", "3.9", "3.10", "3.11" ]
2727
mosquitto-version: [ "2.0" ]
28+
cratedb-version: [ "5.3" ]
2829
influxdb-version: [ "1.8" ]
2930
grafana-version: [ "7.5.17", "8.5.15", "9.3.0" ]
3031

3132
# https://docs.github.com/en/free-pro-team@latest/actions/guides/about-service-containers
3233
services:
3334

35+
cratedb:
36+
image: crate:${{ matrix.cratedb-version }}
37+
ports:
38+
- 4200:4200
39+
- 5432:5432
40+
3441
influxdb:
3542
image: influxdb:${{ matrix.influxdb-version }}
3643
ports:
@@ -53,7 +60,7 @@ jobs:
5360
OS: ${{ matrix.os }}
5461
PYTHON: ${{ matrix.python-version }}
5562

56-
name: Python ${{ matrix.python-version }}, Grafana ${{ matrix.grafana-version }}, Mosquitto ${{ matrix.mosquitto-version }}, InfluxDB ${{ matrix.influxdb-version }}
63+
name: Py ${{ matrix.python-version }}, Grafana ${{ matrix.grafana-version }}, Mosquitto ${{ matrix.mosquitto-version }}, InfluxDB ${{ matrix.influxdb-version }}, CrateDB ${{ matrix.cratedb-version }}
5764
steps:
5865

5966
- name: Acquire sources

doc/source/development/tests.rst

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,9 @@ Run specific tests with maximum verbosity::
6363
# Run tests marked with "tasmota", "homie" or "airrohr".
6464
pytest test ${PYTEST_OPTIONS} -m 'tasmota or homie or airrohr'
6565

66+
# Run tests with CrateDB as database backend.
67+
pytest test ${PYTEST_OPTIONS} -m cratedb
68+
6669
To see available markers, type::
6770

6871
pytest --markers

docker-compose.yml

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,19 @@ services:
1616
- ${PATH_VAR_LIB}/mosquitto:/mosquitto/data
1717
- ${PATH_VAR_LOG}/mosquitto:/mosquitto/log
1818

19+
cratedb:
20+
image: crate:${CRATEDB_VERSION}
21+
ports:
22+
- "${CRATEDB_HTTP_PORT}:${CRATEDB_HTTP_PORT}"
23+
- "${CRATEDB_POSTGRESQL_PORT}:${CRATEDB_POSTGRESQL_PORT}"
24+
environment:
25+
CRATE_HEAP_SIZE: 2g
26+
27+
command: ["crate",
28+
"-Cdiscovery.type=single-node",
29+
"-Ccluster.routing.allocation.disk.threshold_enabled=false",
30+
]
31+
1932
# https://github.com/robcowart/docker_compose_cookbook/blob/master/STACKS/influx_oss/docker-compose.yml#L21
2033
influxdb:
2134
image: influxdb:${INFLUXDB_VERSION}

etc/test/cratedb.ini

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
; ######################################
2+
; Kotori test configuration with CrateDB
3+
; ######################################
4+
5+
6+
; =====================
7+
; Connectivity settings
8+
; =====================
9+
10+
; MQTT bus adapter
11+
[mqtt]
12+
host = localhost
13+
#port = 1883
14+
username = kotori
15+
password = kotori
16+
17+
; Storage adapter
18+
[cratedb]
19+
; host = localhost
20+
; port = 4200
21+
; username = crate
22+
; password =
23+
24+
; User interface
25+
[grafana]
26+
host = localhost
27+
#port = 3000
28+
username = admin
29+
password = admin
30+
31+
32+
; ================
33+
; Channel settings
34+
; ================
35+
36+
[mqttkit-2]
37+
enable = true
38+
type = application
39+
realm = mqttkit-2
40+
mqtt_topics = mqttkit-2/#
41+
application = kotori.daq.application.mqttkit:mqttkit_application

etc/test/main.ini

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ http_listen = localhost
2020
http_port = 24642
2121

2222
; TODO: Implement backend database selection.
23+
; use_database = cratedb
2324
; use_database = influxdb
2425

2526
; mqtt bus adapter

kotori/daq/services/mig.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
# -*- coding: utf-8 -*-
2-
# (c) 2015-2021 Andreas Motl <[email protected]>
2+
# (c) 2015-2023 Andreas Motl <[email protected]>
3+
import os
34
import time
45
import json
56

@@ -17,6 +18,7 @@
1718
from kotori.daq.decoder.schema import MessageType, TopicMatchers
1819
from kotori.daq.services import MultiServiceMixin
1920
from kotori.daq.intercom.mqtt import MqttAdapter
21+
from kotori.daq.storage.cratedb import CrateDBAdapter
2022
from kotori.daq.storage.influx import InfluxDBAdapter
2123
from kotori.util.configuration import read_list
2224
from kotori.util.thimble import Thimble
@@ -79,7 +81,14 @@ def setupService(self):
7981

8082
self.registerService(self.mqtt_service)
8183

82-
self.influx = InfluxDBAdapter(settings = self.settings.influxdb)
84+
# TODO: Support multiple databases at the same time.
85+
log.info("Creating database adapter")
86+
if "influxdb" in self.settings:
87+
self.database = InfluxDBAdapter(settings=self.settings.influxdb)
88+
elif "cratedb" in self.settings:
89+
self.database = CrateDBAdapter(settings = self.settings.cratedb)
90+
else:
91+
log.warn("No time-series database configured")
8392

8493
# Perform MQTT message processing using a different thread pool
8594
self.threadpool = ThreadPool()
@@ -311,7 +320,8 @@ def store_message(self, storage, data):
311320
:param storage: The storage location object
312321
:param data: The data ready for storing
313322
"""
314-
self.influx.write(storage, data)
323+
if self.database is not None:
324+
self.database.write(storage, data)
315325

316326
def mqtt_process_error(self, failure, topic, payload):
317327
"""

kotori/daq/storage/cratedb.py

Lines changed: 193 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,193 @@
1+
# -*- coding: utf-8 -*-
2+
# (c) 2023 Andreas Motl <[email protected]>
3+
import calendar
4+
import json
5+
from decimal import Decimal
6+
from copy import deepcopy
7+
from datetime import datetime, date
8+
9+
import crate.client.http
10+
import pytz
11+
import requests
12+
from crate import client
13+
from crate.client.exceptions import ProgrammingError
14+
from funcy import project
15+
from twisted.logger import Logger
16+
17+
from kotori.daq.storage.util import format_chunk
18+
19+
log = Logger()
20+
21+
22+
class CrateDBAdapter(object):
23+
"""
24+
Kotori database backend adapter for CrateDB.
25+
26+
CrateDB is a distributed SQL database for storing and analyzing
27+
massive amounts of data in real-time. Built on top of Lucene.
28+
29+
https://github.com/crate/crate
30+
"""
31+
32+
def __init__(self, settings=None, database=None):
33+
"""
34+
Carry over connectivity parameters.
35+
36+
TODO: Verify with CrateDB Cloud.
37+
"""
38+
39+
settings = deepcopy(settings) or {}
40+
settings.setdefault("host", "localhost")
41+
settings.setdefault("port", "4200")
42+
settings.setdefault("username", "crate")
43+
settings.setdefault("password", "")
44+
settings.setdefault("database", database)
45+
46+
# TODO: Bring back pool size configuration.
47+
# settings.setdefault('pool_size', 10)
48+
49+
settings["port"] = int(settings["port"])
50+
51+
# FIXME: This is bad style. Well, but it is currently
52+
# inherited from ~10 year old code, so c'est la vie.
53+
self.__dict__.update(**settings)
54+
55+
# Bookkeeping for all databases having been touched already
56+
self.databases_written_once = set()
57+
58+
self.host_uri = "{host}:{port}".format(**self.__dict__)
59+
60+
# TODO: Bring back pool size configuration.
61+
# log.info('Storage target is {uri}, pool size is {pool_size}', uri=self.host_uri, pool_size=self.pool_size)
62+
log.info("Storage target is {uri}", uri=self.host_uri)
63+
self.db_client = client.connect(
64+
self.host_uri, username=self.username, password=self.password, pool_size=20,
65+
)
66+
67+
def get_tablename(self, meta):
68+
"""
69+
Get table name for SensorWAN channel.
70+
"""
71+
return f"{meta.database}.{meta.measurement}"
72+
73+
def create_table(self, tablename):
74+
"""
75+
Create database table for SensorWAN channel.
76+
"""
77+
log.info(f"Creating table: {tablename}")
78+
sql_ddl = f"""
79+
CREATE TABLE IF NOT EXISTS {tablename} (
80+
time TIMESTAMP WITH TIME ZONE DEFAULT NOW() NOT NULL,
81+
tags OBJECT(DYNAMIC),
82+
fields OBJECT(DYNAMIC)
83+
);
84+
"""
85+
cursor = self.db_client.cursor()
86+
cursor.execute(sql_ddl)
87+
cursor.close()
88+
89+
def write(self, meta, data):
90+
"""
91+
Format ingress data chunk and store it into database table.
92+
93+
TODO: This dearly needs efficiency improvements. Currently, there is no
94+
batching, just single records/inserts. That yields bad performance.
95+
"""
96+
97+
meta_copy = deepcopy(dict(meta))
98+
data_copy = deepcopy(data)
99+
100+
try:
101+
chunk = format_chunk(meta, data)
102+
103+
except Exception as ex:
104+
log.failure(
105+
"Could not format chunk (ex={ex_name}: {ex}): data={data}, meta={meta}",
106+
ex_name=ex.__class__.__name__,
107+
ex=ex,
108+
meta=meta_copy,
109+
data=data_copy,
110+
)
111+
raise
112+
113+
try:
114+
success = self.write_chunk(meta, chunk)
115+
return success
116+
117+
except requests.exceptions.ConnectionError as ex:
118+
log.failure(
119+
"Problem connecting to CrateDB at {uri}: {ex}", uri=self.host_uri, ex=ex
120+
)
121+
raise
122+
123+
except ProgrammingError as ex:
124+
if "SchemaUnknownException" in ex.message:
125+
db_table = self.get_tablename(meta)
126+
self.create_table(db_table)
127+
128+
# Attempt second write
129+
success = self.write_chunk(meta, chunk)
130+
return success
131+
132+
else:
133+
raise
134+
135+
def write_chunk(self, meta, chunk):
136+
"""
137+
Run the SQL `INSERT` operation.
138+
"""
139+
db_table = self.get_tablename(meta)
140+
cursor = self.db_client.cursor()
141+
142+
# With or without timestamp.
143+
if "time" in chunk:
144+
cursor.execute(
145+
f"INSERT INTO {db_table} (time, tags, fields) VALUES (?, ?, ?)",
146+
(chunk["time"], chunk["tags"], chunk["fields"]),
147+
)
148+
else:
149+
cursor.execute(
150+
f"INSERT INTO {db_table} (tags, fields) VALUES (?, ?)",
151+
(chunk["tags"], chunk["fields"]),
152+
)
153+
success = True
154+
self.databases_written_once.add(meta.database)
155+
cursor.close()
156+
if success:
157+
log.debug("Storage success: {chunk}", chunk=chunk)
158+
else:
159+
log.error("Storage failed: {chunk}", chunk=chunk)
160+
return success
161+
162+
@staticmethod
163+
def get_tags(data):
164+
"""
165+
Derive tags from topology information.
166+
167+
TODO: Verify if this is used at all.
168+
"""
169+
return project(data, ["gateway", "node"])
170+
171+
172+
class TimezoneAwareCrateJsonEncoder(json.JSONEncoder):
173+
epoch_aware = datetime(1970, 1, 1, tzinfo=pytz.UTC)
174+
epoch_naive = datetime(1970, 1, 1)
175+
176+
def default(self, o):
177+
if isinstance(o, Decimal):
178+
return str(o)
179+
if isinstance(o, datetime):
180+
if o.tzinfo:
181+
delta = o - self.epoch_aware
182+
else:
183+
delta = o - self.epoch_naive
184+
return int(delta.microseconds / 1000.0 +
185+
(delta.seconds + delta.days * 24 * 3600) * 1000.0)
186+
if isinstance(o, date):
187+
return calendar.timegm(o.timetuple()) * 1000
188+
return json.JSONEncoder.default(self, o)
189+
190+
191+
# Monkey patch.
192+
# TODO: Submit upstream.
193+
crate.client.http.CrateJsonEncoder = TimezoneAwareCrateJsonEncoder

pytest.ini

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ markers =
2929
http: Tests using HTTP.
3030
export: Tests for exporting data.
3131
mqtt: Tests only doing MQTT.
32+
cratedb: Tests specific to CrateDB.
3233
influxdb: Tests specific to InfluxDB.
3334
grafana: Tests interacting with Grafana.
3435
mongodb: Tests using MongoDB.

setup.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,8 @@
6767

6868
extras = {
6969
'daq': [
70+
'crash<1',
71+
'crate[sqlalchemy]<1',
7072
'influxdb>=5.3.0,<6',
7173
'pytz>=2020.1',
7274
'requests>=2.12.4,<3',

0 commit comments

Comments
 (0)