Skip to content

Commit 8847a99

Browse files
author
Bar Nehemia
authored
Merge pull request #13 from Lightricks/add-line-protocol-reporter
Reporter: Add new reporter `LineProtocolReporter`.
2 parents cdd11a6 + 48a603c commit 8847a99

File tree

8 files changed

+288
-46
lines changed

8 files changed

+288
-46
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,4 @@ pyformance.egg-info
44
/build/
55
/dist/
66
*.py[cod]
7+
playground

pyformance/reporters/__init__.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,12 @@ def InfluxReporter(*args, **kwargs):
4545
return cls(*args, **kwargs)
4646

4747

48+
def LineProtocolReporter(*args, **kwargs):
49+
from .line_protocol_reporter import LineProtocolReporter as cls
50+
51+
return cls(*args, **kwargs)
52+
53+
4854
def OpenTSDBReporter(*args, **kwargs):
4955
from .opentsdb_reporter import OpenTSDBReporter as cls
5056

pyformance/reporters/influx.py

Lines changed: 29 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,12 @@
33
import base64
44
import logging
55
import re
6-
from enum import Enum
6+
from pathlib import Path
77

88
from six import iteritems
99

10+
from .utils import ReportingPrecision, to_timestamp_in_precision
11+
1012
try:
1113
from urllib2 import quote, urlopen, Request, URLError
1214
except ImportError:
@@ -27,15 +29,6 @@
2729
DEFAULT_INFLUX_PASSWORD = None
2830
DEFAULT_INFLUX_PROTOCOL = "http"
2931

