@@ -143,6 +143,91 @@ impl EventPublisher for RabbitMqEventPublisher {
143143 }
144144}
145145
146- // Ensure the publisher is Send and Sync for use in async contexts
147- unsafe impl Send for RabbitMqEventPublisher { }
148- unsafe impl Sync for RabbitMqEventPublisher { }
146+ #[ cfg( test) ]
147+ #[ cfg( feature = "integration-tests-events-rabbitmq" ) ]
148+ mod integration_tests_events_rabbitmq {
149+ use super :: * ;
150+ use lapin:: {
151+ options:: { BasicAckOptions , BasicConsumeOptions , QueueBindOptions , QueueDeclareOptions } ,
152+ types:: FieldTable ,
153+ Channel , Connection ,
154+ } ;
155+ use ldk_server_protos:: events:: event_envelope:: Event ;
156+ use ldk_server_protos:: events:: PaymentForwarded ;
157+ use std:: io;
158+ use std:: time:: Duration ;
159+ use tokio;
160+
161+ use futures_util:: stream:: StreamExt ;
162+ #[ tokio:: test]
163+ async fn test_publish_and_consume_event ( ) {
164+ let config = RabbitMqConfig {
165+ connection_string : "amqp://guest:guest@localhost:5672/%2f" . to_string ( ) ,
166+ exchange_name : "test_exchange" . to_string ( ) ,
167+ } ;
168+
169+ let publisher = RabbitMqEventPublisher :: new ( config. clone ( ) ) ;
170+
171+ let conn = Connection :: connect ( & config. connection_string , ConnectionProperties :: default ( ) )
172+ . await
173+ . expect ( "Failed make rabbitmq connection" ) ;
174+ let channel = conn. create_channel ( ) . await . expect ( "Failed to create rabbitmq channel" ) ;
175+
176+ let queue_name = "test_queue" ;
177+ setup_queue ( & queue_name, & channel, & config) . await ;
178+
179+ let event = EventEnvelope {
180+ event : Some ( Event :: PaymentForwarded ( PaymentForwarded :: default ( ) ) ) ,
181+ } ;
182+ publisher. publish ( event. clone ( ) ) . await . expect ( "Failed to publish event" ) ;
183+
184+ consume_event ( & queue_name, & channel, & event) . await . expect ( "Failed to consume event" ) ;
185+ }
186+
187+ async fn setup_queue ( queue_name : & str , channel : & Channel , config : & RabbitMqConfig ) {
188+ channel
189+ . queue_declare ( queue_name, QueueDeclareOptions :: default ( ) , FieldTable :: default ( ) )
190+ . await
191+ . unwrap ( ) ;
192+ channel
193+ . exchange_declare (
194+ & config. exchange_name ,
195+ ExchangeKind :: Fanout ,
196+ ExchangeDeclareOptions { durable : true , ..Default :: default ( ) } ,
197+ FieldTable :: default ( ) ,
198+ )
199+ . await
200+ . unwrap ( ) ;
201+
202+ channel
203+ . queue_bind (
204+ queue_name,
205+ & config. exchange_name ,
206+ "" ,
207+ QueueBindOptions :: default ( ) ,
208+ FieldTable :: default ( ) ,
209+ )
210+ . await
211+ . unwrap ( ) ;
212+ }
213+
214+ async fn consume_event (
215+ queue_name : & str , channel : & Channel , expected_event : & EventEnvelope ,
216+ ) -> io:: Result < ( ) > {
217+ let mut consumer = channel
218+ . basic_consume (
219+ queue_name,
220+ "test_consumer" ,
221+ BasicConsumeOptions :: default ( ) ,
222+ FieldTable :: default ( ) ,
223+ )
224+ . await
225+ . unwrap ( ) ;
226+ let delivery =
227+ tokio:: time:: timeout ( Duration :: from_secs ( 10 ) , consumer. next ( ) ) . await ?. unwrap ( ) . unwrap ( ) ;
228+ let received_event = EventEnvelope :: decode ( & * delivery. data ) ?;
229+ assert_eq ! ( received_event, * expected_event, "Event mismatch" ) ;
230+ channel. basic_ack ( delivery. delivery_tag , BasicAckOptions :: default ( ) ) . await . unwrap ( ) ;
231+ Ok ( ( ) )
232+ }
233+ }
0 commit comments