Browse Source

liquidity: make swap suggestions aware of ongoing swaps

pull/289/head
carla 3 months ago
parent
commit
7740231bac
No known key found for this signature in database GPG Key ID: 4CA7FE54A6213C91
5 changed files with 301 additions and 6 deletions
  1. +118
    -4
      liquidity/liquidity.go
  2. +151
    -0
      liquidity/liquidity_test.go
  3. +26
    -0
      liquidity/log.go
  4. +2
    -0
      loopd/log.go
  5. +4
    -2
      loopd/utils.go

+ 118
- 4
liquidity/liquidity.go View File

@ -1,6 +1,12 @@
// Package liquidity is responsible for monitoring our node's liquidity. It
// allows setting of a liquidity rule which describes the desired liquidity
// balance on a per-channel basis.
//
// Swap suggestions are limited to channels that are not currently being used
// for a pending swap. If we are currently processing an unrestricted swap (ie,
// a loop out with no outgoing channel targets set or a loop in with no last
// hop set), we will not suggest any swaps because these swaps will shift the
// balances of our channels in ways we can't predict.
package liquidity
import (
@ -15,6 +21,7 @@ import (
"github.com/lightninglabs/loop/loopdb"
"github.com/lightningnetwork/lnd/clock"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/routing/route"
)
const (
@ -65,6 +72,12 @@ type Config struct {
// Lnd provides us with access to lnd's rpc servers.
Lnd *lndclient.LndServices
// ListLoopOut returns all of the loop our swaps stored on disk.
ListLoopOut func() ([]*loopdb.LoopOut, error)
// ListLoopIn returns all of the loop in swaps stored on disk.
ListLoopIn func() ([]*loopdb.LoopIn, error)
// Clock allows easy mocking of time in unit tests.
Clock clock.Clock
}
@ -184,19 +197,32 @@ func (m *Manager) SuggestSwaps(ctx context.Context) (
return nil, nil
}
channels, err := m.cfg.Lnd.Client.ListChannels(ctx)
// Get the current server side restrictions.
outRestrictions, err := m.cfg.LoopOutRestrictions(ctx)
if err != nil {
return nil, err
}
// Get the current server side restrictions.
outRestrictions, err := m.cfg.LoopOutRestrictions(ctx)
// List our current set of swaps so that we can determine which channels
// are already being utilized by swaps. Note that these calls may race
// with manual initiation of swaps.
loopOut, err := m.cfg.ListLoopOut()
if err != nil {
return nil, err
}
loopIn, err := m.cfg.ListLoopIn()
if err != nil {
return nil, err
}
eligible, err := m.getEligibleChannels(ctx, loopOut, loopIn)
if err != nil {
return nil, err
}
var suggestions []loop.OutRequest
for _, channel := range channels {
for _, channel := range eligible {
channelID := lnwire.NewShortChanIDFromInt(channel.ChannelID)
rule, ok := m.params.ChannelRules[channelID]
if !ok {
@ -242,6 +268,94 @@ func makeLoopOutRequest(suggestion *LoopOutRecommendation) loop.OutRequest {
}
}
// getEligibleChannels takes lists of our existing loop out and in swaps, and
// gets a list of channels that are not currently being utilized for a swap.
// If an unrestricted swap is ongoing, we return an empty set of channels
// because we don't know which channels balances it will affect.
func (m *Manager) getEligibleChannels(ctx context.Context,
loopOut []*loopdb.LoopOut, loopIn []*loopdb.LoopIn) (
[]lndclient.ChannelInfo, error) {
var (
existingOut = make(map[lnwire.ShortChannelID]bool)
existingIn = make(map[route.Vertex]bool)
)
for _, out := range loopOut {
var (
state = out.State().State
chanSet = out.Contract.OutgoingChanSet
)
// Skip completed swaps, they can't affect our channel balances.
if state.Type() != loopdb.StateTypePending {
continue
}
if len(chanSet) == 0 {
log.Debugf("Ongoing unrestricted loop out: "+
"%v, no suggestions at present", out.Hash)
return nil, nil
}
for _, id := range chanSet {
chanID := lnwire.NewShortChanIDFromInt(id)
existingOut[chanID] = true
}
}
for _, in := range loopIn {
// Skip completed swaps, they can't affect our channel balances.
if in.State().State.Type() != loopdb.StateTypePending {
continue
}
if in.Contract.LastHop == nil {
log.Debugf("Ongoing unrestricted loop in: "+
"%v, no suggestions at present", in.Hash)
return nil, nil
}
existingIn[*in.Contract.LastHop] = true
}
channels, err := m.cfg.Lnd.Client.ListChannels(ctx)
if err != nil {
return nil, err
}
// Run through our set of channels and skip over any channels that
// are currently being utilized by a restricted swap (where restricted
// means that a loop out limited channels, or a loop in limited last
// hop).
var eligible []lndclient.ChannelInfo
for _, channel := range channels {
shortID := lnwire.NewShortChanIDFromInt(channel.ChannelID)
if existingOut[shortID] {
log.Debugf("Channel: %v not eligible for "+
"suggestions, ongoing loop out utilizing "+
"channel", channel.ChannelID)
continue
}
if existingIn[channel.PubKeyBytes] {
log.Debugf("Channel: %v not eligible for "+
"suggestions, ongoing loop in utilizing "+
"peer", channel.ChannelID)
continue
}
eligible = append(eligible, channel)
}
return eligible, nil
}
// ppmToSat takes an amount and a measure of parts per million for the amount
// and returns the amount that the ppm represents.
func ppmToSat(amount btcutil.Amount, ppm int) btcutil.Amount {

+ 151
- 0
liquidity/liquidity_test.go View File

@ -11,6 +11,7 @@ import (
"github.com/lightninglabs/loop/test"
"github.com/lightningnetwork/lnd/clock"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/routing/route"
"github.com/stretchr/testify/require"
)
@ -20,8 +21,20 @@ var (
chanID1 = lnwire.NewShortChanIDFromInt(1)
chanID2 = lnwire.NewShortChanIDFromInt(2)
peer1 = route.Vertex{1}
peer2 = route.Vertex{2}
channel1 = lndclient.ChannelInfo{
ChannelID: chanID1.ToUint64(),
PubKeyBytes: peer1,
LocalBalance: 10000,
RemoteBalance: 0,
Capacity: 10000,
}
channel2 = lndclient.ChannelInfo{
ChannelID: chanID2.ToUint64(),
PubKeyBytes: peer2,
LocalBalance: 10000,
RemoteBalance: 0,
Capacity: 10000,
@ -47,6 +60,28 @@ var (
MaxPrepayAmount: defaultMaximumPrepay,
SweepConfTarget: loop.DefaultSweepConfTarget,
}
// chan2Rec is the suggested swap for channel 2 when we use chanRule.
chan2Rec = loop.OutRequest{
Amount: 7500,
OutgoingChanSet: loopdb.ChannelSet{chanID2.ToUint64()},
MaxPrepayRoutingFee: prepayFee,
MaxSwapRoutingFee: routingFee,
MaxMinerFee: defaultMaximumMinerFee,
MaxSwapFee: swapFee,
MaxPrepayAmount: defaultMaximumPrepay,
SweepConfTarget: loop.DefaultSweepConfTarget,
}
// chan1Out is a contract that uses channel 1, used to represent on
// disk swap using chan 1.
chan1Out = &loopdb.LoopOutContract{
OutgoingChanSet: loopdb.ChannelSet(
[]uint64{
chanID1.ToUint64(),
},
),
}
)
// newTestConfig creates a default test config.
@ -61,6 +96,12 @@ func newTestConfig() (*Config, *test.LndMockServices) {
},
Lnd: &lnd.LndServices,
Clock: clock.NewTestClock(testTime),
ListLoopOut: func() ([]*loopdb.LoopOut, error) {
return nil, nil
},
ListLoopIn: func() ([]*loopdb.LoopIn, error) {
return nil, nil
},
}, lnd
}
@ -110,6 +151,116 @@ func TestParameters(t *testing.T) {
require.Equal(t, ErrZeroChannelID, err)
}
// TestRestrictedSuggestions tests getting of swap suggestions when we have
// other in-flight swaps. We setup our manager with a set of channels and rules
// that require a loop out swap, focusing on the filtering our of channels that
// are in use for in-flight swaps.
func TestRestrictedSuggestions(t *testing.T) {
tests := []struct {
name string
channels []lndclient.ChannelInfo
loopOut []*loopdb.LoopOut
loopIn []*loopdb.LoopIn
expected []loop.OutRequest
}{
{
name: "no existing swaps",
channels: []lndclient.ChannelInfo{
channel1,
},
loopOut: nil,
loopIn: nil,
expected: []loop.OutRequest{
chan1Rec,
},
},
{
name: "unrestricted loop out",
channels: []lndclient.ChannelInfo{
channel1, channel2,
},
loopOut: []*loopdb.LoopOut{
{
Contract: &loopdb.LoopOutContract{
OutgoingChanSet: nil,
},
},
},
expected: nil,
},
{
name: "unrestricted loop in",
channels: []lndclient.ChannelInfo{
channel1, channel2,
},
loopIn: []*loopdb.LoopIn{
{
Contract: &loopdb.LoopInContract{
LastHop: nil,
},
},
},
expected: nil,
},
{
name: "restricted loop out",
channels: []lndclient.ChannelInfo{
channel1, channel2,
},
loopOut: []*loopdb.LoopOut{
{
Contract: chan1Out,
},
},
expected: []loop.OutRequest{
chan2Rec,
},
},
{
name: "restricted loop in",
channels: []lndclient.ChannelInfo{
channel1, channel2,
},
loopIn: []*loopdb.LoopIn{
{
Contract: &loopdb.LoopInContract{
LastHop: &peer2,
},
},
},
expected: []loop.OutRequest{
chan1Rec,
},
},
}
for _, testCase := range tests {
testCase := testCase
t.Run(testCase.name, func(t *testing.T) {
// Create a manager config which will return the test
// case's set of existing swaps.
cfg, lnd := newTestConfig()
cfg.ListLoopOut = func() ([]*loopdb.LoopOut, error) {
return testCase.loopOut, nil
}
cfg.ListLoopIn = func() ([]*loopdb.LoopIn, error) {
return testCase.loopIn, nil
}
rules := map[lnwire.ShortChannelID]*ThresholdRule{
chanID1: chanRule,
chanID2: chanRule,
}
testSuggestSwaps(
t, cfg, lnd, testCase.channels, rules,
testCase.expected,
)
})
}
}
// TestSuggestSwaps tests getting of swap suggestions based on the rules set for
// the liquidity manager and the current set of channel balances.
func TestSuggestSwaps(t *testing.T) {

+ 26
- 0
liquidity/log.go View File

@ -0,0 +1,26 @@
package liquidity
import (
"github.com/btcsuite/btclog"
"github.com/lightningnetwork/lnd/build"
)
// Subsystem defines the sub system name of this package.
const Subsystem = "LQDY"
// log is a logger that is initialized with no output filters. This
// means the package will not perform any logging by default until the caller
// requests it.
var log btclog.Logger
// The default amount of logging is none.
func init() {
UseLogger(build.NewSubLogger(Subsystem, nil))
}
// UseLogger uses a specified Logger to output package logging info.
// This should be used in preference to SetLogWriter if the caller is also
// using btclog.
func UseLogger(logger btclog.Logger) {
log = logger
}

+ 2
- 0
loopd/log.go View File

@ -4,6 +4,7 @@ import (
"github.com/btcsuite/btclog"
"github.com/lightninglabs/lndclient"
"github.com/lightninglabs/loop"
"github.com/lightninglabs/loop/liquidity"
"github.com/lightninglabs/loop/loopdb"
"github.com/lightninglabs/loop/lsat"
"github.com/lightningnetwork/lnd/build"
@ -21,6 +22,7 @@ func init() {
addSubLogger("LNDC", lndclient.UseLogger)
addSubLogger("STORE", loopdb.UseLogger)
addSubLogger(lsat.Subsystem, lsat.UseLogger)
addSubLogger(liquidity.Subsystem, liquidity.UseLogger)
}
// addSubLogger is a helper method to conveniently create and register the

+ 4
- 2
loopd/utils.go View File

@ -47,8 +47,10 @@ func getLiquidityManager(client *loop.Client) *liquidity.Manager {
outTerms.MinSwapAmount, outTerms.MaxSwapAmount,
), nil
},
Lnd: client.LndServices,
Clock: clock.NewDefaultClock(),
Lnd: client.LndServices,
Clock: clock.NewDefaultClock(),
ListLoopOut: client.Store.FetchLoopOutSwaps,
ListLoopIn: client.Store.FetchLoopInSwaps,
}
return liquidity.NewManager(mngrCfg)

Loading…
Cancel
Save