11# Copyright (c) OpenMMLab. All rights reserved.
22import asyncio
3- from contextlib import contextmanager
4- from dataclasses import dataclass , field
53
64from lmdeploy .messages import ResponseType , ScheduleMetrics
5+ from lmdeploy .pytorch .utils import singleton
76from lmdeploy .utils import get_logger
87
98from .stats import SchedulerStats
109
1110logger = get_logger ('lmdeploy' )
1211
1312
14- @dataclass
15- class MetricsContext :
16- enable_metrics : bool = False
17- scheduler_stats : SchedulerStats = field (default_factory = SchedulerStats )
18-
19-
20- class MetricsManager :
21-
22- def __init__ (self ):
23- """Initialize metrics manager."""
24- self ._current_ctx = MetricsContext ()
25-
26- def set_context (self , ctx : MetricsContext ):
27- """Set metrics context."""
28- self ._current_ctx = ctx
29-
30- def get_context (self ):
31- """Get current context."""
32- return self ._current_ctx
33-
34- @contextmanager
35- def context (self , ctx : MetricsContext ):
36- """Context manager."""
37- old_ctx = self .get_context ()
38- self .set_context (ctx )
39- try :
40- yield ctx
41- finally :
42- self .set_context (old_ctx )
43-
44-
45- _METRICS_MANAGER = None
46-
47-
48- def get_metrics_manager ():
49- global _METRICS_MANAGER
50- if _METRICS_MANAGER is None :
51- _METRICS_MANAGER = MetricsManager ()
52-
53- return _METRICS_MANAGER
54-
55-
56- # Metrics getters
57- def is_metrics_enabled ():
58- return get_metrics_manager ().get_context ().enable_metrics
59-
60-
61- def get_current_metrics_context ():
62- return get_metrics_manager ().get_context ()
63-
64-
65- def get_current_scheduler_stats ():
66- return get_metrics_manager ().get_context ().scheduler_stats
67-
68-
69- # Metrics setters
70- def set_metrics_enabled_flag (enable_metrics : bool ):
71- """Set metrics enabled flag."""
72- ctx = get_current_metrics_context ()
73- ctx .enable_metrics = enable_metrics
74-
75- if enable_metrics :
76- logger .info ('Metrics are enabled.' )
77-
78-
79- def increment_async_engine_scheduler_stats_total_req ():
80- """Set scheduler stats in async engine."""
81- get_current_scheduler_stats ().num_total_reqs += 1
82-
83-
84- def increment_async_engine_scheduler_stats_finished_req ():
85- """Set scheduler stats in async engine."""
86- get_current_scheduler_stats ().num_finished_reqs += 1
87-
88-
89- # Metrics processor
13+ @singleton
9014class MetricsProcessor ():
9115 """Metrics processor."""
9216
9317 def __init__ (self ):
18+ """Init metrics processor."""
19+ self .enable_metrics : bool = False
20+ self .scheduler_stats = SchedulerStats ()
21+ self .stat_loggers = []
9422 self .metrics_queue : asyncio .Queue = None
9523 self .metrics_handler : asyncio .Task = None
9624
9725 def start_metrics_handler (self , enable_metrics : bool ):
98- set_metrics_enabled_flag ( enable_metrics )
99-
26+ """Start metrics handler."""
27+ self . enable_metrics = enable_metrics
10028 if enable_metrics and self .metrics_handler is None :
10129 self .metrics_queue = asyncio .Queue ()
10230 self .metrics_handler = asyncio .create_task (self ._run_metrics_handler ())
10331 logger .info ('Metrics handler task started.' )
10432
10533 async def stop_metrics_handler (self ):
34+ """Stop metrics handler."""
10635 if self .metrics_handler is not None :
10736 self .metrics_handler .cancel ()
10837 try :
@@ -117,20 +46,20 @@ async def _run_metrics_handler(self):
11746 """A background task that consumes and processes metrics data."""
11847 while True :
11948 try :
120- # fetch
49+ # fetch data from the queue
12150 update_data = await self .metrics_queue .get ()
122- outputs , req_state , iteration_stats , specdecode_stats = update_data
51+ outputs , req_stats , iteration_stats , specdecode_stats = update_data
12352
124- # update request state according the engine events
53+ # update request stats
12554 if outputs and outputs .req_metrics :
12655 # when users visit "/abort_request" endpoint, `req_metrics` might be None
127- req_state .update_from_events (outputs .req_metrics .engine_events )
56+ req_stats .update_from_events (outputs .req_metrics .engine_events )
12857
129- # update iteration stats based on outputs and request state.
130- # some attributes of req_state will also be updated, e.g., lastest_token_time
131- iteration_stats .update_from_output (outputs , req_state )
58+ # update iteration stats
59+ # some attributes of req_stats will also be updated, e.g., lastest_token_time
60+ iteration_stats .update_from_output (outputs , req_stats )
13261
133- # spec decode
62+ # update spec decode stats
13463 if specdecode_stats is not None :
13564 specdecode_stats .update_from_output (outputs )
13665
@@ -140,34 +69,37 @@ async def _run_metrics_handler(self):
14069 if specdecode_stats is not None :
14170 stat_logger .record_specdecode (specdecode_stats )
14271
72+ # record finished request stats
14373 if outputs .status == ResponseType .FINISH :
144- # record finished request stats
14574 for stat_logger in self .stat_loggers :
146- stat_logger .record_finish (req_state . finish_stats )
75+ stat_logger .record_finish (req_stats )
14776
14877 self .metrics_queue .task_done ()
14978 except asyncio .CancelledError :
15079 break
15180 except Exception as e :
15281 logger .exception (f'Metrics handler background task failed: { e } ' )
15382
154- async def udpate_schedule_stats (self , schedule_metrics : ScheduleMetrics ):
155- stats = get_current_scheduler_stats ()
156- stats .update_from_schedule_metrics (schedule_metrics )
83+ async def update_schedule_stats (self , schedule_metrics : ScheduleMetrics ):
84+ """Update schedule stats."""
85+ self . scheduler_stats .update_from_schedule_metrics (schedule_metrics )
15786 # record schedule stats
15887 for stat_logger in self .stat_loggers :
159- stat_logger .record_schedule (stats )
88+ stat_logger .record_schedule (self . scheduler_stats )
16089
16190 def queue_update (self , update_data : tuple ):
162- if not is_metrics_enabled () or self .metrics_queue is None :
91+ """Queue update."""
92+ if not self .enable_metrics or self .metrics_queue is None :
16393 return
16494 self .metrics_queue .put_nowait (update_data )
16595
16696 def increment_total_requests (self ):
167- increment_async_engine_scheduler_stats_total_req ()
97+ """Increment total requests."""
98+ self .scheduler_stats .num_total_reqs += 1
16899
169100 def increment_finished_requests (self ):
170- increment_async_engine_scheduler_stats_finished_req ()
101+ """Increment finished requests."""
102+ self .scheduler_stats .num_finished_reqs += 1
171103
172104
173105metrics_processor = MetricsProcessor ()
0 commit comments