Skip to content

Commit d2cd248

Browse files
committed
feat: New stream-related features
1 parent e5e9f7b commit d2cd248

File tree

37 files changed

+1355
-430
lines changed

37 files changed

+1355
-430
lines changed

dubbo/classes.py

Lines changed: 213 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,44 @@
1313
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1414
# See the License for the specific language governing permissions and
1515
# limitations under the License.
16-
16+
import abc
1717
import threading
18+
from typing import Callable, Optional, Tuple, Any, Union
19+
20+
from dubbo.types import DeserializingFunction, RpcType, RpcTypes, SerializingFunction
21+
22+
__all__ = [
23+
"EOF",
24+
"SingletonBase",
25+
"MethodDescriptor",
26+
"ReadStream",
27+
"WriteStream",
28+
"ReadWriteStream",
29+
]
30+
31+
32+
class _EOF:
33+
"""
34+
EOF is a class representing the end flag.
35+
"""
36+
37+
_repr_str = "<dubbo.classes.EOF>"
38+
39+
def __bool__(self):
40+
return False
41+
42+
def __len__(self):
43+
return 0
1844

19-
__all__ = ["SingletonBase"]
45+
def __repr__(self) -> str:
46+
return self._repr_str
47+
48+
def __str__(self) -> str:
49+
return self._repr_str
50+
51+
52+
# The EOF object -> global constant
53+
EOF = _EOF()
2054

2155

2256
class SingletonBase:
@@ -39,3 +73,180 @@ def __new__(cls, *args, **kwargs):
3973
if cls._instance is None:
4074
cls._instance = super(SingletonBase, cls).__new__(cls)
4175
return cls._instance
76+
77+
78+
class MethodDescriptor:
79+
"""
80+
MethodDescriptor is a descriptor for a method.
81+
It contains the method name, the method, and the method's serialization and deserialization methods.
82+
"""
83+
84+
__slots__ = [
85+
"_callable_method",
86+
"_method_name",
87+
"_rpc_type",
88+
"_arg_serialization",
89+
"_return_serialization",
90+
]
91+
92+
def __init__(
93+
self,
94+
method_name: str,
95+
arg_serialization: Tuple[
96+
Optional[SerializingFunction], Optional[DeserializingFunction]
97+
],
98+
return_serialization: Tuple[
99+
Optional[SerializingFunction], Optional[DeserializingFunction]
100+
],
101+
rpc_type: Union[RpcType, RpcTypes, str] = RpcTypes.UNARY.value,
102+
callable_method: Optional[Callable] = None,
103+
):
104+
"""
105+
Initialize the method model.
106+
107+
:param method_name:
108+
The name of the method.
109+
:type method_name: str
110+
111+
:param arg_serialization:
112+
A tuple containing serialization and deserialization methods for the function's arguments.
113+
:type arg_serialization: Optional[Tuple[SerializingFunction, DeserializingFunction]]
114+
115+
:param return_serialization:
116+
A tuple containing serialization and deserialization methods for the function's return values.
117+
:type return_serialization: Optional[Tuple[SerializingFunction, DeserializingFunction]]
118+
119+
:param rpc_type:
120+
The RPC type. default is RpcTypes.UNARY.
121+
:type rpc_type: RpcType
122+
123+
:param callable_method:
124+
The main callable method to be executed.
125+
:type callable_method: Optional[Callable]
126+
"""
127+
self._method_name = method_name
128+
self._arg_serialization = arg_serialization
129+
self._return_serialization = return_serialization
130+
self._callable_method = callable_method
131+
132+
if isinstance(rpc_type, str):
133+
rpc_type = RpcTypes.from_name(rpc_type)
134+
elif isinstance(rpc_type, RpcTypes):
135+
rpc_type = rpc_type.value
136+
elif not isinstance(rpc_type, RpcType):
137+
raise TypeError(
138+
f"rpc_type must be of type RpcType, RpcTypes, or str, not {type(rpc_type)}"
139+
)
140+
self._rpc_type = rpc_type
141+
142+
def get_method(self) -> Callable:
143+
"""
144+
Get the callable method.
145+
:return: The callable method.
146+
:rtype: Callable
147+
"""
148+
return self._callable_method
149+
150+
def get_method_name(self) -> str:
151+
"""
152+
Get the method name.
153+
:return: The method name.
154+
:rtype: str
155+
"""
156+
return self._method_name
157+
158+
def get_rpc_type(self) -> RpcType:
159+
"""
160+
Get the RPC type.
161+
:return: The RPC type.
162+
:rtype: RpcType
163+
"""
164+
return self._rpc_type
165+
166+
def get_arg_serializer(self) -> Optional[SerializingFunction]:
167+
"""
168+
Get the argument serializer.
169+
:return: The argument serializer. If not set, return None.
170+
:rtype: Optional[SerializingFunction]
171+
"""
172+
return self._arg_serialization[0] if self._arg_serialization else None
173+
174+
def get_arg_deserializer(self) -> Optional[DeserializingFunction]:
175+
"""
176+
Get the argument deserializer.
177+
:return: The argument deserializer. If not set, return None.
178+
:rtype: Optional[DeserializingFunction]
179+
"""
180+
return self._arg_serialization[1] if self._arg_serialization else None
181+
182+
def get_return_serializer(self) -> Optional[SerializingFunction]:
183+
"""
184+
Get the return value serializer.
185+
:return: The return value serializer. If not set, return None.
186+
:rtype: Optional[SerializingFunction]
187+
"""
188+
return self._return_serialization[0] if self._return_serialization else None
189+
190+
def get_return_deserializer(self) -> Optional[DeserializingFunction]:
191+
"""
192+
Get the return value deserializer.
193+
:return: The return value deserializer. If not set, return None.
194+
:rtype: Optional[DeserializingFunction]
195+
"""
196+
return self._return_serialization[1] if self._return_serialization else None
197+
198+
199+
class ReadStream(abc.ABC):
200+
"""
201+
ReadStream is an abstract class for reading streams.
202+
"""
203+
204+
@abc.abstractmethod
205+
def read(self, *args, **kwargs) -> Any:
206+
"""
207+
Read the stream.
208+
:param args: The arguments to pass to the read method.
209+
:param kwargs: The keyword arguments to pass to the read method.
210+
:return: The read value.
211+
"""
212+
raise NotImplementedError()
213+
214+
215+
class WriteStream(abc.ABC):
216+
"""
217+
WriteStream is an abstract class for writing streams.
218+
"""
219+
220+
@abc.abstractmethod
221+
def can_write_more(self) -> bool:
222+
"""
223+
Check if the stream can write more data.
224+
:return: True if the stream can write more data, False otherwise.
225+
:rtype: bool
226+
"""
227+
raise NotImplementedError()
228+
229+
@abc.abstractmethod
230+
def write(self, *args, **kwargs) -> None:
231+
"""
232+
Write to the stream.
233+
:param args: The arguments to pass to the write method.
234+
:param kwargs: The keyword arguments to pass to the write method.
235+
"""
236+
raise NotImplementedError()
237+
238+
@abc.abstractmethod
239+
def done_writing(self, **kwargs) -> None:
240+
"""
241+
Done writing to the stream.
242+
:param kwargs: The keyword arguments to pass to the done
243+
"""
244+
raise NotImplementedError()
245+
246+
247+
class ReadWriteStream(ReadStream, WriteStream, abc.ABC):
248+
"""
249+
ReadWriteStream is an abstract class for reading and writing streams.
250+
"""
251+
252+
pass

