|
1 | 1 | ## Streaming Calls |
2 | 2 |
|
3 | | -Dubbo-python supports streaming calls, including `ClientStream`, `ServerStream`, and `BidirectionalStream`. The key difference in these calls is the use of iterators: passing an iterator as a parameter for `ClientStream`, receiving an iterator for `ServerStream`, or both passing and receiving iterators for `BidirectionalStream`. |
| 3 | +Dubbo-Python supports streaming calls, including `ClientStream`, `ServerStream`, and `BidirectionalStream` modes. |
4 | 4 |
|
5 | | -When using `BidirectionalStream`, the client needs to pass an iterator as a parameter to send multiple data points, while also receiving an iterator to handle multiple responses from the server. |
| 5 | +Streaming calls can be divided into write-streams and read-streams. For `ClientStream`, it’s multiple writes with a single read; for `ServerStream`, a single write with multiple reads; and `BidirectionalStream` allows multiple writes and reads. |
6 | 6 |
|
7 | | -Here’s an example of the client-side code: |
| 7 | +### Write-Stream |
| 8 | + |
| 9 | +Write operations in streaming calls can be divided into single write (`ServerStream`) and multiple writes (`ClientStream` and `BidirectionalStream`). |
| 10 | + |
| 11 | +#### Single Write |
| 12 | + |
| 13 | +Single write calls are similar to unary mode. For example: |
8 | 14 |
|
9 | 15 | ```python |
10 | | -class GreeterServiceStub: |
11 | | - def __init__(self, client: dubbo.Client): |
12 | | - self.bidi_stream = client.bidi_stream( |
13 | | - method_name="biStream", |
14 | | - request_serializer=greeter_pb2.GreeterRequest.SerializeToString, |
15 | | - response_deserializer=greeter_pb2.GreeterReply.FromString, |
16 | | - ) |
| 16 | +stub.server_stream(greeter_pb2.GreeterRequest(name="hello world from dubbo-python")) |
| 17 | +``` |
17 | 18 |
|
18 | | - def bi_stream(self, values): |
19 | | - return self.bidi_stream(values) |
| 19 | +#### Multiple Writes |
20 | 20 |
|
| 21 | +For multiple writes, users can write data using either an iterator or `writeStream` (only one of these options should be used). |
21 | 22 |
|
22 | | -if __name__ == "__main__": |
23 | | - reference_config = ReferenceConfig.from_url( |
24 | | - "tri://127.0.0.1:50051/org.apache.dubbo.samples.proto.Greeter" |
25 | | - ) |
26 | | - dubbo_client = dubbo.Client(reference_config) |
| 23 | +1. **Iterator-based Write**: Writing via iterator is similar to unary mode, with the main difference being the use of an iterator for multiple writes. For example: |
27 | 24 |
|
28 | | - stub = GreeterServiceStub(dubbo_client) |
| 25 | + ```python |
| 26 | + # Use an iterator to send multiple requests |
| 27 | + def request_generator(): |
| 28 | + for i in ["hello", "world", "from", "dubbo-python"]: |
| 29 | + yield greeter_pb2.GreeterRequest(name=str(i)) |
29 | 30 |
|
30 | | - # Iterator of request |
31 | | - def request_generator(): |
32 | | - for item in ["hello", "world", "from", "dubbo-python"]: |
33 | | - yield greeter_pb2.GreeterRequest(name=str(item)) |
| 31 | + # Call the remote method and return a read_stream |
| 32 | + stream = stub.client_stream(request_generator()) |
| 33 | + ``` |
34 | 34 |
|
35 | | - result = stub.bi_stream(request_generator()) |
| 35 | +2. **Using `writeStream`**: This method requires an empty argument, after which data is written incrementally using `write`, and `done_writing` is called to end the write-stream. For example: |
36 | 36 |
|
37 | | - for i in result: |
38 | | - print(f"Received response: {i.message}") |
39 | | -``` |
| 37 | + ```python |
| 38 | + stream = stub.bi_stream() |
| 39 | + # Use the write method to send messages |
| 40 | + stream.write(greeter_pb2.GreeterRequest(name="jock")) |
| 41 | + stream.write(greeter_pb2.GreeterRequest(name="jane")) |
| 42 | + stream.write(greeter_pb2.GreeterRequest(name="alice")) |
| 43 | + stream.write(greeter_pb2.GreeterRequest(name="dave")) |
| 44 | + # Call done_writing to notify the server that the client has finished writing |
| 45 | + stream.done_writing() |
| 46 | + ``` |
40 | 47 |
|
41 | | -And here’s the server-side code: |
| 48 | +### Read-Stream |
| 49 | + |
| 50 | +Read operations for streaming calls can be single read (`ClientStream`) or multiple reads (`ServerStream` and `BidirectionalStream`). A `ReadStream` is returned in all cases, and data can be read using the `read` method or an iterator. When using `read`, please note: |
| 51 | + |
| 52 | +1. The `read` method supports a `timeout` parameter (in seconds). |
| 53 | +2. The `read` method can return one of three values: the expected data, `None` (timeout exceeded), or `EOF` (end of the read-stream). |
| 54 | + |
| 55 | +#### Single Read |
| 56 | + |
| 57 | +A single call to the `read` method will retrieve the data, for example: |
42 | 58 |
|
43 | 59 | ```python |
44 | | -def bi_stream(request_stream): |
45 | | - for request in request_stream: |
46 | | - print(f"Received message from {request.name}") |
47 | | - yield greeter_pb2.GreeterReply(message=request.name) |
48 | | - |
49 | | - |
50 | | -if __name__ == "__main__": |
51 | | - # build a method handler |
52 | | - method_handler = RpcMethodHandler.bi_stream( |
53 | | - bi_stream, |
54 | | - request_deserializer=greeter_pb2.GreeterRequest.FromString, |
55 | | - response_serializer=greeter_pb2.GreeterReply.SerializeToString, |
56 | | - ) |
57 | | - # build a service handler |
58 | | - service_handler = RpcServiceHandler( |
59 | | - service_name="org.apache.dubbo.samples.data.Greeter", |
60 | | - method_handlers={"biStream": method_handler}, |
61 | | - ) |
62 | | - |
63 | | - service_config = ServiceConfig(service_handler) |
64 | | - |
65 | | - # start the server |
66 | | - server = dubbo.Server(service_config).start() |
67 | | - |
68 | | - input("Press Enter to stop the server...\n") |
| 60 | +result = stream.read() |
| 61 | +print(f"Received response: {result.message}") |
69 | 62 | ``` |
70 | 63 |
|
| 64 | +#### Multiple Reads |
| 65 | + |
| 66 | +Multiple reads can be done by repeatedly calling `read`, with handling for `None` and `EOF` values. Since `ReadStream` implements `__iter__` and `__next__`, an iterator-based approach can also be used, which automatically handles these values but doesn’t support a timeout. |
| 67 | + |
| 68 | +1. **Using Iterator (Recommended)**: |
| 69 | + |
| 70 | + ```python |
| 71 | + def client_stream(self, request_iterator): |
| 72 | + response = "" |
| 73 | + for request in request_iterator: |
| 74 | + print(f"Received request: {request.name}") |
| 75 | + response += f"{request.name} " |
| 76 | + return greeter_pb2.GreeterReply(message=response) |
| 77 | + ``` |
| 78 | + |
| 79 | +2. **Multiple Calls to `read` Method**: |
| 80 | + |
| 81 | + ```python |
| 82 | + # Use read method to receive messages |
| 83 | + # If no message arrives within the specified time, returns None |
| 84 | + # If the server has finished sending messages, returns EOF |
| 85 | + while True: |
| 86 | + i = stream.read(timeout=0.5) |
| 87 | + if i is dubbo.classes.EOF: |
| 88 | + break |
| 89 | + elif i is None: |
| 90 | + print("No message received") |
| 91 | + continue |
| 92 | + print(f"Received response: {i.message}") |
| 93 | + ``` |
| 94 | + |
0 commit comments