diff --git a/executor.go b/executor.go index 1707342..3d91748 100644 --- a/executor.go +++ b/executor.go @@ -148,7 +148,7 @@ func (s *executor) run(mainCtx context.Context, go func() { defer s.wg.Done() - err := newSwap.execute(mainCtx, &executeConfig{ + err := newSwap.execute(mainCtx, &s.wg, &executeConfig{ statusChan: statusChan, sweeper: s.sweeper, blockEpochChan: queue.ChanOut(), diff --git a/loopin.go b/loopin.go index 2bd2c32..c925702 100644 --- a/loopin.go +++ b/loopin.go @@ -69,8 +69,6 @@ type loopInSwap struct { htlcTxHash *chainhash.Hash timeoutAddr btcutil.Address - - wg sync.WaitGroup } // loopInInitResult contains information about a just-initiated loop in swap. @@ -487,9 +485,7 @@ func (s *loopInSwap) sendUpdate(ctx context.Context) error { // execute starts/resumes the swap. It is a thin wrapper around executeSwap to // conveniently handle the error case. func (s *loopInSwap) execute(mainCtx context.Context, - cfg *executeConfig, height int32) error { - - defer s.wg.Wait() + wg *sync.WaitGroup, cfg *executeConfig, height int32) error { s.executeConfig = *cfg s.height = height @@ -497,14 +493,17 @@ func (s *loopInSwap) execute(mainCtx context.Context, // Create context for our state subscription which we will cancel once // swap execution has completed, ensuring that we kill the subscribe // goroutine. - subCtx, cancel := context.WithCancel(mainCtx) - defer cancel() + swapCtx, swapCancel := context.WithCancel(mainCtx) - s.wg.Add(1) + wg.Add(1) go func() { - defer s.wg.Done() + defer wg.Done() + + logCtx, logCancel := context.WithCancel(swapCtx) + defer logCancel() + subscribeAndLogUpdates( - subCtx, s.hash, s.log, s.server.SubscribeLoopInUpdates, + logCtx, s.hash, s.log, s.server.SubscribeLoopInUpdates, ) }() @@ -516,11 +515,13 @@ func (s *loopInSwap) execute(mainCtx context.Context, // Execute the swap until it either reaches a final state or a temporary // error occurs. - err = s.executeSwap(mainCtx) + err = s.executeSwap(swapCtx, swapCancel, wg) // Sanity check. If there is no error, the swap must be in a final - // state. - if err == nil && s.state.Type() == loopdb.StateTypePending { + // state. A settled invoice for a loop-in is considered a final state. + if err == nil && s.state.Type() == loopdb.StateTypePending && + s.state != loopdb.StateInvoiceSettled { + err = fmt.Errorf("swap in non-final state %v", s.state) } @@ -550,8 +551,17 @@ func (s *loopInSwap) execute(mainCtx context.Context, } // executeSwap executes the swap. -func (s *loopInSwap) executeSwap(globalCtx context.Context) error { +func (s *loopInSwap) executeSwap(swapCtx context.Context, + swapCancel context.CancelFunc, wg *sync.WaitGroup) error { + var err error + shouldCancel := true + + defer func() { + if shouldCancel { + swapCancel() + } + }() // For loop in, the client takes the first step by publishing the // on-chain htlc. Only do this if we haven't already done so in a @@ -562,12 +572,12 @@ func (s *loopInSwap) executeSwap(globalCtx context.Context) error { // HtlcPublished state directly and wait for // confirmation. s.setState(loopdb.StateHtlcPublished) - err = s.persistAndAnnounceState(globalCtx) + err = s.persistAndAnnounceState(swapCtx) if err != nil { return err } } else { - published, err := s.publishOnChainHtlc(globalCtx) + published, err := s.publishOnChainHtlc(swapCtx) if err != nil { return err } @@ -579,7 +589,7 @@ func (s *loopInSwap) executeSwap(globalCtx context.Context) error { // Wait for the htlc to confirm. After a restart, this will pick up a // previously published tx. - conf, err := s.waitForHtlcConf(globalCtx) + conf, err := s.waitForHtlcConf(swapCtx) if err != nil { return err } @@ -596,24 +606,53 @@ func (s *loopInSwap) executeSwap(globalCtx context.Context) error { // amount. Otherwise, fail the swap immediately. if htlcValue != s.LoopInContract.AmountRequested { s.setState(loopdb.StateFailIncorrectHtlcAmt) - return s.persistAndAnnounceState(globalCtx) + + return s.persistAndAnnounceState(swapCtx) } // The server is expected to see the htlc on-chain and know that it can // sweep that htlc with the preimage, it should pay our swap invoice, - // receive the preimage and sweep the htlc. We are waiting for this to - // happen and simultaneously watch the htlc expiry height. When the htlc + // receive the preimage and sweep the htlc. We won't wait for the htlc + // sweep to occur but instead consider the swap successful if the + // invoice is settled or cancelled. Therefor we monitor the invoice + // settlement and htlc process in a separate goroutine. If the invoice + // is settled or cancelled we notify the main goroutine on the + // invFinalizedChan to continue processing the next swap. The separate + // goroutine will continue to watch out for a htlc spend while + // simultaneously watching the htlc's expiry height. When the htlc // expires, we will publish a timeout tx to reclaim the funds. - err = s.waitForSwapComplete(globalCtx, htlcOutpoint, htlcValue) - if err != nil { - return err - } + var ( + invFinalizedChan = make(chan bool) + errChan = make(chan error) + ) + wg.Add(1) + go func() { + defer wg.Done() + defer swapCancel() - // Persist swap outcome. - if err := s.persistAndAnnounceState(globalCtx); err != nil { + err = s.waitForSwapComplete( + swapCtx, invFinalizedChan, htlcOutpoint, htlcValue, + ) + if err != nil { + errChan <- err + } + }() + + select { + // If the swap invoice is cancelled or settled we continue with the + // execution of the next swap. + case <-invFinalizedChan: + + case err := <-errChan: return err + + case <-swapCtx.Done(): + return swapCtx.Err() + } + shouldCancel = false + return nil } @@ -793,13 +832,16 @@ func getTxFee(tx *wire.MsgTx, fee chainfee.SatPerKVByte) btcutil.Amount { // waitForSwapComplete waits until a spending tx of the htlc gets confirmed and // the swap invoice is either settled or canceled. If the htlc times out, the // timeout tx will be published. -func (s *loopInSwap) waitForSwapComplete(ctx context.Context, - htlcOutpoint *wire.OutPoint, htlcValue btcutil.Amount) error { +func (s *loopInSwap) waitForSwapComplete(swapCtx context.Context, + invFinalizedChan chan<- bool, htlcOutpoint *wire.OutPoint, + htlcValue btcutil.Amount) error { // Register the htlc spend notification. - rpcCtx, cancel := context.WithCancel(ctx) + rpcCtx, cancel := context.WithCancel(swapCtx) defer cancel() - spendChan, spendErr, err := s.lnd.ChainNotifier.RegisterSpendNtfn( + + notifier := s.lnd.ChainNotifier + spendChan, spendErr, err := notifier.RegisterSpendNtfn( rpcCtx, htlcOutpoint, s.htlc.PkScript, s.InitiationHeight, ) if err != nil { @@ -807,10 +849,11 @@ func (s *loopInSwap) waitForSwapComplete(ctx context.Context, } // Register for swap invoice updates. - rpcCtx, cancel = context.WithCancel(ctx) + rpcCtx, cancel = context.WithCancel(swapCtx) defer cancel() s.log.Infof("Subscribing to swap invoice %v", s.hash) - swapInvoiceChan, swapInvoiceErr, err := s.lnd.Invoices.SubscribeSingleInvoice( + invoices := s.lnd.Invoices + swapInvoiceChan, swapInvoiceErr, err := invoices.SubscribeSingleInvoice( rpcCtx, s.hash, ) if err != nil { @@ -821,7 +864,9 @@ func (s *loopInSwap) waitForSwapComplete(ctx context.Context, // expired. publishTxOnTimeout := func() (btcutil.Amount, error) { if s.height >= s.LoopInContract.CltvExpiry { - return s.publishTimeoutTx(ctx, htlcOutpoint, htlcValue) + return s.publishTimeoutTx( + swapCtx, htlcOutpoint, htlcValue, + ) } return 0, nil @@ -829,20 +874,21 @@ func (s *loopInSwap) waitForSwapComplete(ctx context.Context, // Check timeout at current height. After a restart we may want to // publish the tx immediately. - var sweepFee btcutil.Amount - sweepFee, err = publishTxOnTimeout() + sweepFee, err := publishTxOnTimeout() if err != nil { - return err + return fmt.Errorf("publishing timeout tx error: %v", err) } - htlcSpend := false - invoiceFinalized := false - htlcKeyRevealed := false - for !htlcSpend || !invoiceFinalized { + var ( + htlcSpent = false + invoiceFinalized = false + htlcKeyRevealed = false + ) + for !htlcSpent || !invoiceFinalized { select { // Spend notification error. - case err := <-spendErr: - return err + case spendError := <-spendErr: + s.log.Errorf("spend error: %v", spendError) // Receive block epochs and start publishing the timeout tx // whenever possible. @@ -851,27 +897,33 @@ func (s *loopInSwap) waitForSwapComplete(ctx context.Context, sweepFee, err = publishTxOnTimeout() if err != nil { - return err + s.log.Errorf("publishing timout tx error: %v", + err) } if invoiceFinalized && !htlcKeyRevealed { - htlcKeyRevealed = s.tryPushHtlcKey(ctx) + htlcKeyRevealed = s.tryPushHtlcKey(swapCtx) } // The htlc spend is confirmed. Inspect the spending tx to // determine the final swap state. case spendDetails := <-spendChan: - s.log.Infof("Htlc spend by tx: %v", + s.log.Errorf("htlc spend by tx: %v", spendDetails.SpenderTxHash) err := s.processHtlcSpend( - ctx, spendDetails, htlcValue, sweepFee, + swapCtx, spendDetails, htlcValue, sweepFee, ) if err != nil { - return err + s.log.Errorf("process spend error: %v", err) } - htlcSpend = true + err = s.persistAndAnnounceState(swapCtx) + if err != nil { + s.log.Errorf("announce spend error: %v", err) + } + + htlcSpent = true // Swap invoice ntfn error. case err, ok := <-swapInvoiceErr: @@ -910,27 +962,49 @@ func (s *loopInSwap) waitForSwapComplete(ctx context.Context, // in the expected order, move the swap to an // intermediate state that indicates that the // swap is complete from the user point of view, - // but still incomplete with regards to - // accounting data. + // but still incomplete in regard to accounting + // data. if s.state == loopdb.StateHtlcPublished { s.setState(loopdb.StateInvoiceSettled) - err := s.persistAndAnnounceState(ctx) + err := s.persistAndAnnounceState( + swapCtx, + ) if err != nil { return err } } invoiceFinalized = true - htlcKeyRevealed = s.tryPushHtlcKey(ctx) + htlcKeyRevealed = s.tryPushHtlcKey(swapCtx) + + // Notify the main goroutine that the swap + // is finalized from the client's perspective. A + // new swap can be started while this goroutine + // is waiting for processing the on-chain spend. + select { + case invFinalizedChan <- invoiceFinalized: + case <-swapCtx.Done(): + } // Canceled invoice has no effect on server cost // balance. case invpkg.ContractCanceled: + // The timeout transaction was published and the + // invoice cancelled. invoiceFinalized = true + + // Notify the main goroutine that the swap + // is finalized from the client's perspective. A + // new swap can be started. + select { + case invFinalizedChan <- invoiceFinalized: + case <-swapCtx.Done(): + } } - case <-ctx.Done(): - return ctx.Err() + case <-swapCtx.Done(): + return swapCtx.Err() + } } @@ -993,7 +1067,10 @@ func (s *loopInSwap) processHtlcSpend(ctx context.Context, // already settled. This means that the server didn't succeed in // sweeping the htlc after paying the invoice. err := s.lnd.Invoices.CancelInvoice(ctx, s.hash) - if err != nil && err != invpkg.ErrInvoiceAlreadySettled { + isInvoiceAlreadySettled := errors.Is( + err, invpkg.ErrInvoiceAlreadySettled, + ) + if err != nil && !isInvoiceAlreadySettled { return err } } diff --git a/loopin_test.go b/loopin_test.go index 52dee18..4f37a6f 100644 --- a/loopin_test.go +++ b/loopin_test.go @@ -65,7 +65,9 @@ func testLoopInSuccess(t *testing.T) { errChan := make(chan error) go func() { - err := inSwap.execute(context.Background(), ctx.cfg, height) + err := inSwap.execute( + context.Background(), ctx.wg, ctx.cfg, height, + ) if err != nil { log.Error(err) } @@ -217,7 +219,9 @@ func testLoopInTimeout(t *testing.T, externalValue int64) { errChan := make(chan error) go func() { - err := inSwap.execute(context.Background(), ctx.cfg, height) + err := inSwap.execute( + context.Background(), ctx.wg, ctx.cfg, height, + ) if err != nil { log.Error(err) } @@ -470,7 +474,9 @@ func testLoopInResume(t *testing.T, state loopdb.SwapState, expired bool, errChan := make(chan error) go func() { - err := inSwap.execute(context.Background(), ctx.cfg, height) + err := inSwap.execute( + context.Background(), ctx.wg, ctx.cfg, height, + ) if err != nil { log.Error(err) } diff --git a/loopin_testcontext_test.go b/loopin_testcontext_test.go index 7d3ffcd..fddd363 100644 --- a/loopin_testcontext_test.go +++ b/loopin_testcontext_test.go @@ -1,6 +1,7 @@ package loop import ( + "sync" "testing" "time" @@ -23,6 +24,7 @@ type loopInTestContext struct { cfg *executeConfig statusChan chan SwapInfo blockEpochChan chan interface{} + wg *sync.WaitGroup swapInvoiceSubscription *test.SingleInvoiceSubscription } @@ -58,6 +60,7 @@ func newLoopInTestContext(t *testing.T) *loopInTestContext { cfg: &cfg, statusChan: statusChan, blockEpochChan: blockEpochChan, + wg: &sync.WaitGroup{}, } } diff --git a/loopout.go b/loopout.go index f9f372a..82c9356 100644 --- a/loopout.go +++ b/loopout.go @@ -346,7 +346,7 @@ func (s *loopOutSwap) sendUpdate(ctx context.Context) error { // execute starts/resumes the swap. It is a thin wrapper around // executeAndFinalize to conveniently handle the error case. -func (s *loopOutSwap) execute(mainCtx context.Context, +func (s *loopOutSwap) execute(mainCtx context.Context, _ *sync.WaitGroup, cfg *executeConfig, height int32) error { defer s.wg.Wait() diff --git a/loopout_test.go b/loopout_test.go index 31a3ed5..103582e 100644 --- a/loopout_test.go +++ b/loopout_test.go @@ -84,7 +84,7 @@ func testLoopOutPaymentParameters(t *testing.T) { swapCtx, cancel := context.WithCancel(context.Background()) go func() { - err := swap.execute(swapCtx, &executeConfig{ + err := swap.execute(swapCtx, nil, &executeConfig{ statusChan: statusChan, sweeper: sweeper, blockEpochChan: blockEpochChan, @@ -194,7 +194,7 @@ func testLateHtlcPublish(t *testing.T) { errChan := make(chan error) go func() { - err := swap.execute(context.Background(), &executeConfig{ + err := swap.execute(context.Background(), nil, &executeConfig{ statusChan: statusChan, sweeper: sweeper, blockEpochChan: blockEpochChan, @@ -295,7 +295,7 @@ func testCustomSweepConfTarget(t *testing.T) { errChan := make(chan error) go func() { - err := swap.execute(context.Background(), &executeConfig{ + err := swap.execute(context.Background(), nil, &executeConfig{ statusChan: statusChan, blockEpochChan: blockEpochChan, timerFactory: timerFactory, @@ -513,7 +513,7 @@ func testPreimagePush(t *testing.T) { errChan := make(chan error) go func() { - err := swap.execute(context.Background(), &executeConfig{ + err := swap.execute(context.Background(), nil, &executeConfig{ statusChan: statusChan, blockEpochChan: blockEpochChan, timerFactory: timerFactory, @@ -749,7 +749,9 @@ func testFailedOffChainCancelation(t *testing.T) { verifySchnorrSig: mockVerifySchnorrSigFail, } - err := swap.execute(context.Background(), cfg, ctx.Lnd.Height) + err := swap.execute( + context.Background(), nil, cfg, ctx.Lnd.Height, + ) errChan <- err }() @@ -903,7 +905,7 @@ func TestLoopOutMuSig2Sweep(t *testing.T) { } go func() { - err := swap.execute(context.Background(), &executeConfig{ + err := swap.execute(context.Background(), nil, &executeConfig{ statusChan: statusChan, blockEpochChan: blockEpochChan, timerFactory: timerFactory, diff --git a/swap.go b/swap.go index 61db839..b0ab69c 100644 --- a/swap.go +++ b/swap.go @@ -2,6 +2,7 @@ package loop import ( "context" + "sync" "time" "github.com/btcsuite/btcd/chaincfg" @@ -121,7 +122,7 @@ func (s *swapKit) swapInfo() *SwapInfo { } type genericSwap interface { - execute(mainCtx context.Context, cfg *executeConfig, + execute(mainCtx context.Context, wg *sync.WaitGroup, cfg *executeConfig, height int32) error }