package sweepbatcher import ( "bytes" "context" "encoding/hex" "fmt" "math" "sync" "time" "github.com/btcsuite/btcd/btcec/v2" "github.com/btcsuite/btcd/btcec/v2/schnorr/musig2" "github.com/btcsuite/btcd/btcutil" "github.com/btcsuite/btcd/btcutil/psbt" "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/txscript" "github.com/btcsuite/btcd/wire" "github.com/btcsuite/btclog" "github.com/lightninglabs/lndclient" "github.com/lightninglabs/loop/labels" "github.com/lightninglabs/loop/loopdb" "github.com/lightninglabs/loop/swap" "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/input" "github.com/lightningnetwork/lnd/keychain" "github.com/lightningnetwork/lnd/lnrpc/walletrpc" "github.com/lightningnetwork/lnd/lntypes" "github.com/lightningnetwork/lnd/lnwallet/chainfee" ) const ( // defaultFeeRateStep is the default value by which the batch tx's // fee rate is increased when an rbf is attempted. defaultFeeRateStep = chainfee.SatPerKWeight(100) // defaultBatchConfTarget is the default confirmation target of the // batch transaction. defaultBatchConfTarget = 12 // batchConfHeight is the default confirmation height of the batch // transaction. batchConfHeight = 3 // maxFeeToSwapAmtRatio is the maximum fee to swap amount ratio that // we allow for a batch transaction. maxFeeToSwapAmtRatio = 0.2 ) var ( ErrBatchShuttingDown = fmt.Errorf("batch shutting down") ) // sweep stores any data related to sweeping a specific outpoint. type sweep struct { // swapHash is the hash of the swap that the sweep belongs to. swapHash lntypes.Hash // outpoint is the outpoint being swept. outpoint wire.OutPoint // value is the value of the outpoint being swept. value btcutil.Amount // confTarget is the confirmation target of the sweep. confTarget int32 // timeout is the timeout of the swap that the sweep belongs to. timeout int32 // initiationHeight is the height at which the swap was initiated. initiationHeight int32 // htlc is the HTLC that is being swept. htlc swap.Htlc // preimage is the preimage of the HTLC that is being swept. preimage lntypes.Preimage // swapInvoicePaymentAddr is the payment address of the swap invoice. swapInvoicePaymentAddr [32]byte // htlcKeys is the set of keys used to sign the HTLC. htlcKeys loopdb.HtlcKeys // htlcSuccessEstimator is a function that estimates the weight of the // HTLC success script. htlcSuccessEstimator func(*input.TxWeightEstimator) error // protocolVersion is the protocol version of the swap that the sweep // belongs to. protocolVersion loopdb.ProtocolVersion // isExternalAddr is true if the sweep spends to a non-wallet address. isExternalAddr bool // destAddr is the destination address of the sweep. destAddr btcutil.Address // notifier is a collection of channels used to communicate the status // of the sweep back to the swap that requested it. notifier *SpendNotifier } // batchState is the state of the batch. type batchState uint8 const ( // Open is the state in which the batch is able to accept new sweeps. Open batchState = 0 // Closed is the state in which the batch is no longer able to accept // new sweeps. Closed batchState = 1 // Confirmed is the state in which the batch transaction has reached the // configured conf height. Confirmed batchState = 2 ) // batchConfig is the configuration for a batch. type batchConfig struct { // maxTimeoutDistance is the maximum timeout distance that 2 distinct // sweeps can have in the same batch. maxTimeoutDistance int32 // batchConfTarget is the confirmation target of the batch transaction. batchConfTarget int32 // batchPublishDelay is the delay between receiving a new block and // publishing the batch transaction. batchPublishDelay time.Duration } // rbfCache stores data related to our last fee bump. type rbfCache struct { // LastHeight is the last height at which we increased our feerate. LastHeight int32 // FeeRate is the last used fee rate we used to publish a batch tx. FeeRate chainfee.SatPerKWeight } // batch is a collection of sweeps that are published together. type batch struct { // id is the primary identifier of this batch. id int32 // state is the current state of the batch. state batchState // primarySweepID is the swap hash of the primary sweep in the batch. primarySweepID lntypes.Hash // sweeps store the sweeps that this batch currently contains. sweeps map[lntypes.Hash]sweep // currentHeight is the current block height. currentHeight int32 // blockEpochChan is the channel over which block epoch notifications // are received. blockEpochChan chan int32 // spendChan is the channel over which spend notifications are received. spendChan chan *chainntnfs.SpendDetail // confChan is the channel over which confirmation notifications are // received. confChan chan *chainntnfs.TxConfirmation // reorgChan is the channel over which reorg notifications are received. reorgChan chan struct{} // errChan is the channel over which errors are received. errChan chan error // batchTx is the transaction that is currently being monitored for // confirmations. batchTxid *chainhash.Hash // batchPkScript is the pkScript of the batch transaction's output. batchPkScript []byte // batchAddress is the address of the batch transaction's output. batchAddress btcutil.Address // rbfCache stores data related to the RBF fee bumping mechanism. rbfCache rbfCache // callEnter is used to sequentialize calls to the batch handler's // main event loop. callEnter chan struct{} // callLeave is used to resume the execution flow of the batch handler's // main event loop. callLeave chan struct{} // quit signals that the batch must stop. quit chan struct{} // wallet is the wallet client used to create and publish the batch // transaction. wallet lndclient.WalletKitClient // chainNotifier is the chain notifier client used to monitor the // blockchain for spends and confirmations. chainNotifier lndclient.ChainNotifierClient // signerClient is the signer client used to sign the batch transaction. signerClient lndclient.SignerClient // muSig2Kit includes all the required functionality to collect // and verify signatures by the swap server in order to cooperatively // sweep funds. muSig2SignSweep MuSig2SignSweep // verifySchnorrSig is a function that verifies a schnorr signature. verifySchnorrSig VerifySchnorrSig // purger is a function that can take a sweep which is being purged and // hand it over to the batcher for further processing. purger Purger // store includes all the database interactions that are needed by the // batch. store BatcherStore // cfg is the configuration for this batch. cfg *batchConfig // log is the logger for this batch. log btclog.Logger wg sync.WaitGroup } // Purger is a function that takes a sweep request and feeds it back to the // batcher main entry point. The name is inspired by its purpose, which is to // purge the batch from sweeps that didn't make it to the confirmed tx. type Purger func(sweepReq *SweepRequest) error // batchKit is a kit of dependencies that are used to initialize a batch. This // struct is only used as a wrapper for the arguments that are required to // create a new batch. type batchKit struct { id int32 batchTxid *chainhash.Hash batchPkScript []byte state batchState primaryID lntypes.Hash sweeps map[lntypes.Hash]sweep rbfCache rbfCache returnChan chan SweepRequest wallet lndclient.WalletKitClient chainNotifier lndclient.ChainNotifierClient signerClient lndclient.SignerClient musig2SignSweep MuSig2SignSweep verifySchnorrSig VerifySchnorrSig purger Purger store BatcherStore log btclog.Logger } // scheduleNextCall schedules the next call to the batch handler's main event // loop. It returns a function that must be called when the call is finished. func (b *batch) scheduleNextCall() (func(), error) { select { case b.callEnter <- struct{}{}: case <-b.quit: return func() {}, ErrBatchShuttingDown } return func() { b.callLeave <- struct{}{} }, nil } // NewBatch creates a new batch. func NewBatch(cfg batchConfig, bk batchKit) *batch { return &batch{ // We set the ID to a negative value to flag that this batch has // never been persisted, so it needs to be assigned a new ID. id: -1, state: Open, sweeps: make(map[lntypes.Hash]sweep), blockEpochChan: make(chan int32), spendChan: make(chan *chainntnfs.SpendDetail), confChan: make(chan *chainntnfs.TxConfirmation, 1), reorgChan: make(chan struct{}, 1), errChan: make(chan error, 1), callEnter: make(chan struct{}), callLeave: make(chan struct{}), quit: make(chan struct{}), batchTxid: bk.batchTxid, wallet: bk.wallet, chainNotifier: bk.chainNotifier, signerClient: bk.signerClient, muSig2SignSweep: bk.musig2SignSweep, verifySchnorrSig: bk.verifySchnorrSig, purger: bk.purger, store: bk.store, cfg: &cfg, } } // NewBatchFromDB creates a new batch that already existed in storage. func NewBatchFromDB(cfg batchConfig, bk batchKit) *batch { return &batch{ id: bk.id, state: bk.state, primarySweepID: bk.primaryID, sweeps: bk.sweeps, blockEpochChan: make(chan int32), spendChan: make(chan *chainntnfs.SpendDetail), confChan: make(chan *chainntnfs.TxConfirmation, 1), reorgChan: make(chan struct{}, 1), errChan: make(chan error, 1), callEnter: make(chan struct{}), callLeave: make(chan struct{}), quit: make(chan struct{}), batchTxid: bk.batchTxid, batchPkScript: bk.batchPkScript, rbfCache: bk.rbfCache, wallet: bk.wallet, chainNotifier: bk.chainNotifier, signerClient: bk.signerClient, muSig2SignSweep: bk.musig2SignSweep, verifySchnorrSig: bk.verifySchnorrSig, purger: bk.purger, store: bk.store, log: bk.log, cfg: &cfg, } } // addSweep tries to add a sweep to the batch. If this is the first sweep being // added to the batch then it also sets the primary sweep ID. func (b *batch) addSweep(ctx context.Context, sweep *sweep) (bool, error) { done, err := b.scheduleNextCall() defer done() if err != nil { return false, err } // If the provided sweep is nil, we can't proceed with any checks, so // we just return early. if sweep == nil { return false, nil } // Before we run through the acceptance checks, let's just see if this // sweep is already in our batch. In that case, just update the sweep. _, ok := b.sweeps[sweep.swapHash] if ok { // If the sweep was resumed from storage, and the swap requested // to sweep again, a new sweep notifier will be created by the // swap. By re-assigning to the batch's sweep we make sure that // everything, including the notifier, is up to date. b.sweeps[sweep.swapHash] = *sweep // If this is the primary sweep, we also need to update the // batch's confirmation target. if b.primarySweepID == sweep.swapHash { b.cfg.batchConfTarget = sweep.confTarget } return true, nil } // Since all the actions of the batch happen sequentially, we could // arrive here after the batch got closed because of a spend. In this // case we cannot add the sweep to this batch. if b.state != Open { return false, nil } // If this batch contains a single sweep that spends to a non-wallet // address, or the incoming sweep is spending to non-wallet address, // we cannot add this sweep to the batch. for _, s := range b.sweeps { if s.isExternalAddr || sweep.isExternalAddr { return false, nil } } // Check the timeout of the incoming sweep against the timeout of all // already contained sweeps. If that difference exceeds the configured // maximum we cannot add this sweep. for _, s := range b.sweeps { timeoutDistance := int32(math.Abs(float64(sweep.timeout - s.timeout))) if timeoutDistance > b.cfg.maxTimeoutDistance { return false, nil } } // Past this point we know that a new incoming sweep passes the // acceptance criteria and is now ready to be added to this batch. // If this is the first sweep being added to the batch, make it the // primary sweep. if b.primarySweepID == lntypes.ZeroHash { b.primarySweepID = sweep.swapHash b.cfg.batchConfTarget = sweep.confTarget // We also need to start the spend monitor for this new primary // sweep. err := b.monitorSpend(ctx, *sweep) if err != nil { return false, err } } // Add the sweep to the batch's sweeps. b.log.Infof("adding sweep %x", sweep.swapHash[:6]) b.sweeps[sweep.swapHash] = *sweep return true, b.persistSweep(ctx, *sweep, false) } // sweepExists returns true if the batch contains the sweep with the given hash. func (b *batch) sweepExists(hash lntypes.Hash) bool { done, err := b.scheduleNextCall() defer done() if err != nil { return false } _, ok := b.sweeps[hash] return ok } // Wait waits for the batch to gracefully stop. func (b *batch) Wait() { b.log.Infof("Stopping") b.wg.Wait() } // Run is the batch's main event loop. func (b *batch) Run(ctx context.Context) error { runCtx, cancel := context.WithCancel(ctx) defer func() { cancel() close(b.quit) b.wg.Wait() }() if b.muSig2SignSweep == nil { return fmt.Errorf("no musig2 signer available") } blockChan, blockErrChan, err := b.chainNotifier.RegisterBlockEpochNtfn(runCtx) if err != nil { return err } // If a primary sweep exists we immediately start monitoring for its // spend. if b.primarySweepID != lntypes.ZeroHash { sweep := b.sweeps[b.primarySweepID] err := b.monitorSpend(runCtx, sweep) if err != nil { return err } } // We use a timer in order to not publish new transactions at the same // time as the block epoch notification. This is done to prevent // unnecessary transaction publishments when a spend is detected on that // block. var timerChan <-chan time.Time b.log.Infof("started, primary %x, total sweeps %v", b.primarySweepID[0:6], len(b.sweeps)) for { select { case <-b.callEnter: <-b.callLeave case height := <-blockChan: b.log.Debugf("received block %v", height) // Set the timer to publish the batch transaction after // the configured delay. timerChan = time.After(b.cfg.batchPublishDelay) b.currentHeight = height case <-timerChan: if b.state == Open { err := b.publish(ctx) if err != nil { return err } } case spend := <-b.spendChan: err := b.handleSpend(runCtx, spend.SpendingTx) if err != nil { return err } case <-b.confChan: return b.handleConf(runCtx) case <-b.reorgChan: b.state = Open b.log.Warnf("reorg detected, batch is able to accept " + "new sweeps") err := b.monitorSpend(ctx, b.sweeps[b.primarySweepID]) if err != nil { return err } case err := <-blockErrChan: return err case err := <-b.errChan: return err case <-runCtx.Done(): return runCtx.Err() } } } // publish creates and publishes the latest batch transaction to the network. func (b *batch) publish(ctx context.Context) error { var ( err error fee btcutil.Amount coopSuccess bool ) // Run the RBF rate update. err = b.updateRbfRate(ctx) if err != nil { return err } fee, err, coopSuccess = b.publishBatchCoop(ctx) if err != nil { b.log.Warnf("co-op publish error: %v", err) } if !coopSuccess { fee, err = b.publishBatch(ctx) } if err != nil { b.log.Warnf("publish error: %v", err) return nil } b.log.Infof("published, total sweeps: %v, fees: %v", len(b.sweeps), fee) for _, sweep := range b.sweeps { b.log.Infof("published sweep %x, value: %v", sweep.swapHash[:6], sweep.value) } return b.persist(ctx) } // publishBatch creates and publishes the batch transaction. It will consult the // RBFCache to determine the fee rate to use. func (b *batch) publishBatch(ctx context.Context) (btcutil.Amount, error) { // Create the batch transaction. batchTx := wire.NewMsgTx(2) batchTx.LockTime = uint32(b.currentHeight) var ( batchAmt btcutil.Amount prevOuts = make([]*wire.TxOut, 0, len(b.sweeps)) signDescs = make([]*lndclient.SignDescriptor, 0, len(b.sweeps)) sweeps = make([]sweep, 0, len(b.sweeps)) fee btcutil.Amount inputCounter int addrOverride bool ) var weightEstimate input.TxWeightEstimator // Add all the sweeps to the batch transaction. for _, sweep := range b.sweeps { if sweep.isExternalAddr { addrOverride = true } batchAmt += sweep.value batchTx.AddTxIn(&wire.TxIn{ PreviousOutPoint: sweep.outpoint, Sequence: sweep.htlc.SuccessSequence(), }) err := sweep.htlcSuccessEstimator(&weightEstimate) if err != nil { return 0, err } // Append this sweep to an array of sweeps. This is needed to // keep the order of sweeps stored, as iterating the sweeps map // does not guarantee same order. sweeps = append(sweeps, sweep) // Create and store the previous outpoint for this sweep. prevOuts = append(prevOuts, &wire.TxOut{ Value: int64(sweep.value), PkScript: sweep.htlc.PkScript, }) key, err := btcec.ParsePubKey( sweep.htlcKeys.ReceiverScriptKey[:], ) if err != nil { return fee, err } // Create and store the sign descriptor for this sweep. signDesc := lndclient.SignDescriptor{ WitnessScript: sweep.htlc.SuccessScript(), Output: prevOuts[len(prevOuts)-1], HashType: sweep.htlc.SigHash(), InputIndex: inputCounter, KeyDesc: keychain.KeyDescriptor{ PubKey: key, }, } inputCounter++ if sweep.htlc.Version == swap.HtlcV3 { signDesc.SignMethod = input.TaprootScriptSpendSignMethod } signDescs = append(signDescs, &signDesc) } var address btcutil.Address if addrOverride { // Sanity check, there should be exactly 1 sweep in this batch. if len(sweeps) != 1 { return 0, fmt.Errorf("external address sweep batched " + "with other sweeps") } address = sweeps[0].destAddr } else { var err error address, err = b.getBatchDestAddr(ctx) if err != nil { return fee, err } } batchPkScript, err := txscript.PayToAddrScript(address) if err != nil { return fee, err } weightEstimate.AddP2TROutput() totalWeight := int64(weightEstimate.Weight()) fee = b.rbfCache.FeeRate.FeeForWeight(totalWeight) // Clamp the calculated fee to the max allowed fee amount for the batch. fee = clampBatchFee(fee, batchAmt) // Add the batch transaction output, which excludes the fees paid to // miners. batchTx.AddTxOut(&wire.TxOut{ PkScript: batchPkScript, Value: int64(batchAmt - fee), }) // Collect the signatures for our inputs. rawSigs, err := b.signerClient.SignOutputRaw( ctx, batchTx, signDescs, prevOuts, ) if err != nil { return fee, err } for i, sweep := range sweeps { // Generate the success witness for the sweep. witness, err := sweep.htlc.GenSuccessWitness( rawSigs[i], sweep.preimage, ) if err != nil { return fee, err } // Add the success witness to our batch transaction's inputs. batchTx.TxIn[i].Witness = witness } b.log.Infof("attempting to publish non-coop tx=%v with feerate=%v, "+ "totalfee=%v, sweeps=%d, destAddr=%s", batchTx.TxHash(), b.rbfCache.FeeRate, fee, len(batchTx.TxIn), address) b.debugLogTx("serialized non-coop sweep", batchTx) err = b.wallet.PublishTransaction( ctx, batchTx, labels.LoopOutBatchSweepSuccess(b.id), ) if err != nil { return fee, err } // Store the batch transaction's txid and pkScript, for monitoring // purposes. txHash := batchTx.TxHash() b.batchTxid = &txHash b.batchPkScript = batchPkScript return fee, nil } // publishBatchCoop attempts to construct and publish a batch transaction that // collects all the required signatures interactively from the server. This // helps with collecting the funds immediately without revealing any information // related to the HTLC script. func (b *batch) publishBatchCoop(ctx context.Context) (btcutil.Amount, error, bool) { var ( batchAmt = btcutil.Amount(0) sweeps = make([]sweep, 0, len(b.sweeps)) fee = btcutil.Amount(0) weightEstimate input.TxWeightEstimator addrOverride bool ) // Sanity check, there should be at least 1 sweep in this batch. if len(b.sweeps) == 0 { return 0, fmt.Errorf("no sweeps in batch"), false } // Create the batch transaction. batchTx := &wire.MsgTx{ Version: 2, LockTime: uint32(b.currentHeight), } for _, sweep := range b.sweeps { // Append this sweep to an array of sweeps. This is needed to // keep the order of sweeps stored, as iterating the sweeps map // does not guarantee same order. sweeps = append(sweeps, sweep) } // Add all the sweeps to the batch transaction. for _, sweep := range sweeps { if sweep.isExternalAddr { addrOverride = true } // Keep track of the total amount this batch is sweeping back. batchAmt += sweep.value // Add this sweep's input to the transaction. batchTx.AddTxIn(&wire.TxIn{ PreviousOutPoint: sweep.outpoint, }) weightEstimate.AddTaprootKeySpendInput(txscript.SigHashAll) } var address btcutil.Address if addrOverride { // Sanity check, there should be exactly 1 sweep in this batch. if len(sweeps) != 1 { return 0, fmt.Errorf("external address sweep batched " + "with other sweeps"), false } address = sweeps[0].destAddr } else { var err error address, err = b.getBatchDestAddr(ctx) if err != nil { return fee, err, false } } batchPkScript, err := txscript.PayToAddrScript(address) if err != nil { return fee, err, false } weightEstimate.AddP2TROutput() totalWeight := int64(weightEstimate.Weight()) fee = b.rbfCache.FeeRate.FeeForWeight(totalWeight) // Clamp the calculated fee to the max allowed fee amount for the batch. fee = clampBatchFee(fee, batchAmt) // Add the batch transaction output, which excludes the fees paid to // miners. batchTx.AddTxOut(&wire.TxOut{ PkScript: batchPkScript, Value: int64(batchAmt - fee), }) packet, err := psbt.NewFromUnsignedTx(batchTx) if err != nil { return fee, err, false } if len(packet.Inputs) != len(sweeps) { return fee, fmt.Errorf("invalid number of packet inputs"), false } prevOuts := make(map[wire.OutPoint]*wire.TxOut) for i, sweep := range sweeps { txOut := &wire.TxOut{ Value: int64(sweep.value), PkScript: sweep.htlc.PkScript, } prevOuts[sweep.outpoint] = txOut packet.Inputs[i].WitnessUtxo = txOut } var psbtBuf bytes.Buffer err = packet.Serialize(&psbtBuf) if err != nil { return fee, err, false } prevOutputFetcher := txscript.NewMultiPrevOutFetcher(prevOuts) // Attempt to cooperatively sign the batch tx with the server. err = b.coopSignBatchTx( ctx, packet, sweeps, prevOutputFetcher, prevOuts, psbtBuf, ) if err != nil { return fee, err, false } b.log.Infof("attempting to publish coop tx=%v with feerate=%v, "+ "totalfee=%v, sweeps=%d, destAddr=%s", batchTx.TxHash(), b.rbfCache.FeeRate, fee, len(batchTx.TxIn), address) b.debugLogTx("serialized coop sweep", batchTx) err = b.wallet.PublishTransaction( ctx, batchTx, labels.LoopOutBatchSweepSuccess(b.id), ) if err != nil { return fee, err, true } // Store the batch transaction's txid and pkScript, for monitoring // purposes. txHash := batchTx.TxHash() b.batchTxid = &txHash b.batchPkScript = batchPkScript return fee, nil, true } func (b *batch) debugLogTx(msg string, tx *wire.MsgTx) { // Serialize the transaction and convert to hex string. buf := bytes.NewBuffer(make([]byte, 0, tx.SerializeSize())) if err := tx.Serialize(buf); err != nil { b.log.Errorf("failed to serialize tx for debug log: %v", err) return } b.log.Debugf("%s: %s", msg, hex.EncodeToString(buf.Bytes())) } // coopSignBatchTx collects the necessary signatures from the server in order // to cooperatively sweep the funds. func (b *batch) coopSignBatchTx(ctx context.Context, packet *psbt.Packet, sweeps []sweep, prevOutputFetcher *txscript.MultiPrevOutFetcher, prevOuts map[wire.OutPoint]*wire.TxOut, psbtBuf bytes.Buffer) error { for i, sweep := range sweeps { sweep := sweep sigHashes := txscript.NewTxSigHashes( packet.UnsignedTx, prevOutputFetcher, ) sigHash, err := txscript.CalcTaprootSignatureHash( sigHashes, txscript.SigHashDefault, packet.UnsignedTx, i, prevOutputFetcher, ) if err != nil { return err } var ( signers [][]byte muSig2Version input.MuSig2Version ) // Depending on the MuSig2 version we either pass 32 byte // Schnorr public keys or normal 33 byte public keys. if sweep.protocolVersion >= loopdb.ProtocolVersionMuSig2 { muSig2Version = input.MuSig2Version100RC2 signers = [][]byte{ sweep.htlcKeys.SenderInternalPubKey[:], sweep.htlcKeys.ReceiverInternalPubKey[:], } } else { muSig2Version = input.MuSig2Version040 signers = [][]byte{ sweep.htlcKeys.SenderInternalPubKey[1:], sweep.htlcKeys.ReceiverInternalPubKey[1:], } } htlcScript, ok := sweep.htlc.HtlcScript.(*swap.HtlcScriptV3) if !ok { return fmt.Errorf("invalid htlc script version") } // Now we're creating a local MuSig2 session using the receiver // key's key locator and the htlc's root hash. musig2SessionInfo, err := b.signerClient.MuSig2CreateSession( ctx, muSig2Version, &sweep.htlcKeys.ClientScriptKeyLocator, signers, lndclient.MuSig2TaprootTweakOpt( htlcScript.RootHash[:], false, ), ) if err != nil { return err } // With the session active, we can now send the server our // public nonce and the sig hash, so that it can create it's own // MuSig2 session and return the server side nonce and partial // signature. serverNonce, serverSig, err := b.muSig2SignSweep( ctx, sweep.protocolVersion, sweep.swapHash, sweep.swapInvoicePaymentAddr, musig2SessionInfo.PublicNonce[:], psbtBuf.Bytes(), prevOuts, ) if err != nil { return err } var serverPublicNonce [musig2.PubNonceSize]byte copy(serverPublicNonce[:], serverNonce) // Register the server's nonce before attempting to create our // partial signature. haveAllNonces, err := b.signerClient.MuSig2RegisterNonces( ctx, musig2SessionInfo.SessionID, [][musig2.PubNonceSize]byte{serverPublicNonce}, ) if err != nil { return err } // Sanity check that we have all the nonces. if !haveAllNonces { return fmt.Errorf("invalid MuSig2 session: " + "nonces missing") } var digest [32]byte copy(digest[:], sigHash) // Since our MuSig2 session has all nonces, we can now create // the local partial signature by signing the sig hash. _, err = b.signerClient.MuSig2Sign( ctx, musig2SessionInfo.SessionID, digest, false, ) if err != nil { return err } // Now combine the partial signatures to use the final combined // signature in the sweep transaction's witness. haveAllSigs, finalSig, err := b.signerClient.MuSig2CombineSig( ctx, musig2SessionInfo.SessionID, [][]byte{serverSig}, ) if err != nil { return err } if !haveAllSigs { return fmt.Errorf("failed to combine signatures") } // To be sure that we're good, parse and validate that the // combined signature is indeed valid for the sig hash and the // internal pubkey. err = b.verifySchnorrSig( htlcScript.TaprootKey, sigHash, finalSig, ) if err != nil { return err } packet.UnsignedTx.TxIn[i].Witness = wire.TxWitness{ finalSig, } } return nil } // updateRbfRate updates the fee rate we should use for the new batch // transaction. This fee rate does not guarantee RBF success, but the continuous // increase leads to an eventual successful RBF replacement. func (b *batch) updateRbfRate(ctx context.Context) error { // If the feeRate is unset then we never published before, so we // retrieve the fee estimate from our wallet. if b.rbfCache.FeeRate == 0 { b.log.Infof("initializing rbf fee rate for conf target=%v", b.cfg.batchConfTarget) rate, err := b.wallet.EstimateFeeRate( ctx, b.cfg.batchConfTarget, ) if err != nil { return err } // Set the initial value for our fee rate. b.rbfCache.FeeRate = rate } else { // Bump the fee rate by the configured step. b.rbfCache.FeeRate += defaultFeeRateStep } b.rbfCache.LastHeight = b.currentHeight return b.persist(ctx) } // monitorSpend monitors the primary sweep's outpoint for spends. The reason we // monitor the primary sweep's outpoint is because the primary sweep was the // first sweep that entered this batch, therefore it is present in all the // versions of the batch transaction. This means that even if an older version // of the batch transaction gets confirmed, due to the uncertainty of RBF // replacements and network propagation, we can always detect the transaction. func (b *batch) monitorSpend(ctx context.Context, primarySweep sweep) error { spendCtx, cancel := context.WithCancel(ctx) spendChan, spendErr, err := b.chainNotifier.RegisterSpendNtfn( spendCtx, &primarySweep.outpoint, primarySweep.htlc.PkScript, primarySweep.initiationHeight, ) if err != nil { cancel() return err } b.wg.Add(1) go func() { defer cancel() defer b.wg.Done() b.log.Infof("monitoring spend for outpoint %s", primarySweep.outpoint.String()) for { select { case spend := <-spendChan: select { case b.spendChan <- spend: case <-ctx.Done(): } return case err := <-spendErr: b.writeToErrChan(err) return case <-ctx.Done(): return } } }() return nil } // monitorConfirmations monitors the batch transaction for confirmations. func (b *batch) monitorConfirmations(ctx context.Context) error { reorgChan := make(chan struct{}) confCtx, cancel := context.WithCancel(ctx) confChan, errChan, err := b.chainNotifier.RegisterConfirmationsNtfn( confCtx, b.batchTxid, b.batchPkScript, batchConfHeight, b.currentHeight, lndclient.WithReOrgChan(reorgChan), ) if err != nil { cancel() return err } b.wg.Add(1) go func() { defer cancel() defer b.wg.Done() for { select { case conf := <-confChan: select { case b.confChan <- conf: case <-ctx.Done(): } return case err := <-errChan: b.writeToErrChan(err) return case <-reorgChan: // A re-org has been detected. We set the batch // state back to open since our batch // transaction is no longer present in any // block. We can accept more sweeps and try to // publish new transactions, at this point we // need to monitor again for a new spend. select { case b.reorgChan <- struct{}{}: case <-ctx.Done(): } return case <-ctx.Done(): return } } }() return nil } // getFeePortionForSweep calculates the fee portion that each sweep should pay // for the batch transaction. The fee is split evenly among the sweeps, If the // fee cannot be split evenly, the remainder is paid by the first sweep. func getFeePortionForSweep(spendTx *wire.MsgTx, numSweeps int, totalSweptAmt btcutil.Amount) (btcutil.Amount, btcutil.Amount) { totalFee := spendTx.TxOut[0].Value - int64(totalSweptAmt) feePortionPerSweep := (int64(totalSweptAmt) - spendTx.TxOut[0].Value) / int64(numSweeps) roundingDiff := totalFee - (int64(numSweeps) * feePortionPerSweep) return btcutil.Amount(feePortionPerSweep), btcutil.Amount(roundingDiff) } // getFeePortionPaidBySweep returns the fee portion that the sweep should pay // for the batch transaction. If the sweep is the first sweep in the batch, it // pays the rounding difference. func getFeePortionPaidBySweep(spendTx *wire.MsgTx, feePortionPerSweep, roundingDiff btcutil.Amount, sweep *sweep) btcutil.Amount { if bytes.Equal(spendTx.TxIn[0].SignatureScript, sweep.htlc.SigScript) { return feePortionPerSweep + roundingDiff } return feePortionPerSweep } // handleSpend handles a spend notification. func (b *batch) handleSpend(ctx context.Context, spendTx *wire.MsgTx) error { var ( txHash = spendTx.TxHash() purgeList = make([]SweepRequest, 0, len(b.sweeps)) notifyList = make([]sweep, 0, len(b.sweeps)) ) b.batchTxid = &txHash b.batchPkScript = spendTx.TxOut[0].PkScript // As a previous version of the batch transaction may get confirmed, // which does not contain the latest sweeps, we need to detect the // sweeps that did not make it to the confirmed transaction and feed // them back to the batcher. This will ensure that the sweeps will enter // a new batch instead of remaining dangling. var totalSweptAmt btcutil.Amount for _, sweep := range b.sweeps { found := false for _, txIn := range spendTx.TxIn { if txIn.PreviousOutPoint == sweep.outpoint { found = true totalSweptAmt += sweep.value notifyList = append(notifyList, sweep) } } // If the sweep's outpoint was not found in the transaction's // inputs this means it was left out. So we delete it from this // batch and feed it back to the batcher. if !found { newSweep := sweep delete(b.sweeps, sweep.swapHash) purgeList = append(purgeList, SweepRequest{ SwapHash: newSweep.swapHash, Outpoint: newSweep.outpoint, Value: newSweep.value, Notifier: newSweep.notifier, }) } } // Calculate the fee portion that each sweep should pay for the batch. feePortionPaidPerSweep, roundingDifference := getFeePortionForSweep( spendTx, len(notifyList), totalSweptAmt, ) for _, sweep := range notifyList { sweep := sweep // Save the sweep as completed. err := b.persistSweep(ctx, sweep, true) if err != nil { return err } // If the sweep's notifier is empty then this means that a swap // is not waiting to read an update from it, so we can skip // the notification part. if sweep.notifier == nil || *sweep.notifier == (SpendNotifier{}) { continue } spendDetail := SpendDetail{ Tx: spendTx, OnChainFeePortion: getFeePortionPaidBySweep( spendTx, feePortionPaidPerSweep, roundingDifference, &sweep, ), } // Dispatch the sweep notifier, we don't care about the outcome // of this action so we don't wait for it. go sweep.notifySweepSpend(ctx, &spendDetail) } // Proceed with purging the sweeps. This will feed the sweeps that // didn't make it to the confirmed batch transaction back to the batcher // for re-entry. This batch doesn't care for the outcome of this // operation so we don't wait for it. go func() { // Iterate over the purge list and feed the sweeps back to the // batcher. for _, sweep := range purgeList { sweep := sweep err := b.purger(&sweep) if err != nil { b.log.Errorf("unable to purge sweep %x: %v", sweep.SwapHash[:6], err) } } }() b.log.Infof("spent, total sweeps: %v, purged sweeps: %v", len(notifyList), len(purgeList)) err := b.monitorConfirmations(ctx) if err != nil { return err } // We are no longer able to accept new sweeps, so we mark the batch as // closed and persist on storage. b.state = Closed return b.persist(ctx) } // handleConf handles a confirmation notification. This is the final step of the // batch. Here we signal to the batcher that this batch was completed. func (b *batch) handleConf(ctx context.Context) error { b.log.Infof("confirmed") b.state = Confirmed return b.store.ConfirmBatch(ctx, b.id) } // isComplete returns true if the batch is completed. This method is used by the // batcher for lazy deletion of batches. func (b *batch) isComplete() bool { done, err := b.scheduleNextCall() defer done() // We override the ErrBatchShuttingDown error as that is the expected // error to be returned by the scheduler once the batch's main run loop // has exited. if err != nil && err != ErrBatchShuttingDown { return false } return b.state == Confirmed } // persist updates the batch in the database. func (b *batch) persist(ctx context.Context) error { bch := &dbBatch{} bch.ID = b.id bch.State = stateEnumToString(b.state) if b.batchTxid != nil { bch.BatchTxid = *b.batchTxid } bch.BatchPkScript = b.batchPkScript bch.LastRbfHeight = b.rbfCache.LastHeight bch.LastRbfSatPerKw = int32(b.rbfCache.FeeRate) bch.MaxTimeoutDistance = b.cfg.maxTimeoutDistance return b.store.UpdateSweepBatch(ctx, bch) } // getBatchDestAddr returns the batch's destination address. If the batch // has already generated an address then the same one will be returned. func (b *batch) getBatchDestAddr(ctx context.Context) (btcutil.Address, error) { var address btcutil.Address // If a batch address is set, use that. Otherwise, generate a // new address. if b.batchAddress != nil { address = b.batchAddress } else { var err error // Generate a wallet address for the batch transaction's output. address, err = b.wallet.NextAddr( ctx, "", walletrpc.AddressType_TAPROOT_PUBKEY, false, ) if err != nil { return address, err } // Save that new address in order to re-use in future // versions of the batch tx. b.batchAddress = address } return address, nil } func (b *batch) insertAndAcquireID(ctx context.Context) (int32, error) { bch := &dbBatch{} bch.State = stateEnumToString(b.state) bch.MaxTimeoutDistance = b.cfg.maxTimeoutDistance id, err := b.store.InsertSweepBatch(ctx, bch) if err != nil { return 0, err } b.id = id b.log = batchPrefixLogger(fmt.Sprintf("%d", b.id)) return id, nil } // notifySweepSpend writes the spendTx to the sweep's notifier channel. func (s *sweep) notifySweepSpend(ctx context.Context, spendDetail *SpendDetail) { select { // Try to write the update to the notification channel. case s.notifier.SpendChan <- spendDetail: // If a quit signal was provided by the swap, continue. case <-s.notifier.QuitChan: // If the context was canceled, return. case <-ctx.Done(): } } func (b *batch) writeToErrChan(err error) { select { case b.errChan <- err: default: } } func (b *batch) persistSweep(ctx context.Context, sweep sweep, completed bool) error { return b.store.UpsertSweep(ctx, &dbSweep{ BatchID: b.id, SwapHash: sweep.swapHash, Outpoint: sweep.outpoint, Amount: sweep.value, Completed: completed, }) } // clampBatchFee takes the fee amount and total amount of the sweeps in the // batch and makes sure the fee is not too high. If the fee is too high, it is // clamped to the maximum allowed fee. func clampBatchFee(fee btcutil.Amount, totalAmount btcutil.Amount) btcutil.Amount { maxFeeAmount := btcutil.Amount(float64(totalAmount) * maxFeeToSwapAmtRatio) if fee > maxFeeAmount { return maxFeeAmount } return fee } func stateEnumToString(state batchState) string { switch state { case Open: return batchOpen case Closed: return batchClosed case Confirmed: return batchConfirmed } return "" }