@@ -146,3 +146,93 @@ impl EventPublisher for RabbitMqEventPublisher {
146146// Ensure the publisher is Send and Sync for use in async contexts
147147unsafe impl Send for RabbitMqEventPublisher { }
148148unsafe impl Sync for RabbitMqEventPublisher { }
149+
150+ #[ cfg( test) ]
151+ #[ cfg( feature = "integration-tests-events-rabbitmq" ) ]
152+ mod integration_tests_events_rabbitmq {
153+ use super :: * ;
154+ use lapin:: {
155+ options:: { BasicAckOptions , BasicConsumeOptions , QueueBindOptions , QueueDeclareOptions } ,
156+ types:: FieldTable ,
157+ Channel , Connection ,
158+ } ;
159+ use ldk_server_protos:: events:: event_envelope:: Event ;
160+ use ldk_server_protos:: events:: PaymentForwarded ;
161+ use std:: io;
162+ use std:: time:: Duration ;
163+ use tokio;
164+
165+ use futures_util:: stream:: StreamExt ;
166+ #[ tokio:: test]
167+ async fn test_publish_and_consume_event ( ) {
168+ let config = RabbitMqConfig {
169+ connection_string : "amqp://guest:guest@localhost:5672/%2f" . to_string ( ) ,
170+ exchange_name : "test_exchange" . to_string ( ) ,
171+ } ;
172+
173+ let publisher = RabbitMqEventPublisher :: new ( config. clone ( ) ) ;
174+
175+ let conn = Connection :: connect ( & config. connection_string , ConnectionProperties :: default ( ) )
176+ . await
177+ . expect ( "Failed make rabbitmq connection" ) ;
178+ let channel = conn. create_channel ( ) . await . expect ( "Failed to create rabbitmq channel" ) ;
179+
180+ let queue_name = "test_queue" ;
181+ setup_queue ( & queue_name, & channel, & config) . await ;
182+
183+ let event = EventEnvelope {
184+ event_type : 1 ,
185+ event : Some ( Event :: PaymentForwarded ( PaymentForwarded :: default ( ) ) ) ,
186+ } ;
187+ publisher. publish ( event. clone ( ) ) . await . expect ( "Failed to publish event" ) ;
188+
189+ consume_event ( & queue_name, & channel, & event) . await . expect ( "Failed to consume event" ) ;
190+ }
191+
192+ async fn setup_queue ( queue_name : & str , channel : & Channel , config : & RabbitMqConfig ) {
193+ channel
194+ . queue_declare ( queue_name, QueueDeclareOptions :: default ( ) , FieldTable :: default ( ) )
195+ . await
196+ . unwrap ( ) ;
197+ channel
198+ . exchange_declare (
199+ & config. exchange_name ,
200+ ExchangeKind :: Fanout ,
201+ ExchangeDeclareOptions { durable : true , ..Default :: default ( ) } ,
202+ FieldTable :: default ( ) ,
203+ )
204+ . await
205+ . unwrap ( ) ;
206+
207+ channel
208+ . queue_bind (
209+ queue_name,
210+ & config. exchange_name ,
211+ "" ,
212+ QueueBindOptions :: default ( ) ,
213+ FieldTable :: default ( ) ,
214+ )
215+ . await
216+ . unwrap ( ) ;
217+ }
218+
219+ async fn consume_event (
220+ queue_name : & str , channel : & Channel , expected_event : & EventEnvelope ,
221+ ) -> io:: Result < ( ) > {
222+ let mut consumer = channel
223+ . basic_consume (
224+ queue_name,
225+ "test_consumer" ,
226+ BasicConsumeOptions :: default ( ) ,
227+ FieldTable :: default ( ) ,
228+ )
229+ . await
230+ . unwrap ( ) ;
231+ let delivery =
232+ tokio:: time:: timeout ( Duration :: from_secs ( 10 ) , consumer. next ( ) ) . await ?. unwrap ( ) . unwrap ( ) ;
233+ let received_event = EventEnvelope :: decode ( & * delivery. data ) ?;
234+ assert_eq ! ( received_event. event_type, expected_event. event_type, "Event type mismatch" ) ;
235+ channel. basic_ack ( delivery. delivery_tag , BasicAckOptions :: default ( ) ) . await . unwrap ( ) ;
236+ Ok ( ( ) )
237+ }
238+ }
0 commit comments