@@ -198,6 +198,14 @@ type LightningClient interface {
198198 RegisterRPCMiddleware (ctx context.Context , middlewareName ,
199199 customCaveatName string , readOnly bool , timeout time.Duration ,
200200 intercept InterceptFunction ) (chan error , error )
201+
202+ // SendCustomMessage sends a custom message to a peer.
203+ SendCustomMessage (ctx context.Context , msg CustomMessage ) error
204+
205+ // SubscribeCustomMessages creates a subscription to custom messages
206+ // received from our peers.
207+ SubscribeCustomMessages (ctx context.Context ) (<- chan CustomMessage ,
208+ <- chan error , error )
201209}
202210
203211// Info contains info about the connected lnd node.
@@ -1076,6 +1084,18 @@ type QueryRoutesResponse struct {
10761084 TotalAmtMsat lnwire.MilliSatoshi
10771085}
10781086
1087+ // CustomMessage describes custom messages exchanged with peers.
1088+ type CustomMessage struct {
1089+ // Peer is the peer that the message was exchanged with.
1090+ Peer route.Vertex
1091+
1092+ // MsgType is the protocol message type number for the custom message.
1093+ MsgType uint32
1094+
1095+ // Data is the data exchanged.
1096+ Data []byte
1097+ }
1098+
10791099var (
10801100 // ErrNoRouteFound is returned if we can't find a path with the passed
10811101 // parameters.
@@ -3723,3 +3743,84 @@ func (s *lightningClient) RegisterRPCMiddleware(ctx context.Context,
37233743
37243744 return errChan , nil
37253745}
3746+
3747+ // SendCustomMessage sends a custom message to one of our existing peers. Note
3748+ // that lnd must already be connected to a peer to send it messages.
3749+ func (s * lightningClient ) SendCustomMessage (ctx context.Context ,
3750+ msg CustomMessage ) error {
3751+
3752+ rpcCtx , cancel := context .WithTimeout (ctx , s .timeout )
3753+ defer cancel ()
3754+
3755+ rpcCtx = s .adminMac .WithMacaroonAuth (rpcCtx )
3756+ rpcReq := & lnrpc.SendCustomMessageRequest {
3757+ Peer : msg .Peer [:],
3758+ Type : msg .MsgType ,
3759+ Data : msg .Data ,
3760+ }
3761+
3762+ _ , err := s .client .SendCustomMessage (rpcCtx , rpcReq )
3763+ return err
3764+ }
3765+
3766+ // SubscribeCustomMessages subscribes to a stream of custom messages, optionally
3767+ // filtering by peer and message type. The channels returned will be closed
3768+ // when the subscription exits.
3769+ func (s * lightningClient ) SubscribeCustomMessages (ctx context.Context ) (
3770+ <- chan CustomMessage , <- chan error , error ) {
3771+
3772+ rpcCtx := s .adminMac .WithMacaroonAuth (ctx )
3773+ rpcReq := & lnrpc.SubscribeCustomMessagesRequest {}
3774+
3775+ client , err := s .client .SubscribeCustomMessages (rpcCtx , rpcReq )
3776+ if err != nil {
3777+ return nil , nil , err
3778+ }
3779+
3780+ var (
3781+ // Buffer error channel by 1 so that consumer reading from this
3782+ // channel does not block our exit.
3783+ errChan = make (chan error , 1 )
3784+ msgChan = make (chan CustomMessage )
3785+ )
3786+
3787+ s .wg .Add (1 )
3788+ go func () {
3789+ defer func () {
3790+ // Close channels on exit so that callers know the
3791+ // subscription has finished.
3792+ close (errChan )
3793+ close (msgChan )
3794+
3795+ s .wg .Done ()
3796+ }()
3797+
3798+ for {
3799+ msg , err := client .Recv ()
3800+ if err != nil {
3801+ errChan <- fmt .Errorf ("receive failed: %w" , err )
3802+ return
3803+ }
3804+
3805+ peer , err := route .NewVertexFromBytes (msg .Peer )
3806+ if err != nil {
3807+ errChan <- fmt .Errorf ("invalid peer: %w" , err )
3808+ return
3809+ }
3810+
3811+ customMsg := CustomMessage {
3812+ Peer : peer ,
3813+ Data : msg .Data ,
3814+ MsgType : msg .Type ,
3815+ }
3816+
3817+ select {
3818+ case msgChan <- customMsg :
3819+ case <- ctx .Done ():
3820+ return
3821+ }
3822+ }
3823+ }()
3824+
3825+ return msgChan , errChan , nil
3826+ }
0 commit comments