30-
class ReportingPrecision(Enum):
31-
HOURS = "h"
32-
MINUTES = "m"
33-
SECONDS = "s"
34-
MILLISECONDS = "ms"
35-
MICROSECONDS = "u"
36-
NANOSECONDS = "ns"
37-
38-
3932
class InfluxReporter(Reporter):
4033
"""
4134
InfluxDB reporter using native http api
@@ -77,6 +70,7 @@ def __init__(
7770
self.autocreate_database = autocreate_database
7871
self._did_create_database = False
7972
self.retention_policy = retention_policy
73+
self.reported_files = []
8074

8175
if global_tags is None:
8276
self.global_tags = {}
@@ -109,7 +103,7 @@ def report_now(self, registry=None, timestamp=None):
109103
if self.autocreate_database and not self._did_create_database:
110104
self._create_database()
111105
timestamp = timestamp or self.clock.time()
112-
timestamp_in_reporting_precision = _to_timestamp_in_precision(
106+
timestamp_in_reporting_precision = to_timestamp_in_precision(
113107
timestamp=timestamp,
114108
precision=self.reporting_precision
115109
)
@@ -145,7 +139,7 @@ def _get_influx_protocol_lines(self, metrics, timestamp):
145139
for event in metric_values.get("events", []):
146140
values = InfluxReporter._stringify_values(event.values)
147141

148-
event_timestamp = _to_timestamp_in_precision(
142+
event_timestamp = to_timestamp_in_precision(
149143
timestamp=event.time,
150144
precision=self.reporting_precision
151145
)
@@ -160,6 +154,25 @@ def _get_influx_protocol_lines(self, metrics, timestamp):
160154

161155
return lines
162156

157+
def report_from_files(self, files_path: Path) -> None:
158+
"""
159+
Report to Influx from list of file in a given directory.
160+
NOTE: The files in the path must be in line protocol format.
161+
162+
:param files_path: The path where all the files stored.
163+
:return: None
164+
"""
165+
if not files_path.exists():
166+
raise FileNotFoundError
167+
168+
files = [f for f in files_path.glob("*.txt") if f not in self.reported_files]
169+
170+
for file in files:
171+
with open(file, "r") as metrics_file:
172+
url = self._get_url()
173+
if self._try_send(url, metrics_file.read()):
174+
self.reported_files.append(file)
175+
163176
@staticmethod
164177
def _stringify_values(metric_values):
165178
return ",".join(
@@ -196,16 +209,16 @@ def _add_auth_data(self, request):
196209
auth = _encode_username(self.username, self.password)
197210
request.add_header("Authorization", "Basic %s" % auth.decode('utf-8'))
198211

199-
def _try_send(self, url, data):
212+
def _try_send(self, url, data) -> bool:
200213
request = Request(url, data.encode("utf-8"))
201214
if self.username:
202215
self._add_auth_data(request)
203216
try:
204217
response = urlopen(request)
205218
response.read()
219+
return True
206220
except URLError as err:
207-
response = err.read().decode("utf-8")
208-
221+
response = str(err)
209222
LOG.warning(
210223
"Cannot write to %s: %s ,url: %s, data: %s, response: %s",
211224
self.server,
@@ -214,28 +227,7 @@ def _try_send(self, url, data):
214227
data,
215228
response
216229
)
217-
218-
def _to_timestamp_in_precision(timestamp: float, precision: ReportingPrecision) -> int:
219-
if precision == ReportingPrecision.HOURS:
220-
return int(timestamp / 60 / 60)
221-
222-
if precision == ReportingPrecision.MINUTES:
223-
return int(timestamp / 60)
224-
225-
if precision == ReportingPrecision.SECONDS:
226-
return int(timestamp)
227-
228-
if precision == ReportingPrecision.MILLISECONDS:
229-
return int(timestamp * 1e3)
230-
231-
if precision == ReportingPrecision.MICROSECONDS:
232-
return int(timestamp * 1e6)
233-
234-
if precision == ReportingPrecision.NANOSECONDS:
235-
return int(timestamp * 1e9)
236-
237-
raise Exception("Unsupported ReportingPrecision")
238-
230+
return False
239231

240232
def _format_field_value(value):
241233
if isinstance(value, MarkInt):
Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
# Copyright (c) 2023 Lightricks. All rights reserved.
2+
# -*- coding: utf-8 -*-
3+
import logging
4+
import os
5+
import re
6+
import time
7+
import uuid
8+
9+
from six import iteritems
10+
11+
from pyformance.reporters.reporter import Reporter
12+
from pyformance.reporters.utils import to_timestamp_in_precision, ReportingPrecision
13+
from pyformance.registry import MetricsRegistry
14+
from pyformance.mark_int import MarkInt
15+
from copy import copy
16+
17+
LOG = logging.getLogger(__name__)
18+
19+
20+
class LineProtocolReporter(Reporter):
21+
"""
22+
Line Protocol reporter using InfluxDB format.
23+
"""
24+
25+
def __init__(
26+
self,
27+
registry: MetricsRegistry = None,
28+
reporting_interval: int = 30,
29+
path: str = "/tmp/metrics",
30+
prefix: str = "",
31+
clock: time = None,
32+
global_tags: dict = None,
33+
reporting_precision = ReportingPrecision.SECONDS,
34+
):
35+
"""
36+
:param reporting_precision: The precision in which the reporter reports.
37+
The default is seconds. This is a tradeoff between precision and performance. More
38+
coarse precision may result in significant improvements in compression and vice versa.
39+
"""
40+
super(LineProtocolReporter, self).__init__(registry, reporting_interval, clock)
41+
self.path = path
42+
self.prefix = prefix
43+
44+
if not os.path.exists(self.path):
45+
os.makedirs(self.path)
46+
47+
if global_tags is None:
48+
self.global_tags = {}
49+
else:
50+
self.global_tags = global_tags
51+
52+
self.reporting_precision = reporting_precision
53+
54+
def report_now(self, registry=None, timestamp=None) -> None:
55+
timestamp = timestamp or self.clock.time()
56+
timestamp_in_reporting_precision = to_timestamp_in_precision(
57+
timestamp=timestamp,
58+
precision=self.reporting_precision
59+
)
60+
metrics = (registry or self.registry).dump_metrics(key_is_metric=True)
61+
influx_lines = self._get_influx_protocol_lines(metrics, timestamp_in_reporting_precision)
62+
63+
if influx_lines:
64+
with open(f"{self.path}/{uuid.uuid4().hex}.txt", "a") as file:
65+
post_data = "\n".join(influx_lines)
66+
file.write(post_data)
67+
68+
def _get_table_name(self, metric_key) -> str:
69+
if not self.prefix:
70+
return metric_key
71+
else:
72+
return "%s.%s" % (self.prefix, metric_key)
73+
74+
def _get_influx_protocol_lines(self, metrics, timestamp) -> list:
75+
lines = []
76+
for key, metric_values in metrics.items():
77+
metric_name = key.get_key()
78+
table = self._get_table_name(metric_name)
79+
values = LineProtocolReporter._stringify_values(metric_values)
80+
tags = self._stringify_tags(key)
81+
82+
# there's a special case where only events are present, which are skipped by
83+
# _stringify_values function
84+
if values:
85+
line = "%s%s %s %s" % (table, tags, values, timestamp)
86+
lines.append(line)
87+
88+
for event in metric_values.get("events", []):
89+
values = LineProtocolReporter._stringify_values(event.values)
90+
91+
event_timestamp = to_timestamp_in_precision(
92+
timestamp=event.time,
93+
precision=self.reporting_precision
94+
)
95+
line = "%s%s %s %s" % (
96+
table,
97+
tags,
98+
values,
99+
event_timestamp
100+
)
101+
102+
lines.append(line)
103+
104+
return lines
105+
106+
@staticmethod
107+
def _stringify_values(metric_values) -> str:
108+
return ",".join(
109+
[
110+
"%s=%s" % (k, _format_field_value(v))
111+
for (k, v) in iteritems(metric_values) if k != "tags" and k != "events"
112+
]
113+
)
114+
115+
def _stringify_tags(self, metric) -> str:
116+
# start with the global reporter tags
117+
# (copy to avoid mutating to global values)
118+
all_tags = copy(self.global_tags)
119+
120+
# add the local tags on top of those
121+
tags = metric.get_tags()
122+
all_tags.update(tags)
123+
124+
if all_tags:
125+
return "," + ",".join(
126+
[
127+
"%s=%s" % (k, _format_tag_value(v))
128+
for (k, v) in iteritems(all_tags)
129+
]
130+
)
131+
132+
return ""
133+
134+
def _format_field_value(value) -> str:
135+
if isinstance(value, MarkInt):
136+
return f"{value.value}i"
137+
if type(value) is not str:
138+
return value
139+
else:
140+
return '"{}"'.format(value)
141+
142+
143+
def _format_tag_value(value) -> str:
144+
if type(value) is not str:
145+
return value
146+
else:
147+
# Escape special characters
148+
return re.sub("([ ,=])", r"\\\1", value)

pyformance/reporters/reporter.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
import time
22
from threading import Thread, Event
33
import six
4-
from ..registry import global_registry
5-
from ..decorators import get_qualname
4+
from pyformance.registry import global_registry
5+
from pyformance.decorators import get_qualname
66

77

88
class Reporter(object):

pyformance/reporters/utils.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
# Copyright (c) 2023 Lightricks. All rights reserved.
2+
from enum import Enum
3+
4+
5+
class ReportingPrecision(Enum):
6+
HOURS = "h"
7+
MINUTES = "m"
8+
SECONDS = "s"
9+
MILLISECONDS = "ms"
10+
MICROSECONDS = "u"
11+
NANOSECONDS = "ns"
12+
13+
def to_timestamp_in_precision(timestamp: float, precision: ReportingPrecision) -> int:
14+
if precision == ReportingPrecision.HOURS:
15+
return int(timestamp / 60 / 60)
16+
17+
if precision == ReportingPrecision.MINUTES:
18+
return int(timestamp / 60)
19+
20+
if precision == ReportingPrecision.SECONDS:
21+
return int(timestamp)
22+
23+
if precision == ReportingPrecision.MILLISECONDS:
24+
return int(timestamp * 1e3)
25+
26+
if precision == ReportingPrecision.MICROSECONDS:
27+
return int(timestamp * 1e6)
28+
29+
if precision == ReportingPrecision.NANOSECONDS:
30+
return int(timestamp * 1e9)
31+
32+
raise Exception("Unsupported ReportingPrecision")

0 commit comments

Comments
 (0)