1616#
1717
1818import logging
19+ import traceback
1920from queue import Queue
2021
2122import grpc
@@ -40,22 +41,33 @@ def __init__(self):
4041 self .channel , header_adder_interceptor ('authentication' , config .authentication )
4142 )
4243
43- def cb (state ):
44- logger .debug ('grpc channel connectivity changed, [%s -> %s]' , self .state , state )
45- self .state = state
46- if self .connected ():
47- self .service_management .send_instance_props ()
48-
49- self .channel .subscribe (cb , try_to_connect = True )
44+ self .channel .subscribe (self ._cb , try_to_connect = True )
5045 self .service_management = GrpcServiceManagementClient (self .channel )
5146 self .traces_reporter = GrpcTraceSegmentReportService (self .channel )
5247
48+ def _cb (self , state ):
49+ logger .debug ('grpc channel connectivity changed, [%s -> %s]' , self .state , state )
50+ self .state = state
51+ if self .connected ():
52+ try :
53+ self .service_management .send_instance_props ()
54+ except grpc .RpcError :
55+ self .on_error ()
56+
5357 def heartbeat (self ):
54- self .service_management .send_heart_beat ()
58+ try :
59+ self .service_management .send_heart_beat ()
60+ except grpc .RpcError :
61+ self .on_error ()
5562
5663 def connected (self ):
5764 return self .state == grpc .ChannelConnectivity .READY
5865
66+ def on_error (self ):
67+ traceback .print_exc ()
68+ self .channel .unsubscribe (self ._cb )
69+ self .channel .subscribe (self ._cb , try_to_connect = True )
70+
5971 def report (self , queue : Queue ):
6072 def generator ():
6173 while True :
@@ -104,4 +116,7 @@ def generator():
104116
105117 queue .task_done ()
106118
107- self .traces_reporter .report (generator ())
119+ try :
120+ self .traces_reporter .report (generator ())
121+ except grpc .RpcError :
122+ self .on_error ()
0 commit comments