You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
loop/loopd/swapclient_server.go

1571 lines
42 KiB
Go

package loopd
import (
"bytes"
"context"
"encoding/hex"
"errors"
"fmt"
"reflect"
"sort"
"strings"
"sync"
"time"
"github.com/btcsuite/btcd/btcec/v2"
"github.com/btcsuite/btcd/btcutil"
"github.com/btcsuite/btcd/chaincfg"
"github.com/lightninglabs/aperture/lsat"
"github.com/lightninglabs/lndclient"
"github.com/lightninglabs/loop"
"github.com/lightninglabs/loop/instantout"
"github.com/lightninglabs/loop/instantout/reservation"
"github.com/lightninglabs/loop/labels"
"github.com/lightninglabs/loop/liquidity"
"github.com/lightninglabs/loop/loopdb"
clientrpc "github.com/lightninglabs/loop/looprpc"
"github.com/lightninglabs/loop/swap"
looprpc "github.com/lightninglabs/loop/swapserverrpc"
"github.com/lightningnetwork/lnd/lnrpc/walletrpc"
"github.com/lightningnetwork/lnd/lntypes"
"github.com/lightningnetwork/lnd/queue"
"github.com/lightningnetwork/lnd/routing/route"
"github.com/lightningnetwork/lnd/zpay32"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
const (
completedSwapsCount = 5
// minConfTarget is the minimum confirmation target we'll allow clients
// to specify. This is driven by the minimum confirmation target allowed
// by the backing fee estimator.
minConfTarget = 2
defaultLoopdInitiator = "loopd"
)
var (
// errIncorrectChain is returned when the format of the
// destination address provided does not match the active chain.
errIncorrectChain = errors.New("invalid address format for the " +
"active chain")
// errConfTargetTooLow is returned when the chosen confirmation target
// is below the allowed minimum.
errConfTargetTooLow = errors.New("confirmation target too low")
// errBalanceTooLow is returned when the loop out amount can't be
// satisfied given total balance of the selection of channels to loop
// out on.
errBalanceTooLow = errors.New(
"channel balance too low for loop out amount",
)
// errInvalidAddress is returned when the destination address is of
// an unsupported format such as P2PK or P2TR addresses.
errInvalidAddress = errors.New(
"invalid or unsupported address",
)
)
// swapClientServer implements the grpc service exposed by loopd.
type swapClientServer struct {
// Required by the grpc-gateway/v2 library for forward compatibility.
clientrpc.UnimplementedSwapClientServer
clientrpc.UnimplementedDebugServer
config *Config
network lndclient.Network
impl *loop.Client
liquidityMgr *liquidity.Manager
lnd *lndclient.LndServices
reservationManager *reservation.Manager
instantOutManager *instantout.Manager
swaps map[lntypes.Hash]loop.SwapInfo
subscribers map[int]chan<- interface{}
statusChan chan loop.SwapInfo
nextSubscriberID int
swapsLock sync.Mutex
mainCtx context.Context
}
// LoopOut initiates a loop out swap with the given parameters. The call returns
// after the swap has been set up with the swap server. From that point onwards,
// progress can be tracked via the LoopOutStatus stream that is returned from
// Monitor().
func (s *swapClientServer) LoopOut(ctx context.Context,
in *clientrpc.LoopOutRequest) (
*clientrpc.SwapResponse, error) {
log.Infof("Loop out request received")
var sweepAddr btcutil.Address
var isExternalAddr bool
var err error
//nolint:lll
switch {
case in.Dest != "" && in.Account != "":
return nil, fmt.Errorf("destination address and external " +
"account address cannot be set at the same time")
case in.Dest != "":
// Decode the client provided destination address for the loop
// out sweep.
sweepAddr, err = btcutil.DecodeAddress(
in.Dest, s.lnd.ChainParams,
)
if err != nil {
return nil, fmt.Errorf("decode address: %v", err)
}
isExternalAddr = true
case in.Account != "" && in.AccountAddrType == clientrpc.AddressType_ADDRESS_TYPE_UNKNOWN:
return nil, liquidity.ErrAccountAndAddrType
case in.Account != "":
// Derive a new receiving address from the stated account.
addrType, err := toWalletAddrType(in.AccountAddrType)
if err != nil {
return nil, err
}
// Check if account with address type exists.
if !s.accountExists(ctx, in.Account, addrType) {
return nil, fmt.Errorf("the provided account does " +
"not exist")
}
sweepAddr, err = s.lnd.WalletKit.NextAddr(
ctx, in.Account, addrType, false,
)
if err != nil {
return nil, fmt.Errorf("NextAddr from account error: "+
"%v", err)
}
isExternalAddr = true
default:
// Generate sweep address if none specified.
sweepAddr, err = s.lnd.WalletKit.NextAddr(
context.Background(), "",
walletrpc.AddressType_WITNESS_PUBKEY_HASH, false,
)
if err != nil {
return nil, fmt.Errorf("NextAddr error: %v", err)
}
}
sweepConfTarget, err := validateLoopOutRequest(
ctx, s.lnd.Client, s.lnd.ChainParams, in, sweepAddr,
s.impl.LoopOutMaxParts,
)
if err != nil {
return nil, err
}
// Infer if the publication deadline is set in milliseconds.
publicationDeadline := getPublicationDeadline(in.SwapPublicationDeadline)
req := &loop.OutRequest{
Amount: btcutil.Amount(in.Amt),
DestAddr: sweepAddr,
IsExternalAddr: isExternalAddr,
MaxMinerFee: btcutil.Amount(in.MaxMinerFee),
MaxPrepayAmount: btcutil.Amount(in.MaxPrepayAmt),
MaxPrepayRoutingFee: btcutil.Amount(in.MaxPrepayRoutingFee),
MaxSwapRoutingFee: btcutil.Amount(in.MaxSwapRoutingFee),
MaxSwapFee: btcutil.Amount(in.MaxSwapFee),
SweepConfTarget: sweepConfTarget,
HtlcConfirmations: in.HtlcConfirmations,
SwapPublicationDeadline: publicationDeadline,
Label: in.Label,
Initiator: in.Initiator,
}
switch {
case in.LoopOutChannel != 0 && len(in.OutgoingChanSet) > 0: // nolint:staticcheck
return nil, errors.New("loop_out_channel and outgoing_" +
"chan_ids are mutually exclusive")
case in.LoopOutChannel != 0: // nolint:staticcheck
req.OutgoingChanSet = loopdb.ChannelSet{in.LoopOutChannel} // nolint:staticcheck
default:
req.OutgoingChanSet = in.OutgoingChanSet
}
info, err := s.impl.LoopOut(ctx, req)
if err != nil {
log.Errorf("LoopOut: %v", err)
return nil, err
}
htlcAddress := info.HtlcAddress.String()
resp := &clientrpc.SwapResponse{
Id: info.SwapHash.String(),
IdBytes: info.SwapHash[:],
HtlcAddress: htlcAddress,
ServerMessage: info.ServerMessage,
}
if loopdb.CurrentProtocolVersion() < loopdb.ProtocolVersionHtlcV3 {
resp.HtlcAddressP2Wsh = htlcAddress
} else {
resp.HtlcAddressP2Tr = htlcAddress
}
return resp, nil
}
// accountExists returns true if account under the address type exists in the
// backing lnd instance and false otherwise.
func (s *swapClientServer) accountExists(ctx context.Context, account string,
addrType walletrpc.AddressType) bool {
accounts, err := s.lnd.WalletKit.ListAccounts(ctx, account, addrType)
if err != nil {
return false
}
for _, a := range accounts {
if a.Name == account {
return true
}
}
return false
}
func toWalletAddrType(addrType clientrpc.AddressType) (walletrpc.AddressType,
error) {
switch addrType {
case clientrpc.AddressType_TAPROOT_PUBKEY:
return walletrpc.AddressType_TAPROOT_PUBKEY, nil
default:
return walletrpc.AddressType_UNKNOWN,
fmt.Errorf("unknown address type")
}
}
func (s *swapClientServer) marshallSwap(loopSwap *loop.SwapInfo) (
*clientrpc.SwapStatus, error) {
var (
state clientrpc.SwapState
failureReason = clientrpc.FailureReason_FAILURE_REASON_NONE
)
// Set our state var for non-failure states. If we get a failure, we
// will update our failure reason. To remain backwards compatible with
// previous versions where we squashed all failure reasons to a single
// failure state, we set a failure reason for all our different failure
// states, and set our failed state for all of them.
switch loopSwap.State {
case loopdb.StateInitiated:
state = clientrpc.SwapState_INITIATED
case loopdb.StatePreimageRevealed:
state = clientrpc.SwapState_PREIMAGE_REVEALED
case loopdb.StateHtlcPublished:
state = clientrpc.SwapState_HTLC_PUBLISHED
case loopdb.StateInvoiceSettled:
state = clientrpc.SwapState_INVOICE_SETTLED
case loopdb.StateSuccess:
state = clientrpc.SwapState_SUCCESS
case loopdb.StateFailOffchainPayments:
failureReason = clientrpc.FailureReason_FAILURE_REASON_OFFCHAIN
case loopdb.StateFailTimeout:
failureReason = clientrpc.FailureReason_FAILURE_REASON_TIMEOUT
case loopdb.StateFailSweepTimeout:
failureReason = clientrpc.FailureReason_FAILURE_REASON_SWEEP_TIMEOUT
case loopdb.StateFailInsufficientValue:
failureReason = clientrpc.FailureReason_FAILURE_REASON_INSUFFICIENT_VALUE
case loopdb.StateFailTemporary:
failureReason = clientrpc.FailureReason_FAILURE_REASON_TEMPORARY
case loopdb.StateFailIncorrectHtlcAmt:
failureReason = clientrpc.FailureReason_FAILURE_REASON_INCORRECT_AMOUNT
case loopdb.StateFailAbandoned:
failureReason = clientrpc.FailureReason_FAILURE_REASON_ABANDONED
case loopdb.StateFailInsufficientConfirmedBalance:
failureReason = clientrpc.FailureReason_FAILURE_REASON_INSUFFICIENT_CONFIRMED_BALANCE
case loopdb.StateFailIncorrectHtlcAmtSwept:
failureReason = clientrpc.FailureReason_FAILURE_REASON_INCORRECT_HTLC_AMT_SWEPT
default:
return nil, fmt.Errorf("unknown swap state: %v", loopSwap.State)
}
// If we have a failure reason, we have a failure state, so should use
// our catchall failed state.
if failureReason != clientrpc.FailureReason_FAILURE_REASON_NONE {
state = clientrpc.SwapState_FAILED
}
var swapType clientrpc.SwapType
var (
htlcAddress string
htlcAddressP2TR string
htlcAddressP2WSH string
)
var outGoingChanSet []uint64
var lastHop []byte
switch loopSwap.SwapType {
case swap.TypeIn:
swapType = clientrpc.SwapType_LOOP_IN
if loopSwap.HtlcAddressP2TR != nil {
htlcAddressP2TR = loopSwap.HtlcAddressP2TR.EncodeAddress()
htlcAddress = htlcAddressP2TR
} else {
htlcAddressP2WSH =
loopSwap.HtlcAddressP2WSH.EncodeAddress()
htlcAddress = htlcAddressP2WSH
}
if loopSwap.LastHop != nil {
lastHop = loopSwap.LastHop[:]
}
case swap.TypeOut:
swapType = clientrpc.SwapType_LOOP_OUT
if loopSwap.HtlcAddressP2WSH != nil {
htlcAddressP2WSH = loopSwap.HtlcAddressP2WSH.EncodeAddress()
htlcAddress = htlcAddressP2WSH
} else {
htlcAddressP2TR = loopSwap.HtlcAddressP2TR.EncodeAddress()
htlcAddress = htlcAddressP2TR
}
outGoingChanSet = loopSwap.OutgoingChanSet
default:
return nil, errors.New("unknown swap type")
}
return &clientrpc.SwapStatus{
Amt: int64(loopSwap.AmountRequested),
Id: loopSwap.SwapHash.String(),
IdBytes: loopSwap.SwapHash[:],
State: state,
FailureReason: failureReason,
InitiationTime: loopSwap.InitiationTime.UnixNano(),
LastUpdateTime: loopSwap.LastUpdate.UnixNano(),
HtlcAddress: htlcAddress,
HtlcAddressP2Tr: htlcAddressP2TR,
HtlcAddressP2Wsh: htlcAddressP2WSH,
Type: swapType,
CostServer: int64(loopSwap.Cost.Server),
CostOnchain: int64(loopSwap.Cost.Onchain),
CostOffchain: int64(loopSwap.Cost.Offchain),
Label: loopSwap.Label,
LastHop: lastHop,
OutgoingChanSet: outGoingChanSet,
}, nil
}
// Monitor will return a stream of swap updates for currently active swaps.
func (s *swapClientServer) Monitor(in *clientrpc.MonitorRequest,
server clientrpc.SwapClient_MonitorServer) error {
log.Infof("Monitor request received")
send := func(info loop.SwapInfo) error {
rpcSwap, err := s.marshallSwap(&info)
if err != nil {
return err
}
return server.Send(rpcSwap)
}
// Start a notification queue for this subscriber.
queue := queue.NewConcurrentQueue(20)
queue.Start()
// Add this subscriber to the global subscriber list. Also create a
// snapshot of all pending and completed swaps within the lock, to
// prevent subscribers from receiving duplicate updates.
s.swapsLock.Lock()
id := s.nextSubscriberID
s.nextSubscriberID++
s.subscribers[id] = queue.ChanIn()
var pendingSwaps, completedSwaps []loop.SwapInfo
for _, swap := range s.swaps {
if swap.State.Type() == loopdb.StateTypePending {
pendingSwaps = append(pendingSwaps, swap)
} else {
completedSwaps = append(completedSwaps, swap)
}
}
s.swapsLock.Unlock()
defer func() {
s.swapsLock.Lock()
delete(s.subscribers, id)
s.swapsLock.Unlock()
queue.Stop()
}()
// Sort completed swaps new to old.
sort.Slice(completedSwaps, func(i, j int) bool {
return completedSwaps[i].LastUpdate.After(
completedSwaps[j].LastUpdate,
)
})
// Discard all but top x latest.
if len(completedSwaps) > completedSwapsCount {
completedSwaps = completedSwaps[:completedSwapsCount]
}
// Concatenate both sets.
filteredSwaps := append(pendingSwaps, completedSwaps...) // nolint: gocritic
// Sort again, but this time old to new.
sort.Slice(filteredSwaps, func(i, j int) bool {
return filteredSwaps[i].LastUpdate.Before(
filteredSwaps[j].LastUpdate,
)
})
// Return swaps to caller.
for _, swap := range filteredSwaps {
if err := send(swap); err != nil {
return err
}
}
// As long as the client is connected, keep passing through swap
// updates.
for {
select {
case queueItem, ok := <-queue.ChanOut():
if !ok {
return nil
}
swap := queueItem.(loop.SwapInfo)
if err := send(swap); err != nil {
return err
}
// The client cancels the subscription.
case <-server.Context().Done():
return nil
// The server is shutting down.
case <-s.mainCtx.Done():
return fmt.Errorf("server is shutting down")
}
}
}
// ListSwaps returns a list of all currently known swaps and their current
// status.
func (s *swapClientServer) ListSwaps(_ context.Context,
req *clientrpc.ListSwapsRequest) (*clientrpc.ListSwapsResponse, error) {
var (
rpcSwaps = []*clientrpc.SwapStatus{}
idx = 0
)
s.swapsLock.Lock()
defer s.swapsLock.Unlock()
// We can just use the server's in-memory cache as that contains the
// most up-to-date state including temporary failures which aren't
// persisted to disk. The swaps field is a map, that's why we need an
// additional index.
for _, swp := range s.swaps {
swp := swp
// Filter the swap based on the provided filter.
if !filterSwap(&swp, req.ListSwapFilter) {
continue
}
rpcSwap, err := s.marshallSwap(&swp)
if err != nil {
return nil, err
}
rpcSwaps = append(rpcSwaps, rpcSwap)
idx++
}
return &clientrpc.ListSwapsResponse{Swaps: rpcSwaps}, nil
}
// filterSwap filters the given swap based on the provided filter.
func filterSwap(swapInfo *loop.SwapInfo, filter *clientrpc.ListSwapsFilter) bool {
if filter == nil {
return true
}
// If the swap type filter is set, we only return swaps that match the
// filter.
if filter.SwapType != clientrpc.ListSwapsFilter_ANY {
switch filter.SwapType {
case clientrpc.ListSwapsFilter_LOOP_IN:
if swapInfo.SwapType != swap.TypeIn {
return false
}
case clientrpc.ListSwapsFilter_LOOP_OUT:
if swapInfo.SwapType != swap.TypeOut {
return false
}
}
}
// If the pending only filter is set, we only return pending swaps.
if filter.PendingOnly && !swapInfo.State.IsPending() {
return false
}
// If the swap is of type loop out and the outgoing channel filter is
// set, we only return swaps that match the filter.
if swapInfo.SwapType == swap.TypeOut && filter.OutgoingChanSet != nil {
// First we sort both channel sets to make sure we can compare
// them.
sort.Slice(swapInfo.OutgoingChanSet, func(i, j int) bool {
return swapInfo.OutgoingChanSet[i] <
swapInfo.OutgoingChanSet[j]
})
sort.Slice(filter.OutgoingChanSet, func(i, j int) bool {
return filter.OutgoingChanSet[i] <
filter.OutgoingChanSet[j]
})
// Compare the outgoing channel set by using reflect.DeepEqual
// which compares the underlying arrays.
if !reflect.DeepEqual(swapInfo.OutgoingChanSet,
filter.OutgoingChanSet) {
return false
}
}
// If the swap is of type loop in and the last hop filter is set, we
// only return swaps that match the filter.
if swapInfo.SwapType == swap.TypeIn && filter.LoopInLastHop != nil {
// Compare the last hop by using reflect.DeepEqual which
// compares the underlying arrays.
if !reflect.DeepEqual(swapInfo.LastHop, filter.LoopInLastHop) {
return false
}
}
// If a label filter is set, we only return swaps that softly match the
// filter.
if filter.Label != "" {
if !strings.Contains(swapInfo.Label, filter.Label) {
return false
}
}
return true
}
// SwapInfo returns all known details about a single swap.
func (s *swapClientServer) SwapInfo(_ context.Context,
req *clientrpc.SwapInfoRequest) (*clientrpc.SwapStatus, error) {
swapHash, err := lntypes.MakeHash(req.Id)
if err != nil {
return nil, fmt.Errorf("error parsing swap hash: %v", err)
}
// Just return the server's in-memory cache here too as we also want to
// return temporary failures to the client.
swp, ok := s.swaps[swapHash]
if !ok {
return nil, fmt.Errorf("swap with hash %s not found", req.Id)
}
return s.marshallSwap(&swp)
}
// AbandonSwap requests the server to abandon a swap with the given hash.
func (s *swapClientServer) AbandonSwap(ctx context.Context,
req *clientrpc.AbandonSwapRequest) (*clientrpc.AbandonSwapResponse,
error) {
if !req.IKnowWhatIAmDoing {
return nil, fmt.Errorf("please read the AbandonSwap API " +
"documentation")
}
swapHash, err := lntypes.MakeHash(req.Id)
if err != nil {
return nil, fmt.Errorf("error parsing swap hash: %v", err)
}
s.swapsLock.Lock()
swap, ok := s.swaps[swapHash]
s.swapsLock.Unlock()
if !ok {
return nil, fmt.Errorf("swap with hash %s not found", req.Id)
}
if swap.SwapType.IsOut() {
return nil, fmt.Errorf("abandoning loop out swaps is not " +
"supported yet")
}
// If the swap is in a final state, we cannot abandon it.
if swap.State.IsFinal() {
return nil, fmt.Errorf("cannot abandon swap in final state, "+
"state = %s, hash = %s", swap.State.String(), swapHash)
}
err = s.impl.AbandonSwap(ctx, &loop.AbandonSwapRequest{
SwapHash: swapHash,
})
if err != nil {
return nil, fmt.Errorf("error abandoning swap: %v", err)
}
return &clientrpc.AbandonSwapResponse{}, nil
}
// LoopOutTerms returns the terms that the server enforces for loop out swaps.
func (s *swapClientServer) LoopOutTerms(ctx context.Context,
_ *clientrpc.TermsRequest) (*clientrpc.OutTermsResponse, error) {
log.Infof("Loop out terms request received")
terms, err := s.impl.LoopOutTerms(ctx, defaultLoopdInitiator)
if err != nil {
log.Errorf("Terms request: %v", err)
return nil, err
}
return &clientrpc.OutTermsResponse{
MinSwapAmount: int64(terms.MinSwapAmount),
MaxSwapAmount: int64(terms.MaxSwapAmount),
MinCltvDelta: terms.MinCltvDelta,
MaxCltvDelta: terms.MaxCltvDelta,
}, nil
}
// LoopOutQuote returns a quote for a loop out swap with the provided
// parameters.
func (s *swapClientServer) LoopOutQuote(ctx context.Context,
req *clientrpc.QuoteRequest) (*clientrpc.OutQuoteResponse, error) {
confTarget, err := validateConfTarget(
req.ConfTarget, loop.DefaultSweepConfTarget,
)
if err != nil {
return nil, err
}
publicactionDeadline := getPublicationDeadline(
req.SwapPublicationDeadline,
)
quote, err := s.impl.LoopOutQuote(ctx, &loop.LoopOutQuoteRequest{
Amount: btcutil.Amount(req.Amt),
SweepConfTarget: confTarget,
SwapPublicationDeadline: publicactionDeadline,
Initiator: defaultLoopdInitiator,
})
if err != nil {
return nil, err
}
return &clientrpc.OutQuoteResponse{
HtlcSweepFeeSat: int64(quote.MinerFee),
PrepayAmtSat: int64(quote.PrepayAmount),
SwapFeeSat: int64(quote.SwapFee),
SwapPaymentDest: quote.SwapPaymentDest[:],
ConfTarget: confTarget,
}, nil
}
// GetLoopInTerms returns the terms that the server enforces for swaps.
func (s *swapClientServer) GetLoopInTerms(ctx context.Context,
_ *clientrpc.TermsRequest) (*clientrpc.InTermsResponse, error) {
log.Infof("Loop in terms request received")
terms, err := s.impl.LoopInTerms(ctx, defaultLoopdInitiator)
if err != nil {
log.Errorf("Terms request: %v", err)
return nil, err
}
return &clientrpc.InTermsResponse{
MinSwapAmount: int64(terms.MinSwapAmount),
MaxSwapAmount: int64(terms.MaxSwapAmount),
}, nil
}
// GetLoopInQuote returns a quote for a swap with the provided parameters.
func (s *swapClientServer) GetLoopInQuote(ctx context.Context,
req *clientrpc.QuoteRequest) (*clientrpc.InQuoteResponse, error) {
log.Infof("Loop in quote request received")
htlcConfTarget, err := validateLoopInRequest(
req.ConfTarget, req.ExternalHtlc,
)
if err != nil {
return nil, err
}
var (
routeHints [][]zpay32.HopHint
lastHop *route.Vertex
)
if req.LoopInLastHop != nil {
lastHopVertex, err := route.NewVertexFromBytes(
req.LoopInLastHop,
)
if err != nil {
return nil, err
}
lastHop = &lastHopVertex
}
if len(req.LoopInRouteHints) != 0 {
routeHints, err = unmarshallRouteHints(req.LoopInRouteHints)
if err != nil {
return nil, err
}
}
quote, err := s.impl.LoopInQuote(ctx, &loop.LoopInQuoteRequest{
Amount: btcutil.Amount(req.Amt),
HtlcConfTarget: htlcConfTarget,
ExternalHtlc: req.ExternalHtlc,
LastHop: lastHop,
RouteHints: routeHints,
Private: req.Private,
Initiator: defaultLoopdInitiator,
})
if err != nil {
return nil, err
}
return &clientrpc.InQuoteResponse{
HtlcPublishFeeSat: int64(quote.MinerFee),
SwapFeeSat: int64(quote.SwapFee),
ConfTarget: htlcConfTarget,
}, nil
}
// unmarshallRouteHints unmarshalls a list of route hints.
func unmarshallRouteHints(rpcRouteHints []*looprpc.RouteHint) (
[][]zpay32.HopHint, error) {
routeHints := make([][]zpay32.HopHint, 0, len(rpcRouteHints))
for _, rpcRouteHint := range rpcRouteHints {
routeHint := make(
[]zpay32.HopHint, 0, len(rpcRouteHint.HopHints),
)
for _, rpcHint := range rpcRouteHint.HopHints {
hint, err := unmarshallHopHint(rpcHint)
if err != nil {
return nil, err
}
routeHint = append(routeHint, hint)
}
routeHints = append(routeHints, routeHint)
}
return routeHints, nil
}
// unmarshallHopHint unmarshalls a single hop hint.
func unmarshallHopHint(rpcHint *looprpc.HopHint) (zpay32.HopHint, error) {
pubBytes, err := hex.DecodeString(rpcHint.NodeId)
if err != nil {
return zpay32.HopHint{}, err
}
pubkey, err := btcec.ParsePubKey(pubBytes)
if err != nil {
return zpay32.HopHint{}, err
}
return zpay32.HopHint{
NodeID: pubkey,
ChannelID: rpcHint.ChanId,
FeeBaseMSat: rpcHint.FeeBaseMsat,
FeeProportionalMillionths: rpcHint.FeeProportionalMillionths,
CLTVExpiryDelta: uint16(rpcHint.CltvExpiryDelta),
}, nil
}
// Probe requests the server to probe the client's node to test inbound
// liquidity.
func (s *swapClientServer) Probe(ctx context.Context,
req *clientrpc.ProbeRequest) (*clientrpc.ProbeResponse, error) {
log.Infof("Probe request received")
var lastHop *route.Vertex
if req.LastHop != nil {
lastHopVertex, err := route.NewVertexFromBytes(req.LastHop)
if err != nil {
return nil, err
}
lastHop = &lastHopVertex
}
routeHints, err := unmarshallRouteHints(req.RouteHints)
if err != nil {
return nil, err
}
err = s.impl.Probe(ctx, &loop.ProbeRequest{
Amount: btcutil.Amount(req.Amt),
LastHop: lastHop,
RouteHints: routeHints,
})
if err != nil {
return nil, err
}
return &clientrpc.ProbeResponse{}, nil
}
func (s *swapClientServer) LoopIn(ctx context.Context,
in *clientrpc.LoopInRequest) (*clientrpc.SwapResponse, error) {
log.Infof("Loop in request received")
htlcConfTarget, err := validateLoopInRequest(
in.HtlcConfTarget, in.ExternalHtlc,
)
if err != nil {
return nil, err
}
// Check that the label is valid.
if err := labels.Validate(in.Label); err != nil {
return nil, err
}
routeHints, err := unmarshallRouteHints(in.RouteHints)
if err != nil {
return nil, err
}
req := &loop.LoopInRequest{
Amount: btcutil.Amount(in.Amt),
MaxMinerFee: btcutil.Amount(in.MaxMinerFee),
MaxSwapFee: btcutil.Amount(in.MaxSwapFee),
HtlcConfTarget: htlcConfTarget,
ExternalHtlc: in.ExternalHtlc,
Label: in.Label,
Initiator: in.Initiator,
Private: in.Private,
RouteHints: routeHints,
}
if in.LastHop != nil {
lastHop, err := route.NewVertexFromBytes(in.LastHop)
if err != nil {
return nil, err
}
req.LastHop = &lastHop
}
swapInfo, err := s.impl.LoopIn(ctx, req)
if err != nil {
log.Errorf("Loop in: %v", err)
return nil, err
}
response := &clientrpc.SwapResponse{
Id: swapInfo.SwapHash.String(),
IdBytes: swapInfo.SwapHash[:],
ServerMessage: swapInfo.ServerMessage,
}
if loopdb.CurrentProtocolVersion() < loopdb.ProtocolVersionHtlcV3 {
p2wshAddr := swapInfo.HtlcAddressP2WSH.String()
response.HtlcAddress = p2wshAddr
response.HtlcAddressP2Wsh = p2wshAddr
} else {
p2trAddr := swapInfo.HtlcAddressP2TR.String()
response.HtlcAddress = p2trAddr
response.HtlcAddressP2Tr = p2trAddr
}
return response, nil
}
// GetLsatTokens returns all tokens that are contained in the LSAT token store.
func (s *swapClientServer) GetLsatTokens(ctx context.Context,
_ *clientrpc.TokensRequest) (*clientrpc.TokensResponse, error) {
log.Infof("Get LSAT tokens request received")
tokens, err := s.impl.LsatStore.AllTokens()
if err != nil {
return nil, err
}
rpcTokens := make([]*clientrpc.LsatToken, len(tokens))
idx := 0
for key, token := range tokens {
macBytes, err := token.BaseMacaroon().MarshalBinary()
if err != nil {
return nil, err
}
id, err := lsat.DecodeIdentifier(
bytes.NewReader(token.BaseMacaroon().Id()),
)
if err != nil {
return nil, err
}
rpcTokens[idx] = &clientrpc.LsatToken{
BaseMacaroon: macBytes,
PaymentHash: token.PaymentHash[:],
PaymentPreimage: token.Preimage[:],
AmountPaidMsat: int64(token.AmountPaid),
RoutingFeePaidMsat: int64(token.RoutingFeePaid),
TimeCreated: token.TimeCreated.Unix(),
Expired: !token.IsValid(),
StorageName: key,
Id: hex.EncodeToString(
id.TokenID[:],
),
}
idx++
}
return &clientrpc.TokensResponse{Tokens: rpcTokens}, nil
}
// GetInfo returns basic information about the loop daemon and details to swaps
// from the swap store.
func (s *swapClientServer) GetInfo(ctx context.Context,
_ *clientrpc.GetInfoRequest) (*clientrpc.GetInfoResponse, error) {
// Fetch loop-outs from the loop db.
outSwaps, err := s.impl.Store.FetchLoopOutSwaps(ctx)
if err != nil {
return nil, err
}
// Collect loop-out stats.
loopOutStats := &clientrpc.LoopStats{}
for _, out := range outSwaps {
switch out.State().State.Type() {
case loopdb.StateTypeSuccess:
loopOutStats.SuccessCount++
loopOutStats.SumSucceededAmt += int64(
out.Contract.AmountRequested,
)
case loopdb.StateTypePending:
loopOutStats.PendingCount++
loopOutStats.SumPendingAmt += int64(
out.Contract.AmountRequested,
)
case loopdb.StateTypeFail:
loopOutStats.FailCount++
}
}
// Fetch loop-ins from the loop db.
inSwaps, err := s.impl.Store.FetchLoopInSwaps(ctx)
if err != nil {
return nil, err
}
// Collect loop-in stats.
loopInStats := &clientrpc.LoopStats{}
for _, in := range inSwaps {
switch in.State().State.Type() {
case loopdb.StateTypeSuccess:
loopInStats.SuccessCount++
loopInStats.SumSucceededAmt += int64(
in.Contract.AmountRequested,
)
case loopdb.StateTypePending:
loopInStats.PendingCount++
loopInStats.SumPendingAmt += int64(
in.Contract.AmountRequested,
)
case loopdb.StateTypeFail:
loopInStats.FailCount++
}
}
return &clientrpc.GetInfoResponse{
Version: loop.Version(),
Network: s.config.Network,
RpcListen: s.config.RPCListen,
RestListen: s.config.RESTListen,
MacaroonPath: s.config.MacaroonPath,
TlsCertPath: s.config.TLSCertPath,
LoopOutStats: loopOutStats,
LoopInStats: loopInStats,
}, nil
}
// GetLiquidityParams gets our current liquidity manager's parameters.
func (s *swapClientServer) GetLiquidityParams(_ context.Context,
_ *clientrpc.GetLiquidityParamsRequest) (*clientrpc.LiquidityParameters,
error) {
cfg := s.liquidityMgr.GetParameters()
rpcCfg, err := liquidity.ParametersToRpc(cfg)
if err != nil {
return nil, err
}
return rpcCfg, nil
}
// SetLiquidityParams attempts to set our current liquidity manager's
// parameters.
func (s *swapClientServer) SetLiquidityParams(ctx context.Context,
in *clientrpc.SetLiquidityParamsRequest) (*clientrpc.SetLiquidityParamsResponse,
error) {
err := s.liquidityMgr.SetParameters(ctx, in.Parameters)
if err != nil {
return nil, err
}
return &clientrpc.SetLiquidityParamsResponse{}, nil
}
// SuggestSwaps provides a list of suggested swaps based on lnd's current
// channel balances and rules set by the liquidity manager.
func (s *swapClientServer) SuggestSwaps(ctx context.Context,
_ *clientrpc.SuggestSwapsRequest) (*clientrpc.SuggestSwapsResponse, error) {
suggestions, err := s.liquidityMgr.SuggestSwaps(ctx)
switch err {
case liquidity.ErrNoRules:
return nil, status.Error(codes.FailedPrecondition, err.Error())
case nil:
default:
return nil, err
}
resp := &clientrpc.SuggestSwapsResponse{
LoopOut: make(
[]*clientrpc.LoopOutRequest, len(suggestions.OutSwaps),
),
LoopIn: make(
[]*clientrpc.LoopInRequest, len(suggestions.InSwaps),
),
}
for i, swap := range suggestions.OutSwaps {
resp.LoopOut[i] = &clientrpc.LoopOutRequest{
Amt: int64(swap.Amount),
OutgoingChanSet: swap.OutgoingChanSet,
MaxSwapFee: int64(swap.MaxSwapFee),
MaxMinerFee: int64(swap.MaxMinerFee),
MaxPrepayAmt: int64(swap.MaxPrepayAmount),
MaxSwapRoutingFee: int64(swap.MaxSwapRoutingFee),
MaxPrepayRoutingFee: int64(swap.MaxPrepayRoutingFee),
SweepConfTarget: swap.SweepConfTarget,
}
}
for i, swap := range suggestions.InSwaps {
loopIn := &clientrpc.LoopInRequest{
Amt: int64(swap.Amount),
MaxSwapFee: int64(swap.MaxSwapFee),
MaxMinerFee: int64(swap.MaxMinerFee),
HtlcConfTarget: swap.HtlcConfTarget,
}
if swap.LastHop != nil {
loopIn.LastHop = swap.LastHop[:]
}
resp.LoopIn[i] = loopIn
}
for id, reason := range suggestions.DisqualifiedChans {
autoloopReason, err := rpcAutoloopReason(reason)
if err != nil {
return nil, err
}
exclChan := &clientrpc.Disqualified{
Reason: autoloopReason,
ChannelId: id.ToUint64(),
}
resp.Disqualified = append(resp.Disqualified, exclChan)
}
for pubkey, reason := range suggestions.DisqualifiedPeers {
autoloopReason, err := rpcAutoloopReason(reason)
if err != nil {
return nil, err
}
clonedPubkey := route.Vertex{}
copy(clonedPubkey[:], pubkey[:])
exclChan := &clientrpc.Disqualified{
Reason: autoloopReason,
Pubkey: clonedPubkey[:],
}
resp.Disqualified = append(resp.Disqualified, exclChan)
}
return resp, nil
}
// ListReservations lists all existing reservations the client has ever made.
func (s *swapClientServer) ListReservations(ctx context.Context,
_ *clientrpc.ListReservationsRequest) (
*clientrpc.ListReservationsResponse, error) {
if s.reservationManager == nil {
return nil, status.Error(codes.Unimplemented,
"Restart loop with --experimental")
}
reservations, err := s.reservationManager.GetReservations(
ctx,
)
if err != nil {
return nil, err
}
return &clientrpc.ListReservationsResponse{
Reservations: ToClientReservations(
reservations,
),
}, nil
}
// InstantOut initiates an instant out swap.
func (s *swapClientServer) InstantOut(ctx context.Context,
req *clientrpc.InstantOutRequest) (*clientrpc.InstantOutResponse,
error) {
reservationIds := make([]reservation.ID, len(req.ReservationIds))
for i, id := range req.ReservationIds {
if len(id) != reservation.IdLength {
return nil, fmt.Errorf("invalid reservation id: "+
"expected %v bytes, got %d",
reservation.IdLength, len(id))
}
var resId reservation.ID
copy(resId[:], id)
reservationIds[i] = resId
}
instantOutFsm, err := s.instantOutManager.NewInstantOut(
ctx, reservationIds, req.DestAddr,
)
if err != nil {
return nil, err
}
res := &clientrpc.InstantOutResponse{
InstantOutHash: instantOutFsm.InstantOut.SwapHash[:],
State: string(instantOutFsm.InstantOut.State),
}
if instantOutFsm.InstantOut.SweepTxHash != nil {
res.SweepTxId = instantOutFsm.InstantOut.SweepTxHash.String()
}
return res, nil
}
// InstantOutQuote returns a quote for an instant out swap with the provided
// parameters.
func (s *swapClientServer) InstantOutQuote(ctx context.Context,
req *clientrpc.InstantOutQuoteRequest) (
*clientrpc.InstantOutQuoteResponse, error) {
quote, err := s.instantOutManager.GetInstantOutQuote(
ctx, btcutil.Amount(req.Amt), int(req.NumReservations),
)
if err != nil {
return nil, err
}
return &clientrpc.InstantOutQuoteResponse{
ServiceFeeSat: int64(quote.ServiceFee),
SweepFeeSat: int64(quote.OnChainFee),
}, nil
}
// ListInstantOuts returns a list of all currently known instant out swaps and
// their current status.
func (s *swapClientServer) ListInstantOuts(ctx context.Context,
_ *clientrpc.ListInstantOutsRequest) (
*clientrpc.ListInstantOutsResponse, error) {
instantOuts, err := s.instantOutManager.ListInstantOuts(ctx)
if err != nil {
return nil, err
}
rpcSwaps := make([]*clientrpc.InstantOut, 0, len(instantOuts))
for _, instantOut := range instantOuts {
rpcSwaps = append(rpcSwaps, rpcInstantOut(instantOut))
}
return &clientrpc.ListInstantOutsResponse{
Swaps: rpcSwaps,
}, nil
}
func rpcInstantOut(instantOut *instantout.InstantOut) *clientrpc.InstantOut {
var sweepTxId string
if instantOut.SweepTxHash != nil {
sweepTxId = instantOut.SweepTxHash.String()
}
reservations := make([][]byte, len(instantOut.Reservations))
for i, res := range instantOut.Reservations {
reservations[i] = res.ID[:]
}
return &clientrpc.InstantOut{
SwapHash: instantOut.SwapHash[:],
State: string(instantOut.State),
Amount: uint64(instantOut.Value),
SweepTxId: sweepTxId,
ReservationIds: reservations,
}
}
func rpcAutoloopReason(reason liquidity.Reason) (clientrpc.AutoReason, error) {
switch reason {
case liquidity.ReasonNone:
return clientrpc.AutoReason_AUTO_REASON_UNKNOWN, nil
case liquidity.ReasonBudgetNotStarted:
return clientrpc.AutoReason_AUTO_REASON_BUDGET_NOT_STARTED, nil
case liquidity.ReasonSweepFees:
return clientrpc.AutoReason_AUTO_REASON_SWEEP_FEES, nil
case liquidity.ReasonBudgetElapsed:
return clientrpc.AutoReason_AUTO_REASON_BUDGET_ELAPSED, nil
case liquidity.ReasonInFlight:
return clientrpc.AutoReason_AUTO_REASON_IN_FLIGHT, nil
case liquidity.ReasonSwapFee:
return clientrpc.AutoReason_AUTO_REASON_SWAP_FEE, nil
case liquidity.ReasonMinerFee:
return clientrpc.AutoReason_AUTO_REASON_MINER_FEE, nil
case liquidity.ReasonPrepay:
return clientrpc.AutoReason_AUTO_REASON_PREPAY, nil
case liquidity.ReasonFailureBackoff:
return clientrpc.AutoReason_AUTO_REASON_FAILURE_BACKOFF, nil
case liquidity.ReasonLoopOut:
return clientrpc.AutoReason_AUTO_REASON_LOOP_OUT, nil
case liquidity.ReasonLoopIn:
return clientrpc.AutoReason_AUTO_REASON_LOOP_IN, nil
case liquidity.ReasonLiquidityOk:
return clientrpc.AutoReason_AUTO_REASON_LIQUIDITY_OK, nil
case liquidity.ReasonBudgetInsufficient:
return clientrpc.AutoReason_AUTO_REASON_BUDGET_INSUFFICIENT, nil
case liquidity.ReasonFeePPMInsufficient:
return clientrpc.AutoReason_AUTO_REASON_SWAP_FEE, nil
default:
return 0, fmt.Errorf("unknown autoloop reason: %v", reason)
}
}
// processStatusUpdates reads updates on the status channel and processes them.
//
// NOTE: This must run inside a goroutine as it blocks until the main context
// shuts down.
func (s *swapClientServer) processStatusUpdates(mainCtx context.Context) {
for {
select {
// On updates, refresh the server's in-memory state and inform
// subscribers about the changes.
case swp := <-s.statusChan:
s.swapsLock.Lock()
s.swaps[swp.SwapHash] = swp
for _, subscriber := range s.subscribers {
select {
case subscriber <- swp:
case <-mainCtx.Done():
s.swapsLock.Unlock()
return
}
}
s.swapsLock.Unlock()
// Server is shutting down.
case <-mainCtx.Done():
return
}
}
}
// validateConfTarget ensures the given confirmation target is valid. If one
// isn't specified (0 value), then the default target is used.
func validateConfTarget(target, defaultTarget int32) (int32, error) {
switch {
case target == 0:
return defaultTarget, nil
// Ensure the target respects our minimum threshold.
case target < minConfTarget:
return 0, fmt.Errorf("%w: A confirmation target of at "+
"least %v must be provided", errConfTargetTooLow,
minConfTarget)
default:
return target, nil
}
}
// validateLoopInRequest fails if the mutually exclusive conf target and
// external parameters are both set.
func validateLoopInRequest(htlcConfTarget int32, external bool) (int32, error) {
// If the htlc is going to be externally set, the htlcConfTarget should
// not be set, because it has no relevance when the htlc is external.
if external && htlcConfTarget != 0 {
return 0, errors.New("external and htlc conf target cannot " +
"both be set")
}
// If the htlc is being externally published, we do not need to set a
// confirmation target.
if external {
return 0, nil
}
return validateConfTarget(htlcConfTarget, loop.DefaultHtlcConfTarget)
}
// validateLoopOutRequest validates the confirmation target, destination
// address and label of the loop out request. It also checks that the requested
// loop amount is valid given the available balance.
func validateLoopOutRequest(ctx context.Context, lnd lndclient.LightningClient,
chainParams *chaincfg.Params, req *clientrpc.LoopOutRequest,
sweepAddr btcutil.Address, maxParts uint32) (int32, error) {
// Check that the provided destination address has the correct format
// for the active network.
if !sweepAddr.IsForNet(chainParams) {
return 0, fmt.Errorf("%w: Current active network is %s",
errIncorrectChain, chainParams.Name)
}
// Check that the provided destination address is a supported
// address format.
switch sweepAddr.(type) {
case *btcutil.AddressTaproot,
*btcutil.AddressWitnessScriptHash,
*btcutil.AddressWitnessPubKeyHash,
*btcutil.AddressScriptHash,
*btcutil.AddressPubKeyHash:
default:
return 0, errInvalidAddress
}
// Check that the label is valid.
if err := labels.Validate(req.Label); err != nil {
return 0, err
}
channels, err := lnd.ListChannels(ctx, false, false)
if err != nil {
return 0, err
}
unlimitedChannels := len(req.OutgoingChanSet) == 0
outgoingChanSetMap := make(map[uint64]bool)
for _, chanID := range req.OutgoingChanSet {
outgoingChanSetMap[chanID] = true
}
var activeChannelSet []lndclient.ChannelInfo
for _, c := range channels {
// Don't bother looking at inactive channels.
if !c.Active {
continue
}
// If no outgoing channel set was specified then all active
// channels are considered. However, if a channel set was
// specified then only the specified channels are considered.
if unlimitedChannels || outgoingChanSetMap[c.ChannelID] {
activeChannelSet = append(activeChannelSet, c)
}
}
// Determine if the loop out request is theoretically possible given
// the amount requested, the maximum possible routing fees,
// the available channel set and the fact that equal splitting is
// used for MPP.
requiredBalance := btcutil.Amount(req.Amt + req.MaxSwapRoutingFee)
isRoutable, _ := hasBandwidth(activeChannelSet, requiredBalance,
int(maxParts))
if !isRoutable {
return 0, fmt.Errorf("%w: Requested swap amount of %d "+
"sats along with the maximum routing fee of %d sats "+
"is more than what can be routed given current state "+
"of the channel set", errBalanceTooLow, req.Amt,
req.MaxSwapRoutingFee)
}
return validateConfTarget(
req.SweepConfTarget, loop.DefaultSweepConfTarget,
)
}
// hasBandwidth simulates the MPP splitting logic that will be used by LND when
// attempting to route the payment. This function is used to evaluate if a
// payment will be routable given the splitting logic used by LND.
// It returns true if the amount is routable given the channel set and the
// maximum number of shards allowed. If the amount is routable then the number
// of shards used is also returned. This function makes an assumption that the
// minimum loop amount divided by max parts will not be less than the minimum
// shard amount. If the MPP logic changes, then this function should be updated.
func hasBandwidth(channels []lndclient.ChannelInfo, amt btcutil.Amount,
maxParts int) (bool, int) {
scratch := make([]btcutil.Amount, len(channels))
var totalBandwidth btcutil.Amount
for i, channel := range channels {
scratch[i] = channel.LocalBalance
totalBandwidth += channel.LocalBalance
}
if totalBandwidth < amt {
return false, 0
}
split := amt
for shard := 0; shard <= maxParts; {
paid := false
for i := 0; i < len(scratch); i++ {
if scratch[i] >= split {
scratch[i] -= split
amt -= split
paid = true
shard++
break
}
}
if amt == 0 {
return true, shard
}
if !paid {
split /= 2
} else {
split = amt
}
}
return false, 0
}
// getPublicationDeadline returns the publication deadline for a swap given the
// unix timestamp. If the timestamp is believed to be in milliseconds, then it
// is converted to seconds.
func getPublicationDeadline(unixTimestamp uint64) time.Time {
length := len(fmt.Sprintf("%d", unixTimestamp))
if length >= 13 {
// Likely a millisecond timestamp
secs := unixTimestamp / 1000
nsecs := (unixTimestamp % 1000) * 1e6
return time.Unix(int64(secs), int64(nsecs))
} else {
// Likely a second timestamp
return time.Unix(int64(unixTimestamp), 0)
}
}
// ToClientReservations converts a slice of server
// reservations to a slice of client reservations.
func ToClientReservations(
res []*reservation.Reservation) []*clientrpc.ClientReservation {
var result []*clientrpc.ClientReservation
for _, r := range res {
result = append(result, toClientReservation(r))
}
return result
}
// toClientReservation converts a server reservation to a
// client reservation.
func toClientReservation(
res *reservation.Reservation) *clientrpc.ClientReservation {
var (
txid string
vout uint32
)
if res.Outpoint != nil {
txid = res.Outpoint.Hash.String()
vout = res.Outpoint.Index
}
return &clientrpc.ClientReservation{
ReservationId: res.ID[:],
State: string(res.State),
Amount: uint64(res.Value),
TxId: txid,
Vout: vout,
Expiry: res.Expiry,
}
}