Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
d51e9c4
gsoc pull request
aditya0yadav Jul 4, 2025
a785c34
fix the duplicate code issue in client.py
aditya0yadav Jul 14, 2025
95321d7
fixed with changes
aditya0yadav Jul 17, 2025
109a1e4
Resolve the error for the high dependency on pydantic
aditya0yadav Jul 17, 2025
57b0129
completed the protbuf implementation
aditya0yadav Aug 23, 2025
9cd492e
remove some unneccary debug logic
aditya0yadav Aug 23, 2025
6e13e9f
add the license template in the handlers.py
aditya0yadav Aug 24, 2025
85d0bdd
add the simple test for json and protobuf
aditya0yadav Aug 27, 2025
b94f3cf
add the license header for missing or newly generated file
aditya0yadav Aug 27, 2025
46e660c
fixed some naming convenion , clean the code and fix some bug
aditya0yadav Aug 31, 2025
1ff7083
completed the hessian with pyhessian and manual serialization
aditya0yadav Sep 1, 2025
dfa1b07
upgraded the manual , automatic fallback to manual if the type cant …
aditya0yadav Sep 1, 2025
ed8c30b
addding the neccasy license
aditya0yadav Sep 1, 2025
da70485
remove the hessian implementation
aditya0yadav Sep 2, 2025
fc6e5dd
fixing the issue
aditya0yadav Sep 3, 2025
598703c
formating the code and remake the json serializer plugable architecture
aditya0yadav Sep 3, 2025
65de7d6
fixed all error issue by the reviewer
aditya0yadav Sep 4, 2025
fbb9d81
changing the type for organisation requirement..
aditya0yadav Sep 4, 2025
4f914b5
fixing the minor bugs
aditya0yadav Sep 4, 2025
e12f69e
remove some debug statement
aditya0yadav Sep 4, 2025
3424ccb
clean the code and make it easy to understand
aditya0yadav Sep 4, 2025
1393855
fixing bugs
aditya0yadav Sep 4, 2025
23f21d9
remove the debug statement
aditya0yadav Sep 4, 2025
e3a5244
added the use of extension loader in the betterproto handler
aditya0yadav Sep 5, 2025
1b5d668
using ruff format for formatting
aditya0yadav Sep 5, 2025
c2f1424
added the protoc handler if the google protoc being used
aditya0yadav Sep 5, 2025
25a0496
divide the base line code from dubbo codec in interface
aditya0yadav Sep 5, 2025
bd2b42b
fixing the import issue
aditya0yadav Sep 5, 2025
a5fd49e
add _interface in every file and remove the codehelper from the class…
aditya0yadav Sep 5, 2025
5a4b4eb
added more visibility in the codebase
aditya0yadav Sep 6, 2025
fc96853
remove the file json_codec
aditya0yadav Sep 6, 2025
1dc61a0
chat_pb2.py
aditya0yadav Sep 6, 2025
bfee72d
changing the deprecated type to inbuilt one
aditya0yadav Sep 6, 2025
96b02ed
test being change
aditya0yadav Sep 6, 2025
91a4ba8
added more pytest for protobuf
aditya0yadav Sep 6, 2025
ebbb532
fix the bug related to json transportcodecbridge
aditya0yadav Sep 6, 2025
552137c
remove the use the json as a fallback in the handlers.py
aditya0yadav Sep 6, 2025
dea7c78
changing the marker according to real time market state
aditya0yadav Sep 7, 2025
db9f70a
imporvise the testcase and improve the return type checker
aditya0yadav Sep 7, 2025
e016672
remove the debug statement
aditya0yadav Sep 7, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion samples/llm/chat_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion samples/llm/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@
# limitations under the License.
from time import sleep

import chat_pb2
from lmdeploy import GenerationConfig, TurbomindEngineConfig, pipeline

from dubbo import Dubbo
from dubbo.configs import RegistryConfig, ServiceConfig
from dubbo.proxy.handlers import RpcMethodHandler, RpcServiceHandler
import chat_pb2

# the path of a model. It could be one of the following options:
# 1. A local directory path of a turbomind model
Expand Down
1 change: 1 addition & 0 deletions src/dubbo/classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import abc
import threading
from typing import Any, Callable, Optional, Union
Expand Down
159 changes: 113 additions & 46 deletions src/dubbo/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
Expand All @@ -13,11 +11,13 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import threading
from typing import Optional

from dubbo.bootstrap import Dubbo
from dubbo.classes import MethodDescriptor
from dubbo.codec import DubboSerializationService
from dubbo.configs import ReferenceConfig
from dubbo.constants import common_constants
from dubbo.extension import extensionLoader
Expand Down Expand Up @@ -54,20 +54,19 @@ def __init__(self, reference: ReferenceConfig, dubbo: Optional[Dubbo] = None):

def _initialize(self):
"""
Initialize the invoker.
Initialize the invoker with protocol and URL.
"""
with self._global_lock:
if self._initialized:
return

# get the protocol
# get the protocol extension
protocol = extensionLoader.get_extension(Protocol, self._reference.protocol)()

registry_config = self._dubbo.registry_config
self._protocol = RegistryProtocol(registry_config, protocol) if registry_config else protocol

self._protocol = RegistryProtocol(registry_config, protocol) if self._dubbo.registry_config else protocol

# build url
# build the reference URL
reference_url = self._reference.to_url()
if registry_config:
self._url = registry_config.to_url().copy()
Expand All @@ -77,87 +76,155 @@ def _initialize(self):
else:
self._url = reference_url

# create invoker
# create the invoker using the protocol
self._invoker = self._protocol.refer(self._url)

self._initialized = True

