diff --git a/samples/llm/chat_pb2.py b/samples/llm/chat_pb2.py index de9488e..716bf7f 100644 --- a/samples/llm/chat_pb2.py +++ b/samples/llm/chat_pb2.py @@ -29,4 +29,4 @@ _globals["_CHATREPLY"]._serialized_end = 136 _globals["_DEEPSEEKAISERVICE"]._serialized_start = 138 _globals["_DEEPSEEKAISERVICE"]._serialized_end = 259 -# @@protoc_insertion_point(module_scope) +# @@protoc_insertion_point(module_scope) \ No newline at end of file diff --git a/samples/llm/main.py b/samples/llm/main.py index e315716..97baaa6 100644 --- a/samples/llm/main.py +++ b/samples/llm/main.py @@ -15,12 +15,12 @@ # limitations under the License. from time import sleep +import chat_pb2 from lmdeploy import GenerationConfig, TurbomindEngineConfig, pipeline from dubbo import Dubbo from dubbo.configs import RegistryConfig, ServiceConfig from dubbo.proxy.handlers import RpcMethodHandler, RpcServiceHandler -import chat_pb2 # the path of a model. It could be one of the following options: # 1. A local directory path of a turbomind model diff --git a/src/dubbo/classes.py b/src/dubbo/classes.py index 8d87299..a07f56f 100644 --- a/src/dubbo/classes.py +++ b/src/dubbo/classes.py @@ -13,6 +13,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + import abc import threading from typing import Any, Callable, Optional, Union diff --git a/src/dubbo/client.py b/src/dubbo/client.py index 33e6264..390a28c 100644 --- a/src/dubbo/client.py +++ b/src/dubbo/client.py @@ -3,8 +3,6 @@ # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # @@ -13,11 +11,13 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + import threading from typing import Optional from dubbo.bootstrap import Dubbo from dubbo.classes import MethodDescriptor +from dubbo.codec import DubboSerializationService from dubbo.configs import ReferenceConfig from dubbo.constants import common_constants from dubbo.extension import extensionLoader @@ -54,20 +54,19 @@ def __init__(self, reference: ReferenceConfig, dubbo: Optional[Dubbo] = None): def _initialize(self): """ - Initialize the invoker. + Initialize the invoker with protocol and URL. """ with self._global_lock: if self._initialized: return - # get the protocol + # get the protocol extension protocol = extensionLoader.get_extension(Protocol, self._reference.protocol)() registry_config = self._dubbo.registry_config + self._protocol = RegistryProtocol(registry_config, protocol) if registry_config else protocol - self._protocol = RegistryProtocol(registry_config, protocol) if self._dubbo.registry_config else protocol - - # build url + # build the reference URL reference_url = self._reference.to_url() if registry_config: self._url = registry_config.to_url().copy() @@ -77,87 +76,155 @@ def _initialize(self): else: self._url = reference_url - # create invoker + # create the invoker using the protocol self._invoker = self._protocol.refer(self._url) self._initialized = True - def unary( + def _create_rpc_callable( self, + rpc_type: str, method_name: str, + params_types: list[type], + return_type: type, + codec: Optional[str] = None, request_serializer: Optional[SerializingFunction] = None, response_deserializer: Optional[DeserializingFunction] = None, ) -> RpcCallable: - return self._callable( - MethodDescriptor( - method_name=method_name, - arg_serialization=(request_serializer, None), - return_serialization=(None, response_deserializer), - rpc_type=RpcTypes.UNARY.value, + """ + Create an RPC callable with the specified type. + + :param rpc_type: Type of RPC (unary, client_stream, server_stream, bi_stream) + :param method_name: Name of the method to call + :param params_types: List of parameter types + :param return_type: Return type of the method + :param codec: Optional codec to use for serialization + :param request_serializer: Optional custom request serializer + :param response_deserializer: Optional custom response deserializer + :return: RPC callable proxy + :rtype: RpcCallable + """ + # determine serializers + if request_serializer and response_deserializer: + req_ser = request_serializer + res_deser = response_deserializer + else: + req_ser, res_deser = DubboSerializationService.create_serialization_functions( + codec, + parameter_types=params_types, + return_type=return_type, ) + + # create method descriptor + descriptor = MethodDescriptor( + method_name=method_name, + arg_serialization=(req_ser, None), + return_serialization=(None, res_deser), + rpc_type=rpc_type, + ) + + return self._callable(descriptor) + + def unary( + self, + method_name: str, + params_types: list[type], + return_type: type, + codec: Optional[str] = None, + request_serializer: Optional[SerializingFunction] = None, + response_deserializer: Optional[DeserializingFunction] = None, + ) -> RpcCallable: + """ + Create a unary RPC callable. + """ + return self._create_rpc_callable( + rpc_type=RpcTypes.UNARY.value, + method_name=method_name, + params_types=params_types, + return_type=return_type, + codec=codec, + request_serializer=request_serializer, + response_deserializer=response_deserializer, ) def client_stream( self, method_name: str, + params_types: list[type], + return_type: type, + codec: Optional[str] = None, request_serializer: Optional[SerializingFunction] = None, response_deserializer: Optional[DeserializingFunction] = None, ) -> RpcCallable: - return self._callable( - MethodDescriptor( - method_name=method_name, - arg_serialization=(request_serializer, None), - return_serialization=(None, response_deserializer), - rpc_type=RpcTypes.CLIENT_STREAM.value, - ) + """ + Create a client-streaming RPC callable. + """ + return self._create_rpc_callable( + rpc_type=RpcTypes.CLIENT_STREAM.value, + method_name=method_name, + params_types=params_types, + return_type=return_type, + codec=codec, + request_serializer=request_serializer, + response_deserializer=response_deserializer, ) def server_stream( self, method_name: str, + params_types: list[type], + return_type: type, + codec: Optional[str] = None, request_serializer: Optional[SerializingFunction] = None, response_deserializer: Optional[DeserializingFunction] = None, ) -> RpcCallable: - return self._callable( - MethodDescriptor( - method_name=method_name, - arg_serialization=(request_serializer, None), - return_serialization=(None, response_deserializer), - rpc_type=RpcTypes.SERVER_STREAM.value, - ) + """ + Create a server-streaming RPC callable. + """ + return self._create_rpc_callable( + rpc_type=RpcTypes.SERVER_STREAM.value, + method_name=method_name, + params_types=params_types, + return_type=return_type, + codec=codec, + request_serializer=request_serializer, + response_deserializer=response_deserializer, ) def bi_stream( self, method_name: str, + params_types: list[type], + return_type: type, + codec: Optional[str] = None, request_serializer: Optional[SerializingFunction] = None, response_deserializer: Optional[DeserializingFunction] = None, ) -> RpcCallable: - # create method descriptor - return self._callable( - MethodDescriptor( - method_name=method_name, - arg_serialization=(request_serializer, None), - return_serialization=(None, response_deserializer), - rpc_type=RpcTypes.BI_STREAM.value, - ) + """ + Create a bidirectional-streaming RPC callable. + """ + return self._create_rpc_callable( + rpc_type=RpcTypes.BI_STREAM.value, + method_name=method_name, + params_types=params_types, + return_type=return_type, + codec=codec, + request_serializer=request_serializer, + response_deserializer=response_deserializer, ) def _callable(self, method_descriptor: MethodDescriptor) -> RpcCallable: """ - Generate a proxy for the given method + Generate a proxy for the given method. + :param method_descriptor: The method descriptor. - :return: The proxy. + :return: The RPC callable proxy. :rtype: RpcCallable """ - # get invoker - url = self._invoker.get_url() - - # clone url - url = url.copy() + # get invoker URL and clone it + url = self._invoker.get_url().copy() url.parameters[common_constants.METHOD_KEY] = method_descriptor.get_method_name() - # set method descriptor url.attributes[common_constants.METHOD_DESCRIPTOR_KEY] = method_descriptor - # create proxy + # create proxy callable return self._callable_factory.get_callable(self._invoker, url) diff --git a/src/dubbo/codec/__init__.py b/src/dubbo/codec/__init__.py new file mode 100644 index 0000000..fb17b9d --- /dev/null +++ b/src/dubbo/codec/__init__.py @@ -0,0 +1,20 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._interface import Codec +from .dubbo_codec import DubboSerializationService + +__all__ = ["DubboSerializationService", "Codec"] diff --git a/src/dubbo/codec/_interface.py b/src/dubbo/codec/_interface.py new file mode 100644 index 0000000..4350b82 --- /dev/null +++ b/src/dubbo/codec/_interface.py @@ -0,0 +1,157 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import abc +import logging +from dataclasses import dataclass +from typing import Any, Callable, Optional + +__all__ = [ + "ParameterDescriptor", + "MethodDescriptor", + "TransportCodec", + "SerializationEncoder", + "SerializationDecoder", + "Codec", +] + +logger = logging.getLogger(__name__) + + +@dataclass +class ParameterDescriptor: + """Information about a method parameter""" + + name: str + annotation: Any + is_required: bool = True + default_value: Any = None + + +@dataclass +class MethodDescriptor: + """Method descriptor with function details""" + + function: Callable + name: str + parameters: list[ParameterDescriptor] + return_parameter: ParameterDescriptor + documentation: Optional[str] = None + + +class TransportCodec(abc.ABC): + """ + The transport codec interface. + """ + + @classmethod + @abc.abstractmethod + def get_transport_type(cls) -> str: + """ + Get transport type of current codec + :return: The transport type. + :rtype: str + """ + raise NotImplementedError() + + @abc.abstractmethod + def get_encoder(self) -> "SerializationEncoder": + """ + Get encoder instance + :return: The encoder. + :rtype: SerializationEncoder + """ + raise NotImplementedError() + + @abc.abstractmethod + def get_decoder(self) -> "SerializationDecoder": + """ + Get decoder instance + :return: The decoder. + :rtype: SerializationDecoder + """ + raise NotImplementedError() + + +class SerializationEncoder(abc.ABC): + """ + The serialization encoder interface. + """ + + @abc.abstractmethod + def encode(self, arguments: tuple[Any, ...]) -> bytes: + """ + Encode arguments to bytes. + :param arguments: The arguments to encode. + :type arguments: tuple[Any, ...] + :return: The encoded bytes. + :rtype: bytes + """ + raise NotImplementedError() + + +class SerializationDecoder(abc.ABC): + """ + The serialization decoder interface. + """ + + @abc.abstractmethod + def decode(self, data: bytes) -> Any: + """ + Decode bytes to object. + :param data: The data to decode. + :type data: bytes + :return: The decoded object. + :rtype: Any + """ + raise NotImplementedError() + + +class Codec(abc.ABC): + """ + Base codec interface for encoding and decoding data. + """ + + def __init__(self, model_type: Optional[type[Any]] = None, **kwargs): + """ + Initialize a codec + :param model_type: Optional model type for structured encoding/decoding + :type model_type: Optional[type[Any]] + :param kwargs: Additional codec configuration + """ + self.model_type = model_type + + @abc.abstractmethod + def encode(self, data: Any) -> bytes: + """ + Encode data into bytes + :param data: The data to encode. + :type data: Any + :return: Encoded byte representation + :rtype: bytes + """ + raise NotImplementedError() + + @abc.abstractmethod + def decode(self, data: bytes) -> Any: + """ + Decode bytes into object + :param data: The bytes to decode. + :type data: bytes + :return: Decoded object + :rtype: Any + """ + raise NotImplementedError() diff --git a/src/dubbo/codec/dubbo_codec.py b/src/dubbo/codec/dubbo_codec.py new file mode 100644 index 0000000..ae5f51f --- /dev/null +++ b/src/dubbo/codec/dubbo_codec.py @@ -0,0 +1,237 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import inspect +import logging +from typing import Any, Callable, Optional + +from ._interface import ( + Codec, + MethodDescriptor, + ParameterDescriptor, + SerializationDecoder, + SerializationEncoder, + TransportCodec, +) + +__all__ = [ + "DubboSerializationService", +] + +logger = logging.getLogger(__name__) + + +class DubboSerializationService: + """Dubbo serialization service with type handling""" + + @staticmethod + def create_transport_codec( + transport_type: str = "json", + parameter_types: Optional[list[type]] = None, + return_type: Optional[type] = None, + **codec_options, + ) -> TransportCodec: + """ + Create transport codec + + :param transport_type: The transport type (e.g., 'json', 'protobuf') + :param parameter_types: list of parameter types + :param return_type: Return value type + :param codec_options: Additional codec options + :return: Transport codec instance + :raises ImportError: If required modules cannot be imported + :raises Exception: If codec creation fails + """ + try: + from dubbo.extension.extension_loader import ExtensionLoader + + codec_class = ExtensionLoader().get_extension(Codec, transport_type) + return codec_class(parameter_types=parameter_types or [], return_type=return_type, **codec_options) + except ImportError as e: + logger.error("Failed to import required modules: %s", e) + raise + except Exception as e: + logger.error("Failed to create transport codec: %s", e) + raise + + @staticmethod + def create_encoder_decoder_pair( + transport_type: str, + parameter_types: Optional[list[type]] = None, + return_type: Optional[type] = None, + **codec_options, + ) -> tuple[SerializationEncoder, SerializationDecoder]: + """ + Create encoder and decoder instances + + :param transport_type: The transport type + :param parameter_types: list of parameter types + :param return_type: Return value type + :param codec_options: Additional codec options + :return: tuple of (encoder, decoder) + :raises ValueError: If codec returns None encoder/decoder + :raises Exception: If creation fails + """ + try: + codec_instance = DubboSerializationService.create_transport_codec( + transport_type=transport_type, + parameter_types=parameter_types, + return_type=return_type, + **codec_options, + ) + + encoder = codec_instance.encoder() + decoder = codec_instance.decoder() + + if encoder is None or decoder is None: + raise ValueError(f"Codec for transport type '{transport_type}' returned None encoder/decoder") + + return encoder, decoder + + except Exception as e: + logger.error("Failed to create encoder/decoder pair: %s", e) + raise + + @staticmethod + def create_serialization_functions( + transport_type: str, + parameter_types: Optional[list[type]] = None, + return_type: Optional[type] = None, + **codec_options, + ) -> tuple[Callable[..., bytes], Callable[[bytes], Any]]: + """ + Create serializer and deserializer functions + + :param transport_type: The transport type + :param parameter_types: list of parameter types + :param return_type: Return value type + :param codec_options: Additional codec options + :return: tuple of (serializer_function, deserializer_function) + :raises Exception: If creation fails + """ + try: + parameter_encoder, return_decoder = DubboSerializationService.create_encoder_decoder_pair( + transport_type=transport_type, + parameter_types=parameter_types, + return_type=return_type, + **codec_options, + ) + + def serialize_method_parameters(*args) -> bytes: + """Serialize method parameters to bytes""" + try: + return parameter_encoder.encode(args) + except Exception as e: + logger.error("Failed to serialize parameters: %s", e) + raise + + def deserialize_method_return(data: bytes) -> Any: + """Deserialize bytes to return value""" + if not isinstance(data, bytes): + raise TypeError(f"Expected bytes, got {type(data)}") + try: + return return_decoder.decode(data) + except Exception as e: + logger.error("Failed to deserialize return value: %s", e) + raise + + return serialize_method_parameters, deserialize_method_return + + except Exception as e: + logger.error("Failed to create serialization functions: %s", e) + raise + + @staticmethod + def create_method_descriptor( + func: Callable, + method_name: Optional[str] = None, + parameter_types: Optional[list[type]] = None, + return_type: Optional[type] = None, + interface: Optional[Callable[..., Any]] = None, + ) -> MethodDescriptor: + """ + Create a method descriptor from function and configuration + + :param func: The function to create descriptor for + :param method_name: Override method name + :param parameter_types: Override parameter types + :param return_type: Override return type + :param interface: Interface to use for signature inspection + :return: Method descriptor + :raises TypeError: If func is not callable + :raises ValueError: If signature cannot be inspected + """ + if not callable(func): + raise TypeError("func must be callable") + + # Use interface signature if provided, otherwise use func signature + target_function = interface if interface is not None else func + name = method_name or target_function.__name__ + + try: + sig = inspect.signature(target_function) + except ValueError as e: + logger.error("Cannot inspect signature of %s: %s", target_function, e) + raise + + parameters = [] + resolved_parameter_types = parameter_types or [] + param_index = 0 + + for param_name, param in sig.parameters.items(): + # Skip 'self' parameter for methods + if param_name == "self": + continue + + # Get parameter type from provided types, annotation, or default to Any + if param_index < len(resolved_parameter_types): + param_type = resolved_parameter_types[param_index] + elif param.annotation != inspect.Parameter.empty: + param_type = param.annotation + else: + param_type = Any + + is_required = param.default == inspect.Parameter.empty + default_value = param.default if not is_required else None + + parameters.append( + ParameterDescriptor( + name=param_name, + annotation=param_type, + is_required=is_required, + default_value=default_value, + ) + ) + + param_index += 1 + + # Resolve return type + if return_type is not None: + resolved_return_type = return_type + elif sig.return_annotation != inspect.Signature.empty: + resolved_return_type = sig.return_annotation + else: + resolved_return_type = Any + + return_parameter = ParameterDescriptor(name="return_value", annotation=resolved_return_type) + + return MethodDescriptor( + function=func, + name=name, + parameters=parameters, + return_parameter=return_parameter, + documentation=inspect.getdoc(target_function), + ) diff --git a/src/dubbo/codec/json_codec/__init__.py b/src/dubbo/codec/json_codec/__init__.py new file mode 100644 index 0000000..05b8599 --- /dev/null +++ b/src/dubbo/codec/json_codec/__init__.py @@ -0,0 +1,44 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._interfaces import JsonCodec, TypeHandler +from .collections_handler import CollectionHandler +from .dataclass_handler import DataclassHandler +from .datetime_handler import DateTimeHandler +from .decimal_handler import DecimalHandler +from .enum_handler import EnumHandler +from .json_codec_handler import JsonTransportCodec +from .orjson_codec import OrJsonCodec +from .pydantic_handler import PydanticHandler +from .simple_types_handler import SimpleTypesHandler +from .standard_json import StandardJsonCodec +from .ujson_codec import UJsonCodec + +__all__ = [ + "JsonCodec", + "TypeHandler", + "StandardJsonCodec", + "OrJsonCodec", + "UJsonCodec", + "DateTimeHandler", + "PydanticHandler", + "CollectionHandler", + "DecimalHandler", + "SimpleTypesHandler", + "EnumHandler", + "DataclassHandler", + "JsonTransportCodec", +] diff --git a/src/dubbo/codec/json_codec/_interfaces.py b/src/dubbo/codec/json_codec/_interfaces.py new file mode 100644 index 0000000..af2b1a7 --- /dev/null +++ b/src/dubbo/codec/json_codec/_interfaces.py @@ -0,0 +1,98 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import abc +from typing import Any + +__all__ = ["JsonCodec", "TypeHandler"] + + +class JsonCodec(abc.ABC): + """ + The JSON codec interface for encoding and decoding objects to/from JSON bytes. + """ + + @abc.abstractmethod + def encode(self, obj: Any) -> bytes: + """ + Encode an object to JSON bytes. + + :param obj: The object to encode. + :type obj: Any + :return: The encoded JSON bytes. + :rtype: bytes + """ + raise NotImplementedError() + + @abc.abstractmethod + def decode(self, data: bytes) -> Any: + """ + Decode JSON bytes to an object. + + :param data: The JSON bytes to decode. + :type data: bytes + :return: The decoded object. + :rtype: Any + """ + raise NotImplementedError() + + @abc.abstractmethod + def can_handle(self, obj: Any) -> bool: + """ + Check if this codec can handle the given object. + + :param obj: The object to check. + :type obj: Any + :return: True if this codec can handle the object. + :rtype: bool + """ + raise NotImplementedError() + + +class TypeHandler(abc.ABC): + """ + Base interface for all type-specific serializers. + + Each type handler should implement: + - can_serialize_type: determine if the object can be serialized + - serialize_to_dict: return a dict representation of the object + """ + + @abc.abstractmethod + def can_serialize_type(self, obj: Any, obj_type: type) -> bool: + """ + Returns True if this handler can serialize the given object/type. + + :param obj: The object to check. + :type obj: Any + :param obj_type: The type of the object. + :type obj_type: type + :return: True if this handler can serialize the object. + :rtype: bool + """ + raise NotImplementedError() + + @abc.abstractmethod + def serialize_to_dict(self, obj: Any) -> dict[str, Any]: + """ + Serialize the object into a dictionary representation. + + :param obj: The object to serialize. + :type obj: Any + :return: The dictionary representation of the object. + :rtype: dict[str, Any] + """ + raise NotImplementedError() diff --git a/src/dubbo/codec/json_codec/collections_handler.py b/src/dubbo/codec/json_codec/collections_handler.py new file mode 100644 index 0000000..0e569cd --- /dev/null +++ b/src/dubbo/codec/json_codec/collections_handler.py @@ -0,0 +1,57 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Any, Union + +from dubbo.codec.json_codec import TypeHandler + +__all__ = ["CollectionHandler"] + + +class CollectionHandler(TypeHandler): + """ + Type handler for set and frozenset collections. + + Serializes sets and frozensets to list format with type markers + for proper reconstruction. + """ + + def can_serialize_type(self, obj: Any, obj_type: type) -> bool: + """ + Check if this handler can serialize collection types. + + :param obj: The object to check. + :type obj: Any + :param obj_type: The type of the object. + :type obj_type: type + :return: True if object is set or frozenset. + :rtype: bool + """ + return obj_type in (set, frozenset) + + def serialize_to_dict(self, obj: Union[set, frozenset]) -> dict[str, list]: + """ + Serialize set/frozenset to dictionary representation. + + :param obj: The collection to serialize. + :type obj: Union[set, frozenset] + :return: dictionary representation with type marker. + :rtype: dict[str, list] + """ + if isinstance(obj, frozenset): + return {"$frozenset": list(obj)} + else: + return {"$set": list(obj)} diff --git a/src/dubbo/codec/json_codec/dataclass_handler.py b/src/dubbo/codec/json_codec/dataclass_handler.py new file mode 100644 index 0000000..6ce2eae --- /dev/null +++ b/src/dubbo/codec/json_codec/dataclass_handler.py @@ -0,0 +1,55 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from dataclasses import asdict, is_dataclass +from typing import Any + +from dubbo.codec.json_codec import TypeHandler + +__all__ = ["DataclassHandler"] + + +class DataclassHandler(TypeHandler): + """ + Type handler for dataclass objects. + + Serializes dataclass instances with module path and field data + for proper reconstruction. + """ + + def can_serialize_type(self, obj: Any, obj_type: type) -> bool: + """ + Check if this handler can serialize dataclass types. + + :param obj: The object to check. + :type obj: Any + :param obj_type: The type of the object. + :type obj_type: type + :return: True if object is a dataclass instance. + :rtype: bool + """ + return is_dataclass(obj) + + def serialize_to_dict(self, obj: Any) -> dict[str, Any]: + """ + Serialize dataclass to dictionary representation. + + :param obj: The dataclass to serialize. + :type obj: Any + :return: dictionary with class path and field data. + :rtype: dict[str, Any] + """ + return {"$dataclass": f"{obj.__class__.__module__}.{obj.__class__.__qualname__}", "$fields": asdict(obj)} diff --git a/src/dubbo/codec/json_codec/datetime_handler.py b/src/dubbo/codec/json_codec/datetime_handler.py new file mode 100644 index 0000000..a20a23c --- /dev/null +++ b/src/dubbo/codec/json_codec/datetime_handler.py @@ -0,0 +1,69 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from datetime import date, datetime, time +from typing import Any, Union + +from dubbo.codec.json_codec import TypeHandler + +__all__ = ["DateTimeHandler"] + + +class DateTimeHandler(TypeHandler): + """ + Type handler for datetime, date, and time objects. + + Serializes datetime objects to ISO format with timezone information. + """ + + def can_serialize_type(self, obj: Any, obj_type: type) -> bool: + """ + Check if this handler can serialize datetime-related types. + + :param obj: The object to check. + :type obj: Any + :param obj_type: The type of the object. + :type obj_type: type + :return: True if object is datetime, date, or time. + :rtype: bool + """ + return isinstance(obj, (datetime, date, time)) + + def serialize_to_dict(self, obj: Union[datetime, date, time]) -> dict[str, Any]: + """ + Serialize datetime objects to dictionary representation. + + :param obj: The datetime object to serialize. + :type obj: Union[datetime, date, time] + :return: dictionary representation with type markers. + :rtype: dict[str, Any] + """ + if isinstance(obj, datetime): + # Convert to ISO format with Z suffix for UTC + iso_string = obj.isoformat() + if obj.tzinfo is None: + # Assume naive datetime is UTC and add Z + iso_string += "Z" + elif str(obj.tzinfo) == "UTC" or obj.utcoffset().total_seconds() == 0: + # Replace +00:00 with Z for UTC + iso_string = iso_string.replace("+00:00", "Z") + return {"$date": iso_string} + elif isinstance(obj, date): + return {"$dateOnly": obj.isoformat()} + elif isinstance(obj, time): + return {"$timeOnly": obj.isoformat()} + else: + raise ValueError(f"Unsupported datetime type: {type(obj)}") diff --git a/src/dubbo/codec/json_codec/decimal_handler.py b/src/dubbo/codec/json_codec/decimal_handler.py new file mode 100644 index 0000000..60c36f9 --- /dev/null +++ b/src/dubbo/codec/json_codec/decimal_handler.py @@ -0,0 +1,55 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from decimal import Decimal +from typing import Any + +from dubbo.codec.json_codec import TypeHandler + +__all__ = ["DecimalHandler"] + + +class DecimalHandler(TypeHandler): + """ + Type handler for Decimal objects. + + Serializes Decimal objects to string representation + for precision preservation. + """ + + def can_serialize_type(self, obj: Any, obj_type: type) -> bool: + """ + Check if this handler can serialize Decimal types. + + :param obj: The object to check. + :type obj: Any + :param obj_type: The type of the object. + :type obj_type: type + :return: True if object is Decimal. + :rtype: bool + """ + return obj_type is Decimal + + def serialize_to_dict(self, obj: Decimal) -> dict[str, str]: + """ + Serialize Decimal to dictionary representation. + + :param obj: The Decimal to serialize. + :type obj: Decimal + :return: dictionary representation with string value. + :rtype: dict[str, str] + """ + return {"$decimal": str(obj)} diff --git a/src/dubbo/codec/json_codec/enum_handler.py b/src/dubbo/codec/json_codec/enum_handler.py new file mode 100644 index 0000000..980c3bc --- /dev/null +++ b/src/dubbo/codec/json_codec/enum_handler.py @@ -0,0 +1,55 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from enum import Enum +from typing import Any + +from dubbo.codec.json_codec import TypeHandler + +__all__ = ["EnumHandler"] + + +class EnumHandler(TypeHandler): + """ + Type handler for Enum objects. + + Serializes Enum instances with module path and value + for proper reconstruction. + """ + + def can_serialize_type(self, obj: Any, obj_type: type) -> bool: + """ + Check if this handler can serialize Enum types. + + :param obj: The object to check. + :type obj: Any + :param obj_type: The type of the object. + :type obj_type: type + :return: True if object is an Enum instance. + :rtype: bool + """ + return isinstance(obj, Enum) + + def serialize_to_dict(self, obj: Enum) -> dict[str, Any]: + """ + Serialize Enum to dictionary representation. + + :param obj: The Enum to serialize. + :type obj: Enum + :return: Dictionary with enum class path and value. + :rtype: dict[str, Any] + """ + return {"$enum": f"{obj.__class__.__module__}.{obj.__class__.__qualname__}", "$value": obj.value} diff --git a/src/dubbo/codec/json_codec/json_codec_handler.py b/src/dubbo/codec/json_codec/json_codec_handler.py new file mode 100644 index 0000000..4cc95f0 --- /dev/null +++ b/src/dubbo/codec/json_codec/json_codec_handler.py @@ -0,0 +1,446 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Any, Optional, get_args, get_origin, Union + +from ._interfaces import JsonCodec, TypeHandler +from .orjson_codec import OrJsonCodec +from .standard_json import StandardJsonCodec +from .ujson_codec import UJsonCodec + +__all__ = ["JsonTransportCodec", "SerializationException", "DeserializationException"] + + +class SerializationException(Exception): + """Exception raised during serialization""" + + def __init__(self, message: str): + super().__init__(message) + self.message = message + + +class DeserializationException(Exception): + """Exception raised during deserialization""" + + def __init__(self, message: str): + super().__init__(message) + self.message = message + + +class JsonTransportCodec: + """ + JSON Transport Codec with integrated encoder/decoder functionality. + + This class serves as both a transport codec and provides encoder/decoder + interface compatibility for services that expect separate encoder/decoder objects. + """ + + def __init__( + self, + parameter_types: Optional[list[type]] = None, + return_type: Optional[type] = None, + maximum_depth: int = 100, + strict_validation: bool = True, + **kwargs, + ): + """ + Initialize the JSON transport codec. + + :param parameter_types: list of parameter types for the method. + :param return_type: Return type for the method. + :param maximum_depth: Maximum serialization depth. + :param strict_validation: Whether to use strict validation. + """ + self.parameter_types = parameter_types or [] + self.return_type = return_type + self.maximum_depth = maximum_depth + self.strict_validation = strict_validation + + # Initialize codecs and handlers using the extension pattern + self._json_codecs = self._setup_json_codecs() + self._type_handlers = self._setup_type_handlers() + + def _setup_json_codecs(self) -> list[JsonCodec]: + """ + Setup JSON codecs in priority order. + """ + codecs = [] + + # Try orjson first (fastest) + orjson_codec = OrJsonCodec() + if orjson_codec.can_handle(None): # Check availability + codecs.append(orjson_codec) + + # Try ujson second + ujson_codec = UJsonCodec() + if ujson_codec.can_handle(None): # Check availability + codecs.append(ujson_codec) + + # Always include standard json as fallback + codecs.append(StandardJsonCodec()) + + return codecs + + def _setup_type_handlers(self) -> list[TypeHandler]: + """ + Setup type handlers for different object types. + """ + handlers = [] + + from dubbo.extension import extensionLoader + + handler_names = ["datetime", "pydantic", "decimal", "enum", "simple", "dataclass", "collection"] + + for name in handler_names: + try: + plugin_class = extensionLoader.get_extension(TypeHandler, name) + if plugin_class: + plugin_instance = plugin_class() + handlers.append(plugin_instance) + except Exception as e: + print(f"Warning: Could not load type handler plugin '{name}': {e}") + return handlers + + # Core encoding/decoding methods + def encode_parameters(self, *arguments) -> bytes: + """ + Encode parameters to JSON bytes. + + :param arguments: The arguments to encode. + :return: Encoded JSON bytes. + :rtype: bytes + """ + try: + if not arguments: + return self._encode_with_codecs([]) + + # Handle single parameter case + if len(self.parameter_types) == 1: + serialized = self._serialize_object(arguments[0]) + return self._encode_with_codecs(serialized) + + # Handle multiple parameters + elif len(self.parameter_types) > 1: + serialized_args = [self._serialize_object(arg) for arg in arguments] + return self._encode_with_codecs(serialized_args) + + # No type constraints + else: + if len(arguments) == 1: + serialized = self._serialize_object(arguments[0]) + return self._encode_with_codecs(serialized) + else: + serialized_args = [self._serialize_object(arg) for arg in arguments] + return self._encode_with_codecs(serialized_args) + + except Exception as e: + raise SerializationException(f"Parameter encoding failed: {e}") from e + + def decode_return_value(self, data: bytes) -> Any: + """ + Decode return value from JSON bytes and validate against self.return_type. + Supports nested generics and marker-wrapped types. + """ + if not data: + return None + + # Step 1: Decode JSON bytes to Python object + json_data = self._decode_with_codecs(data) + + # Step 2: Reconstruct marker-based objects (datetime, UUID, set, frozenset, dataclass, pydantic) + obj = self._reconstruct_objects(json_data) + + # Step 3: Validate type recursively + if self.return_type: + if not self._validate_type(obj, self.return_type): + raise DeserializationException( + f"Decoded object type {type(obj).__name__} does not match expected {self.return_type}" + ) + + return obj + + def _validate_type(self, obj: Any, expected_type: type) -> bool: + """ + Recursively validate obj against expected_type. + Supports Union, List, Tuple, Set, frozenset, dataclass, Enum, Pydantic models. + """ + origin = get_origin(expected_type) + args = get_args(expected_type) + + # Handle Union types + if origin is Union: + return any(self._validate_type(obj, t) for t in args) + + # Handle container types + if origin in (list, tuple, set, frozenset): + if not isinstance(obj, origin): + return False + if args: + return all(self._validate_type(item, args[0]) for item in obj) + return True + + # Dataclass + if hasattr(expected_type, "__dataclass_fields__"): + return hasattr(obj, "__dataclass_fields__") and type(obj) == expected_type + + # Enum + import enum + + if isinstance(expected_type, type) and issubclass(expected_type, enum.Enum): + return isinstance(obj, expected_type) + + # Pydantic + try: + from pydantic import BaseModel + + if issubclass(expected_type, BaseModel): + return isinstance(obj, expected_type) + except Exception: + pass + + # Plain types + return isinstance(obj, expected_type) + + # Encoder/Decoder interface compatibility methods + def encoder(self): + """ + Get the parameter encoder instance (returns self for compatibility). + + :return: Self as encoder. + :rtype: JsonTransportCodec + """ + return self + + def decoder(self): + """ + Get the return value decoder instance (returns self for compatibility). + + :return: Self as decoder. + :rtype: JsonTransportCodec + """ + return self + + def encode(self, arguments: tuple) -> bytes: + """ + Encode method for encoder interface compatibility. + + :param arguments: The method arguments to encode. + :type arguments: tuple + :return: Encoded parameter bytes. + :rtype: bytes + """ + return self.encode_parameters(*arguments) + + def decode(self, data: bytes) -> Any: + """ + Decode method for decoder interface compatibility. + + :param data: The bytes to decode. + :type data: bytes + :return: Decoded return value. + :rtype: Any + """ + return self.decode_return_value(data) + + # Internal serialization methods + def _serialize_object(self, obj: Any, depth: int = 0) -> Any: + """ + Serialize an object using the appropriate type handler. + + :param obj: The object to serialize. + :param depth: Current serialization depth. + :return: Serialized representation. + """ + if depth > self.maximum_depth: + raise SerializationException(f"Maximum depth {self.maximum_depth} exceeded") + + # Handle simple types + if obj is None or isinstance(obj, (bool, int, float, str)): + return obj + + # Handle collections + if isinstance(obj, (list, tuple)): + return [self._serialize_object(item, depth + 1) for item in obj] + + elif isinstance(obj, dict): + result = {} + for key, value in obj.items(): + if not isinstance(key, str): + if self.strict_validation: + raise SerializationException(f"Dictionary key must be string, got {type(key).__name__}") + key = str(key) + result[key] = self._serialize_object(value, depth + 1) + return result + + # Use type handlers for complex objects + obj_type = type(obj) + for handler in self._type_handlers: + if handler.can_serialize_type(obj, obj_type): + try: + serialized = handler.serialize_to_dict(obj) + return self._serialize_object(serialized, depth + 1) + except Exception as e: + if self.strict_validation: + raise SerializationException(f"Handler failed for {type(obj).__name__}: {e}") from e + return {"$error": str(e), "$type": type(obj).__name__} + + # Fallback for unknown types + if self.strict_validation: + raise SerializationException(f"No handler for type {type(obj).__name__}") + return {"$fallback": str(obj), "$type": type(obj).__name__} + + def _encode_with_codecs(self, obj: Any) -> bytes: + """ + Encode object using the first available JSON codec. + + :param obj: The object to encode. + :return: JSON bytes. + :rtype: bytes + """ + last_error = None + + for codec in self._json_codecs: + try: + return codec.encode(obj) + except Exception as e: + last_error = e + continue + + raise SerializationException(f"All JSON codecs failed. Last error: {last_error}") + + def _decode_with_codecs(self, data: bytes) -> Any: + """ + Decode JSON bytes using the first available codec. + + :param data: The JSON bytes to decode. + :return: Decoded object. + :rtype: Any + """ + last_error = None + + for codec in self._json_codecs: + try: + return codec.decode(data) + except Exception as e: + last_error = e + continue + + raise DeserializationException(f"All JSON codecs failed. Last error: {last_error}") + + def _reconstruct_objects(self, data: Any) -> Any: + """ + Reconstruct objects from their serialized form. + + :param data: The data to reconstruct. + :return: Reconstructed object. + :rtype: Any + """ + if not isinstance(data, dict): + if isinstance(data, list): + return [self._reconstruct_objects(item) for item in data] + return data + + if "$date" in data: + from datetime import datetime, timezone + + dt = datetime.fromisoformat(data["$date"].replace("Z", "+00:00")) + return dt.astimezone(timezone.utc) + + elif "$uuid" in data: + from uuid import UUID + + return UUID(data["$uuid"]) + + elif "$set" in data: + return set(self._reconstruct_objects(item) for item in data["$set"]) + + elif "$frozenset" in data: + return frozenset(self._reconstruct_objects(item) for item in data["$frozenset"]) + + elif "$tuple" in data: + return tuple(self._reconstruct_objects(item) for item in data["$tuple"]) + + elif "$binary" in data: + import base64 + + binary_data = base64.b64decode(data["$binary"]) + return binary_data + + elif "$decimal" in data: + from decimal import Decimal + + return Decimal(data["$decimal"]) + + elif "$pydantic" in data and "$data" in data: + return self._reconstruct_pydantic_model(data) + + elif "$dataclass" in data: + return self._reconstruct_dataclass(data) + + elif "$enum" in data: + return self._reconstruct_enum(data) + + else: + return {key: self._reconstruct_objects(value) for key, value in data.items()} + + def _reconstruct_pydantic_model(self, data: dict) -> Any: + """Reconstruct a Pydantic model from serialized data""" + try: + model_path = data.get("$pydantic") or data.get("__pydantic_model__") + model_data = data.get("$data") or data.get("__model_data__") + + module_name, class_name = model_path.rsplit(".", 1) + + import importlib + + module = importlib.import_module(module_name) + model_class = getattr(module, class_name) + + reconstructed_data = self._reconstruct_objects(model_data) + return model_class(**reconstructed_data) + except Exception: + return self._reconstruct_objects(model_data or {}) + + def _reconstruct_dataclass(self, data: dict) -> Any: + """Reconstruct a dataclass from serialized data""" + + class_path = data.get("$dataclass") or data.get("__dataclass__") + fields_data = data.get("$fields") or data.get("fields") + + module_name, class_name = class_path.rsplit(".", 1) + + import importlib + + module = importlib.import_module(module_name) + cls = getattr(module, class_name) + + fields = self._reconstruct_objects(fields_data) + return cls(**fields) + + def _reconstruct_enum(self, data: dict) -> Any: + """Reconstruct an enum from serialized data""" + + enum_path = data.get("$enum") or data.get("__enum__") + enum_value = data.get("$value") or data.get("value") + + module_name, class_name = enum_path.rsplit(".", 1) + + import importlib + + module = importlib.import_module(module_name) + cls = getattr(module, class_name) + + return cls(enum_value) diff --git a/src/dubbo/codec/json_codec/orjson_codec.py b/src/dubbo/codec/json_codec/orjson_codec.py new file mode 100644 index 0000000..6277c8a --- /dev/null +++ b/src/dubbo/codec/json_codec/orjson_codec.py @@ -0,0 +1,109 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from datetime import date, datetime, time +from decimal import Decimal +from pathlib import Path +from typing import Any +from uuid import UUID + +from dubbo.codec.json_codec import JsonCodec + +__all__ = ["OrJsonCodec"] + + +class OrJsonCodec(JsonCodec): + """ + orjson codec implementation for high-performance JSON encoding/decoding. + + Uses the orjson library if available, otherwise falls back gracefully. + """ + + def __init__(self): + try: + import orjson + + self.orjson = orjson + self.available = True + except ImportError: + self.available = False + + def encode(self, obj: Any) -> bytes: + """ + Encode an object to JSON bytes using orjson. + + :param obj: The object to encode. + :type obj: Any + :return: The encoded JSON bytes. + :rtype: bytes + """ + if not self.available: + raise ImportError("orjson not available") + return self.orjson.dumps(obj, default=self._default_handler) + + def decode(self, data: bytes) -> Any: + """ + Decode JSON bytes to an object using orjson. + + :param data: The JSON bytes to decode. + :type data: bytes + :return: The decoded object. + :rtype: Any + """ + if not self.available: + raise ImportError("orjson not available") + return self.orjson.loads(data) + + def can_handle(self, obj: Any) -> bool: + """ + Check if this codec can handle the given object. + + :param obj: The object to check. + :type obj: Any + :return: True if orjson is available. + :rtype: bool + """ + return self.available + + def _default_handler(self, obj): + """ + Handle types not supported natively by orjson. + + :param obj: The object to serialize. + :return: Serialized representation. + """ + if isinstance(obj, datetime): + iso_string = obj.isoformat() + if obj.tzinfo is None: + iso_string += "Z" + elif str(obj.tzinfo) == "UTC" or obj.utcoffset().total_seconds() == 0: + iso_string = iso_string.replace("+00:00", "Z") + return {"$date": iso_string} + elif isinstance(obj, date): + return {"$dateOnly": obj.isoformat()} + elif isinstance(obj, time): + return {"$timeOnly": obj.isoformat()} + elif isinstance(obj, Decimal): + return {"$decimal": str(obj)} + elif isinstance(obj, set): + return {"$set": list(obj)} + elif isinstance(obj, frozenset): + return {"$frozenset": list(obj)} + elif isinstance(obj, UUID): + return {"$uuid": str(obj)} + elif isinstance(obj, Path): + return {"$path": str(obj)} + return {"$fallback": str(obj), "$type": type(obj).__name__} diff --git a/src/dubbo/codec/json_codec/pydantic_handler.py b/src/dubbo/codec/json_codec/pydantic_handler.py new file mode 100644 index 0000000..92410dd --- /dev/null +++ b/src/dubbo/codec/json_codec/pydantic_handler.py @@ -0,0 +1,88 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Any, Optional + +from dubbo.codec.json_codec import TypeHandler + +__all__ = ["PydanticHandler"] + + +class PydanticHandler(TypeHandler): + """ + Type handler for Pydantic models. + + Handles serialization of Pydantic BaseModel instances with proper + model reconstruction support. + """ + + def __init__(self): + try: + from pydantic import BaseModel, create_model + + self.BaseModel = BaseModel + self.create_model = create_model + self.available = True + except ImportError: + self.available = False + + def can_serialize_type(self, obj: Any, obj_type: type) -> bool: + """ + Check if this handler can serialize Pydantic models. + + :param obj: The object to check. + :type obj: Any + :param obj_type: The type of the object. + :type obj_type: type + :return: True if object is a Pydantic BaseModel and library is available. + :rtype: bool + """ + return self.available and isinstance(obj, self.BaseModel) + + def serialize_to_dict(self, obj: Any) -> dict[str, Any]: + """ + Serialize Pydantic model to dictionary representation. + + :param obj: The Pydantic model to serialize. + :type obj: BaseModel + :return: Dictionary representation with model metadata. + :rtype: dict[str, Any] + """ + if not self.available: + raise ImportError("Pydantic not available") + + # Use model_dump if available (Pydantic v2), otherwise use dict (Pydantic v1) + if hasattr(obj, "model_dump"): + model_data = obj.model_dump() + else: + model_data = obj.dict() + + return { + "__pydantic_model__": f"{obj.__class__.__module__}.{obj.__class__.__qualname__}", + "__model_data__": model_data, + } + + def create_parameter_model(self, parameter_types: Optional[list[type]] = None): + """ + Create a Pydantic model for parameter wrapping. + + :param parameter_types: List of parameter types to wrap. + :type parameter_types: Optional[list[type]] + :return: Dynamically created Pydantic model or None. + """ + if not self.available or parameter_types is None: + return None + + model_fields = {f"param_{i}": (param_type, ...) for i, param_type in enumerate(parameter_types)} + return self.create_model("ParametersModel", **model_fields) diff --git a/src/dubbo/codec/json_codec/simple_types_handler.py b/src/dubbo/codec/json_codec/simple_types_handler.py new file mode 100644 index 0000000..835ee8d --- /dev/null +++ b/src/dubbo/codec/json_codec/simple_types_handler.py @@ -0,0 +1,60 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from pathlib import Path +from typing import Any, Union +from uuid import UUID + +from dubbo.codec.json_codec import TypeHandler + +__all__ = ["SimpleTypesHandler"] + + +class SimpleTypesHandler(TypeHandler): + """ + Type handler for simple types like UUID and Path. + + Handles serialization of UUID and Path objects to string representations. + """ + + def can_serialize_type(self, obj: Any, obj_type: type) -> bool: + """ + Check if this handler can serialize simple types. + + :param obj: The object to check. + :type obj: Any + :param obj_type: The type of the object. + :type obj_type: type + :return: True if object is UUID or Path. + :rtype: bool + """ + return obj_type in (UUID, Path) or isinstance(obj, Path) + + def serialize_to_dict(self, obj: Union[UUID, Path]) -> dict[str, str]: + """ + Serialize UUID or Path to dictionary representation. + + :param obj: The object to serialize. + :type obj: Union[UUID, Path] + :return: Dictionary representation with type marker. + :rtype: Dict[str, str] + """ + if isinstance(obj, UUID): + return {"$uuid": str(obj)} + elif isinstance(obj, Path): + return {"$path": str(obj)} + else: + raise ValueError(f"Unsupported simple type: {type(obj)}") diff --git a/src/dubbo/codec/json_codec/standard_json.py b/src/dubbo/codec/json_codec/standard_json.py new file mode 100644 index 0000000..150ccfc --- /dev/null +++ b/src/dubbo/codec/json_codec/standard_json.py @@ -0,0 +1,65 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +from typing import Any + +from dubbo.codec.json_codec import JsonCodec + +__all__ = ["StandardJsonCodec"] + + +class StandardJsonCodec(JsonCodec): + """ + Standard library JSON codec implementation. + + Uses Python's built-in json module for encoding and decoding. + This is the fallback codec that can handle any object. + """ + + def encode(self, obj: Any) -> bytes: + """ + Encode an object to JSON bytes using standard library. + + :param obj: The object to encode. + :type obj: Any + :return: The encoded JSON bytes. + :rtype: bytes + """ + return json.dumps(obj, ensure_ascii=False, separators=(",", ":")).encode("utf-8") + + def decode(self, data: bytes) -> Any: + """ + Decode JSON bytes to an object using standard library. + + :param data: The JSON bytes to decode. + :type data: bytes + :return: The decoded object. + :rtype: Any + """ + return json.loads(data.decode("utf-8")) + + def can_handle(self, obj: Any) -> bool: + """ + Check if this codec can handle the given object. + Standard JSON can handle any object as fallback. + + :param obj: The object to check. + :type obj: Any + :return: Always True (fallback codec). + :rtype: bool + """ + return True diff --git a/src/dubbo/codec/json_codec/ujson_codec.py b/src/dubbo/codec/json_codec/ujson_codec.py new file mode 100644 index 0000000..595261b --- /dev/null +++ b/src/dubbo/codec/json_codec/ujson_codec.py @@ -0,0 +1,109 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from datetime import date, datetime, time +from decimal import Decimal +from pathlib import Path +from typing import Any +from uuid import UUID + +from dubbo.codec.json_codec import JsonCodec + +__all__ = ["UJsonCodec"] + + +class UJsonCodec(JsonCodec): + """ + ujson codec implementation for high-performance JSON encoding/decoding. + + Uses the ujson library if available, otherwise falls back gracefully. + """ + + def __init__(self): + try: + import ujson + + self.ujson = ujson + self.available = True + except ImportError: + self.available = False + + def encode(self, obj: Any) -> bytes: + """ + Encode an object to JSON bytes using ujson. + + :param obj: The object to encode. + :type obj: Any + :return: The encoded JSON bytes. + :rtype: bytes + """ + if not self.available: + raise ImportError("ujson not available") + return self.ujson.dumps(obj, ensure_ascii=False, default=self._default_handler).encode("utf-8") + + def decode(self, data: bytes) -> Any: + """ + Decode JSON bytes to an object using ujson. + + :param data: The JSON bytes to decode. + :type data: bytes + :return: The decoded object. + :rtype: Any + """ + if not self.available: + raise ImportError("ujson not available") + return self.ujson.loads(data.decode("utf-8")) + + def can_handle(self, obj: Any) -> bool: + """ + Check if this codec can handle the given object. + + :param obj: The object to check. + :type obj: Any + :return: True if ujson is available. + :rtype: bool + """ + return self.available + + def _default_handler(self, obj): + """ + Handle types not supported natively by ujson. + + :param obj: The object to serialize. + :return: Serialized representation. + """ + if isinstance(obj, datetime): + iso_string = obj.isoformat() + if obj.tzinfo is None: + iso_string += "Z" + elif str(obj.tzinfo) == "UTC" or obj.utcoffset().total_seconds() == 0: + iso_string = iso_string.replace("+00:00", "Z") + return {"$date": iso_string} + elif isinstance(obj, date): + return {"$dateOnly": obj.isoformat()} + elif isinstance(obj, time): + return {"$timeOnly": obj.isoformat()} + elif isinstance(obj, Decimal): + return {"$decimal": str(obj)} + elif isinstance(obj, set): + return {"$set": list(obj)} + elif isinstance(obj, frozenset): + return {"$frozenset": list(obj)} + elif isinstance(obj, UUID): + return {"$uuid": str(obj)} + elif isinstance(obj, Path): + return {"$path": str(obj)} + return {"$fallback": str(obj), "$type": type(obj).__name__} diff --git a/src/dubbo/codec/protobuf_codec/__init__.py b/src/dubbo/codec/protobuf_codec/__init__.py new file mode 100644 index 0000000..4bbf7b4 --- /dev/null +++ b/src/dubbo/codec/protobuf_codec/__init__.py @@ -0,0 +1,33 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._interface import ProtobufDecoder, ProtobufEncoder +from .betterproto_handler import BetterprotoMessageHandler +from .primitive_handler import PrimitiveHandler +from .protobuf_codec import ProtobufTransportCodec, ProtobufTransportDecoder, ProtobufTransportEncoder +from .protoc_handler import GoogleProtobufMessageHandler + +__all__ = [ + "ProtobufTransportCodec", + "ProtobufTransportDecoder", + "ProtobufTransportEncoder", + "ProtobufEncoder", + "ProtobufDecoder", + "BetterprotoMessageHandler", + "PrimitiveHandler", + "GoogleProtobufMessageHandler", + "PrimitiveHandler", +] diff --git a/src/dubbo/codec/protobuf_codec/_interface.py b/src/dubbo/codec/protobuf_codec/_interface.py new file mode 100644 index 0000000..df188b7 --- /dev/null +++ b/src/dubbo/codec/protobuf_codec/_interface.py @@ -0,0 +1,103 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import abc +from typing import Any, Optional + +__all__ = [ + "SerializationException", + "DeserializationException", + "ProtobufSerialization", + "ProtobufEncoder", + "ProtobufDecoder", +] + + +class SerializationException(Exception): + """Exception raised when encoding or serialization fails.""" + + def __init__(self, message: str, *, cause: Optional[Exception] = None): + super().__init__(message) + self.cause = cause + + +class DeserializationException(Exception): + """Exception raised when decoding or deserialization fails.""" + + def __init__(self, message: str, *, cause: Optional[Exception] = None): + super().__init__(message) + self.cause = cause + + +class ProtobufSerialization(abc.ABC): + """ + The protobuf serialization interface. + """ + + @classmethod + @abc.abstractmethod + def get_serialization_type(cls) -> str: + """ + Get serialization type of current implementation + :return: The serialization type. + :rtype: str + """ + raise NotImplementedError() + + @abc.abstractmethod + def can_handle(self, obj: Any, obj_type: Optional[type] = None) -> bool: + """ + Check if this serialization can handle the given object/type + :param obj: The object to check + :param obj_type: The type to check + :return: True if can handle, False otherwise + :rtype: bool + """ + raise NotImplementedError() + + +class ProtobufEncoder(ProtobufSerialization, abc.ABC): + """ + The protobuf encoding interface. + """ + + @abc.abstractmethod + def encode(self, obj: Any, obj_type: Optional[type] = None) -> bytes: + """ + Encode the object to bytes. + :param obj: The object to encode. + :param obj_type: The type hint for encoding. + :return: The encoded bytes. + :rtype: bytes + """ + raise NotImplementedError() + + +class ProtobufDecoder(ProtobufSerialization, abc.ABC): + """ + The protobuf decoding interface. + """ + + @abc.abstractmethod + def decode(self, data: bytes, target_type: type) -> Any: + """ + Decode the data to object. + :param data: The data to decode. + :param target_type: The target type for decoding. + :return: The decoded object. + :rtype: Any + """ + raise NotImplementedError() diff --git a/src/dubbo/codec/protobuf_codec/betterproto_handler.py b/src/dubbo/codec/protobuf_codec/betterproto_handler.py new file mode 100644 index 0000000..605cad8 --- /dev/null +++ b/src/dubbo/codec/protobuf_codec/betterproto_handler.py @@ -0,0 +1,196 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +from typing import Any, Optional + +from ._interface import DeserializationException, ProtobufDecoder, ProtobufEncoder, SerializationException + +try: + import betterproto + + HAS_BETTERPROTO = True +except ImportError: + HAS_BETTERPROTO = False + +__all__ = ["BetterprotoMessageHandler", "PrimitiveHandler"] + + +class BetterprotoMessageHandler(ProtobufEncoder, ProtobufDecoder): + """ + The BetterProto message handler for protobuf messages. + """ + + _SERIALIZATION_TYPE = "betterproto" + + def __init__(self): + if not HAS_BETTERPROTO: + raise ImportError("betterproto library is required for BetterprotoMessageHandler") + + @classmethod + def get_serialization_type(cls) -> str: + """ + Get serialization type of current implementation + :return: The serialization type. + :rtype: str + """ + return cls._SERIALIZATION_TYPE + + def can_handle(self, obj: Any, obj_type: Optional[type] = None) -> bool: + """ + Check if this handler can handle the given object/type + :param obj: The object to check + :param obj_type: The type to check + :return: True if can handle, False otherwise + :rtype: bool + """ + if obj is not None and isinstance(obj, betterproto.Message): + return True + if obj_type is not None: + return self._is_betterproto_message(obj_type) + return False + + def encode(self, obj: Any, obj_type: Optional[type] = None) -> bytes: + """ + Encode the betterproto message to bytes. + :param obj: The message to encode. + :param obj_type: The type hint for encoding. + :return: The encoded bytes. + :rtype: bytes + """ + try: + if isinstance(obj, betterproto.Message): + return bytes(obj) + + if obj_type and self._is_betterproto_message(obj_type): + if isinstance(obj, obj_type): + return bytes(obj) + elif isinstance(obj, dict): + message = obj_type().from_dict(obj) + return bytes(message) + else: + raise SerializationException(f"Cannot convert {type(obj)} to {obj_type}") + + raise SerializationException(f"Cannot encode {type(obj)} as betterproto message") + except Exception as e: + raise SerializationException(f"BetterProto encoding failed: {e}") from e + + def decode(self, data: bytes, target_type: type) -> Any: + """ + Decode the data to betterproto message. + :param data: The data to decode. + :param target_type: The target message type. + :return: The decoded message. + :rtype: Any + """ + try: + if not self._is_betterproto_message(target_type): + raise DeserializationException(f"{target_type} is not a betterproto message type") + return target_type().parse(data) + except Exception as e: + raise DeserializationException(f"BetterProto decoding failed: {e}") from e + + def _is_betterproto_message(self, obj_type: type) -> bool: + """Check if the type is a betterproto message""" + try: + return hasattr(obj_type, "__dataclass_fields__") and issubclass(obj_type, betterproto.Message) + except (TypeError, AttributeError): + return False + + +class PrimitiveHandler(ProtobufEncoder, ProtobufDecoder): + """ + The primitive type handler for basic Python types. + """ + + _SERIALIZATION_TYPE = "primitive" + + @classmethod + def get_serialization_type(cls) -> str: + """ + Get serialization type of current implementation + :return: The serialization type. + :rtype: str + """ + return cls._SERIALIZATION_TYPE + + def can_handle(self, obj: Any, obj_type: Optional[type] = None) -> bool: + """ + Check if this handler can handle the given object/type + :param obj: The object to check + :param obj_type: The type to check + :return: True if can handle, False otherwise + :rtype: bool + """ + if obj is not None: + return isinstance(obj, (str, int, float, bool, bytes)) + if obj_type is not None: + return obj_type in (str, int, float, bool, bytes) + return False + + def encode(self, obj: Any, obj_type: Optional[type] = None) -> bytes: + """ + Encode the primitive object to bytes. + :param obj: The object to encode. + :param obj_type: The type hint for encoding. + :return: The encoded bytes. + :rtype: bytes + """ + try: + if not isinstance(obj, (str, int, float, bool, bytes)): + raise SerializationException(f"Cannot encode {type(obj)} as primitive") + + json_str = json.dumps({"value": obj, "type": type(obj).__name__}) + return json_str.encode("utf-8") + except Exception as e: + raise SerializationException(f"Primitive encoding failed: {e}") from e + + def decode(self, data: bytes, target_type: type) -> Any: + """ + Decode the data to primitive object. + :param data: The data to decode. + :param target_type: The target primitive type. + :return: The decoded object. + :rtype: Any + """ + try: + if target_type not in (str, int, float, bool, bytes): + raise DeserializationException(f"{target_type} is not a supported primitive type") + + json_str = data.decode("utf-8") + parsed = json.loads(json_str) + value = parsed.get("value") + + if target_type is str: + return str(value) + elif target_type is int: + return int(value) + elif target_type is float: + return float(value) + elif target_type is bool: + return bool(value) + elif target_type is bytes: + if isinstance(value, bytes): + return value + elif isinstance(value, list): + return bytes(value) + else: + return str(value).encode() + else: + return value + + except Exception as e: + raise DeserializationException(f"Primitive decoding failed: {e}") from e diff --git a/src/dubbo/codec/protobuf_codec/primitive_handler.py b/src/dubbo/codec/protobuf_codec/primitive_handler.py new file mode 100644 index 0000000..8a68091 --- /dev/null +++ b/src/dubbo/codec/protobuf_codec/primitive_handler.py @@ -0,0 +1,107 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +from typing import Any, Optional + +from ._interface import DeserializationException, ProtobufDecoder, ProtobufEncoder, SerializationException + +__all__ = ["PrimitiveHandler"] + + +class PrimitiveHandler(ProtobufEncoder, ProtobufDecoder): + """ + The primitive type handler for basic Python types. + """ + + _SERIALIZATION_TYPE = "primitive" + + @classmethod + def get_serialization_type(cls) -> str: + """ + Get serialization type of current implementation + :return: The serialization type. + :rtype: str + """ + return cls._SERIALIZATION_TYPE + + def can_handle(self, obj: Any, obj_type: Optional[type] = None) -> bool: + """ + Check if this handler can handle the given object/type + :param obj: The object to check + :param obj_type: The type to check + :return: True if can handle, False otherwise + :rtype: bool + """ + if obj is not None: + return isinstance(obj, (str, int, float, bool, bytes)) + if obj_type is not None: + return obj_type in (str, int, float, bool, bytes) + return False + + def encode(self, obj: Any, obj_type: Optional[type] = None) -> bytes: + """ + Encode the primitive object to bytes. + :param obj: The object to encode. + :param obj_type: The type hint for encoding. + :return: The encoded bytes. + :rtype: bytes + """ + try: + if not isinstance(obj, (str, int, float, bool, bytes)): + raise SerializationException(f"Cannot encode {type(obj)} as primitive") + + json_str = json.dumps({"value": obj, "type": type(obj).__name__}) + return json_str.encode("utf-8") + except Exception as e: + raise SerializationException(f"Primitive encoding failed: {e}") from e + + def decode(self, data: bytes, target_type: type) -> Any: + """ + Decode the data to primitive object. + :param data: The data to decode. + :param target_type: The target primitive type. + :return: The decoded object. + :rtype: Any + """ + try: + if target_type not in (str, int, float, bool, bytes): + raise DeserializationException(f"{target_type} is not a supported primitive type") + + json_str = data.decode("utf-8") + parsed = json.loads(json_str) + value = parsed.get("value") + + if target_type is str: + return str(value) + elif target_type is int: + return int(value) + elif target_type is float: + return float(value) + elif target_type is bool: + return bool(value) + elif target_type is bytes: + if isinstance(value, bytes): + return value + elif isinstance(value, list): + return bytes(value) + else: + return str(value).encode() + else: + return value + + except Exception as e: + raise DeserializationException(f"Primitive decoding failed: {e}") from e diff --git a/src/dubbo/codec/protobuf_codec/protobuf_codec.py b/src/dubbo/codec/protobuf_codec/protobuf_codec.py new file mode 100644 index 0000000..c175bb3 --- /dev/null +++ b/src/dubbo/codec/protobuf_codec/protobuf_codec.py @@ -0,0 +1,169 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Any, Optional + +from ._interface import DeserializationException, ProtobufDecoder, ProtobufEncoder, SerializationException + +__all__ = ["ProtobufTransportCodec"] + + +class ProtobufTransportEncoder: + """Protobuf encoder for parameters""" + + def __init__(self, handlers: list[ProtobufEncoder], parameter_types: Optional[list[type]] = None): + self._handlers = handlers + self._parameter_types = parameter_types or [] + + def encode(self, arguments: tuple) -> bytes: + """Encode arguments tuple to bytes""" + try: + if not arguments: + return b"" + if len(arguments) == 1: + return self._encode_single(arguments[0]) + raise SerializationException( + f"Multiple parameters not supported. Got {len(arguments)} arguments, expected 1." + ) + except Exception as e: + raise SerializationException(f"Parameter encoding failed: {e}") from e + + def _encode_single(self, argument: Any) -> bytes: + """Encode a single argument""" + if argument is None: + return b"" + + # Try to get parameter type from configuration + param_type = self._parameter_types[0] if self._parameter_types else None + + for handler in self._handlers: + if handler.can_handle(argument, param_type): + return handler.encode(argument, param_type) + + raise SerializationException(f"No handler found for {type(argument)}") + + +class ProtobufTransportDecoder: + """Protobuf decoder for return values""" + + def __init__(self, handlers: list[ProtobufDecoder], return_type: Optional[type] = None): + self._handlers = handlers + self._return_type = return_type + + def decode(self, data: bytes) -> Any: + """Decode bytes to return value""" + try: + if not data: + return None + if not self._return_type: + raise DeserializationException("No return_type specified for decoding") + + for handler in self._handlers: + if handler.can_handle(None, self._return_type): + return handler.decode(data, self._return_type) + + raise DeserializationException(f"No handler found for {self._return_type}") + except Exception as e: + raise DeserializationException(f"Return value decoding failed: {e}") from e + + +class ProtobufTransportCodec: + """ + Main protobuf codec class + """ + + def __init__( + self, + parameter_types: Optional[list[type]] = None, + return_type: Optional[type] = None, + **kwargs, + ): + self._parameter_types = parameter_types or [] + self._return_type = return_type + + # Initialize handlers + self._encoders: list[ProtobufEncoder] = [] + self._decoders: list[ProtobufDecoder] = [] + + # Load default handlers + self._load_default_handlers() + + def _load_default_handlers(self): + """Load default encoding and decoding handlers""" + from dubbo.extension import extensionLoader + + # Try BetterProto handler + try: + betterproto_handler = extensionLoader.get_extension(ProtobufEncoder, "betterproto")() + self._encoders.append(betterproto_handler) + self._decoders.append(betterproto_handler) + except ImportError: + print("Warning: BetterProto handler not available") + + # Try Google Protoc handler + try: + protoc_handler = extensionLoader.get_extension(ProtobufEncoder, "googleproto")() + self._encoders.append(protoc_handler) + self._decoders.append(protoc_handler) + except ImportError: + print("Warning: Protoc handler not available") + + # Always load primitive handler + primitive_handler = extensionLoader.get_extension(ProtobufEncoder, "primitive")() + self._encoders.append(primitive_handler) + self._decoders.append(primitive_handler) + + def encoder(self) -> ProtobufTransportEncoder: + """ + Create and return an encoder instance. + """ + return ProtobufTransportEncoder(self._encoders, self._parameter_types) + + def decoder(self) -> ProtobufTransportDecoder: + """ + Create and return a decoder instance. + """ + return ProtobufTransportDecoder(self._decoders, self._return_type) + + # Convenience methods for direct usage (backward compatibility) + def encode_parameter(self, argument: Any) -> bytes: + """Encode a single parameter""" + encoder = self.encoder() + return encoder.encode((argument,)) + + def encode_parameters(self, arguments: tuple) -> bytes: + """Encode parameters tuple""" + encoder = self.encoder() + return encoder.encode(arguments) + + def decode_return_value(self, data: bytes) -> Any: + """Decode return value""" + decoder = self.decoder() + return decoder.decode(data) + + def register_encoder(self, encoder: ProtobufEncoder): + """Register a custom encoder""" + self._encoders.append(encoder) + + def register_decoder(self, decoder: ProtobufDecoder): + """Register a custom decoder""" + self._decoders.append(decoder) + + def get_encoders(self) -> list[ProtobufEncoder]: + """Get all registered encoders""" + return self._encoders.copy() + + def get_decoders(self) -> list[ProtobufDecoder]: + """Get all registered decoders""" + return self._decoders.copy() diff --git a/src/dubbo/codec/protobuf_codec/protoc_handler.py b/src/dubbo/codec/protobuf_codec/protoc_handler.py new file mode 100644 index 0000000..5dda34e --- /dev/null +++ b/src/dubbo/codec/protobuf_codec/protoc_handler.py @@ -0,0 +1,83 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Any, Optional + +from ._interface import DeserializationException, ProtobufDecoder, ProtobufEncoder, SerializationException + +try: + from google.protobuf.message import Message as GoogleMessage + + HAS_PROTOC = True +except ImportError: + HAS_PROTOC = False + + +class GoogleProtobufMessageHandler(ProtobufEncoder, ProtobufDecoder): + """ + The Google protoc message handler for protobuf messages. + """ + + _SERIALIZATION_TYPE = "protoc" + + def __init__(self): + if not HAS_PROTOC: + raise ImportError("google.protobuf is required for GoogleProtobufMessageHandler") + + @classmethod + def get_serialization_type(cls) -> str: + return cls._SERIALIZATION_TYPE + + def can_handle(self, obj: Any, obj_type: Optional[type] = None) -> bool: + if obj is not None and HAS_PROTOC and isinstance(obj, GoogleMessage): + return True + if obj_type is not None: + return self._is_protoc_message(obj_type) + return False + + def encode(self, obj: Any, obj_type: Optional[type] = None) -> bytes: + try: + if isinstance(obj, GoogleMessage): + return obj.SerializeToString() + + if obj_type and self._is_protoc_message(obj_type): + if isinstance(obj, obj_type): + return obj.SerializeToString() + elif isinstance(obj, dict): + message = obj_type(**obj) + return message.SerializeToString() + else: + raise SerializationException(f"Cannot convert {type(obj)} to {obj_type}") + + raise SerializationException(f"Cannot encode {type(obj)} as protoc message") + except Exception as e: + raise SerializationException(f"Protoc encoding failed: {e}") from e + + def decode(self, data: bytes, target_type: type) -> Any: + try: + if not self._is_protoc_message(target_type): + raise DeserializationException(f"{target_type} is not a protoc message type") + message = target_type() + message.ParseFromString(data) + return message + except Exception as e: + raise DeserializationException(f"Protoc decoding failed: {e}") from e + + def _is_protoc_message(self, obj_type: type) -> bool: + try: + return HAS_PROTOC and issubclass(obj_type, GoogleMessage) + except (TypeError, AttributeError): + return False diff --git a/src/dubbo/extension/registries.py b/src/dubbo/extension/registries.py index cf23ae7..08b40a7 100644 --- a/src/dubbo/extension/registries.py +++ b/src/dubbo/extension/registries.py @@ -18,6 +18,9 @@ from typing import Any from dubbo.cluster import LoadBalance +from dubbo.codec import Codec +from dubbo.codec.json_codec import TypeHandler +from dubbo.codec.protobuf_codec import ProtobufEncoder from dubbo.compression import Compressor, Decompressor from dubbo.protocol import Protocol from dubbo.registry import RegistryFactory @@ -47,6 +50,9 @@ class ExtendedRegistry: "compressorRegistry", "decompressorRegistry", "transporterRegistry", + "codecRegistry", + "jsonTypeHandlerRegistry", + "protoHandlerRegistry", ] # RegistryFactory registry @@ -84,7 +90,6 @@ class ExtendedRegistry: }, ) - # Decompressor registry decompressorRegistry = ExtendedRegistry( interface=Decompressor, @@ -95,7 +100,6 @@ class ExtendedRegistry: }, ) - # Transporter registry transporterRegistry = ExtendedRegistry( interface=Transporter, @@ -103,3 +107,36 @@ class ExtendedRegistry: "aio": "dubbo.remoting.aio.aio_transporter.AioTransporter", }, ) + +# Codec registry +codecRegistry = ExtendedRegistry( + interface=Codec, + impls={ + "json": "dubbo.codec.json_codec.JsonTransportCodec", + "protobuf": "dubbo.codec.protobuf_codec.ProtobufTransportCodec", + }, +) + +# Protobuf handler registry +protoHandlerRegistry = ExtendedRegistry( + interface=ProtobufEncoder, + impls={ + "betterproto": "dubbo.codec.protobuf_codec.BetterprotoMessageHandler", + "primitive": "dubbo.codec.protobuf_codec.PrimitiveHandler", + "googleproto": "dubbo.codec.protobuf_codec.GoogleProtobufMessageHandler", + }, +) + +# JSON type handler registry +jsonTypeHandlerRegistry = ExtendedRegistry( + interface=TypeHandler, + impls={ + "datetime": "dubbo.codec.json_codec.DateTimeHandler", + "decimal": "dubbo.codec.json_codec.DecimalHandler", + "collection": "dubbo.codec.json_codec.CollectionHandler", + "enum": "dubbo.codec.json_codec.EnumHandler", + "dataclass": "dubbo.codec.json_codec.DataclassHandler", + "simple": "dubbo.codec.json_codec.SimpleTypesHandler", + "pydantic": "dubbo.codec.json_codec.PydanticHandler", + }, +) diff --git a/src/dubbo/protocol/triple/protocol.py b/src/dubbo/protocol/triple/protocol.py index f4aa4d3..f5333cd 100644 --- a/src/dubbo/protocol/triple/protocol.py +++ b/src/dubbo/protocol/triple/protocol.py @@ -30,9 +30,7 @@ from dubbo.remoting import Server, Transporter from dubbo.remoting.aio import constants as aio_constants from dubbo.remoting.aio.http2.protocol import Http2ClientProtocol, Http2ServerProtocol -from dubbo.remoting.aio.http2.stream_handler import ( - StreamClientMultiplexHandler -) +from dubbo.remoting.aio.http2.stream_handler import StreamClientMultiplexHandler from dubbo.url import URL _LOGGER = loggerFactory.get_logger() diff --git a/src/dubbo/proxy/handlers.py b/src/dubbo/proxy/handlers.py index 8c89663..498ce3d 100644 --- a/src/dubbo/proxy/handlers.py +++ b/src/dubbo/proxy/handlers.py @@ -14,9 +14,11 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import Callable, Optional +import inspect +from typing import Any, Callable, Optional, get_type_hints from dubbo.classes import MethodDescriptor +from dubbo.codec import DubboSerializationService from dubbo.types import ( DeserializingFunction, RpcTypes, @@ -26,9 +28,17 @@ __all__ = ["RpcMethodHandler", "RpcServiceHandler"] +class RpcMethodConfigurationError(Exception): + """ + Raised when an RPC method is configured incorrectly. + """ + + pass + + class RpcMethodHandler: """ - Rpc method handler + Rpc method handler that wraps metadata and serialization logic for a callable. """ __slots__ = ["_method_descriptor"] @@ -36,7 +46,7 @@ class RpcMethodHandler: def __init__(self, method_descriptor: MethodDescriptor): """ Initialize the RpcMethodHandler - :param method_descriptor: the method descriptor. + :param method_descriptor: the method descriptor :type method_descriptor: MethodDescriptor """ self._method_descriptor = method_descriptor @@ -50,34 +60,117 @@ def method_descriptor(self) -> MethodDescriptor: """ return self._method_descriptor + @staticmethod + def get_codec(**kwargs) -> tuple: + """ + Get serialization and deserialization functions + :param kwargs: codec configuration like transport_type, parameter_types, return_type + :return: serializer and deserializer functions + :rtype: Tuple[SerializingFunction, DeserializingFunction] + """ + return DubboSerializationService.create_serialization_functions(**kwargs) + + @classmethod + def _infer_types_from_method(cls, method: Callable) -> tuple: + """ + Infer method name, parameter types, and return type + :param method: the callable method + :type method: Callable + :return: tuple(method_name, param_types, return_type) + """ + try: + type_hints = get_type_hints(method) + sig = inspect.signature(method) + method_name = method.__name__ + params = list(sig.parameters.values()) + + # Detect unbound methods + if params and params[0].name == "self": + raise RpcMethodConfigurationError( + f"Method '{method_name}' appears unbound with 'self'. Pass a bound method or standalone function." + ) + + params_types = [type_hints.get(p.name, Any) for p in params] + return_type = type_hints.get("return", Any) + return method_name, params_types, return_type + except RpcMethodConfigurationError: + raise + except Exception: + return method.__name__, [Any], Any + + @classmethod + def _create_method_descriptor( + cls, + method: Callable, + method_name: str, + params_types: list[type], + return_type: type, + rpc_type: str, + codec: Optional[str] = None, + request_deserializer: Optional[DeserializingFunction] = None, + response_serializer: Optional[SerializingFunction] = None, + **kwargs, + ) -> MethodDescriptor: + """ + Create a MethodDescriptor with serialization configuration + :param method: callable method + :param method_name: RPC method name + :param params_types: parameter types + :param return_type: return type + :param rpc_type: RPC type (unary, client_stream, server_stream, bi_stream) + :param codec: serialization codec + :param request_deserializer: request deserialization function + :param response_serializer: response serialization function + :param kwargs: additional codec arguments + :return: MethodDescriptor instance + :rtype: MethodDescriptor + """ + if request_deserializer is None or response_serializer is None: + codec_kwargs = { + "transport_type": codec, + "parameter_types": params_types, + "return_type": return_type, + **kwargs, + } + serializer, deserializer = cls.get_codec(**codec_kwargs) + request_deserializer = request_deserializer or deserializer + response_serializer = response_serializer or serializer + + return MethodDescriptor( + callable_method=method, + method_name=method_name or method.__name__, + arg_serialization=(None, request_deserializer), + return_serialization=(response_serializer, None), + rpc_type=rpc_type, + ) + @classmethod def unary( cls, method: Callable, method_name: Optional[str] = None, + params_types: Optional[list[type]] = None, + return_type: Optional[type] = None, + codec: Optional[str] = None, request_deserializer: Optional[DeserializingFunction] = None, response_serializer: Optional[SerializingFunction] = None, + **kwargs, ) -> "RpcMethodHandler": """ Create a unary method handler - :param method: the method. - :type method: Callable - :param method_name: the method name. If not provided, the method name will be used. - :type method_name: Optional[str] - :param request_deserializer: the request deserializer. - :type request_deserializer: Optional[DeserializingFunction] - :param response_serializer: the response serializer. - :type response_serializer: Optional[SerializingFunction] - :return: the unary method handler. - :rtype: RpcMethodHandler """ + name, param_types, ret_type = cls._infer_types_from_method(method) return cls( - MethodDescriptor( - callable_method=method, - method_name=method_name or method.__name__, - arg_serialization=(None, request_deserializer), - return_serialization=(response_serializer, None), + cls._create_method_descriptor( + method=method, + method_name=method_name or name, + params_types=params_types or param_types, + return_type=return_type or ret_type, rpc_type=RpcTypes.UNARY.value, + codec=codec or "json", + request_deserializer=request_deserializer, + response_serializer=response_serializer, + **kwargs, ) ) @@ -86,29 +179,28 @@ def client_stream( cls, method: Callable, method_name: Optional[str] = None, + params_types: Optional[list[type]] = None, + return_type: Optional[type] = None, + codec: Optional[str] = None, request_deserializer: Optional[DeserializingFunction] = None, response_serializer: Optional[SerializingFunction] = None, - ): + **kwargs, + ) -> "RpcMethodHandler": """ - Create a client stream method handler - :param method: the method. - :type method: Callable - :param method_name: the method name. If not provided, the method name will be used. - :type method_name: Optional[str] - :param request_deserializer: the request deserializer. - :type request_deserializer: Optional[DeserializingFunction] - :param response_serializer: the response serializer. - :type response_serializer: Optional[SerializingFunction] - :return: the client stream method handler. - :rtype: RpcMethodHandler + Create a client-streaming method handler """ + name, param_types, ret_type = cls._infer_types_from_method(method) return cls( - MethodDescriptor( - callable_method=method, - method_name=method_name or method.__name__, - arg_serialization=(None, request_deserializer), - return_serialization=(response_serializer, None), + cls._create_method_descriptor( + method=method, + method_name=method_name or name, + params_types=params_types or param_types, + return_type=return_type or ret_type, rpc_type=RpcTypes.CLIENT_STREAM.value, + codec=codec or "json", + request_deserializer=request_deserializer, + response_serializer=response_serializer, + **kwargs, ) ) @@ -117,29 +209,28 @@ def server_stream( cls, method: Callable, method_name: Optional[str] = None, + params_types: Optional[list[type]] = None, + return_type: Optional[type] = None, + codec: Optional[str] = None, request_deserializer: Optional[DeserializingFunction] = None, response_serializer: Optional[SerializingFunction] = None, - ): + **kwargs, + ) -> "RpcMethodHandler": """ - Create a server stream method handler - :param method: the method. - :type method: Callable - :param method_name: the method name. If not provided, the method name will be used. - :type method_name: Optional[str] - :param request_deserializer: the request deserializer. - :type request_deserializer: Optional[DeserializingFunction] - :param response_serializer: the response serializer. - :type response_serializer: Optional[SerializingFunction] - :return: the server stream method handler. - :rtype: RpcMethodHandler + Create a server-streaming method handler """ + name, param_types, ret_type = cls._infer_types_from_method(method) return cls( - MethodDescriptor( - callable_method=method, - method_name=method_name or method.__name__, - arg_serialization=(None, request_deserializer), - return_serialization=(response_serializer, None), + cls._create_method_descriptor( + method=method, + method_name=method_name or name, + params_types=params_types or param_types, + return_type=return_type or ret_type, rpc_type=RpcTypes.SERVER_STREAM.value, + codec=codec or "json", + request_deserializer=request_deserializer, + response_serializer=response_serializer, + **kwargs, ) ) @@ -148,36 +239,35 @@ def bi_stream( cls, method: Callable, method_name: Optional[str] = None, + params_types: Optional[list[type]] = None, + return_type: Optional[type] = None, + codec: Optional[str] = None, request_deserializer: Optional[DeserializingFunction] = None, response_serializer: Optional[SerializingFunction] = None, - ): + **kwargs, + ) -> "RpcMethodHandler": """ - Create a bidi stream method handler - :param method: the method. - :type method: Callable - :param method_name: the method name. If not provided, the method name will be used. - :type method_name: Optional[str] - :param request_deserializer: the request deserializer. - :type request_deserializer: Optional[DeserializingFunction] - :param response_serializer: the response serializer. - :type response_serializer: Optional[SerializingFunction] - :return: the bidi stream method handler. - :rtype: RpcMethodHandler + Create a bidirectional-streaming method handler """ + name, param_types, ret_type = cls._infer_types_from_method(method) return cls( - MethodDescriptor( - callable_method=method, - method_name=method_name or method.__name__, - arg_serialization=(None, request_deserializer), - return_serialization=(response_serializer, None), + cls._create_method_descriptor( + method=method, + method_name=method_name or name, + params_types=params_types or param_types, + return_type=return_type or ret_type, rpc_type=RpcTypes.BI_STREAM.value, + codec=codec or "json", + request_deserializer=request_deserializer, + response_serializer=response_serializer, + **kwargs, ) ) class RpcServiceHandler: """ - Rpc service handler + Rpc service handler that maps method names to their corresponding RpcMethodHandler """ __slots__ = ["_service_name", "_method_handlers"] @@ -185,10 +275,10 @@ class RpcServiceHandler: def __init__(self, service_name: str, method_handlers: list[RpcMethodHandler]): """ Initialize the RpcServiceHandler - :param service_name: the name of the service. + :param service_name: the name of the service :type service_name: str - :param method_handlers: the method handlers. - :type method_handlers: List[RpcMethodHandler] + :param method_handlers: the list of RPC method handlers + :type method_handlers: list[RpcMethodHandler] """ self._service_name = service_name self._method_handlers: dict[str, RpcMethodHandler] = {} @@ -209,8 +299,8 @@ def service_name(self) -> str: @property def method_handlers(self) -> dict[str, RpcMethodHandler]: """ - Get the method handlers - :return: the method handlers - :rtype: Dict[str, RpcMethodHandler] + Get the registered method handlers + :return: method handlers dictionary + :rtype: dict[str, RpcMethodHandler] """ return self._method_handlers diff --git a/src/dubbo/remoting/aio/http2/protocol.py b/src/dubbo/remoting/aio/http2/protocol.py index a762d0a..c99876d 100644 --- a/src/dubbo/remoting/aio/http2/protocol.py +++ b/src/dubbo/remoting/aio/http2/protocol.py @@ -26,7 +26,6 @@ from dubbo.loggers import loggerFactory from dubbo.remoting.aio import ConnectionStateListener, EmptyConnectionStateListener, constants as h2_constants from dubbo.remoting.aio.exceptions import ProtocolError -from dubbo.remoting.aio.http2.stream_handler import StreamServerMultiplexHandler from dubbo.remoting.aio.http2.controllers import RemoteFlowController from dubbo.remoting.aio.http2.frames import ( DataFrame, @@ -39,6 +38,7 @@ ) from dubbo.remoting.aio.http2.registries import Http2FrameType from dubbo.remoting.aio.http2.stream import Http2Stream +from dubbo.remoting.aio.http2.stream_handler import StreamServerMultiplexHandler from dubbo.remoting.aio.http2.utils import Http2EventUtils from dubbo.url import URL from dubbo.utils import EventHelper, FutureHelper diff --git a/tests/json/json_test.py b/tests/json/json_test.py new file mode 100644 index 0000000..805a651 --- /dev/null +++ b/tests/json/json_test.py @@ -0,0 +1,77 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from dataclasses import asdict, dataclass +from datetime import datetime, timezone +from decimal import Decimal +from enum import Enum +from uuid import UUID + +import pytest + +from dubbo.codec.json_codec import JsonTransportCodec + + +# Optional dataclass and enum examples +@dataclass +class SampleDataClass: + field1: int + field2: str + + +class Color(Enum): + RED = "red" + GREEN = "green" + + +# List of test cases: (input_value, expected_type_after_decoding) +test_cases = [ + ("simple string", str), + (12345, int), + (12.34, float), + (True, bool), + (datetime(2025, 8, 27, 13, 0, tzinfo=timezone.utc), datetime), + (Decimal("123.45"), Decimal), + (set([1, 2, 3]), set), + (frozenset(["a", "b"]), frozenset), + (UUID("12345678-1234-5678-1234-567812345678"), UUID), + (Color.RED, Color), + (SampleDataClass(field1=1, field2="abc"), SampleDataClass), +] + + +@pytest.mark.parametrize("value,expected_type", test_cases) +def test_json_codec_roundtrip(value, expected_type): + codec = JsonTransportCodec(parameter_types=[type(value)], return_type=expected_type) + + # Encode + encoded = codec.encode_parameters(value) + assert isinstance(encoded, bytes) + + # Decode + decoded = codec.decode_return_value(encoded) + + # For dataclass, compare asdict + if hasattr(value, "__dataclass_fields__"): + assert asdict(decoded) == asdict(value) + # For sets/frozensets, compare as sets + elif isinstance(value, (set, frozenset)): + assert decoded == value + # For enum + elif isinstance(value, Enum): + assert decoded.value == value.value + else: + assert decoded == value diff --git a/tests/protobuf/generated/__init__.py b/tests/protobuf/generated/__init__.py new file mode 100644 index 0000000..bcba37a --- /dev/null +++ b/tests/protobuf/generated/__init__.py @@ -0,0 +1,15 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/tests/protobuf/generated/greet_pb2.py b/tests/protobuf/generated/greet_pb2.py new file mode 100644 index 0000000..f4d8461 --- /dev/null +++ b/tests/protobuf/generated/greet_pb2.py @@ -0,0 +1,29 @@ +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: greet.proto +"""Generated protocol buffer code.""" + +from google.protobuf import ( + descriptor as _descriptor, + descriptor_pool as _descriptor_pool, + symbol_database as _symbol_database, +) +from google.protobuf.internal import builder as _builder + +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile( + b'\n\x0bgreet.proto\x12\rprotobuf_test"\x1e\n\x0eGreeterRequest\x12\x0c\n\x04name\x18\x01 \x01(\t"\x1f\n\x0cGreeterReply\x12\x0f\n\x07message\x18\x01 \x01(\tb\x06proto3' +) + +_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals()) +_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, "greet_pb2", globals()) +if _descriptor._USE_C_DESCRIPTORS == False: + DESCRIPTOR._options = None + _GREETERREQUEST._serialized_start = 30 + _GREETERREQUEST._serialized_end = 60 + _GREETERREPLY._serialized_start = 62 + _GREETERREPLY._serialized_end = 93 +# @@protoc_insertion_point(module_scope) diff --git a/tests/protobuf/generated/protobuf_test.py b/tests/protobuf/generated/protobuf_test.py new file mode 100644 index 0000000..38bd7f6 --- /dev/null +++ b/tests/protobuf/generated/protobuf_test.py @@ -0,0 +1,32 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Generated by the protocol buffer compiler. DO NOT EDIT! +# sources: greet.proto +# plugin: python-betterproto +from dataclasses import dataclass + +import betterproto + + +@dataclass +class GreeterRequest(betterproto.Message): + name: str = betterproto.string_field(1) + + +@dataclass +class GreeterReply(betterproto.Message): + message: str = betterproto.string_field(1) diff --git a/tests/protobuf/greet.proto b/tests/protobuf/greet.proto new file mode 100644 index 0000000..5b453a7 --- /dev/null +++ b/tests/protobuf/greet.proto @@ -0,0 +1,11 @@ +syntax = "proto3"; + +package protobuf_test; + +message GreeterRequest { + string name = 1; +} + +message GreeterReply { + string message = 1; +} diff --git a/tests/protobuf/protobuf_test.py b/tests/protobuf/protobuf_test.py new file mode 100644 index 0000000..3f03cae --- /dev/null +++ b/tests/protobuf/protobuf_test.py @@ -0,0 +1,79 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import pytest + +from dubbo.codec.protobuf_codec import GoogleProtobufMessageHandler, PrimitiveHandler, ProtobufTransportCodec +from dubbo.codec.protobuf_codec.protobuf_codec import DeserializationException, SerializationException + + +def test_primitive_roundtrip_string(): + codec = ProtobufTransportCodec(parameter_types=[str], return_type=str) + + # Encode + encoded = codec.encode_parameter("hello world") + assert isinstance(encoded, bytes) + + # Decode + decoded = codec.decode_return_value(encoded) + assert decoded == "hello world" + + +def test_primitive_roundtrip_int(): + codec = ProtobufTransportCodec(parameter_types=[int], return_type=int) + + encoded = codec.encode_parameter(12345) + decoded = codec.decode_return_value(encoded) + + assert isinstance(decoded, int) + assert decoded == 12345 + + +def test_primitive_invalid_type_raises(): + codec = ProtobufTransportCodec(parameter_types=[dict], return_type=dict) + + with pytest.raises(SerializationException): + codec.encode_parameter({"a": 1}) + + +def test_decode_with_no_return_type_raises(): + codec = ProtobufTransportCodec(parameter_types=[str], return_type=None) + + data = PrimitiveHandler().encode("hello", str) + + with pytest.raises(DeserializationException): + codec.decode_return_value(data) + + +@pytest.mark.skipif(not GoogleProtobufMessageHandler.__module__, reason="google.protobuf not available") +def test_google_protobuf_roundtrip(): + from generated.greet_pb2 import GreeterReply, GreeterRequest + + codec = ProtobufTransportCodec(parameter_types=[GreeterRequest], return_type=GreeterReply) + + req = GreeterRequest(name="Alice") + encoded = codec.encode_parameter(req) + + assert isinstance(encoded, bytes) + + # Fake server response + reply = GreeterReply(message="Hello Alice") + reply_bytes = reply.SerializeToString() + + decoded = codec.decode_return_value(reply_bytes) + assert isinstance(decoded, GreeterReply) + assert decoded.message == "Hello Alice"