Skip to content

Commit c6a1199

Browse files
committed
Add liquidity purchases to the AuditDb
Whenever liquidity is purchased, we store it in the `AuditDb`. This lets node operators gather useful statistics on their peers, and which ones are actively using the liquidity that is purchased. We store minimal information about the liquidity ads itself to be more easily compatible with potential changes in the spec.
1 parent 5b5452e commit c6a1199

File tree

12 files changed

+420
-18
lines changed

12 files changed

+420
-18
lines changed

eclair-core/src/main/scala/fr/acinq/eclair/channel/ChannelEvents.scala

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,11 @@ package fr.acinq.eclair.channel
1818

1919
import akka.actor.ActorRef
2020
import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
21-
import fr.acinq.bitcoin.scalacompat.{ByteVector32, Satoshi, Transaction}
21+
import fr.acinq.bitcoin.scalacompat.{ByteVector32, Satoshi, Transaction, TxId}
2222
import fr.acinq.eclair.blockchain.fee.FeeratePerKw
2323
import fr.acinq.eclair.channel.Helpers.Closing.ClosingType
24-
import fr.acinq.eclair.io.Peer.OpenChannelResponse
25-
import fr.acinq.eclair.wire.protocol.{ChannelAnnouncement, ChannelUpdate}
26-
import fr.acinq.eclair.{BlockHeight, Features, ShortChannelId}
24+
import fr.acinq.eclair.wire.protocol.{ChannelAnnouncement, ChannelUpdate, LiquidityAds}
25+
import fr.acinq.eclair.{BlockHeight, Features, MilliSatoshi, ShortChannelId}
2726

