Skip to content

Commit 7345423

Browse files
committed
nslookup code integration with main
1 parent 9dad2c4 commit 7345423

File tree

2 files changed

+272
-5
lines changed

2 files changed

+272
-5
lines changed

.pre-commit-config.yaml

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,29 +8,30 @@ exclude: \w*(_pb2)\w*
88

99
repos:
1010
- repo: https://github.com/pre-commit/pre-commit-hooks
11-
rev: cef0300fd0fc4d2a87a85fa2093c6b283ea36f4b # frozen: v5.0.0
11+
rev: v5.0.0
1212
hooks:
1313
- id: trailing-whitespace
1414
- id: end-of-file-fixer
1515
- id: check-yaml
1616
- id: check-added-large-files
1717

1818
- repo: https://github.com/PyCQA/flake8
19-
rev: e43806be3607110919eff72939fda031776e885a # frozen: 7.1.1
19+
rev: 7.1.1
2020
hooks:
2121
- id: flake8
2222

2323
- repo: https://github.com/psf/black
24-
rev: 1b2427a2b785cc4aac97c19bb4b9a0de063f9547 # frozen: 24.10.0
24+
rev: 24.10.0
2525
hooks:
2626
- id: black
27+
language_version: python3.9
2728

2829
- repo: https://github.com/pre-commit/mirrors-mypy
29-
rev: f56614daa94d5cd733d3b7004c5df9caad267b4a # frozen: v1.13.0
30+
rev: v1.13.0
3031
hooks:
3132
- id: mypy
3233

3334
- repo: https://github.com/PyCQA/isort
34-
rev: c235f5e450b4b84e58d114ed4c589cbf454175a3 # frozen: 5.13.2
35+
rev: 5.13.2
3536
hooks:
3637
- id: isort

src/coherence/nslookup.py

