loopin: unblock htlc sweep

pull/625/head
Slyghtning 9 months ago
parent cf59c4856f
commit bf0844e20b
No known key found for this signature in database
GPG Key ID: F82D456EA023C9BF

@ -148,7 +148,7 @@ func (s *executor) run(mainCtx context.Context,
go func() {
defer s.wg.Done()
err := newSwap.execute(mainCtx, &executeConfig{
err := newSwap.execute(mainCtx, &s.wg, &executeConfig{
statusChan: statusChan,
sweeper: s.sweeper,
blockEpochChan: queue.ChanOut(),

@ -69,8 +69,6 @@ type loopInSwap struct {
htlcTxHash *chainhash.Hash
timeoutAddr btcutil.Address
wg sync.WaitGroup
}
// loopInInitResult contains information about a just-initiated loop in swap.
@ -487,9 +485,7 @@ func (s *loopInSwap) sendUpdate(ctx context.Context) error {
// execute starts/resumes the swap. It is a thin wrapper around executeSwap to
// conveniently handle the error case.
func (s *loopInSwap) execute(mainCtx context.Context,
cfg *executeConfig, height int32) error {
defer s.wg.Wait()
wg *sync.WaitGroup, cfg *executeConfig, height int32) error {
s.executeConfig = *cfg
s.height = height
@ -497,14 +493,17 @@ func (s *loopInSwap) execute(mainCtx context.Context,
// Create context for our state subscription which we will cancel once
// swap execution has completed, ensuring that we kill the subscribe
// goroutine.
subCtx, cancel := context.WithCancel(mainCtx)
defer cancel()
swapCtx, swapCancel := context.WithCancel(mainCtx)
s.wg.Add(1)
wg.Add(1)
go func() {
defer s.wg.Done()
defer wg.Done()
logCtx, logCancel := context.WithCancel(swapCtx)
defer logCancel()
subscribeAndLogUpdates(
subCtx, s.hash, s.log, s.server.SubscribeLoopInUpdates,
logCtx, s.hash, s.log, s.server.SubscribeLoopInUpdates,
)
}()
@ -516,11 +515,13 @@ func (s *loopInSwap) execute(mainCtx context.Context,
// Execute the swap until it either reaches a final state or a temporary
// error occurs.
err = s.executeSwap(mainCtx)
err = s.executeSwap(swapCtx, swapCancel, wg)
// Sanity check. If there is no error, the swap must be in a final
// state.
if err == nil && s.state.Type() == loopdb.StateTypePending {
// state. A settled invoice for a loop-in is considered a final state.
if err == nil && s.state.Type() == loopdb.StateTypePending &&
s.state != loopdb.StateInvoiceSettled {
err = fmt.Errorf("swap in non-final state %v", s.state)
}
@ -550,8 +551,17 @@ func (s *loopInSwap) execute(mainCtx context.Context,
}
// executeSwap executes the swap.
func (s *loopInSwap) executeSwap(globalCtx context.Context) error {
func (s *loopInSwap) executeSwap(swapCtx context.Context,
swapCancel context.CancelFunc, wg *sync.WaitGroup) error {
var err error
shouldCancel := true
defer func() {
if shouldCancel {
swapCancel()
}
}()
// For loop in, the client takes the first step by publishing the
// on-chain htlc. Only do this if we haven't already done so in a
@ -562,12 +572,12 @@ func (s *loopInSwap) executeSwap(globalCtx context.Context) error {
// HtlcPublished state directly and wait for
// confirmation.
s.setState(loopdb.StateHtlcPublished)
err = s.persistAndAnnounceState(globalCtx)
err = s.persistAndAnnounceState(swapCtx)
if err != nil {
return err
}
} else {
published, err := s.publishOnChainHtlc(globalCtx)
published, err := s.publishOnChainHtlc(swapCtx)
if err != nil {
return err
}
@ -579,7 +589,7 @@ func (s *loopInSwap) executeSwap(globalCtx context.Context) error {
// Wait for the htlc to confirm. After a restart, this will pick up a
// previously published tx.
conf, err := s.waitForHtlcConf(globalCtx)
conf, err := s.waitForHtlcConf(swapCtx)
if err != nil {
return err
}
@ -596,24 +606,53 @@ func (s *loopInSwap) executeSwap(globalCtx context.Context) error {
// amount. Otherwise, fail the swap immediately.
if htlcValue != s.LoopInContract.AmountRequested {
s.setState(loopdb.StateFailIncorrectHtlcAmt)
return s.persistAndAnnounceState(globalCtx)
return s.persistAndAnnounceState(swapCtx)
}
// The server is expected to see the htlc on-chain and know that it can
// sweep that htlc with the preimage, it should pay our swap invoice,
// receive the preimage and sweep the htlc. We are waiting for this to
// happen and simultaneously watch the htlc expiry height. When the htlc
// receive the preimage and sweep the htlc. We won't wait for the htlc
// sweep to occur but instead consider the swap successful if the
// invoice is settled or cancelled. Therefor we monitor the invoice
// settlement and htlc process in a separate goroutine. If the invoice
// is settled or cancelled we notify the main goroutine on the
// invFinalizedChan to continue processing the next swap. The separate
// goroutine will continue to watch out for a htlc spend while
// simultaneously watching the htlc's expiry height. When the htlc
// expires, we will publish a timeout tx to reclaim the funds.
err = s.waitForSwapComplete(globalCtx, htlcOutpoint, htlcValue)
if err != nil {
return err
}
var (
invFinalizedChan = make(chan bool)
errChan = make(chan error)
)
wg.Add(1)
go func() {
defer wg.Done()
defer swapCancel()
// Persist swap outcome.
if err := s.persistAndAnnounceState(globalCtx); err != nil {
err = s.waitForSwapComplete(
swapCtx, invFinalizedChan, htlcOutpoint, htlcValue,
)
if err != nil {
errChan <- err
}
}()
select {
// If the swap invoice is cancelled or settled we continue with the
// execution of the next swap.
case <-invFinalizedChan:
case err := <-errChan:
return err
case <-swapCtx.Done():
return swapCtx.Err()
}
shouldCancel = false
return nil
}
@ -793,13 +832,16 @@ func getTxFee(tx *wire.MsgTx, fee chainfee.SatPerKVByte) btcutil.Amount {
// waitForSwapComplete waits until a spending tx of the htlc gets confirmed and
// the swap invoice is either settled or canceled. If the htlc times out, the
// timeout tx will be published.
func (s *loopInSwap) waitForSwapComplete(ctx context.Context,
htlcOutpoint *wire.OutPoint, htlcValue btcutil.Amount) error {
func (s *loopInSwap) waitForSwapComplete(swapCtx context.Context,
invFinalizedChan chan<- bool, htlcOutpoint *wire.OutPoint,
htlcValue btcutil.Amount) error {
// Register the htlc spend notification.
rpcCtx, cancel := context.WithCancel(ctx)
rpcCtx, cancel := context.WithCancel(swapCtx)
defer cancel()
spendChan, spendErr, err := s.lnd.ChainNotifier.RegisterSpendNtfn(
notifier := s.lnd.ChainNotifier
spendChan, spendErr, err := notifier.RegisterSpendNtfn(
rpcCtx, htlcOutpoint, s.htlc.PkScript, s.InitiationHeight,
)
if err != nil {
@ -807,10 +849,11 @@ func (s *loopInSwap) waitForSwapComplete(ctx context.Context,
}
// Register for swap invoice updates.
rpcCtx, cancel = context.WithCancel(ctx)
rpcCtx, cancel = context.WithCancel(swapCtx)
defer cancel()
s.log.Infof("Subscribing to swap invoice %v", s.hash)
swapInvoiceChan, swapInvoiceErr, err := s.lnd.Invoices.SubscribeSingleInvoice(
invoices := s.lnd.Invoices
swapInvoiceChan, swapInvoiceErr, err := invoices.SubscribeSingleInvoice(
rpcCtx, s.hash,
)
if err != nil {
@ -821,7 +864,9 @@ func (s *loopInSwap) waitForSwapComplete(ctx context.Context,
// expired.
publishTxOnTimeout := func() (btcutil.Amount, error) {
if s.height >= s.LoopInContract.CltvExpiry {
return s.publishTimeoutTx(ctx, htlcOutpoint, htlcValue)
return s.publishTimeoutTx(
swapCtx, htlcOutpoint, htlcValue,
)
}
return 0, nil
@ -829,20 +874,21 @@ func (s *loopInSwap) waitForSwapComplete(ctx context.Context,
// Check timeout at current height. After a restart we may want to
// publish the tx immediately.
var sweepFee btcutil.Amount
sweepFee, err = publishTxOnTimeout()
sweepFee, err := publishTxOnTimeout()
if err != nil {
return err
return fmt.Errorf("publishing timeout tx error: %v", err)
}
htlcSpend := false
invoiceFinalized := false
htlcKeyRevealed := false
for !htlcSpend || !invoiceFinalized {
var (
htlcSpent = false
invoiceFinalized = false
htlcKeyRevealed = false
)
for !htlcSpent || !invoiceFinalized {
select {
// Spend notification error.
case err := <-spendErr:
return err
case spendError := <-spendErr:
s.log.Errorf("spend error: %v", spendError)
// Receive block epochs and start publishing the timeout tx
// whenever possible.
@ -851,27 +897,33 @@ func (s *loopInSwap) waitForSwapComplete(ctx context.Context,
sweepFee, err = publishTxOnTimeout()
if err != nil {
return err
s.log.Errorf("publishing timout tx error: %v",
err)
}
if invoiceFinalized && !htlcKeyRevealed {
htlcKeyRevealed = s.tryPushHtlcKey(ctx)
htlcKeyRevealed = s.tryPushHtlcKey(swapCtx)
}
// The htlc spend is confirmed. Inspect the spending tx to
// determine the final swap state.
case spendDetails := <-spendChan:
s.log.Infof("Htlc spend by tx: %v",
s.log.Errorf("htlc spend by tx: %v",
spendDetails.SpenderTxHash)
err := s.processHtlcSpend(
ctx, spendDetails, htlcValue, sweepFee,
swapCtx, spendDetails, htlcValue, sweepFee,
)
if err != nil {
return err
s.log.Errorf("process spend error: %v", err)
}
htlcSpend = true
err = s.persistAndAnnounceState(swapCtx)
if err != nil {
s.log.Errorf("announce spend error: %v", err)
}
htlcSpent = true
// Swap invoice ntfn error.
case err, ok := <-swapInvoiceErr:
@ -910,27 +962,49 @@ func (s *loopInSwap) waitForSwapComplete(ctx context.Context,
// in the expected order, move the swap to an
// intermediate state that indicates that the
// swap is complete from the user point of view,
// but still incomplete with regards to
// accounting data.
// but still incomplete in regard to accounting
// data.
if s.state == loopdb.StateHtlcPublished {
s.setState(loopdb.StateInvoiceSettled)
err := s.persistAndAnnounceState(ctx)
err := s.persistAndAnnounceState(
swapCtx,
)
if err != nil {
return err
}
}
invoiceFinalized = true
htlcKeyRevealed = s.tryPushHtlcKey(ctx)
htlcKeyRevealed = s.tryPushHtlcKey(swapCtx)
// Notify the main goroutine that the swap
// is finalized from the client's perspective. A
// new swap can be started while this goroutine
// is waiting for processing the on-chain spend.
select {
case invFinalizedChan <- invoiceFinalized:
case <-swapCtx.Done():
}
// Canceled invoice has no effect on server cost
// balance.
case invpkg.ContractCanceled:
// The timeout transaction was published and the
// invoice cancelled.
invoiceFinalized = true
// Notify the main goroutine that the swap
// is finalized from the client's perspective. A
// new swap can be started.
select {
case invFinalizedChan <- invoiceFinalized:
case <-swapCtx.Done():
}
}
case <-ctx.Done():
return ctx.Err()
case <-swapCtx.Done():
return swapCtx.Err()
}
}
@ -993,7 +1067,10 @@ func (s *loopInSwap) processHtlcSpend(ctx context.Context,
// already settled. This means that the server didn't succeed in
// sweeping the htlc after paying the invoice.
err := s.lnd.Invoices.CancelInvoice(ctx, s.hash)
if err != nil && err != invpkg.ErrInvoiceAlreadySettled {
isInvoiceAlreadySettled := errors.Is(
err, invpkg.ErrInvoiceAlreadySettled,
)
if err != nil && !isInvoiceAlreadySettled {
return err
}
}

@ -65,7 +65,9 @@ func testLoopInSuccess(t *testing.T) {
errChan := make(chan error)
go func() {
err := inSwap.execute(context.Background(), ctx.cfg, height)
err := inSwap.execute(
context.Background(), ctx.wg, ctx.cfg, height,
)
if err != nil {
log.Error(err)
}
@ -217,7 +219,9 @@ func testLoopInTimeout(t *testing.T, externalValue int64) {
errChan := make(chan error)
go func() {
err := inSwap.execute(context.Background(), ctx.cfg, height)
err := inSwap.execute(
context.Background(), ctx.wg, ctx.cfg, height,
)
if err != nil {
log.Error(err)
}
@ -470,7 +474,9 @@ func testLoopInResume(t *testing.T, state loopdb.SwapState, expired bool,
errChan := make(chan error)
go func() {
err := inSwap.execute(context.Background(), ctx.cfg, height)
err := inSwap.execute(
context.Background(), ctx.wg, ctx.cfg, height,
)
if err != nil {
log.Error(err)
}

@ -1,6 +1,7 @@
package loop
import (
"sync"
"testing"
"time"
@ -23,6 +24,7 @@ type loopInTestContext struct {
cfg *executeConfig
statusChan chan SwapInfo
blockEpochChan chan interface{}
wg *sync.WaitGroup
swapInvoiceSubscription *test.SingleInvoiceSubscription
}
@ -58,6 +60,7 @@ func newLoopInTestContext(t *testing.T) *loopInTestContext {
cfg: &cfg,
statusChan: statusChan,
blockEpochChan: blockEpochChan,
wg: &sync.WaitGroup{},
}
}

@ -346,7 +346,7 @@ func (s *loopOutSwap) sendUpdate(ctx context.Context) error {
// execute starts/resumes the swap. It is a thin wrapper around
// executeAndFinalize to conveniently handle the error case.
func (s *loopOutSwap) execute(mainCtx context.Context,
func (s *loopOutSwap) execute(mainCtx context.Context, _ *sync.WaitGroup,
cfg *executeConfig, height int32) error {
defer s.wg.Wait()

@ -84,7 +84,7 @@ func testLoopOutPaymentParameters(t *testing.T) {
swapCtx, cancel := context.WithCancel(context.Background())
go func() {
err := swap.execute(swapCtx, &executeConfig{
err := swap.execute(swapCtx, nil, &executeConfig{
statusChan: statusChan,
sweeper: sweeper,
blockEpochChan: blockEpochChan,
@ -194,7 +194,7 @@ func testLateHtlcPublish(t *testing.T) {
errChan := make(chan error)
go func() {
err := swap.execute(context.Background(), &executeConfig{
err := swap.execute(context.Background(), nil, &executeConfig{
statusChan: statusChan,
sweeper: sweeper,
blockEpochChan: blockEpochChan,
@ -295,7 +295,7 @@ func testCustomSweepConfTarget(t *testing.T) {
errChan := make(chan error)
go func() {
err := swap.execute(context.Background(), &executeConfig{
err := swap.execute(context.Background(), nil, &executeConfig{
statusChan: statusChan,
blockEpochChan: blockEpochChan,
timerFactory: timerFactory,
@ -513,7 +513,7 @@ func testPreimagePush(t *testing.T) {
errChan := make(chan error)
go func() {
err := swap.execute(context.Background(), &executeConfig{
err := swap.execute(context.Background(), nil, &executeConfig{
statusChan: statusChan,
blockEpochChan: blockEpochChan,
timerFactory: timerFactory,
@ -749,7 +749,9 @@ func testFailedOffChainCancelation(t *testing.T) {
verifySchnorrSig: mockVerifySchnorrSigFail,
}
err := swap.execute(context.Background(), cfg, ctx.Lnd.Height)
err := swap.execute(
context.Background(), nil, cfg, ctx.Lnd.Height,
)
errChan <- err
}()
@ -903,7 +905,7 @@ func TestLoopOutMuSig2Sweep(t *testing.T) {
}
go func() {
err := swap.execute(context.Background(), &executeConfig{
err := swap.execute(context.Background(), nil, &executeConfig{
statusChan: statusChan,
blockEpochChan: blockEpochChan,
timerFactory: timerFactory,

@ -2,6 +2,7 @@ package loop
import (
"context"
"sync"
"time"
"github.com/btcsuite/btcd/chaincfg"
@ -121,7 +122,7 @@ func (s *swapKit) swapInfo() *SwapInfo {
}
type genericSwap interface {
execute(mainCtx context.Context, cfg *executeConfig,
execute(mainCtx context.Context, wg *sync.WaitGroup, cfg *executeConfig,
height int32) error
}

Loading…
Cancel
Save