Skip to content

Commit 65c6e4a

Browse files
[Plugin] add aiohttp plugin (#101)
* add test case Co-authored-by: kezhenxu94 <[email protected]>
1 parent 3d90e1e commit 65c6e4a

File tree

10 files changed

+399
-0
lines changed

10 files changed

+399
-0
lines changed

requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
asgiref==3.2.10
22
aiofiles==0.6.0
3+
aiohttp==3.7.3
34
attrs==19.3.0
45
blindspin==2.0.1
56
certifi==2020.6.20

skywalking/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ class Component(Enum):
3838
Elasticsearch = 47
3939
Urllib3 = 7006
4040
Sanic = 7007
41+
AioHttp = 7008
4142

4243

4344
class Layer(Enum):

skywalking/plugins/sw_aiohttp.py

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
from skywalking import Layer, Component
19+
from skywalking.trace import tags
20+
from skywalking.trace.carrier import Carrier
21+
from skywalking.trace.context import get_context
22+
from skywalking.trace.tags import Tag
23+
24+
25+
def install():
26+
from aiohttp import ClientSession
27+
from aiohttp.web_protocol import RequestHandler
28+
from multidict import CIMultiDict, MultiDict, MultiDictProxy
29+
from yarl import URL
30+
31+
async def _sw_request(self: ClientSession, method: str, str_or_url, **kwargs):
32+
url = URL(str_or_url).with_user(None).with_password(None)
33+
peer = '%s:%d' % (url.host or '', url.port)
34+
context = get_context()
35+
36+
with context.new_exit_span(op=url.path or "/", peer=peer) as span:
37+
span.layer = Layer.Http
38+
span.component = Component.AioHttp
39+
span.tag(Tag(key=tags.HttpMethod, val=method.upper())) # pyre-ignore
40+
span.tag(Tag(key=tags.HttpUrl, val=url)) # pyre-ignore
41+
42+
carrier = span.inject()
43+
headers = kwargs.get('headers')
44+
45+
if headers is None:
46+
headers = kwargs['headers'] = CIMultiDict()
47+
elif not isinstance(headers, (MultiDictProxy, MultiDict)):
48+
headers = CIMultiDict(headers)
49+
50+
for item in carrier:
51+
headers.add(item.key, item.val)
52+
53+
res = await _request(self, method, str_or_url, **kwargs)
54+
55+
span.tag(Tag(key=tags.HttpStatus, val=res.status, overridable=True))
56+
57+
if res.status >= 400:
58+
span.error_occurred = True
59+
60+
return res
61+
62+
_request = ClientSession._request
63+
ClientSession._request = _sw_request
64+
65+
async def _sw_handle_request(self, request, start_time: float):
66+
context = get_context()
67+
carrier = Carrier()
68+
69+
for item in carrier:
70+
val = request.headers.get(item.key)
71+
72+
if val is not None:
73+
item.val = val
74+
75+
with context.new_entry_span(op=request.path, carrier=carrier) as span:
76+
span.layer = Layer.Http
77+
span.component = Component.AioHttp
78+
span.peer = '%s:%d' % request._transport_peername if isinstance(request._transport_peername, (list, tuple))\
79+
else request._transport_peername
80+
81+
span.tag(Tag(key=tags.HttpMethod, val=request.method)) # pyre-ignore
82+
span.tag(Tag(key=tags.HttpUrl, val=str(request.url))) # pyre-ignore
83+
84+
resp, reset = await _handle_request(self, request, start_time)
85+
86+
span.tag(Tag(key=tags.HttpStatus, val=resp.status, overridable=True))
87+
88+
if resp.status >= 400:
89+
span.error_occurred = True
90+
91+
return resp, reset
92+
93+
_handle_request = RequestHandler._handle_request
94+
RequestHandler._handle_request = _sw_handle_request
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
version: '2.1'
19+
20+
services:
21+
collector:
22+
extends:
23+
service: collector
24+
file: ../docker/docker-compose.base.yml
25+
26+
provider:
27+
extends:
28+
service: agent
29+
file: ../docker/docker-compose.base.yml
30+
ports:
31+
- 9091:9091
32+
volumes:
33+
- .:/app
34+
command: ['bash', '-c', 'pip install -r /app/requirements.txt && python3 /app/services/provider.py']
35+
depends_on:
36+
collector:
37+
condition: service_healthy
38+
healthcheck:
39+
test: ["CMD", "bash", "-c", "cat < /dev/null > /dev/tcp/127.0.0.1/9091"]
40+
interval: 5s
41+
timeout: 60s
42+
retries: 120
43+
44+
consumer:
45+
extends:
46+
service: agent
47+
file: ../docker/docker-compose.base.yml
48+
ports:
49+
- 9090:9090
50+
volumes:
51+
- .:/app
52+
command: ['bash', '-c', 'pip install -r /app/requirements.txt && python3 /app/services/consumer.py']
53+
depends_on:
54+
collector:
55+
condition: service_healthy
56+
provider:
57+
condition: service_healthy
58+
59+
networks:
60+
beyond:
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
segmentItems:
19+
- serviceName: provider
20+
segmentSize: 1
21+
segments:
22+
- segmentId: not null
23+
spans:
24+
- operationName: /skywalking
25+
operationId: 0
26+
parentSpanId: -1
27+
spanId: 0
28+
spanLayer: Http
29+
tags:
30+
- key: http.method
31+
value: GET
32+
- key: url
33+
value: http://provider:9091/skywalking
34+
- key: status.code
35+
value: '200'
36+
refs:
37+
- parentEndpoint: /skywalking
38+
networkAddress: provider:9091
39+
refType: CrossProcess
40+
parentSpanId: 1
41+
parentTraceSegmentId: not null
42+
parentServiceInstance: not null
43+
parentService: consumer
44+
traceId: not null
45+
startTime: gt 0
46+
endTime: gt 0
47+
componentId: 7008
48+
spanType: Entry
49+
peer: not null
50+
skipAnalysis: false
51+
- serviceName: consumer
52+
segmentSize: 1
53+
segments:
54+
- segmentId: not null
55+
spans:
56+
- operationName: /skywalking
57+
operationId: 0
58+
parentSpanId: 0
59+
spanId: 1
60+
spanLayer: Http
61+
tags:
62+
- key: http.method
63+
value: GET
64+
- key: url
65+
value: http://provider:9091/skywalking
66+
- key: status.code
67+
value: '200'
68+
startTime: gt 0
69+
endTime: gt 0
70+
componentId: 7008
71+
spanType: Exit
72+
peer: provider:9091
73+
skipAnalysis: false
74+
- operationName: /skywalking
75+
operationId: 0
76+
parentSpanId: -1
77+
spanId: 0
78+
spanLayer: Http
79+
tags:
80+
- key: http.method
81+
value: GET
82+
- key: url
83+
value: http://0.0.0.0:9090/skywalking
84+
- key: status.code
85+
value: '200'
86+
startTime: gt 0
87+
endTime: gt 0
88+
componentId: 7008
89+
spanType: Entry
90+
peer: not null
91+
skipAnalysis: false
92+
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
import time
18+
19+
import aiohttp
20+
from aiohttp import web
21+
22+
from skywalking import agent
23+
from skywalking import config
24+
25+
26+
async def handle(request):
27+
name = request.match_info.get('name', "Anonymous")
28+
29+
async with aiohttp.ClientSession() as session:
30+
async with session.get(f'http://user:pass@provider:9091/{name}') as response:
31+
time.sleep(.5)
32+
json = await response.json()
33+
return web.Response(text=str(json))
34+
35+
36+
app = web.Application()
37+
app.add_routes([web.get('/', handle), web.get('/{name}', handle)])
38+
39+
if __name__ == '__main__':
40+
config.service_name = 'consumer'
41+
config.logging_level = 'DEBUG'
42+
agent.start()
43+
44+
web.run_app(app, port=9090)
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
from aiohttp import web
19+
20+
from skywalking import agent
21+
from skywalking import config
22+
23+
24+
async def handle(request):
25+
name = request.match_info.get('name', "Anonymous")
26+
return web.json_response({
27+
name: name,
28+
})
29+
30+
31+
app = web.Application()
32+
app.add_routes([web.get('/', handle), web.get('/{name}', handle)])
33+
34+
if __name__ == '__main__':
35+
config.service_name = 'provider'
36+
config.logging_level = 'DEBUG'
37+
agent.start()
38+
39+
web.run_app(app, port=9091)

0 commit comments

Comments
 (0)