Skip to content

Commit 9bab773

Browse files
authored
[Feature] CPU load balancing implementation and code structure optimization (apache#38)
* feat&fix: Optimize the logic of h2, remove duplicate code, add cpu monitoring * fix: Optimize some code
1 parent d103f72 commit 9bab773

40 files changed

+394
-2240
lines changed

dubbo/__version__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,4 +14,4 @@
1414
# See the License for the specific language governing permissions and
1515
# limitations under the License.
1616

17-
__version__ = "1.0.0b1"
17+
__version__ = "3.0.0b1"

dubbo/cluster/failfast_cluster.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
# limitations under the License.
1616

1717
from dubbo.cluster import Cluster, Directory, LoadBalance
18+
from dubbo.cluster.loadbalances import CpuLoadBalance
1819
from dubbo.constants import common_constants
1920
from dubbo.extension import extensionLoader
2021
from dubbo.protocol import Invoker, Result
@@ -27,13 +28,16 @@ class FailfastInvoker(Invoker):
2728
FailfastInvoker
2829
"""
2930

30-
def __init__(self, directory: Directory, url: URL):
31+
def __init__(self, directory, url: URL):
3132
self._directory = directory
3233

3334
self._load_balance = extensionLoader.get_extension(
3435
LoadBalance, url.parameters.get(common_constants.LOADBALANCE_KEY, "random")
3536
)()
3637

38+
if isinstance(self._load_balance, CpuLoadBalance):
39+
self._load_balance.set_monitor(directory)
40+
3741
def invoke(self, invocation) -> Result:
3842

3943
# get the invokers

dubbo/cluster/loadbalances.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
from typing import List, Optional
1919

2020
from dubbo.cluster import LoadBalance
21+
from dubbo.cluster.monitor.cpu import CpuMonitor
2122
from dubbo.protocol import Invocation, Invoker
2223

2324

@@ -63,3 +64,46 @@ def do_select(
6364
) -> Optional[Invoker]:
6465
randint = random.randint(0, len(invokers) - 1)
6566
return invokers[randint]
67+
68+
69+
class CpuLoadBalance(AbstractLoadBalance):
70+
"""
71+
CPU load balance.
72+
"""
73+
74+
def __init__(self):
75+
self._monitor: Optional[CpuMonitor] = None
76+
77+
def set_monitor(self, monitor: CpuMonitor) -> None:
78+
"""
79+
Set the CPU monitor.
80+
:param monitor: The CPU monitor.
81+
:type monitor: CpuMonitor
82+
"""
83+
self._monitor = monitor
84+
85+
def do_select(
86+
self, invokers: List[Invoker], invocation: Invocation
87+
) -> Optional[Invoker]:
88+
# get the CPU usage
89+
cpu_usages = self._monitor.get_cpu_usage(invokers)
90+
# Select the caller with the lowest CPU usage, 0 means CPU usage is unknown.
91+
available_invokers = []
92+
unknown_invokers = []
93+
94+
for invoker, cpu_usage in cpu_usages.items():
95+
if cpu_usage == 0:
96+
unknown_invokers.append((cpu_usage, invoker))
97+
else:
98+
available_invokers.append((cpu_usage, invoker))
99+
100+
if available_invokers:
101+
# sort and select the invoker with the lowest CPU usage
102+
available_invokers.sort(key=lambda x: x[0])
103+
return available_invokers[0][1]
104+
elif unknown_invokers:
105+
# get the invoker with unknown CPU usage randomly
106+
randint = random.randint(0, len(unknown_invokers) - 1)
107+
return unknown_invokers[randint][1]
108+
else:
109+
return None

dubbo/logger/logging/__init__.py renamed to dubbo/cluster/monitor/__init__.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,3 @@
1313
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1414
# See the License for the specific language governing permissions and
1515
# limitations under the License.
16-
17-
from .logger_adapter import LoggerAdapter
18-
19-
__all__ = ["LoggerAdapter"]

dubbo/cluster/monitor/cpu.py

Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
import threading
17+
from typing import Dict, List
18+
from dubbo.cluster.directories import RegistryDirectory
19+
from dubbo.constants import common_constants
20+
from dubbo.loggers import loggerFactory
21+
from dubbo.protocol import Protocol, Invoker
22+
from dubbo.protocol.invocation import RpcInvocation
23+
from dubbo.proxy.handlers import RpcServiceHandler, RpcMethodHandler
24+
from dubbo.registry import Registry
25+
from dubbo.types import UnaryCallType
26+
from dubbo.url import URL
27+
from dubbo.utils import CpuUtils
28+
29+
_LOGGER = loggerFactory.get_logger()
30+
31+
_cpu_invocation = RpcInvocation(
32+
"org.apache.dubbo.MetricsService",
33+
"cpu",
34+
str(1).encode("utf-8"),
35+
attributes={
36+
common_constants.CALL_KEY: UnaryCallType,
37+
},
38+
)
39+
40+
41+
class CpuMonitor(RegistryDirectory):
42+
"""
43+
The CPU monitor.
44+
"""
45+
46+
def __init__(self, registry: Registry, protocol: Protocol, url: URL):
47+
super().__init__(registry, protocol, url)
48+
49+
# interval
50+
self._interval = 5
51+
52+
# about CPU usage
53+
self._usages_lock = threading.Lock()
54+
self._cpu_usages: Dict[Invoker, float] = {}
55+
56+
# running invokers
57+
self._running_invokers: Dict[str, Invoker] = {}
58+
59+
# thread
60+
self._started = False
61+
self._thread: threading.Thread = threading.Thread(
62+
target=self._monitor_cpu, daemon=True
63+
)
64+
self._stop_event: threading.Event = threading.Event()
65+
66+
# start the monitor
67+
self.start()
68+
69+
def start(self) -> None:
70+
"""
71+
Start the monitor.
72+
"""
73+
if self._stop_event.is_set():
74+
raise RuntimeError("The monitor has been stopped.")
75+
elif self._started:
76+
return
77+
78+
self._started = True
79+
self._thread.start()
80+
_LOGGER.info("The CPU monitor has been started.")
81+
82+
def stop(self) -> None:
83+
"""
84+
Stop the monitor.
85+
"""
86+
if self._stop_event.is_set():
87+
return
88+
# notify the thread to stop
89+
self._stop_event.set()
90+
91+
def _monitor_cpu(self) -> None:
92+
"""
93+
Monitor the CPU usage.
94+
"""
95+
while True:
96+
# get available invokers
97+
available_invokers = {
98+
url: invoker
99+
for url, invoker in self._invokers.items()
100+
if invoker.is_available()
101+
}
102+
103+
# update the running invokers
104+
self._running_invokers = available_invokers
105+
106+
# update the CPU usage
107+
with self._usages_lock:
108+
self._cpu_usages = {
109+
invoker: usage
110+
for invoker, usage in self._cpu_usages.items()
111+
if invoker in available_invokers.values()
112+
}
113+
114+
# get the CPU usage for each invoker
115+
for url, invoker in self._running_invokers.items():
116+
if invoker.is_available():
117+
try:
118+
result = invoker.invoke(_cpu_invocation)
119+
cpu_usage = float(result.value().decode("utf-8"))
120+
self._cpu_usages[invoker] = cpu_usage
121+
except Exception as e:
122+
_LOGGER.error(
123+
f"Failed to get the CPU usage for invoker {url}: {str(e)}"
124+
)
125+
# remove the cpu usage
126+
self._remove_cpu_usage(invoker)
127+
128+
# wait for the interval or stop
129+
if self._stop_event.wait(self._interval):
130+
_LOGGER.info("The CPU monitor has been stopped.")
131+
break
132+
133+
def get_cpu_usage(self, invokers: List[Invoker]) -> Dict[Invoker, float]:
134+
"""
135+
Get the CPU usage for the invoker.
136+
:param invokers: The invokers.
137+
:type invokers: List[Invoker]
138+
:return: The CPU usage.
139+
:rtype: Dict[Invoker, float]
140+
"""
141+
with self._usages_lock:
142+
return {invoker: self._cpu_usages.get(invoker, 0) for invoker in invokers}
143+
144+
def _remove_cpu_usage(self, invoker: Invoker) -> None:
145+
with self._usages_lock:
146+
self._cpu_usages.pop(invoker)
147+
148+
149+
class CpuInnerRpcHandler:
150+
"""
151+
The CPU inner RPC handler.
152+
"""
153+
154+
@staticmethod
155+
def get_service_handler() -> RpcServiceHandler:
156+
"""
157+
Get the service handler.
158+
:return: The service handler.
159+
:rtype: RpcServiceHandler
160+
"""
161+
return RpcServiceHandler(
162+
"org.apache.dubbo.MetricsService",
163+
{"cpu": RpcMethodHandler.unary(CpuInnerRpcHandler.get_cpu_usage)},
164+
)
165+
166+
@staticmethod
167+
def get_cpu_usage(interval) -> bytes:
168+
"""
169+
Get the CPU usage.
170+
:param interval: The interval.
171+
:type interval: bytes
172+
:return: The CPU usage.
173+
:rtype: bytes
174+
"""
175+
float_value = CpuUtils.get_total_cpu_usage(interval=int(interval.decode("utf-8")))
176+
return str(float_value).encode("utf-8")

dubbo/common/__init__.py

Lines changed: 0 additions & 32 deletions
This file was deleted.

dubbo/common/classes.py

Lines changed: 0 additions & 41 deletions
This file was deleted.

0 commit comments

Comments
 (0)