package loopd
import (
"context"
"errors"
"fmt"
"sort"
"sync"
"time"
"github.com/btcsuite/btcutil"
"github.com/lightninglabs/lndclient"
"github.com/lightninglabs/loop"
"github.com/lightninglabs/loop/labels"
"github.com/lightninglabs/loop/liquidity"
"github.com/lightninglabs/loop/loopdb"
"github.com/lightninglabs/loop/looprpc"
"github.com/lightninglabs/loop/swap"
"github.com/lightningnetwork/lnd/lntypes"
"github.com/lightningnetwork/lnd/lnwallet/chainfee"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/queue"
"github.com/lightningnetwork/lnd/routing/route"
)
const (
completedSwapsCount = 5
// minConfTarget is the minimum confirmation target we'll allow clients
// to specify. This is driven by the minimum confirmation target allowed
// by the backing fee estimator.
minConfTarget = 2
)
// swapClientServer implements the grpc service exposed by loopd.
type swapClientServer struct {
network lndclient . Network
impl * loop . Client
liquidityMgr * liquidity . Manager
lnd * lndclient . LndServices
swaps map [ lntypes . Hash ] loop . SwapInfo
subscribers map [ int ] chan <- interface { }
statusChan chan loop . SwapInfo
nextSubscriberID int
swapsLock sync . Mutex
mainCtx context . Context
}
// LoopOut initiates an loop out swap with the given parameters. The call
// returns after the swap has been set up with the swap server. From that point
// onwards, progress can be tracked via the LoopOutStatus stream that is
// returned from Monitor().
func ( s * swapClientServer ) LoopOut ( ctx context . Context ,
in * looprpc . LoopOutRequest ) (
* looprpc . SwapResponse , error ) {
log . Infof ( "Loop out request received" )
sweepConfTarget , err := validateConfTarget (
in . SweepConfTarget , loop . DefaultSweepConfTarget ,
)
if err != nil {
return nil , err
}
var sweepAddr btcutil . Address
if in . Dest == "" {
// Generate sweep address if none specified.
var err error
sweepAddr , err = s . lnd . WalletKit . NextAddr ( context . Background ( ) )
if err != nil {
return nil , fmt . Errorf ( "NextAddr error: %v" , err )
}
} else {
var err error
sweepAddr , err = btcutil . DecodeAddress (
in . Dest , s . lnd . ChainParams ,
)
if err != nil {
return nil , fmt . Errorf ( "decode address: %v" , err )
}
}
var changeAddr btcutil . Address
if in . ChangeAddr != "" {
var err error
changeAddr , err = btcutil . DecodeAddress (
in . ChangeAddr , s . lnd . ChainParams ,
)
if err != nil {
return nil , fmt . Errorf ( "decode change address: %v" , err )
}
} else if in . DestAmount != 0 {
// Generate change address.
var err error
changeAddr , err = s . lnd . WalletKit . NextAddr ( context . Background ( ) )
if err != nil {
return nil , fmt . Errorf ( "NextAddr error: %v" , err )
}
}
if in . DestAmount != 0 && in . Amt < in . DestAmount + in . MaxMinerFee {
return nil , fmt . Errorf ( "amt is too small; the following rule is not satisfied: amt >= max_miner_fee + dest_amt; got values: amt=%d, max_miner_fee=%d, dest_amt=%d" , in . Amt , in . DestAmount , in . MaxMinerFee )
}
// Check that the label is valid.
if err := labels . Validate ( in . Label ) ; err != nil {
return nil , err
}
req := & loop . OutRequest {
Amount : btcutil . Amount ( in . Amt ) ,
DestAddr : sweepAddr ,
DestAmount : btcutil . Amount ( in . DestAmount ) ,
ChangeAddr : changeAddr ,
MaxMinerFee : btcutil . Amount ( in . MaxMinerFee ) ,
MaxPrepayAmount : btcutil . Amount ( in . MaxPrepayAmt ) ,
MaxPrepayRoutingFee : btcutil . Amount ( in . MaxPrepayRoutingFee ) ,
MaxSwapRoutingFee : btcutil . Amount ( in . MaxSwapRoutingFee ) ,
MaxSwapFee : btcutil . Amount ( in . MaxSwapFee ) ,
SweepConfTarget : sweepConfTarget ,
HtlcConfirmations : in . HtlcConfirmations ,
SwapPublicationDeadline : time . Unix (
int64 ( in . SwapPublicationDeadline ) , 0 ,
) ,
Label : in . Label ,
Initiator : in . Initiator ,
}
switch {
case in . LoopOutChannel != 0 && len ( in . OutgoingChanSet ) > 0 :
return nil , errors . New ( "loop_out_channel and outgoing_" +
"chan_ids are mutually exclusive" )
case in . LoopOutChannel != 0 :
req . OutgoingChanSet = loopdb . ChannelSet { in . LoopOutChannel }
default :
req . OutgoingChanSet = in . OutgoingChanSet
}
info , err := s . impl . LoopOut ( ctx , req )
if err != nil {
log . Errorf ( "LoopOut: %v" , err )
return nil , err
}
return & looprpc . SwapResponse {
Id : info . SwapHash . String ( ) ,
IdBytes : info . SwapHash [ : ] ,
HtlcAddress : info . HtlcAddressP2WSH . String ( ) ,
HtlcAddressP2Wsh : info . HtlcAddressP2WSH . String ( ) ,
ServerMessage : info . ServerMessage ,
} , nil
}
func ( s * swapClientServer ) marshallSwap ( loopSwap * loop . SwapInfo ) (
* looprpc . SwapStatus , error ) {
var (
state looprpc . SwapState
failureReason = looprpc . FailureReason_FAILURE_REASON_NONE
)
// Set our state var for non-failure states. If we get a failure, we
// will update our failure reason. To remain backwards compatible with
// previous versions where we squashed all failure reasons to a single
// failure state, we set a failure reason for all our different failure
// states, and set our failed state for all of them.
switch loopSwap . State {
case loopdb . StateInitiated :
state = looprpc . SwapState_INITIATED
case loopdb . StatePreimageRevealed :
state = looprpc . SwapState_PREIMAGE_REVEALED
case loopdb . StateHtlcPublished :
state = looprpc . SwapState_HTLC_PUBLISHED
case loopdb . StateInvoiceSettled :
state = looprpc . SwapState_INVOICE_SETTLED
case loopdb . StateSuccess :
state = looprpc . SwapState_SUCCESS
case loopdb . StateFailOffchainPayments :
failureReason = looprpc . FailureReason_FAILURE_REASON_OFFCHAIN
case loopdb . StateFailTimeout :
failureReason = looprpc . FailureReason_FAILURE_REASON_TIMEOUT
case loopdb . StateFailSweepTimeout :
failureReason = looprpc . FailureReason_FAILURE_REASON_SWEEP_TIMEOUT
case loopdb . StateFailInsufficientValue :
failureReason = looprpc . FailureReason_FAILURE_REASON_INSUFFICIENT_VALUE
case loopdb . StateFailTemporary :
failureReason = looprpc . FailureReason_FAILURE_REASON_TEMPORARY
case loopdb . StateFailIncorrectHtlcAmt :
failureReason = looprpc . FailureReason_FAILURE_REASON_INCORRECT_AMOUNT
default :
return nil , fmt . Errorf ( "unknown swap state: %v" , loopSwap . State )
}
// If we have a failure reason, we have a failure state, so should use
// our catchall failed state.
if failureReason != looprpc . FailureReason_FAILURE_REASON_NONE {
state = looprpc . SwapState_FAILED
}
var swapType looprpc . SwapType
var htlcAddress , htlcAddressP2WSH , htlcAddressNP2WSH string
switch loopSwap . SwapType {
case swap . TypeIn :
swapType = looprpc . SwapType_LOOP_IN
htlcAddressP2WSH = loopSwap . HtlcAddressP2WSH . EncodeAddress ( )
if loopSwap . ExternalHtlc {
htlcAddressNP2WSH = loopSwap . HtlcAddressNP2WSH . EncodeAddress ( )
htlcAddress = htlcAddressNP2WSH
} else {
htlcAddress = htlcAddressP2WSH
}
case swap . TypeOut :
swapType = looprpc . SwapType_LOOP_OUT
htlcAddressP2WSH = loopSwap . HtlcAddressP2WSH . EncodeAddress ( )
htlcAddress = htlcAddressP2WSH
default :
return nil , errors . New ( "unknown swap type" )
}
return & looprpc . SwapStatus {
Amt : int64 ( loopSwap . AmountRequested ) ,
Id : loopSwap . SwapHash . String ( ) ,
IdBytes : loopSwap . SwapHash [ : ] ,
State : state ,
FailureReason : failureReason ,
InitiationTime : loopSwap . InitiationTime . UnixNano ( ) ,
LastUpdateTime : loopSwap . LastUpdate . UnixNano ( ) ,
HtlcAddress : htlcAddress ,
HtlcAddressP2Wsh : htlcAddressP2WSH ,
HtlcAddressNp2Wsh : htlcAddressNP2WSH ,
Type : swapType ,
CostServer : int64 ( loopSwap . Cost . Server ) ,
CostOnchain : int64 ( loopSwap . Cost . Onchain ) ,
CostOffchain : int64 ( loopSwap . Cost . Offchain ) ,
Label : loopSwap . Label ,
} , nil
}
// Monitor will return a stream of swap updates for currently active swaps.
func ( s * swapClientServer ) Monitor ( in * looprpc . MonitorRequest ,
server looprpc . SwapClient_MonitorServer ) error {
log . Infof ( "Monitor request received" )
send := func ( info loop . SwapInfo ) error {
rpcSwap , err := s . marshallSwap ( & info )
if err != nil {
return err
}
return server . Send ( rpcSwap )
}
// Start a notification queue for this subscriber.
queue := queue . NewConcurrentQueue ( 20 )
queue . Start ( )
// Add this subscriber to the global subscriber list. Also create a
// snapshot of all pending and completed swaps within the lock, to
// prevent subscribers from receiving duplicate updates.
s . swapsLock . Lock ( )
id := s . nextSubscriberID
s . nextSubscriberID ++
s . subscribers [ id ] = queue . ChanIn ( )
var pendingSwaps , completedSwaps [ ] loop . SwapInfo
for _ , swap := range s . swaps {
if swap . State . Type ( ) == loopdb . StateTypePending {
pendingSwaps = append ( pendingSwaps , swap )
} else {
completedSwaps = append ( completedSwaps , swap )
}
}
s . swapsLock . Unlock ( )
defer func ( ) {
s . swapsLock . Lock ( )
delete ( s . subscribers , id )
s . swapsLock . Unlock ( )
queue . Stop ( )
} ( )
// Sort completed swaps new to old.
sort . Slice ( completedSwaps , func ( i , j int ) bool {
return completedSwaps [ i ] . LastUpdate . After (
completedSwaps [ j ] . LastUpdate ,
)
} )
// Discard all but top x latest.
if len ( completedSwaps ) > completedSwapsCount {
completedSwaps = completedSwaps [ : completedSwapsCount ]
}
// Concatenate both sets.
filteredSwaps := append ( pendingSwaps , completedSwaps ... )
// Sort again, but this time old to new.
sort . Slice ( filteredSwaps , func ( i , j int ) bool {
return filteredSwaps [ i ] . LastUpdate . Before (
filteredSwaps [ j ] . LastUpdate ,
)
} )
// Return swaps to caller.
for _ , swap := range filteredSwaps {
if err := send ( swap ) ; err != nil {
return err
}
}
// As long as the client is connected, keep passing through swap
// updates.
for {
select {
case queueItem , ok := <- queue . ChanOut ( ) :
if ! ok {
return nil
}
swap := queueItem . ( loop . SwapInfo )
if err := send ( swap ) ; err != nil {
return err
}
// The client cancels the subscription.
case <- server . Context ( ) . Done ( ) :
return nil
// The server is shutting down.
case <- s . mainCtx . Done ( ) :
return fmt . Errorf ( "server is shutting down" )
}
}
}
// ListSwaps returns a list of all currently known swaps and their current
// status.
func ( s * swapClientServer ) ListSwaps ( _ context . Context ,
_ * looprpc . ListSwapsRequest ) ( * looprpc . ListSwapsResponse , error ) {
var (
rpcSwaps = make ( [ ] * looprpc . SwapStatus , len ( s . swaps ) )
idx = 0
err error
)
s . swapsLock . Lock ( )
defer s . swapsLock . Unlock ( )
// We can just use the server's in-memory cache as that contains the
// most up-to-date state including temporary failures which aren't
// persisted to disk. The swaps field is a map, that's why we need an
// additional index.
for _ , swp := range s . swaps {
swp := swp
rpcSwaps [ idx ] , err = s . marshallSwap ( & swp )
if err != nil {
return nil , err
}
idx ++
}
return & looprpc . ListSwapsResponse { Swaps : rpcSwaps } , nil
}
// SwapInfo returns all known details about a single swap.
func ( s * swapClientServer ) SwapInfo ( _ context . Context ,
req * looprpc . SwapInfoRequest ) ( * looprpc . SwapStatus , error ) {
swapHash , err := lntypes . MakeHash ( req . Id )
if err != nil {
return nil , fmt . Errorf ( "error parsing swap hash: %v" , err )
}
// Just return the server's in-memory cache here too as we also want to
// return temporary failures to the client.
swp , ok := s . swaps [ swapHash ]
if ! ok {
return nil , fmt . Errorf ( "swap with hash %s not found" , req . Id )
}
return s . marshallSwap ( & swp )
}
// LoopOutTerms returns the terms that the server enforces for loop out swaps.
func ( s * swapClientServer ) LoopOutTerms ( ctx context . Context ,
req * looprpc . TermsRequest ) ( * looprpc . OutTermsResponse , error ) {
log . Infof ( "Loop out terms request received" )
terms , err := s . impl . LoopOutTerms ( ctx )
if err != nil {
log . Errorf ( "Terms request: %v" , err )
return nil , err
}
return & looprpc . OutTermsResponse {
MinSwapAmount : int64 ( terms . MinSwapAmount ) ,
MaxSwapAmount : int64 ( terms . MaxSwapAmount ) ,
MinCltvDelta : terms . MinCltvDelta ,
MaxCltvDelta : terms . MaxCltvDelta ,
} , nil
}
// LoopOutQuote returns a quote for a loop out swap with the provided
// parameters.
func ( s * swapClientServer ) LoopOutQuote ( ctx context . Context ,
req * looprpc . QuoteRequest ) ( * looprpc . OutQuoteResponse , error ) {
confTarget , err := validateConfTarget (
req . ConfTarget , loop . DefaultSweepConfTarget ,
)
if err != nil {
return nil , err
}
quote , err := s . impl . LoopOutQuote ( ctx , & loop . LoopOutQuoteRequest {
Amount : btcutil . Amount ( req . Amt ) ,
WithChange : req . WithChange ,
SweepConfTarget : confTarget ,
SwapPublicationDeadline : time . Unix (
int64 ( req . SwapPublicationDeadline ) , 0 ,
) ,
} )
if err != nil {
return nil , err
}
return & looprpc . OutQuoteResponse {
HtlcSweepFeeSat : int64 ( quote . MinerFee ) ,
PrepayAmtSat : int64 ( quote . PrepayAmount ) ,
SwapFeeSat : int64 ( quote . SwapFee ) ,
SwapPaymentDest : quote . SwapPaymentDest [ : ] ,
} , nil
}
// GetTerms returns the terms that the server enforces for swaps.
func ( s * swapClientServer ) GetLoopInTerms ( ctx context . Context ,
req * looprpc . TermsRequest ) ( * looprpc . InTermsResponse , error ) {
log . Infof ( "Loop in terms request received" )
terms , err := s . impl . LoopInTerms ( ctx )
if err != nil {
log . Errorf ( "Terms request: %v" , err )
return nil , err
}
return & looprpc . InTermsResponse {
MinSwapAmount : int64 ( terms . MinSwapAmount ) ,
MaxSwapAmount : int64 ( terms . MaxSwapAmount ) ,
} , nil
}
// GetQuote returns a quote for a swap with the provided parameters.
func ( s * swapClientServer ) GetLoopInQuote ( ctx context . Context ,
req * looprpc . QuoteRequest ) ( * looprpc . InQuoteResponse , error ) {
log . Infof ( "Loop in quote request received" )
htlcConfTarget , err := validateLoopInRequest (
req . ConfTarget , req . ExternalHtlc ,
)
if err != nil {
return nil , err
}
quote , err := s . impl . LoopInQuote ( ctx , & loop . LoopInQuoteRequest {
Amount : btcutil . Amount ( req . Amt ) ,
HtlcConfTarget : htlcConfTarget ,
ExternalHtlc : req . ExternalHtlc ,
} )
if err != nil {
return nil , err
}
return & looprpc . InQuoteResponse {
HtlcPublishFeeSat : int64 ( quote . MinerFee ) ,
SwapFeeSat : int64 ( quote . SwapFee ) ,
} , nil
}
func ( s * swapClientServer ) LoopIn ( ctx context . Context ,
in * looprpc . LoopInRequest ) (
* looprpc . SwapResponse , error ) {
log . Infof ( "Loop in request received" )
htlcConfTarget , err := validateLoopInRequest (
in . HtlcConfTarget , in . ExternalHtlc ,
)
if err != nil {
return nil , err
}
// Check that the label is valid.
if err := labels . Validate ( in . Label ) ; err != nil {
return nil , err
}
req := & loop . LoopInRequest {
Amount : btcutil . Amount ( in . Amt ) ,
MaxMinerFee : btcutil . Amount ( in . MaxMinerFee ) ,
MaxSwapFee : btcutil . Amount ( in . MaxSwapFee ) ,
HtlcConfTarget : htlcConfTarget ,
ExternalHtlc : in . ExternalHtlc ,
Label : in . Label ,
Initiator : in . Initiator ,
}
if in . LastHop != nil {
lastHop , err := route . NewVertexFromBytes ( in . LastHop )
if err != nil {
return nil , err
}
req . LastHop = & lastHop
}
swapInfo , err := s . impl . LoopIn ( ctx , req )
if err != nil {
log . Errorf ( "Loop in: %v" , err )
return nil , err
}
response := & looprpc . SwapResponse {
Id : swapInfo . SwapHash . String ( ) ,
IdBytes : swapInfo . SwapHash [ : ] ,
HtlcAddressP2Wsh : swapInfo . HtlcAddressP2WSH . String ( ) ,
ServerMessage : swapInfo . ServerMessage ,
}
if req . ExternalHtlc {
response . HtlcAddressNp2Wsh = swapInfo . HtlcAddressNP2WSH . String ( )
response . HtlcAddress = response . HtlcAddressNp2Wsh
} else {
response . HtlcAddress = response . HtlcAddressP2Wsh
}
return response , nil
}
// GetLsatTokens returns all tokens that are contained in the LSAT token store.
func ( s * swapClientServer ) GetLsatTokens ( ctx context . Context ,
_ * looprpc . TokensRequest ) ( * looprpc . TokensResponse , error ) {
log . Infof ( "Get LSAT tokens request received" )
tokens , err := s . impl . LsatStore . AllTokens ( )
if err != nil {
return nil , err
}
rpcTokens := make ( [ ] * looprpc . LsatToken , len ( tokens ) )
idx := 0
for key , token := range tokens {
macBytes , err := token . BaseMacaroon ( ) . MarshalBinary ( )
if err != nil {
return nil , err
}
rpcTokens [ idx ] = & looprpc . LsatToken {
BaseMacaroon : macBytes ,
PaymentHash : token . PaymentHash [ : ] ,
PaymentPreimage : token . Preimage [ : ] ,
AmountPaidMsat : int64 ( token . AmountPaid ) ,
RoutingFeePaidMsat : int64 ( token . RoutingFeePaid ) ,
TimeCreated : token . TimeCreated . Unix ( ) ,
Expired : ! token . IsValid ( ) ,
StorageName : key ,
}
idx ++
}
return & looprpc . TokensResponse { Tokens : rpcTokens } , nil
}
// GetLiquidityParams gets our current liquidity manager's parameters.
func ( s * swapClientServer ) GetLiquidityParams ( _ context . Context ,
_ * looprpc . GetLiquidityParamsRequest ) ( * looprpc . LiquidityParameters ,
error ) {
cfg := s . liquidityMgr . GetParameters ( )
satPerByte := cfg . SweepFeeRateLimit . FeePerKVByte ( ) / 1000
rpcCfg := & looprpc . LiquidityParameters {
MaxMinerFeeSat : uint64 ( cfg . MaximumMinerFee ) ,
MaxSwapFeePpm : uint64 ( cfg . MaximumSwapFeePPM ) ,
MaxRoutingFeePpm : uint64 ( cfg . MaximumRoutingFeePPM ) ,
MaxPrepayRoutingFeePpm : uint64 ( cfg . MaximumPrepayRoutingFeePPM ) ,
MaxPrepaySat : uint64 ( cfg . MaximumPrepay ) ,
SweepFeeRateSatPerVbyte : uint64 ( satPerByte ) ,
SweepConfTarget : cfg . SweepConfTarget ,
FailureBackoffSec : uint64 ( cfg . FailureBackOff . Seconds ( ) ) ,
AutoLoopOut : cfg . AutoOut ,
AutoOutBudgetSat : uint64 ( cfg . AutoFeeBudget ) ,
AutoMaxInFlight : uint64 ( cfg . MaxAutoInFlight ) ,
Rules : make (
[ ] * looprpc . LiquidityRule , 0 , len ( cfg . ChannelRules ) ,
) ,
}
// Zero golang time is different to a zero unix time, so we only set
// our start date if it is non-zero.
if ! cfg . AutoFeeStartDate . IsZero ( ) {
rpcCfg . AutoOutBudgetStartSec = uint64 (
cfg . AutoFeeStartDate . Unix ( ) ,
)
}
for channel , rule := range cfg . ChannelRules {
rpcRule := & looprpc . LiquidityRule {
ChannelId : channel . ToUint64 ( ) ,
Type : looprpc . LiquidityRuleType_THRESHOLD ,
IncomingThreshold : uint32 ( rule . MinimumIncoming ) ,
OutgoingThreshold : uint32 ( rule . MinimumOutgoing ) ,
}
rpcCfg . Rules = append ( rpcCfg . Rules , rpcRule )
}
return rpcCfg , nil
}
// SetLiquidityParams attempts to set our current liquidity manager's
// parameters.
func ( s * swapClientServer ) SetLiquidityParams ( _ context . Context ,
in * looprpc . SetLiquidityParamsRequest ) ( * looprpc . SetLiquidityParamsResponse ,
error ) {
satPerVbyte := chainfee . SatPerKVByte (
in . Parameters . SweepFeeRateSatPerVbyte * 1000 ,
)
params := liquidity . Parameters {
MaximumMinerFee : btcutil . Amount ( in . Parameters . MaxMinerFeeSat ) ,
MaximumSwapFeePPM : int ( in . Parameters . MaxSwapFeePpm ) ,
MaximumRoutingFeePPM : int ( in . Parameters . MaxRoutingFeePpm ) ,
MaximumPrepayRoutingFeePPM : int ( in . Parameters . MaxPrepayRoutingFeePpm ) ,
MaximumPrepay : btcutil . Amount ( in . Parameters . MaxPrepaySat ) ,
SweepFeeRateLimit : satPerVbyte . FeePerKWeight ( ) ,
SweepConfTarget : in . Parameters . SweepConfTarget ,
FailureBackOff : time . Duration ( in . Parameters . FailureBackoffSec ) *
time . Second ,
AutoOut : in . Parameters . AutoLoopOut ,
AutoFeeBudget : btcutil . Amount ( in . Parameters . AutoOutBudgetSat ) ,
MaxAutoInFlight : int ( in . Parameters . AutoMaxInFlight ) ,
ChannelRules : make (
map [ lnwire . ShortChannelID ] * liquidity . ThresholdRule ,
len ( in . Parameters . Rules ) ,
) ,
}
// Zero unix time is different to zero golang time.
if in . Parameters . AutoOutBudgetStartSec != 0 {
params . AutoFeeStartDate = time . Unix (
int64 ( in . Parameters . AutoOutBudgetStartSec ) , 0 ,
)
}
for _ , rule := range in . Parameters . Rules {
var (
shortID = lnwire . NewShortChanIDFromInt ( rule . ChannelId )
err error
)
// Make sure that there are not multiple rules set for a single
// channel.
if _ , ok := params . ChannelRules [ shortID ] ; ok {
return nil , fmt . Errorf ( "multiple rules set for " +
"channel: %v" , shortID )
}
params . ChannelRules [ shortID ] , err = rpcToRule ( rule )
if err != nil {
return nil , err
}
}
if err := s . liquidityMgr . SetParameters ( params ) ; err != nil {
return nil , err
}
return & looprpc . SetLiquidityParamsResponse { } , nil
}
// rpcToRule switches on rpc rule type to convert to our rule interface.
func rpcToRule ( rule * looprpc . LiquidityRule ) ( * liquidity . ThresholdRule , error ) {
switch rule . Type {
case looprpc . LiquidityRuleType_UNKNOWN :
return nil , fmt . Errorf ( "rule type field must be set" )
case looprpc . LiquidityRuleType_THRESHOLD :
return liquidity . NewThresholdRule (
int ( rule . IncomingThreshold ) ,
int ( rule . OutgoingThreshold ) ,
) , nil
default :
return nil , fmt . Errorf ( "unknown rule: %T" , rule )
}
}
// SuggestSwaps provides a list of suggested swaps based on lnd's current
// channel balances and rules set by the liquidity manager.
func ( s * swapClientServer ) SuggestSwaps ( ctx context . Context ,
_ * looprpc . SuggestSwapsRequest ) ( * looprpc . SuggestSwapsResponse , error ) {
swaps , err := s . liquidityMgr . SuggestSwaps ( ctx , false )
if err != nil {
return nil , err
}
var loopOut [ ] * looprpc . LoopOutRequest
for _ , swap := range swaps {
loopOut = append ( loopOut , & looprpc . LoopOutRequest {
Amt : int64 ( swap . Amount ) ,
OutgoingChanSet : swap . OutgoingChanSet ,
MaxSwapFee : int64 ( swap . MaxSwapFee ) ,
MaxMinerFee : int64 ( swap . MaxMinerFee ) ,
MaxPrepayAmt : int64 ( swap . MaxPrepayAmount ) ,
MaxSwapRoutingFee : int64 ( swap . MaxSwapRoutingFee ) ,
MaxPrepayRoutingFee : int64 ( swap . MaxPrepayRoutingFee ) ,
SweepConfTarget : swap . SweepConfTarget ,
} )
}
return & looprpc . SuggestSwapsResponse {
LoopOut : loopOut ,
} , nil
}
// processStatusUpdates reads updates on the status channel and processes them.
//
// NOTE: This must run inside a goroutine as it blocks until the main context
// shuts down.
func ( s * swapClientServer ) processStatusUpdates ( mainCtx context . Context ) {
for {
select {
// On updates, refresh the server's in-memory state and inform
// subscribers about the changes.
case swp := <- s . statusChan :
s . swapsLock . Lock ( )
s . swaps [ swp . SwapHash ] = swp
for _ , subscriber := range s . subscribers {
select {
case subscriber <- swp :
case <- mainCtx . Done ( ) :
s . swapsLock . Unlock ( )
return
}
}
s . swapsLock . Unlock ( )
// Server is shutting down.
case <- mainCtx . Done ( ) :
return
}
}
}
// validateConfTarget ensures the given confirmation target is valid. If one
// isn't specified (0 value), then the default target is used.
func validateConfTarget ( target , defaultTarget int32 ) ( int32 , error ) {
switch {
case target == 0 :
return defaultTarget , nil
// Ensure the target respects our minimum threshold.
case target < minConfTarget :
return 0 , fmt . Errorf ( "a confirmation target of at least %v " +
"must be provided" , minConfTarget )
default :
return target , nil
}
}
// validateLoopInRequest fails if the mutually exclusive conf target and
// external parameters are both set.
func validateLoopInRequest ( htlcConfTarget int32 , external bool ) ( int32 , error ) {
// If the htlc is going to be externally set, the htlcConfTarget should
// not be set, because it has no relevance when the htlc is external.
if external && htlcConfTarget != 0 {
return 0 , errors . New ( "external and htlc conf target cannot " +
"both be set" )
}
// If the htlc is being externally published, we do not need to set a
// confirmation target.
if external {
return 0 , nil
}
return validateConfTarget ( htlcConfTarget , loop . DefaultHtlcConfTarget )
}