Skip to content

Commit 9fa1324

Browse files
committed
feat(nats/client): make stats extensible
Replace the stats dictionary by a class and interface, that is extensible to allow users to replace the stats collection by their own custom stats collector. Resolves #720 Signed-off-by: Tim Drijvers <[email protected]>
1 parent 8f53949 commit 9fa1324

File tree

2 files changed

+85
-15
lines changed

2 files changed

+85
-15
lines changed

nats/aio/client.py

Lines changed: 8 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252

5353
from .errors import ErrInvalidUserCredentials, ErrStaleConnection
5454
from .msg import Msg
55+
from .stats import ClientStats, StatsInterface
5556
from .subscription import (
5657
DEFAULT_SUB_PENDING_BYTES_LIMIT,
5758
DEFAULT_SUB_PENDING_MSGS_LIMIT,
@@ -127,6 +128,7 @@ class Srv:
127128
server_version: Optional[str] = None
128129

129130

131+
130132
class ServerVersion:
131133

132134
def __init__(self, server_version: str) -> None:
@@ -230,6 +232,7 @@ class Client:
230232
"""
231233

232234
msg_class: type[Msg] = Msg
235+
stats: StatsInterface
233236

234237
# FIXME: Use an enum instead.
235238
DISCONNECTED = 0
@@ -314,14 +317,7 @@ def __init__(self) -> None:
314317
self._public_nkey: Optional[str] = None
315318

316319
self.options: Dict[str, Any] = {}
317-
self.stats = {
318-
"in_msgs": 0,
319-
"out_msgs": 0,
320-
"in_bytes": 0,
321-
"out_bytes": 0,
322-
"reconnects": 0,
323-
"errors_received": 0,
324-
}
320+
self.stats = ClientStats()
325321

326322
async def connect(
327323
self,
@@ -947,8 +943,7 @@ async def _send_publish(
947943
hdr.extend(_CRLF_)
948944
pub_cmd = prot_command.hpub_cmd(subject, reply, hdr, payload)
949945

950-
self.stats["out_msgs"] += 1
951-
self.stats["out_bytes"] += payload_size
946+
self.stats.message_sent(subject, payload_size, headers)
952947
await self._send_command(pub_cmd)
953948
if self._flush_queue is not None and self._flush_queue.empty():
954949
await self._flush_pending()
@@ -1510,7 +1505,7 @@ async def _attempt_reconnect(self) -> None:
15101505

15111506
# Consider a reconnect to be done once CONNECT was
15121507
# processed by the server successfully.
1513-
self.stats["reconnects"] += 1
1508+
self.stats.client_reconnected()
15141509

15151510
# Reset reconnect attempts for this server
15161511
# since have successfully connected.
@@ -1749,8 +1744,8 @@ async def _process_msg(
17491744
Process MSG sent by server.
17501745
"""
17511746
payload_size = len(data)
1752-
self.stats["in_msgs"] += 1
1753-
self.stats["in_bytes"] += payload_size
1747+
hdr = await self._process_headers(headers)
1748+
self.stats.message_received(subject.decode(), payload_size, hdr)
17541749

17551750
sub = self._subs.get(sid)
17561751
if not sub:
@@ -1764,8 +1759,6 @@ async def _process_msg(
17641759
# internal queue and the task will finish once the last
17651760
# message is processed.
17661761
self._subs.pop(sid, None)
1767-
1768-
hdr = await self._process_headers(headers)
17691762
msg = self._build_message(sid, subject, reply, data, hdr)
17701763
if not msg:
17711764
return

nats/aio/stats.py

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
from __future__ import annotations
2+
3+
from abc import ABC, abstractmethod
4+
from collections import UserDict
5+
from typing import Dict, Optional
6+
7+
8+
class StatsInterface(ABC):
9+
"""
10+
Abstract base class defining the interface for NATS client statistics tracking.
11+
"""
12+
13+
@abstractmethod
14+
def message_received(
15+
self, subject: str, payload_size: int, headers: Optional[Dict[str, str]] = None
16+
) -> None:
17+
"""Record an incoming message with its payload size."""
18+
pass
19+
20+
@abstractmethod
21+
def message_sent(
22+
self, subject: str, payload_size: int, headers: Optional[Dict[str, str]] = None
23+
) -> None:
24+
"""Record an outgoing message with its payload size."""
25+
pass
26+
27+
@abstractmethod
28+
def client_reconnected(self) -> None:
29+
"""Record a client reconnection."""
30+
pass
31+
32+
@abstractmethod
33+
def error_received(self) -> None:
34+
"""Record a server error."""
35+
pass
36+
37+
38+
class ClientStats(StatsInterface, UserDict):
39+
"""
40+
ClientStats tracks NATS client connection statistics and acts as a dict
41+
for backward compatibility while providing structured methods for updates.
42+
"""
43+
44+
def __init__(self) -> None:
45+
super().__init__(
46+
{
47+
"in_msgs": 0,
48+
"out_msgs": 0,
49+
"in_bytes": 0,
50+
"out_bytes": 0,
51+
"reconnects": 0,
52+
"errors_received": 0,
53+
}
54+
)
55+
56+
def message_received(
57+
self, subject: str, payload_size: int, headers: Optional[Dict[str, str]] = None
58+
) -> None:
59+
"""Record an incoming message with its payload size."""
60+
self.data["in_msgs"] += 1
61+
self.data["in_bytes"] += payload_size
62+
63+
def message_sent(
64+
self, subject: str, payload_size: int, headers: Optional[Dict[str, str]] = None
65+
) -> None:
66+
"""Record an outgoing message with its payload size."""
67+
self.data["out_msgs"] += 1
68+
self.data["out_bytes"] += payload_size
69+
70+
def client_reconnected(self) -> None:
71+
"""Record a client reconnection."""
72+
self.data["reconnects"] += 1
73+
74+
def error_received(self) -> None:
75+
"""Record a server error."""
76+
self.data["errors_received"] += 1
77+

0 commit comments

Comments
 (0)