2827
/**
2928
* Created by PM on 17/08/2016.
@@ -79,6 +78,14 @@ case class ChannelSignatureSent(channel: ActorRef, commitments: Commitments) ext
7978

8079
case class ChannelSignatureReceived(channel: ActorRef, commitments: Commitments) extends ChannelEvent
8180

81+
case class LiquidityPurchase(fundingTxId: TxId, fundingTxIndex: Long, isBuyer: Boolean, amount: Satoshi, fees: LiquidityAds.Fees, capacity: Satoshi, localContribution: Satoshi, remoteContribution: Satoshi, localBalance: MilliSatoshi, remoteBalance: MilliSatoshi, outgoingHtlcCount: Long, incomingHtlcCount: Long) {
82+
val previousCapacity: Satoshi = capacity - localContribution - remoteContribution
83+
val previousLocalBalance: MilliSatoshi = if (isBuyer) localBalance - localContribution + fees.total else localBalance - localContribution - fees.total
84+
val previousRemoteBalance: MilliSatoshi = if (isBuyer) remoteBalance - remoteContribution - fees.total else remoteBalance - remoteContribution + fees.total
85+
}
86+
87+
case class ChannelLiquidityPurchased(channel: ActorRef, channelId: ByteVector32, remoteNodeId: PublicKey, purchase: LiquidityPurchase) extends ChannelEvent
88+
8289
case class ChannelErrorOccurred(channel: ActorRef, channelId: ByteVector32, remoteNodeId: PublicKey, error: ChannelError, isFatal: Boolean) extends ChannelEvent
8390

8491
// NB: the fee should be set to 0 when we're not paying it.

eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/Channel.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -982,7 +982,7 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder with
982982
sessionId,
983983
nodeParams, fundingParams,
984984
channelParams = d.commitments.params,
985-
purpose = InteractiveTxBuilder.SpliceTx(parentCommitment),
985+
purpose = InteractiveTxBuilder.SpliceTx(parentCommitment, d.commitments.changes),
986986
localPushAmount = spliceAck.pushAmount, remotePushAmount = msg.pushAmount,
987987
liquidityPurchase_opt = willFund_opt.map(_.purchase),
988988
wallet
@@ -1029,7 +1029,7 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder with
10291029
sessionId,
10301030
nodeParams, fundingParams,
10311031
channelParams = d.commitments.params,
1032-
purpose = InteractiveTxBuilder.SpliceTx(parentCommitment),
1032+
purpose = InteractiveTxBuilder.SpliceTx(parentCommitment, d.commitments.changes),
10331033
localPushAmount = cmd.pushAmount, remotePushAmount = msg.pushAmount,
10341034
liquidityPurchase_opt = liquidityPurchase_opt,
10351035
wallet

eclair-core/src/main/scala/fr/acinq/eclair/channel/fund/InteractiveTxBuilder.scala

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package fr.acinq.eclair.channel.fund
1818

19+
import akka.actor.typed.eventstream.EventStream
20+
import akka.actor.typed.scaladsl.adapter.TypedActorRefOps
1921
import akka.actor.typed.scaladsl.{ActorContext, Behaviors, StashBuffer}
2022
import akka.actor.typed.{ActorRef, Behavior}
2123
import akka.event.LoggingAdapter
@@ -163,6 +165,8 @@ object InteractiveTxBuilder {
163165
def previousFundingAmount: Satoshi
164166
def localCommitIndex: Long
165167
def remoteCommitIndex: Long
168+
def localNextHtlcId: Long
169+
def remoteNextHtlcId: Long
166170
def remotePerCommitmentPoint: PublicKey
167171
def commitTxFeerate: FeeratePerKw
168172
def fundingTxIndex: Long
@@ -175,15 +179,19 @@ object InteractiveTxBuilder {
175179
override val previousFundingAmount: Satoshi = 0 sat
176180
override val localCommitIndex: Long = 0
177181
override val remoteCommitIndex: Long = 0
182+
override val localNextHtlcId: Long = 0
183+
override val remoteNextHtlcId: Long = 0
178184
override val fundingTxIndex: Long = 0
179185
override val localHtlcs: Set[DirectedHtlc] = Set.empty
180186
}
181-
case class SpliceTx(parentCommitment: Commitment) extends Purpose {
187+
case class SpliceTx(parentCommitment: Commitment, changes: CommitmentChanges) extends Purpose {
182188
override val previousLocalBalance: MilliSatoshi = parentCommitment.localCommit.spec.toLocal
183189
override val previousRemoteBalance: MilliSatoshi = parentCommitment.remoteCommit.spec.toLocal
184190
override val previousFundingAmount: Satoshi = parentCommitment.capacity
185191
override val localCommitIndex: Long = parentCommitment.localCommit.index
186192
override val remoteCommitIndex: Long = parentCommitment.remoteCommit.index
193+
override val localNextHtlcId: Long = changes.localNextHtlcId
194+
override val remoteNextHtlcId: Long = changes.remoteNextHtlcId
187195
override val remotePerCommitmentPoint: PublicKey = parentCommitment.remoteCommit.remotePerCommitmentPoint
188196
override val commitTxFeerate: FeeratePerKw = parentCommitment.localCommit.spec.commitTxFeerate
189197
override val fundingTxIndex: Long = parentCommitment.fundingTxIndex + 1
@@ -199,6 +207,8 @@ object InteractiveTxBuilder {
199207
override val previousFundingAmount: Satoshi = (previousLocalBalance + previousRemoteBalance).truncateToSatoshi
200208
override val localCommitIndex: Long = replacedCommitment.localCommit.index
201209
override val remoteCommitIndex: Long = replacedCommitment.remoteCommit.index
210+
override val localNextHtlcId: Long = 0
211+
override val remoteNextHtlcId: Long = 0
202212
override val remotePerCommitmentPoint: PublicKey = replacedCommitment.remoteCommit.remotePerCommitmentPoint
203213
override val commitTxFeerate: FeeratePerKw = replacedCommitment.localCommit.spec.commitTxFeerate
204214
override val fundingTxIndex: Long = replacedCommitment.fundingTxIndex
@@ -792,6 +802,29 @@ private class InteractiveTxBuilder(replyTo: ActorRef[InteractiveTxBuilder.Respon
792802
Behaviors.receiveMessagePartial {
793803
case SignTransactionResult(signedTx) =>
794804
log.info(s"interactive-tx txid=${signedTx.txId} partially signed with {} local inputs, {} remote inputs, {} local outputs and {} remote outputs", signedTx.tx.localInputs.length, signedTx.tx.remoteInputs.length, signedTx.tx.localOutputs.length, signedTx.tx.remoteOutputs.length)
805+
// At this point, we're not completely sure that the transaction will succeed: if our peer doesn't send their
806+
// commit_sig, the transaction will be aborted. But it's a best effort, because after sending our commit_sig,
807+
// we won't store details about the liquidity purchase so we'll be unable to emit that event later. Even after
808+
// fully signing the transaction, it may be double-spent by a force-close, which would invalidate it as well.
809+
// The right solution is to check confirmations on the funding transaction before considering that a liquidity
810+
// purchase is completed, which is what we do in our AuditDb.
811+
liquidityPurchase_opt.foreach { p =>
812+
val purchase = LiquidityPurchase(
813+
fundingTxId = signedTx.txId,
814+
fundingTxIndex = purpose.fundingTxIndex,
815+
isBuyer = fundingParams.isInitiator,
816+
amount = p.amount,
817+
fees = p.fees,
818+
capacity = fundingParams.fundingAmount,
819+
localContribution = fundingParams.localContribution,
820+
remoteContribution = fundingParams.remoteContribution,
821+
localBalance = localCommit.spec.toLocal,
822+
remoteBalance = localCommit.spec.toRemote,
823+
outgoingHtlcCount = purpose.localNextHtlcId,
824+
incomingHtlcCount = purpose.remoteNextHtlcId,
825+
)
826+
context.system.eventStream ! EventStream.Publish(ChannelLiquidityPurchased(replyTo.toClassic, channelParams.channelId, remoteNodeId, purchase))
827+
}
795828
replyTo ! Succeeded(InteractiveTxSigningSession.WaitingForSigs(fundingParams, purpose.fundingTxIndex, signedTx, Left(localCommit), remoteCommit), commitSig)
796829
Behaviors.stopped
797830
case WalletFailure(t) =>

eclair-core/src/main/scala/fr/acinq/eclair/db/Databases.scala

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ trait Databases {
4444
def peers: PeersDb
4545
def payments: PaymentsDb
4646
def pendingCommands: PendingCommandsDb
47+
def liquidity: LiquidityDb
4748
//@formatter:on
4849
}
4950

@@ -60,6 +61,7 @@ object Databases extends Logging {
6061
}
6162

6263
case class SqliteDatabases private(network: SqliteNetworkDb,
64+
liquidity: SqliteLiquidityDb,
6365
audit: SqliteAuditDb,
6466
channels: SqliteChannelsDb,
6567
peers: SqlitePeersDb,
@@ -78,6 +80,7 @@ object Databases extends Logging {
7880
jdbcUrlFile_opt.foreach(checkIfDatabaseUrlIsUnchanged("sqlite", _))
7981
SqliteDatabases(
8082
network = new SqliteNetworkDb(networkJdbc),
83+
liquidity = new SqliteLiquidityDb(eclairJdbc),
8184
audit = new SqliteAuditDb(auditJdbc),
8285
channels = new SqliteChannelsDb(eclairJdbc),
8386
peers = new SqlitePeersDb(eclairJdbc),
@@ -89,6 +92,7 @@ object Databases extends Logging {
8992
}
9093

9194
case class PostgresDatabases private(network: PgNetworkDb,
95+
liquidity: PgLiquidityDb,
9296
audit: PgAuditDb,
9397
channels: PgChannelsDb,
9498
peers: PgPeersDb,
@@ -106,8 +110,7 @@ object Databases extends Logging {
106110
auditRelayedMaxAge: FiniteDuration,
107111
localChannelsMinCount: Int,
108112
networkNodesMinCount: Int,
109-
networkChannelsMinCount: Int
110-
)
113+
networkChannelsMinCount: Int)
111114

112115
def apply(hikariConfig: HikariConfig,
113116
instanceId: UUID,
@@ -149,6 +152,7 @@ object Databases extends Logging {
149152

150153
val databases = PostgresDatabases(
151154
network = new PgNetworkDb,
155+
liquidity = new PgLiquidityDb,
152156
audit = new PgAuditDb,
153157
channels = new PgChannelsDb,
154158
peers = new PgPeersDb,
@@ -160,7 +164,7 @@ object Databases extends Logging {
160164
readOnlyUser_opt.foreach { readOnlyUser =>
161165
PgUtils.inTransaction { connection =>
162166
using(connection.createStatement()) { statement =>
163-
val schemas = "public" :: "audit" :: "local" :: "network" :: "payments" :: Nil
167+
val schemas = "public" :: "audit" :: "local" :: "network" :: "payments" :: "liquidity" :: Nil
164168
schemas.foreach { schema =>
165169
logger.info(s"granting read-only access to user=$readOnlyUser schema=$schema")
166170
statement.executeUpdate(s"GRANT USAGE ON SCHEMA $schema TO $readOnlyUser")

eclair-core/src/main/scala/fr/acinq/eclair/db/DbEventHandler.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,13 +38,15 @@ class DbEventHandler(nodeParams: NodeParams) extends Actor with DiagnosticActorL
3838

3939
private val auditDb: AuditDb = nodeParams.db.audit
4040
private val channelsDb: ChannelsDb = nodeParams.db.channels
41+
private val liquidityDb: LiquidityDb = nodeParams.db.liquidity
4142

4243
context.spawn(Behaviors.supervise(RevokedHtlcInfoCleaner(channelsDb, nodeParams.revokedHtlcInfoCleanerConfig)).onFailure(SupervisorStrategy.restart), name = "revoked-htlc-info-cleaner")
4344

4445
context.system.eventStream.subscribe(self, classOf[PaymentSent])
4546
context.system.eventStream.subscribe(self, classOf[PaymentFailed])
4647
context.system.eventStream.subscribe(self, classOf[PaymentReceived])
4748
context.system.eventStream.subscribe(self, classOf[PaymentRelayed])
49+
context.system.eventStream.subscribe(self, classOf[ChannelLiquidityPurchased])
4850
context.system.eventStream.subscribe(self, classOf[TransactionPublished])
4951
context.system.eventStream.subscribe(self, classOf[TransactionConfirmed])
5052
context.system.eventStream.subscribe(self, classOf[ChannelErrorOccurred])
@@ -92,11 +94,15 @@ class DbEventHandler(nodeParams: NodeParams) extends Actor with DiagnosticActorL
9294
}
9395
auditDb.add(e)
9496

97+
case e: ChannelLiquidityPurchased => liquidityDb.addPurchase(e)
98+
9599
case e: TransactionPublished =>
96100
log.info(s"paying mining fee=${e.miningFee} for txid=${e.tx.txid} desc=${e.desc}")
97101
auditDb.add(e)
98102

99-
case e: TransactionConfirmed => auditDb.add(e)
103+
case e: TransactionConfirmed =>
104+
liquidityDb.setConfirmed(e.remoteNodeId, e.tx.txid)
105+
auditDb.add(e)
100106

101107
case e: ChannelErrorOccurred =>
102108
// first pattern matching level is to ignore some errors, second level is to separate between different kind of errors

eclair-core/src/main/scala/fr/acinq/eclair/db/DualDatabases.scala

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,16 +30,12 @@ import scala.util.{Failure, Success, Try}
3030
case class DualDatabases(primary: Databases, secondary: Databases) extends Databases with FileBackup {
3131

3232
override val network: NetworkDb = DualNetworkDb(primary.network, secondary.network)
33-
3433
override val audit: AuditDb = DualAuditDb(primary.audit, secondary.audit)
35-
3634
override val channels: ChannelsDb = DualChannelsDb(primary.channels, secondary.channels)
37-
3835
override val peers: PeersDb = DualPeersDb(primary.peers, secondary.peers)
39-
4036
override val payments: PaymentsDb = DualPaymentsDb(primary.payments, secondary.payments)
41-
4237
override val pendingCommands: PendingCommandsDb = DualPendingCommandsDb(primary.pendingCommands, secondary.pendingCommands)
38+
override val liquidity: LiquidityDb = DualLiquidityDb(primary.liquidity, secondary.liquidity)
4339

4440
/** if one of the database supports file backup, we use it */
4541
override def backup(backupFile: File): Unit = (primary, secondary) match {
@@ -411,3 +407,24 @@ case class DualPendingCommandsDb(primary: PendingCommandsDb, secondary: PendingC
411407
primary.listSettlementCommands()
412408
}
413409
}
410+
411+
case class DualLiquidityDb(primary: LiquidityDb, secondary: LiquidityDb) extends LiquidityDb {
412+
413+
private implicit val ec: ExecutionContext = ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("db-liquidity").build()))
414+
415+
override def addPurchase(liquidityPurchase: ChannelLiquidityPurchased): Unit = {
416+
runAsync(secondary.addPurchase(liquidityPurchase))
417+
primary.addPurchase(liquidityPurchase)
418+
}
419+
420+
override def setConfirmed(remoteNodeId: PublicKey, txId: TxId): Unit = {
421+
runAsync(secondary.setConfirmed(remoteNodeId, txId))
422+
primary.setConfirmed(remoteNodeId, txId)
423+
}
424+
425+
override def listPurchases(remoteNodeId: PublicKey): Seq[LiquidityPurchase] = {
426+
runAsync(secondary.listPurchases(remoteNodeId))
427+
primary.listPurchases(remoteNodeId)
428+
}
429+
430+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Copyright 2024 ACINQ SAS
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package fr.acinq.eclair.db
18+
19+
import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
20+
import fr.acinq.bitcoin.scalacompat.TxId
21+
import fr.acinq.eclair.channel.{ChannelLiquidityPurchased, LiquidityPurchase}
22+
23+
/**
24+
* Created by t-bast on 13/09/2024.
25+
*/
26+
27+
trait LiquidityDb {
28+
29+
def addPurchase(liquidityPurchase: ChannelLiquidityPurchased): Unit
30+
31+
def setConfirmed(remoteNodeId: PublicKey, txId: TxId): Unit
32+
33+
def listPurchases(remoteNodeId: PublicKey): Seq[LiquidityPurchase]
34+
35+
}

0 commit comments

Comments
 (0)