dubbo/client.py

Lines changed: 40 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -17,27 +17,23 @@
1717
from typing import Optional
1818

1919
from dubbo.bootstrap import Dubbo
20+
from dubbo.classes import MethodDescriptor
2021
from dubbo.configs import ReferenceConfig
2122
from dubbo.constants import common_constants
2223
from dubbo.extension import extensionLoader
2324
from dubbo.protocol import Invoker, Protocol
24-
from dubbo.proxy import RpcCallable
25-
from dubbo.proxy.callables import MultipleRpcCallable
25+
from dubbo.proxy import RpcCallable, RpcCallableFactory
26+
from dubbo.proxy.callables import DefaultRpcCallableFactory
2627
from dubbo.registry.protocol import RegistryProtocol
2728
from dubbo.types import (
28-
BiStreamCallType,
29-
CallType,
30-
ClientStreamCallType,
3129
DeserializingFunction,
3230
SerializingFunction,
33-
ServerStreamCallType,
34-
UnaryCallType,
31+
RpcTypes,
3532
)
33+
from dubbo.url import URL
3634

3735
__all__ = ["Client"]
3836

39-
from dubbo.url import URL
40-
4137

4238
class Client:
4339
def __init__(self, reference: ReferenceConfig, dubbo: Optional[Dubbo] = None):
@@ -51,6 +47,8 @@ def __init__(self, reference: ReferenceConfig, dubbo: Optional[Dubbo] = None):
5147
self._protocol: Optional[Protocol] = None
5248
self._invoker: Optional[Invoker] = None
5349

