Skip to content

Commit 50388c5

Browse files
authored
Support Kafka reporter protocol (#74)
1 parent cbf689f commit 50388c5

File tree

6 files changed

+224
-0
lines changed

6 files changed

+224
-0
lines changed

docs/EnvVars.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,3 +22,7 @@ Environment Variable | Description | Default
2222
| `SW_TRACE_IGNORE`| This config item controls that whether the trace should be ignore | `false` |
2323
| `SW_TRACE_IGNORE_PATH`| You can setup multiple URL path patterns, The endpoints match these patterns wouldn't be traced. the current matching rules follow Ant Path match style , like /path/*, /path/**, /path/?.| `''` |
2424
| `SW_ELASTICSEARCH_TRACE_DSL`| If true, trace all the DSL(Domain Specific Language) in ElasticSearch access, default is false | `false` |
25+
| `SW_KAFKA_REPORTER_BOOTSTRAP_SERVERS` | A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. It is in the form host1:port1,host2:port2,... | `localhost:9092` |
26+
| `SW_KAFKA_REPORTER_TOPIC_MANAGEMENT` | Specifying Kafka topic name for service instance reporting and registering. | `skywalking-managements` |
27+
| `SW_KAFKA_REPORTER_TOPIC_SEGMENT` | Specifying Kafka topic name for Tracing data. | `skywalking-segments` |
28+
| `SW_KAFKA_REPORTER_CONFIG_key` | The configs to init KafkaProducer. it support the basic arguments (whose type is either `str`, `bool`, or `int`) listed [here](https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html#kafka.KafkaProducer) | unset |

skywalking/agent/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,9 @@ def __init():
6262
elif config.protocol == 'http':
6363
from skywalking.agent.protocol.http import HttpProtocol
6464
__protocol = HttpProtocol()
65+
elif config.protocol == "kafka":
66+
from skywalking.agent.protocol.kafka import KafkaProtocol
67+
__protocol = KafkaProtocol()
6568

6669
plugins.install()
6770

skywalking/agent/protocol/kafka.py

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
import logging
19+
from queue import Queue
20+
21+
from skywalking import config
22+
from skywalking.agent import Protocol
23+
from skywalking.client.kafka import KafkaServiceManagementClient, KafkaTraceSegmentReportService
24+
from skywalking.protocol.common.Common_pb2 import KeyStringValuePair
25+
from skywalking.protocol.language_agent.Tracing_pb2 import SegmentObject, SpanObject, Log, SegmentReference
26+
from skywalking.trace.segment import Segment
27+
28+
logger = logging.getLogger(__name__)
29+
30+
# avoid too many kafka logs
31+
logger_kafka = logging.getLogger('kafka')
32+
logger_kafka.setLevel(logging.WARN)
33+
34+
35+
class KafkaProtocol(Protocol):
36+
def __init__(self):
37+
self.service_management = KafkaServiceManagementClient()
38+
self.traces_reporter = KafkaTraceSegmentReportService()
39+
40+
def connected(self):
41+
return True
42+
43+
def heartbeat(self):
44+
self.service_management.send_heart_beat()
45+
46+
def report(self, queue: Queue):
47+
def generator():
48+
while True:
49+
segment = queue.get() # type: Segment
50+
51+
logger.debug('reporting segment %s', segment)
52+
53+
s = SegmentObject(
54+
traceId=str(segment.related_traces[0]),
55+
traceSegmentId=str(segment.segment_id),
56+
service=config.service_name,
57+
serviceInstance=config.service_instance,
58+
spans=[SpanObject(
59+
spanId=span.sid,
60+
parentSpanId=span.pid,
61+
startTime=span.start_time,
62+
endTime=span.end_time,
63+
operationName=span.op,
64+
peer=span.peer,
65+
spanType=span.kind.name,
66+
spanLayer=span.layer.name,
67+
componentId=span.component.value,
68+
isError=span.error_occurred,
69+
logs=[Log(
70+
time=int(log.timestamp * 1000),
71+
data=[KeyStringValuePair(key=item.key, value=item.val) for item in log.items],
72+
) for log in span.logs],
73+
tags=[KeyStringValuePair(
74+
key=str(tag.key),
75+
value=str(tag.val),
76+
) for tag in span.tags],
77+
refs=[SegmentReference(
78+
refType=0 if ref.ref_type == "CrossProcess" else 1,
79+
traceId=ref.trace_id,
80+
parentTraceSegmentId=ref.segment_id,
81+
parentSpanId=ref.span_id,
82+
parentService=ref.service,
83+
parentServiceInstance=ref.service_instance,
84+
parentEndpoint=ref.endpoint,
85+
networkAddressUsedAtPeer=ref.client_address,
86+
) for ref in span.refs if ref.trace_id],
87+
) for span in segment.spans],
88+
)
89+
90+
yield s
91+
92+
queue.task_done()
93+
94+
self.traces_reporter.report(generator())

skywalking/client/kafka.py

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
import os
19+
import ast
20+
import logging
21+
22+
from skywalking import config
23+
from skywalking.client import ServiceManagementClient, TraceSegmentReportService
24+
from skywalking.protocol.common.Common_pb2 import KeyStringValuePair
25+
from skywalking.protocol.management.Management_pb2 import InstancePingPkg, InstanceProperties
26+
27+
from kafka import KafkaProducer
28+
29+
logger = logging.getLogger(__name__)
30+
31+
kafka_configs = {}
32+
33+
34+
def __init_kafka_configs():
35+
kafka_configs["bootstrap_servers"] = config.kafka_bootstrap_servers.split(",")
36+
# process all kafka configs in env
37+
kafka_keys = [key for key in os.environ.keys() if key.startswith("SW_KAFKA_REPORTER_CONFIG_")]
38+
for kafka_key in kafka_keys:
39+
key = kafka_key[25:]
40+
val = os.environ.get(kafka_key)
41+
42+
if val is not None:
43+
if val.isnumeric():
44+
val = int(val)
45+
elif val in ["True", "False"]:
46+
val = ast.literal_eval(val)
47+
else:
48+
continue
49+
50+
# check if the key was already set
51+
if kafka_configs.get(key) is None:
52+
kafka_configs[key] = val
53+
else:
54+
raise KafkaConfigDuplicated(key)
55+
56+
57+
__init_kafka_configs()
58+
59+
60+
class KafkaServiceManagementClient(ServiceManagementClient):
61+
def __init__(self):
62+
logger.debug("kafka reporter configs: %s", kafka_configs)
63+
self.producer = KafkaProducer(**kafka_configs)
64+
self.topic_key_register = "register-"
65+
self.topic = config.kafka_topic_management
66+
67+
self.send_instance_props()
68+
69+
def send_instance_props(self):
70+
instance = InstanceProperties(
71+
service=config.service_name,
72+
serviceInstance=config.service_instance,
73+
properties=[KeyStringValuePair(key='language', value='Python')],
74+
)
75+
76+
key = bytes(self.topic_key_register + instance.serviceInstance, encoding='utf-8')
77+
value = bytes(instance.SerializeToString())
78+
self.producer.send(topic=self.topic, key=key, value=value)
79+
80+
def send_heart_beat(self):
81+
logger.debug(
82+
'service heart beats, [%s], [%s]',
83+
config.service_name,
84+
config.service_instance,
85+
)
86+
87+
instance_ping_pkg = InstancePingPkg(
88+
service=config.service_name,
89+
serviceInstance=config.service_instance,
90+
)
91+
92+
key = bytes(instance_ping_pkg.serviceInstance, encoding="utf-8")
93+
value = bytes(instance_ping_pkg.SerializeToString())
94+
future = self.producer.send(topic=self.topic, key=key, value=value)
95+
res = future.get(timeout=10)
96+
logger.debug('heartbeat response: %s', res)
97+
98+
99+
class KafkaTraceSegmentReportService(TraceSegmentReportService):
100+
def __init__(self):
101+
self.producer = KafkaProducer(**kafka_configs)
102+
self.topic = config.kafka_topic_segment
103+
104+
def report(self, generator):
105+
for segment in generator:
106+
key = bytes(segment.traceSegmentId, encoding="utf-8")
107+
value = bytes(segment.SerializeToString())
108+
self.producer.send(topic=self.topic, key=key, value=value)
109+
110+
111+
class KafkaConfigDuplicated(Exception):
112+
def __init__(self, key):
113+
self.key = key

skywalking/config.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,9 @@
4949
trace_ignore_path = (os.getenv('SW_TRACE_IGNORE_PATH') or '').split(',') # type: List[str]
5050
elasticsearch_trace_dsl = True if os.getenv('SW_ELASTICSEARCH_TRACE_DSL') and \
5151
os.getenv('SW_ELASTICSEARCH_TRACE_DSL') == 'True' else False # type: bool
52+
kafka_bootstrap_servers = os.getenv('SW_KAFKA_REPORTER_BOOTSTRAP_SERVERS') or "localhost:9092" # type: str
53+
kafka_topic_management = os.getenv('SW_KAFKA_REPORTER_TOPIC_MANAGEMENT') or "skywalking-managements" # type: str
54+
kafka_topic_segment = os.getenv('SW_KAFKA_REPORTER_TOPIC_SEGMENT') or "skywalking-segments" # type: str
5255

5356

5457
def init(

skywalking/plugins/sw_kafka.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
#
1717
import logging
1818

19+
from skywalking import config
1920
from skywalking import Layer, Component
2021
from skywalking.trace import tags
2122
from skywalking.trace.carrier import Carrier
@@ -48,6 +49,7 @@ def _sw__poll_once(this, timeout_ms, max_records, update_offsets=True):
4849
context = get_context()
4950
topics = ";".join(this._subscription.subscription or
5051
[t.topic for t in this._subscription._user_assignment])
52+
5153
with context.new_entry_span(
5254
op="Kafka/" + topics + "/Consumer/" + (this.config["group_id"] or "")) as span:
5355
for consumerRecords in res.values():
@@ -71,6 +73,11 @@ def _sw__poll_once(this, timeout_ms, max_records, update_offsets=True):
7173

7274
def _sw_send_func(_send):
7375
def _sw_send(this, topic, value=None, key=None, headers=None, partition=None, timestamp_ms=None):
76+
# ignore trace skywalking self request
77+
if config.protocol == 'kafka' and config.kafka_topic_segment == topic or config.kafka_topic_management == topic:
78+
return _send(this, topic, value=value, key=key, headers=headers, partition=partition,
79+
timestamp_ms=timestamp_ms)
80+
7481
peer = ";".join(this.config["bootstrap_servers"])
7582
context = get_context()
7683
carrier = Carrier()

0 commit comments

Comments
 (0)