You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
loop/client.go

476 lines
12 KiB
Go

package loop
import (
"context"
"encoding/hex"
"errors"
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/btcsuite/btcutil"
"github.com/lightninglabs/loop/lndclient"
"github.com/lightninglabs/loop/loopdb"
"github.com/lightninglabs/loop/swap"
"github.com/lightninglabs/loop/sweep"
"github.com/lightningnetwork/lnd/lntypes"
)
var (
// ErrSwapFeeTooHigh is returned when the swap invoice amount is too
// high.
ErrSwapFeeTooHigh = errors.New("swap fee too high")
// ErrPrepayAmountTooHigh is returned when the prepay invoice amount is
// too high.
ErrPrepayAmountTooHigh = errors.New("prepay amount too high")
// ErrSwapAmountTooLow is returned when the requested swap amount is
// less than the server minimum.
ErrSwapAmountTooLow = errors.New("swap amount too low")
// ErrSwapAmountTooHigh is returned when the requested swap amount is
// more than the server maximum.
ErrSwapAmountTooHigh = errors.New("swap amount too high")
// ErrExpiryTooSoon is returned when the server proposes an expiry that
// is too soon for us.
ErrExpiryTooSoon = errors.New("swap expiry too soon")
// ErrExpiryTooFar is returned when the server proposes an expiry that
// is too soon for us.
ErrExpiryTooFar = errors.New("swap expiry too far")
serverRPCTimeout = 30 * time.Second
republishDelay = 10 * time.Second
)
// Client performs the client side part of swaps. This interface exists to be
// able to implement a stub.
type Client struct {
started uint32 // To be used atomically.
errChan chan error
lndServices *lndclient.LndServices
sweeper *sweep.Sweeper
executor *executor
resumeReady chan struct{}
wg sync.WaitGroup
clientConfig
}
// NewClient returns a new instance to initiate swaps with.
func NewClient(dbDir string, serverAddress string, insecure bool,
lnd *lndclient.LndServices) (*Client, func(), error) {
store, err := loopdb.NewBoltSwapStore(dbDir, lnd.ChainParams)
if err != nil {
return nil, nil, err
}
swapServerClient, err := newSwapServerClient(serverAddress, insecure)
if err != nil {
return nil, nil, err
}
config := &clientConfig{
LndServices: lnd,
Server: swapServerClient,
Store: store,
CreateExpiryTimer: func(d time.Duration) <-chan time.Time {
return time.NewTimer(d).C
},
}
sweeper := &sweep.Sweeper{
Lnd: lnd,
}
executor := newExecutor(&executorConfig{
lnd: lnd,
store: store,
sweeper: sweeper,
createExpiryTimer: config.CreateExpiryTimer,
})
client := &Client{
errChan: make(chan error),
clientConfig: *config,
lndServices: lnd,
sweeper: sweeper,
executor: executor,
resumeReady: make(chan struct{}),
}
cleanup := func() {
swapServerClient.Close()
}
return client, cleanup, nil
}
// FetchSwaps returns all loop in and out swaps currently in the database.
func (s *Client) FetchSwaps() ([]*SwapInfo, error) {
loopOutSwaps, err := s.Store.FetchLoopOutSwaps()
if err != nil {
return nil, err
}
loopInSwaps, err := s.Store.FetchLoopInSwaps()
if err != nil {
return nil, err
}
swaps := make([]*SwapInfo, 0, len(loopInSwaps)+len(loopOutSwaps))
for _, swp := range loopOutSwaps {
htlc, err := swap.NewHtlc(
swp.Contract.CltvExpiry, swp.Contract.SenderKey,
swp.Contract.ReceiverKey, swp.Hash, swap.HtlcP2WSH,
s.lndServices.ChainParams,
)
if err != nil {
return nil, err
}
swaps = append(swaps, &SwapInfo{
SwapType: TypeOut,
SwapContract: swp.Contract.SwapContract,
SwapStateData: swp.State(),
SwapHash: swp.Hash,
LastUpdate: swp.LastUpdateTime(),
HtlcAddress: htlc.Address,
})
}
for _, swp := range loopInSwaps {
htlc, err := swap.NewHtlc(
swp.Contract.CltvExpiry, swp.Contract.SenderKey,
swp.Contract.ReceiverKey, swp.Hash, swap.HtlcNP2WSH,
s.lndServices.ChainParams,
)
if err != nil {
return nil, err
}
swaps = append(swaps, &SwapInfo{
SwapType: TypeIn,
SwapContract: swp.Contract.SwapContract,
SwapStateData: swp.State(),
SwapHash: swp.Hash,
LastUpdate: swp.LastUpdateTime(),
HtlcAddress: htlc.Address,
})
}
return swaps, nil
}
// Run is a blocking call that executes all swaps. Any pending swaps are
// restored from persistent storage and resumed. Subsequent updates will be
// sent through the passed in statusChan. The function can be terminated by
// cancelling the context.
func (s *Client) Run(ctx context.Context,
statusChan chan<- SwapInfo) error {
if !atomic.CompareAndSwapUint32(&s.started, 0, 1) {
return errors.New("swap client can only be started once")
}
// Log connected node.
info, err := s.lndServices.Client.GetInfo(ctx)
if err != nil {
return fmt.Errorf("GetInfo error: %v", err)
}
logger.Infof("Connected to lnd node %v with pubkey %v",
info.Alias, hex.EncodeToString(info.IdentityPubkey[:]),
)
// Setup main context used for cancelation.
mainCtx, mainCancel := context.WithCancel(ctx)
defer mainCancel()
// Query store before starting event loop to prevent new swaps from
// being treated as swaps that need to be resumed.
pendingLoopOutSwaps, err := s.Store.FetchLoopOutSwaps()
if err != nil {
return err
}
pendingLoopInSwaps, err := s.Store.FetchLoopInSwaps()
if err != nil {
return err
}
// Start goroutine to deliver all pending swaps to the main loop.
s.wg.Add(1)
go func() {
defer s.wg.Done()
s.resumeSwaps(mainCtx, pendingLoopOutSwaps, pendingLoopInSwaps)
// Signal that new requests can be accepted. Otherwise the new
// swap could already have been added to the store and read in
// this goroutine as being a swap that needs to be resumed.
// Resulting in two goroutines executing the same swap.
close(s.resumeReady)
}()
// Main event loop.
err = s.executor.run(mainCtx, statusChan)
// Consider canceled as happy flow.
if err == context.Canceled {
err = nil
}
if err != nil {
logger.Errorf("Swap client terminating: %v", err)
} else {
logger.Info("Swap client terminating")
}
// Cancel all remaining active goroutines.
mainCancel()
// Wait for all to finish.
logger.Debug("Wait for executor to finish")
s.executor.waitFinished()
logger.Debug("Wait for goroutines to finish")
s.wg.Wait()
logger.Info("Swap client terminated")
return err
}
// resumeSwaps restarts all pending swaps from the provided list.
func (s *Client) resumeSwaps(ctx context.Context,
loopOutSwaps []*loopdb.LoopOut, loopInSwaps []*loopdb.LoopIn) {
swapCfg := &swapConfig{
lnd: s.lndServices,
store: s.Store,
}
for _, pend := range loopOutSwaps {
if pend.State().State.Type() != loopdb.StateTypePending {
continue
}
swap, err := resumeLoopOutSwap(ctx, swapCfg, pend)
if err != nil {
logger.Errorf("resuming loop out swap: %v", err)
continue
}
s.executor.initiateSwap(ctx, swap)
}
for _, pend := range loopInSwaps {
if pend.State().State.Type() != loopdb.StateTypePending {
continue
}
swap, err := resumeLoopInSwap(ctx, swapCfg, pend)
if err != nil {
logger.Errorf("resuming loop in swap: %v", err)
continue
}
s.executor.initiateSwap(ctx, swap)
}
}
// LoopOut initiates a loop out swap. It blocks until the swap is initiation
// with the swap server is completed (typically this takes only a short amount
// of time). From there on further status information can be acquired through
// the status channel returned from the Run call.
//
// When the call returns, the swap has been persisted and will be resumed
// automatically after restarts.
//
// The return value is a hash that uniquely identifies the new swap.
func (s *Client) LoopOut(globalCtx context.Context,
request *OutRequest) (*lntypes.Hash, btcutil.Address, error) {
logger.Infof("LoopOut %v to %v (channel: %v)",
request.Amount, request.DestAddr,
request.LoopOutChannel,
)
if err := s.waitForInitialized(globalCtx); err != nil {
return nil, nil, err
}
// Create a new swap object for this swap.
initiationHeight := s.executor.height()
swapCfg := &swapConfig{
lnd: s.lndServices,
store: s.Store,
server: s.Server,
}
swap, err := newLoopOutSwap(
globalCtx, swapCfg, initiationHeight, request,
)
if err != nil {
return nil, nil, err
}
// Post swap to the main loop.
s.executor.initiateSwap(globalCtx, swap)
// Return hash so that the caller can identify this swap in the updates
// stream.
return &swap.hash, swap.htlc.Address, nil
}
// LoopOutQuote takes a LoopOut amount and returns a break down of estimated
// costs for the client. Both the swap server and the on-chain fee estimator
// are queried to get to build the quote response.
func (s *Client) LoopOutQuote(ctx context.Context,
request *LoopOutQuoteRequest) (*LoopOutQuote, error) {
terms, err := s.Server.GetLoopOutTerms(ctx)
if err != nil {
return nil, err
}
if request.Amount < terms.MinSwapAmount {
return nil, ErrSwapAmountTooLow
}
if request.Amount > terms.MaxSwapAmount {
return nil, ErrSwapAmountTooHigh
}
logger.Infof("Offchain swap destination: %x", terms.SwapPaymentDest)
swapFee := swap.CalcFee(
request.Amount, terms.SwapFeeBase, terms.SwapFeeRate,
)
minerFee, err := s.sweeper.GetSweepFee(
ctx, swap.QuoteHtlc.AddSuccessToEstimator,
request.SweepConfTarget,
)
if err != nil {
return nil, err
}
return &LoopOutQuote{
SwapFee: swapFee,
MinerFee: minerFee,
PrepayAmount: btcutil.Amount(terms.PrepayAmt),
}, nil
}
// LoopOutTerms returns the terms on which the server executes swaps.
func (s *Client) LoopOutTerms(ctx context.Context) (
*LoopOutTerms, error) {
return s.Server.GetLoopOutTerms(ctx)
}
// waitForInitialized for swaps to be resumed and executor ready.
func (s *Client) waitForInitialized(ctx context.Context) error {
select {
case <-s.executor.ready:
case <-ctx.Done():
return ctx.Err()
}
select {
case <-s.resumeReady:
case <-ctx.Done():
return ctx.Err()
}
return nil
}
// LoopIn initiates a loop in swap.
func (s *Client) LoopIn(globalCtx context.Context,
request *LoopInRequest) (*lntypes.Hash, btcutil.Address, error) {
logger.Infof("Loop in %v (channel: %v)",
request.Amount,
request.LoopInChannel,
)
if err := s.waitForInitialized(globalCtx); err != nil {
return nil, nil, err
}
// Create a new swap object for this swap.
initiationHeight := s.executor.height()
swapCfg := swapConfig{
lnd: s.lndServices,
store: s.Store,
server: s.Server,
}
swap, err := newLoopInSwap(
globalCtx, &swapCfg, initiationHeight, request,
)
if err != nil {
return nil, nil, err
}
// Post swap to the main loop.
s.executor.initiateSwap(globalCtx, swap)
// Return hash so that the caller can identify this swap in the updates
// stream.
return &swap.hash, swap.htlc.Address, nil
}
// LoopInQuote takes an amount and returns a break down of estimated
// costs for the client. Both the swap server and the on-chain fee estimator are
// queried to get to build the quote response.
func (s *Client) LoopInQuote(ctx context.Context,
request *LoopInQuoteRequest) (*LoopInQuote, error) {
// Retrieve current server terms to calculate swap fee.
terms, err := s.Server.GetLoopInTerms(ctx)
if err != nil {
return nil, err
}
// Check amount limits.
if request.Amount < terms.MinSwapAmount {
return nil, ErrSwapAmountTooLow
}
if request.Amount > terms.MaxSwapAmount {
return nil, ErrSwapAmountTooHigh
}
// Calculate swap fee.
swapFee := terms.SwapFeeBase +
request.Amount*btcutil.Amount(terms.SwapFeeRate)/
btcutil.Amount(swap.FeeRateTotalParts)
// Get estimate for miner fee.
minerFee, err := s.lndServices.Client.EstimateFeeToP2WSH(
ctx, request.Amount, request.HtlcConfTarget,
)
if err != nil {
return nil, err
}
return &LoopInQuote{
SwapFee: swapFee,
MinerFee: minerFee,
}, nil
}
// LoopInTerms returns the terms on which the server executes swaps.
func (s *Client) LoopInTerms(ctx context.Context) (
*LoopInTerms, error) {
return s.Server.GetLoopInTerms(ctx)
}