def unary(
def _create_rpc_callable(
self,
rpc_type: str,
method_name: str,
params_types: list[type],
return_type: type,
codec: Optional[str] = None,
request_serializer: Optional[SerializingFunction] = None,
response_deserializer: Optional[DeserializingFunction] = None,
) -> RpcCallable:
return self._callable(
MethodDescriptor(
method_name=method_name,
arg_serialization=(request_serializer, None),
return_serialization=(None, response_deserializer),
rpc_type=RpcTypes.UNARY.value,
"""
Create an RPC callable with the specified type.

:param rpc_type: Type of RPC (unary, client_stream, server_stream, bi_stream)
:param method_name: Name of the method to call
:param params_types: List of parameter types
:param return_type: Return type of the method
:param codec: Optional codec to use for serialization
:param request_serializer: Optional custom request serializer
:param response_deserializer: Optional custom response deserializer
:return: RPC callable proxy
:rtype: RpcCallable
"""
# determine serializers
if request_serializer and response_deserializer:
req_ser = request_serializer
res_deser = response_deserializer
else:
req_ser, res_deser = DubboSerializationService.create_serialization_functions(
codec,
parameter_types=params_types,
return_type=return_type,
)

# create method descriptor
descriptor = MethodDescriptor(
method_name=method_name,
arg_serialization=(req_ser, None),
return_serialization=(None, res_deser),
rpc_type=rpc_type,
)

return self._callable(descriptor)

def unary(
self,
method_name: str,
params_types: list[type],
return_type: type,
codec: Optional[str] = None,
request_serializer: Optional[SerializingFunction] = None,
response_deserializer: Optional[DeserializingFunction] = None,
) -> RpcCallable:
"""
Create a unary RPC callable.
"""
return self._create_rpc_callable(
rpc_type=RpcTypes.UNARY.value,
method_name=method_name,
params_types=params_types,
return_type=return_type,
codec=codec,
request_serializer=request_serializer,
response_deserializer=response_deserializer,
)

def client_stream(
self,
method_name: str,
params_types: list[type],
return_type: type,
codec: Optional[str] = None,
request_serializer: Optional[SerializingFunction] = None,
response_deserializer: Optional[DeserializingFunction] = None,
) -> RpcCallable:
return self._callable(
MethodDescriptor(
method_name=method_name,
arg_serialization=(request_serializer, None),
return_serialization=(None, response_deserializer),
rpc_type=RpcTypes.CLIENT_STREAM.value,
)
"""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also hope to see the implementation on the server side

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cnzakii
If you don't mind can you explain me about this

like most of the work of server can be handle by the rpc handler

or
Am i missing something ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can review the proposal I sent in Slack, which also contains the corresponding Server interface design. You should also implement the construction method descriptors on the Server side, select the serialization method, and so on.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok
Now i understand
Thx for clarification

Create a client-streaming RPC callable.
"""
return self._create_rpc_callable(
rpc_type=RpcTypes.CLIENT_STREAM.value,
method_name=method_name,
params_types=params_types,
return_type=return_type,
codec=codec,
request_serializer=request_serializer,
response_deserializer=response_deserializer,
)

def server_stream(
self,
method_name: str,
params_types: list[type],
return_type: type,
codec: Optional[str] = None,
request_serializer: Optional[SerializingFunction] = None,
response_deserializer: Optional[DeserializingFunction] = None,
) -> RpcCallable:
return self._callable(
MethodDescriptor(
method_name=method_name,
arg_serialization=(request_serializer, None),
return_serialization=(None, response_deserializer),
rpc_type=RpcTypes.SERVER_STREAM.value,
)
"""
Create a server-streaming RPC callable.
"""
return self._create_rpc_callable(
rpc_type=RpcTypes.SERVER_STREAM.value,
method_name=method_name,
params_types=params_types,
return_type=return_type,
codec=codec,
request_serializer=request_serializer,
response_deserializer=response_deserializer,
)

def bi_stream(
self,
method_name: str,
params_types: list[type],
return_type: type,
codec: Optional[str] = None,
request_serializer: Optional[SerializingFunction] = None,
response_deserializer: Optional[DeserializingFunction] = None,
) -> RpcCallable:
# create method descriptor
return self._callable(
MethodDescriptor(
method_name=method_name,
arg_serialization=(request_serializer, None),
return_serialization=(None, response_deserializer),
rpc_type=RpcTypes.BI_STREAM.value,
)
"""
Create a bidirectional-streaming RPC callable.
"""
return self._create_rpc_callable(
rpc_type=RpcTypes.BI_STREAM.value,
method_name=method_name,
params_types=params_types,
return_type=return_type,
codec=codec,
request_serializer=request_serializer,
response_deserializer=response_deserializer,
)

def _callable(self, method_descriptor: MethodDescriptor) -> RpcCallable:
"""
Generate a proxy for the given method
Generate a proxy for the given method.

:param method_descriptor: The method descriptor.
:return: The proxy.
:return: The RPC callable proxy.
:rtype: RpcCallable
"""
# get invoker
url = self._invoker.get_url()

# clone url
url = url.copy()
# get invoker URL and clone it
url = self._invoker.get_url().copy()
url.parameters[common_constants.METHOD_KEY] = method_descriptor.get_method_name()
# set method descriptor
url.attributes[common_constants.METHOD_DESCRIPTOR_KEY] = method_descriptor

# create proxy
# create proxy callable
return self._callable_factory.get_callable(self._invoker, url)
20 changes: 20 additions & 0 deletions src/dubbo/codec/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from ._interface import Codec
from .dubbo_codec import DubboSerializationService

__all__ = ["DubboSerializationService", "Codec"]
Loading