1616
1717package fr .acinq .eclair .io
1818
19- import akka .actor .typed .Behavior
2019import akka .actor .typed .eventstream .EventStream
2120import akka .actor .typed .scaladsl .adapter .TypedActorRefOps
2221import akka .actor .typed .scaladsl .{ActorContext , Behaviors }
22+ import akka .actor .typed .{Behavior , SupervisorStrategy }
2323import akka .actor .{ActorRef , typed }
2424import fr .acinq .bitcoin .scalacompat .ByteVector32
2525import fr .acinq .bitcoin .scalacompat .Crypto .PublicKey
@@ -32,6 +32,8 @@ import fr.acinq.eclair.router.Router
3232import fr .acinq .eclair .wire .protocol .OnionMessage
3333import fr .acinq .eclair .{EncodedNodeId , NodeParams , ShortChannelId }
3434
35+ import scala .concurrent .duration .DurationInt
36+
3537object MessageRelay {
3638 // @formatter:off
3739 sealed trait Command
@@ -42,29 +44,18 @@ object MessageRelay {
4244 policy : RelayPolicy ,
4345 replyTo_opt : Option [typed.ActorRef [Status ]]) extends Command
4446 case class WrappedPeerInfo (peerInfo : PeerInfoResponse ) extends Command
45- case class WrappedConnectionResult (result : PeerConnection .ConnectionResult ) extends Command
46- case class WrappedOptionalNodeId (nodeId_opt : Option [PublicKey ]) extends Command
47+ private case class WrappedConnectionResult (result : PeerConnection .ConnectionResult ) extends Command
48+ private case class WrappedOptionalNodeId (nodeId_opt : Option [PublicKey ]) extends Command
49+ private case class WrappedPeerReadyResult (result : PeerReadyNotifier .Result ) extends Command
4750
48- sealed trait Status {
49- val messageId : ByteVector32
50- }
51+ sealed trait Status { val messageId : ByteVector32 }
5152 case class Sent (messageId : ByteVector32 ) extends Status
5253 sealed trait Failure extends Status
53- case class AgainstPolicy (messageId : ByteVector32 , policy : RelayPolicy ) extends Failure {
54- override def toString : String = s " Relay prevented by policy $policy"
55- }
56- case class ConnectionFailure (messageId : ByteVector32 , failure : PeerConnection .ConnectionResult .Failure ) extends Failure {
57- override def toString : String = s " Can't connect to peer: ${failure.toString}"
58- }
59- case class Disconnected (messageId : ByteVector32 ) extends Failure {
60- override def toString : String = " Peer is not connected"
61- }
62- case class UnknownChannel (messageId : ByteVector32 , channelId : ShortChannelId ) extends Failure {
63- override def toString : String = s " Unknown channel: $channelId"
64- }
65- case class DroppedMessage (messageId : ByteVector32 , reason : DropReason ) extends Failure {
66- override def toString : String = s " Message dropped: $reason"
67- }
54+ case class AgainstPolicy (messageId : ByteVector32 , policy : RelayPolicy ) extends Failure { override def toString : String = s " Relay prevented by policy $policy" }
55+ case class ConnectionFailure (messageId : ByteVector32 , failure : PeerConnection .ConnectionResult .Failure ) extends Failure { override def toString : String = s " Can't connect to peer: ${failure.toString}" }
56+ case class Disconnected (messageId : ByteVector32 ) extends Failure { override def toString : String = " Peer is not connected" }
57+ case class UnknownChannel (messageId : ByteVector32 , channelId : ShortChannelId ) extends Failure { override def toString : String = s " Unknown channel: $channelId" }
58+ case class DroppedMessage (messageId : ByteVector32 , reason : DropReason ) extends Failure { override def toString : String = s " Message dropped: $reason" }
6859
6960 sealed trait RelayPolicy
7061 case object RelayChannelsOnly extends RelayPolicy
@@ -100,15 +91,15 @@ private class MessageRelay(nodeParams: NodeParams,
10091 def queryNextNodeId (msg : OnionMessage , nextNode : Either [ShortChannelId , EncodedNodeId ]): Behavior [Command ] = {
10192 nextNode match {
10293 case Left (outgoingChannelId) if outgoingChannelId == ShortChannelId .toSelf =>
103- withNextNodeId(msg, nodeParams.nodeId)
94+ withNextNodeId(msg, EncodedNodeId . WithPublicKey . Plain ( nodeParams.nodeId) )
10495 case Left (outgoingChannelId) =>
10596 register ! Register .GetNextNodeId (context.messageAdapter(WrappedOptionalNodeId ), outgoingChannelId)
10697 waitForNextNodeId(msg, outgoingChannelId)
10798 case Right (EncodedNodeId .ShortChannelIdDir (isNode1, scid)) =>
10899 router ! Router .GetNodeId (context.messageAdapter(WrappedOptionalNodeId ), scid, isNode1)
109100 waitForNextNodeId(msg, scid)
110101 case Right (encodedNodeId : EncodedNodeId .WithPublicKey ) =>
111- withNextNodeId(msg, encodedNodeId.publicKey )
102+ withNextNodeId(msg, encodedNodeId)
112103 }
113104 }
114105
@@ -118,33 +109,39 @@ private class MessageRelay(nodeParams: NodeParams,
118109 replyTo_opt.foreach(_ ! UnknownChannel (messageId, channelId))
119110 Behaviors .stopped
120111 case WrappedOptionalNodeId (Some (nextNodeId)) =>
121- withNextNodeId(msg, nextNodeId)
112+ withNextNodeId(msg, EncodedNodeId . WithPublicKey . Plain ( nextNodeId) )
122113 }
123114 }
124115
125- private def withNextNodeId (msg : OnionMessage , nextNodeId : PublicKey ): Behavior [Command ] = {
126- if (nextNodeId == nodeParams.nodeId) {
127- OnionMessages .process(nodeParams.privateKey, msg) match {
128- case OnionMessages .DropMessage (reason) =>
129- replyTo_opt.foreach(_ ! DroppedMessage (messageId, reason))
130- Behaviors .stopped
131- case OnionMessages .SendMessage (nextNode, nextMessage) =>
132- // We need to repeat the process until we identify the (real) next node, or find out that we're the recipient.
133- queryNextNodeId(nextMessage, nextNode)
134- case received : OnionMessages .ReceiveMessage =>
135- context.system.eventStream ! EventStream .Publish (received)
136- replyTo_opt.foreach(_ ! Sent (messageId))
137- Behaviors .stopped
138- }
139- } else {
140- policy match {
141- case RelayChannelsOnly =>
142- switchboard ! GetPeerInfo (context.messageAdapter(WrappedPeerInfo ), prevNodeId)
143- waitForPreviousPeerForPolicyCheck(msg, nextNodeId)
144- case RelayAll =>
145- switchboard ! Peer .Connect (nextNodeId, None , context.messageAdapter(WrappedConnectionResult ).toClassic, isPersistent = false )
146- waitForConnection(msg)
147- }
116+ private def withNextNodeId (msg : OnionMessage , nextNodeId : EncodedNodeId .WithPublicKey ): Behavior [Command ] = {
117+ nextNodeId match {
118+ case EncodedNodeId .WithPublicKey .Plain (nodeId) if nodeId == nodeParams.nodeId =>
119+ OnionMessages .process(nodeParams.privateKey, msg) match {
120+ case OnionMessages .DropMessage (reason) =>
121+ replyTo_opt.foreach(_ ! DroppedMessage (messageId, reason))
122+ Behaviors .stopped
123+ case OnionMessages .SendMessage (nextNode, nextMessage) =>
124+ // We need to repeat the process until we identify the (real) next node, or find out that we're the recipient.
125+ queryNextNodeId(nextMessage, nextNode)
126+ case received : OnionMessages .ReceiveMessage =>
127+ context.system.eventStream ! EventStream .Publish (received)
128+ replyTo_opt.foreach(_ ! Sent (messageId))
129+ Behaviors .stopped
130+ }
131+ case EncodedNodeId .WithPublicKey .Plain (nodeId) =>
132+ policy match {
133+ case RelayChannelsOnly =>
134+ switchboard ! GetPeerInfo (context.messageAdapter(WrappedPeerInfo ), prevNodeId)
135+ waitForPreviousPeerForPolicyCheck(msg, nodeId)
136+ case RelayAll =>
137+ switchboard ! Peer .Connect (nodeId, None , context.messageAdapter(WrappedConnectionResult ).toClassic, isPersistent = false )
138+ waitForConnection(msg)
139+ }
140+ case EncodedNodeId .WithPublicKey .Wallet (nodeId) =>
141+ context.log.info(" trying to wake up next peer to relay onion message (nodeId={})" , nodeId)
142+ val notifier = context.spawnAnonymous(Behaviors .supervise(PeerReadyNotifier (nodeId, timeout_opt = Some (Left (nodeParams.wakeUpTimeout)))).onFailure(SupervisorStrategy .stop))
143+ notifier ! PeerReadyNotifier .NotifyWhenPeerReady (context.messageAdapter(WrappedPeerReadyResult ))
144+ waitForWalletNodeUp(msg)
148145 }
149146 }
150147
@@ -180,4 +177,15 @@ private class MessageRelay(nodeParams: NodeParams,
180177 Behaviors .stopped
181178 }
182179 }
180+
181+ private def waitForWalletNodeUp (msg : OnionMessage ): Behavior [Command ] = {
182+ Behaviors .receiveMessagePartial {
183+ case WrappedPeerReadyResult (r : PeerReadyNotifier .PeerReady ) =>
184+ r.peer ! Peer .RelayOnionMessage (messageId, msg, replyTo_opt)
185+ Behaviors .stopped
186+ case WrappedPeerReadyResult (_ : PeerReadyNotifier .PeerUnavailable ) =>
187+ replyTo_opt.foreach(_ ! Disconnected (messageId))
188+ Behaviors .stopped
189+ }
190+ }
183191}
0 commit comments