50+
self._callable_factory: RpcCallableFactory = DefaultRpcCallableFactory()
51+
5452
# initialize the invoker
5553
self._initialize()
5654

@@ -97,10 +95,12 @@ def unary(
9795
response_deserializer: Optional[DeserializingFunction] = None,
9896
) -> RpcCallable:
9997
return self._callable(
100-
UnaryCallType,
101-
method_name,
102-
request_serializer,
103-
response_deserializer,
98+
MethodDescriptor(
99+
method_name=method_name,
100+
arg_serialization=(request_serializer, None),
101+
return_serialization=(None, response_deserializer),
102+
rpc_type=RpcTypes.UNARY.value,
103+
)
104104
)
105105

106106
def client_stream(
@@ -110,10 +110,12 @@ def client_stream(
110110
response_deserializer: Optional[DeserializingFunction] = None,
111111
) -> RpcCallable:
112112
return self._callable(
113-
ClientStreamCallType,
114-
method_name,
115-
request_serializer,
116-
response_deserializer,
113+
MethodDescriptor(
114+
method_name=method_name,
115+
arg_serialization=(request_serializer, None),
116+
return_serialization=(None, response_deserializer),
117+
rpc_type=RpcTypes.CLIENT_STREAM.value,
118+
)
117119
)
118120

119121
def server_stream(
@@ -123,10 +125,12 @@ def server_stream(
123125
response_deserializer: Optional[DeserializingFunction] = None,
124126
) -> RpcCallable:
125127
return self._callable(
126-
ServerStreamCallType,
127-
method_name,
128-
request_serializer,
129-
response_deserializer,
128+
MethodDescriptor(
129+
method_name=method_name,
130+
arg_serialization=(request_serializer, None),
131+
return_serialization=(None, response_deserializer),
132+
rpc_type=RpcTypes.SERVER_STREAM.value,
133+
)
130134
)
131135

132136
def bidi_stream(
@@ -135,30 +139,20 @@ def bidi_stream(
135139
request_serializer: Optional[SerializingFunction] = None,
136140
response_deserializer: Optional[DeserializingFunction] = None,
137141
) -> RpcCallable:
142+
# create method descriptor
138143
return self._callable(
139-
BiStreamCallType,
140-
method_name,
141-
request_serializer,
142-
response_deserializer,
144+
MethodDescriptor(
145+
method_name=method_name,
146+
arg_serialization=(request_serializer, None),
147+
return_serialization=(None, response_deserializer),
148+
rpc_type=RpcTypes.BI_STREAM.value,
149+
)
143150
)
144151

145-
def _callable(
146-
self,
147-
call_type: CallType,
148-
method_name: str,
149-
request_serializer: Optional[SerializingFunction] = None,
150-
response_deserializer: Optional[DeserializingFunction] = None,
151-
) -> RpcCallable:
152+
def _callable(self, method_descriptor: MethodDescriptor) -> RpcCallable:
152153
"""
153154
Generate a proxy for the given method
154-
:param call_type: The call type.
155-
:type call_type: str
156-
:param method_name: The method name.
157-
:type method_name: str
158-
:param request_serializer: The request serializer.
159-
:type request_serializer: Optional[SerializingFunction]
160-
:param response_deserializer: The response deserializer.
161-
:type response_deserializer: Optional[DeserializingFunction]
155+
:param method_descriptor: The method descriptor.
162156
:return: The proxy.
163157
:rtype: RpcCallable
164158
"""
@@ -167,13 +161,11 @@ def _callable(
167161

168162
# clone url
169163
url = url.copy()
170-
url.parameters[common_constants.METHOD_KEY] = method_name
171-
# set call type
172-
url.attributes[common_constants.CALL_KEY] = call_type
173-
174-
# set serializer and deserializer
175-
url.attributes[common_constants.SERIALIZER_KEY] = request_serializer
176-
url.attributes[common_constants.DESERIALIZER_KEY] = response_deserializer
164+
url.parameters[common_constants.METHOD_KEY] = (
165+
method_descriptor.get_method_name()
166+
)
167+
# set method descriptor
168+
url.attributes[common_constants.METHOD_DESCRIPTOR_KEY] = method_descriptor
177169

178170
# create proxy
179-
return MultipleRpcCallable(self._invoker, url)
171+
return self._callable_factory.get_callable(self._invoker, url)

0 commit comments

Comments
 (0)