Lines changed: 266 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,266 @@
1+
import asyncio
2+
import struct
3+
from typing import List, Optional, Tuple
4+
5+
DEFAULT_PORT = 7574
6+
DEFAULT_HOST = "localhost"
7+
CLUSTER_NAME_LOOKUP = "Cluster/name"
8+
CLUSTER_INFO_LOOKUP = "Cluster/info"
9+
CLUSTER_FOREIGN_LOOKUP = "Cluster/foreign"
10+
MANAGEMENT_LOOKUP = "management/HTTPManagementURL"
11+
JMX_LOOKUP = "management/JMXServiceURL"
12+
METRICS_LOOKUP = "metrics/HTTPMetricsURL"
13+
GRPC_PROXY_LOOKUP = "$GRPC:GrpcProxy"
14+
NS_PREFIX = "NameService/string/"
15+
NS_LOCAL_PORT = "/NameService/localPort"
16+
DEFAULT_TIMEOUT = 10
17+
18+
MULTIPLEXED_SOCKET = bytes([90, 193, 224, 0])
19+
NAME_SERVICE_SUB_PORT = bytes([0, 0, 0, 3])
20+
# fmt: off
21+
CONNECTION_OPEN = bytes([
22+
0, 1, 2, 0, 66, 0, 1, 14, 0, 0, 66, 166, 182, 159, 222, 178, 81,
23+
1, 65, 227, 243, 228, 221, 15, 2, 65, 143, 246, 186, 153, 1, 3,
24+
65, 248, 180, 229, 242, 4, 4, 65, 196, 254, 220, 245, 5, 5, 65, 215,
25+
206, 195, 141, 7, 6, 65, 219, 137, 220, 213, 10, 64, 2, 110, 3,
26+
93, 78, 87, 2, 17, 77, 101, 115, 115, 97, 103, 105, 110, 103, 80,
27+
114, 111, 116, 111, 99, 111, 108, 2, 65, 2, 65, 2, 19, 78, 97, 109,
28+
101, 83, 101, 114, 118, 105, 99, 101, 80, 114, 111, 116, 111, 99,
29+
111, 108, 2, 65, 1, 65, 1, 5, 160, 2, 0, 0, 14, 0, 0, 66, 174, 137,
30+
158, 222, 178, 81, 1, 65, 129, 128, 128, 240, 15, 5, 65, 152, 159,
31+
129, 128, 8, 6, 65, 147, 158, 1, 64, 1, 106, 2, 110, 3, 106, 4, 113,
32+
5, 113, 6, 78, 8, 67, 108, 117, 115, 116, 101, 114, 66, 9, 78, 9, 108,
33+
111, 99, 97, 108, 104, 111, 115, 116, 10, 78, 5, 50, 48, 50, 51, 51, 12,
34+
78, 16, 67, 111, 104, 101, 114, 101, 110, 99, 101, 67, 111, 110, 115,
35+
111, 108, 101, 64, 64,
36+
])
37+
CHANNEL_OPEN = bytes([
38+
0, 11, 2, 0, 66, 1, 1, 78, 19, 78, 97, 109, 101, 83, 101, 114, 118,
39+
105, 99, 101, 80, 114, 111, 116, 111, 99, 111, 108, 2, 78, 11, 78,
40+
97, 109, 101, 83, 101, 114, 118, 105, 99, 101, 64,
41+
])
42+
NS_LOOKUP_REQ_ID = bytes([1, 1, 0, 66, 0, 1, 78])
43+
REQ_END_MARKER = bytes([64])
44+
# fmt: on
45+
46+
47+
class DiscoveredCluster:
48+
def __init__(self) -> None:
49+
self.cluster_name = ""
50+
self.connection_name = ""
51+
self.ns_port = 0
52+
self.host = ""
53+
self.management_urls: List[str] = []
54+
self.selected_url = ""
55+
self.metrics_urls: List[str] = []
56+
self.jmx_urls: List[str] = []
57+
self.grpc_proxy_endpoints: List[str] = []
58+
59+
60+
class ClusterNSPort:
61+
def __init__(
62+
self, host_name: str = "", cluster_name: str = "", port: Optional[int] = 0, is_local: bool = False
63+
) -> None:
64+
self.host_name = host_name
65+
self.cluster_name = cluster_name
66+
self.port = port
67+
self.is_local = is_local
68+
69+
70+
class AsyncNSLookup:
71+
def __init__(self, host: str = DEFAULT_HOST, port: int = DEFAULT_PORT):
72+
self.host = host
73+
self.port = port
74+
self.channel = bytes()
75+
76+
# These get initialized in connect(). Commented below to satisfy mypy
77+
# self.reader: Optional[asyncio.StreamReader] = None
78+
# self.writer: Optional[asyncio.StreamWriter] = None
79+
80+
@staticmethod
81+
async def open(host_port: str = "", timeout: int = DEFAULT_TIMEOUT) -> "AsyncNSLookup":
82+
ns_lookup = AsyncNSLookup()
83+
if host_port:
84+
parts = host_port.split(":")
85+
if len(parts) == 1:
86+
ns_lookup.host = parts[0]
87+
ns_lookup.port = DEFAULT_PORT
88+
elif len(parts) == 2:
89+
ns_lookup.host = parts[0]
90+
ns_lookup.port = int(parts[1])
91+
else:
92+
raise ValueError(f"Invalid value for host/port: {host_port}")
93+
else:
94+
ns_lookup.host = DEFAULT_HOST
95+
ns_lookup.port = DEFAULT_PORT
96+
97+
await ns_lookup.connect((ns_lookup.host, ns_lookup.port), timeout)
98+
return ns_lookup
99+
100+
async def connect(self, address: Tuple[str, int], timeout: int = DEFAULT_TIMEOUT) -> None:
101+
self.reader, self.writer = await asyncio.open_connection(address[0], address[1])
102+
self.writer.write(MULTIPLEXED_SOCKET)
103+
self.writer.write(NAME_SERVICE_SUB_PORT)
104+
self.writer.write(self.write_packed_int(len(CONNECTION_OPEN)))
105+
self.writer.write(CONNECTION_OPEN)
106+
self.writer.write(self.write_packed_int(len(CHANNEL_OPEN)))
107+
self.writer.write(CHANNEL_OPEN)
108+
await self.writer.drain()
109+
110+
await self.read_response()
111+
data = await self.read_response()
112+
self.channel = data[8 : 8 + len(data) - 9]
113+
114+
async def lookup(self, name: str) -> str:
115+
response = await self.lookup_internal(name)
116+
if len(response) <= 7:
117+
return ""
118+
return self.read_string(response)
119+
120+
async def lookup_internal(self, name: str) -> bytes:
121+
request = self.channel + NS_LOOKUP_REQ_ID
122+
request += self.write_packed_int(len(name)) + name.encode() + REQ_END_MARKER
123+
self.writer.write(self.write_packed_int(len(request)))
124+
self.writer.write(request)
125+
await self.writer.drain()
126+
127+
response = await self.read_response()
128+
return response[len(self.channel) + 1 :]
129+
130+
async def read_response(self) -> bytes:
131+
length, _ = await self.read_packed_int()
132+
data = await self.reader.read(length)
133+
return data
134+
135+
async def read_packed_int(self) -> Tuple[int, int]:
136+
value = ord(await self.reader.read(1))
137+
negative = value & 0x40 != 0
138+
result = value & 0x3F
139+
bits = 6
140+
141+
while value & 0x80:
142+
value = ord(await self.reader.read(1))
143+
result |= (value & 0x7F) << bits
144+
bits += 7
145+
146+
if negative:
147+
result = ~result
148+
149+
return result, bits
150+
151+
@staticmethod
152+
def write_packed_int(n: int) -> bytes:
153+
result = b""
154+
b = 0
155+
if n < 0:
156+
b = 0x40
157+
n = ~n
158+
159+
b |= n & 0x3F
160+
n >>= 6
161+
162+
while n != 0:
163+
result += struct.pack("B", b | 0x80)
164+
b = n & 0x7F
165+
n >>= 7
166+
167+
result += struct.pack("B", b)
168+
return result
169+
170+
def read_string(self, data: bytes) -> str:
171+
length, pos = self.read_packed_int_from_string(data)
172+
return data[7 + (pos // 7) : 7 + (pos // 7) + length].decode()
173+
174+
@staticmethod
175+
def read_packed_int_from_string(s: bytes) -> Tuple[int, int]:
176+
value = s[6]
177+
negative = value & 0x40 != 0
178+
result = value & 0x3F
179+
bits = 6
180+
181+
while value & 0x80:
182+
value = s[7]
183+
result |= (value & 0x7F) << bits
184+
bits += 7
185+
186+
if negative:
187+
result = ~result
188+
189+
return result, bits
190+
191+
async def close(self) -> None:
192+
if self.writer:
193+
self.writer.close()
194+
await self.writer.wait_closed()
195+
196+
async def discover_cluster_info(self) -> DiscoveredCluster:
197+
cluster = DiscoveredCluster()
198+
cluster.ns_port = self.port
199+
cluster.host = self.host
200+
201+
cluster.cluster_name = await self.lookup(CLUSTER_NAME_LOOKUP)
202+
cluster.management_urls = parse_results(await self.lookup(NS_PREFIX + MANAGEMENT_LOOKUP))
203+
cluster.jmx_urls = parse_results(await self.lookup(NS_PREFIX + JMX_LOOKUP))
204+
cluster.metrics_urls = parse_results(await self.lookup(NS_PREFIX + METRICS_LOOKUP))
205+
cluster.grpc_proxy_endpoints = parse_results(await self.lookup(NS_PREFIX + GRPC_PROXY_LOOKUP))
206+
207+
return cluster
208+
209+
async def discover_name_service_ports(self) -> List[ClusterNSPort]:
210+
local_cluster = await self.lookup(CLUSTER_NAME_LOOKUP)
211+
other_clusters = await self.lookup(NS_PREFIX + CLUSTER_FOREIGN_LOOKUP)
212+
other_clusters_list = parse_results(other_clusters)
213+
214+
cluster_names = [local_cluster] + other_clusters_list
215+
list_clusters = [
216+
ClusterNSPort(
217+
cluster_name=name,
218+
port=self.port if name == local_cluster else None,
219+
host_name=self.host,
220+
is_local=name == local_cluster,
221+
)
222+
for name in cluster_names
223+
]
224+
225+
for cluster_ns_port in list_clusters[1:]:
226+
cluster_ns_port.port = int(
227+
await self.lookup(f"{NS_PREFIX}{CLUSTER_FOREIGN_LOOKUP}/{cluster_ns_port.cluster_name}{NS_LOCAL_PORT}")
228+
)
229+
230+
return list_clusters
231+
232+
233+
def parse_results(results: str) -> List[str]:
234+
results = results.strip("[]")
235+
return results.split(", ") if results else []
236+
237+
238+
async def main() -> None:
239+
try:
240+
nslookup = await AsyncNSLookup.open("localhost:7574")
241+
print(f"Connected to {nslookup.host}:{nslookup.port}")
242+
243+
# Example: Perform a lookup
244+
cluster_info = await nslookup.discover_cluster_info()
245+
print(f"Cluster Name: {cluster_info.cluster_name}")
246+
print(f"Management URLs: {cluster_info.management_urls}")
247+
print(f"JMX URLs: {cluster_info.jmx_urls}")
248+
print(f"Metrics URLs: {cluster_info.metrics_urls}")
249+
print(f"GRPC Endpoints: {cluster_info.grpc_proxy_endpoints}")
250+
251+
ns_ports = await nslookup.discover_name_service_ports()
252+
for port_info in ns_ports:
253+
print(
254+
f"Cluster Name: {port_info.cluster_name}, Port: {port_info.port}, "
255+
f"Host: {port_info.host_name}, Is Local: {port_info.is_local}"
256+
)
257+
258+
except Exception as e:
259+
print(f"Error: {e}")
260+
finally:
261+
if "nslookup" in locals():
262+
await nslookup.close()
263+
264+
265+
if __name__ == "__main__":
266+
asyncio.run(main())

0 commit comments

Comments
 (0)