liquidity: add manager with updatable parameters

pull/265/head
carla 4 years ago
parent 2ac5d8defa
commit ce10cc7959
No known key found for this signature in database
GPG Key ID: 4CA7FE54A6213C91

@ -0,0 +1,133 @@
// Package liquidity is responsible for monitoring our node's liquidity. It
// allows setting of a liquidity rule which describes the desired liquidity
// balance on a per-channel basis.
package liquidity
import (
"context"
"fmt"
"strings"
"sync"
"github.com/lightningnetwork/lnd/lnwire"
)
var (
// ErrZeroChannelID is returned if we get a rule for a 0 channel ID.
ErrZeroChannelID = fmt.Errorf("zero channel ID not allowed")
)
// Config contains the external functionality required to run the
// liquidity manager.
type Config struct {
// LoopOutRestrictions returns the restrictions that the server applies
// to loop out swaps.
LoopOutRestrictions func(ctx context.Context) (*Restrictions, error)
}
// Parameters is a set of parameters provided by the user which guide
// how we assess liquidity.
type Parameters struct {
// ChannelRules maps a short channel ID to a rule that describes how we
// would like liquidity to be managed.
ChannelRules map[lnwire.ShortChannelID]*ThresholdRule
}
// newParameters creates an empty set of parameters.
func newParameters() Parameters {
return Parameters{
ChannelRules: make(map[lnwire.ShortChannelID]*ThresholdRule),
}
}
// String returns the string representation of our parameters.
func (p Parameters) String() string {
channelRules := make([]string, 0, len(p.ChannelRules))
for channel, rule := range p.ChannelRules {
channelRules = append(
channelRules, fmt.Sprintf("%v: %v", channel, rule),
)
}
return fmt.Sprintf("channel rules: %v",
strings.Join(channelRules, ","))
}
// validate checks whether a set of parameters is valid.
func (p Parameters) validate() error {
for channel, rule := range p.ChannelRules {
if channel.ToUint64() == 0 {
return ErrZeroChannelID
}
if err := rule.validate(); err != nil {
return fmt.Errorf("channel: %v has invalid rule: %v",
channel.ToUint64(), err)
}
}
return nil
}
// Manager contains a set of desired liquidity rules for our channel
// balances.
type Manager struct {
// cfg contains the external functionality we require to determine our
// current liquidity balance.
cfg *Config
// params is the set of parameters we are currently using. These may be
// updated at runtime.
params Parameters
// paramsLock is a lock for our current set of parameters.
paramsLock sync.Mutex
}
// NewManager creates a liquidity manager which has no rules set.
func NewManager(cfg *Config) *Manager {
return &Manager{
cfg: cfg,
params: newParameters(),
}
}
// GetParameters returns a copy of our current parameters.
func (m *Manager) GetParameters() Parameters {
m.paramsLock.Lock()
defer m.paramsLock.Unlock()
return cloneParameters(m.params)
}
// SetParameters updates our current set of parameters if the new parameters
// provided are valid.
func (m *Manager) SetParameters(params Parameters) error {
if err := params.validate(); err != nil {
return err
}
m.paramsLock.Lock()
defer m.paramsLock.Unlock()
m.params = cloneParameters(params)
return nil
}
// cloneParameters creates a deep clone of a parameters struct so that callers
// cannot mutate our parameters. Although our parameters struct itself is not
// a reference, we still need to clone the contents of maps.
func cloneParameters(params Parameters) Parameters {
paramCopy := Parameters{
ChannelRules: make(map[lnwire.ShortChannelID]*ThresholdRule,
len(params.ChannelRules)),
}
for channel, rule := range params.ChannelRules {
ruleCopy := *rule
paramCopy.ChannelRules[channel] = &ruleCopy
}
return paramCopy
}

@ -0,0 +1,66 @@
package liquidity
import (
"context"
"testing"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/stretchr/testify/require"
)
// newTestConfig creates a default test config.
func newTestConfig() *Config {
return &Config{
LoopOutRestrictions: func(_ context.Context) (*Restrictions,
error) {
return NewRestrictions(1, 10000), nil
},
}
}
// TestParameters tests getting and setting of parameters for our manager.
func TestParameters(t *testing.T) {
manager := NewManager(newTestConfig())
chanID := lnwire.NewShortChanIDFromInt(1)
// Start with the case where we have no rules set.
startParams := manager.GetParameters()
require.Equal(t, newParameters(), startParams)
// Mutate the parameters returned by our get function.
startParams.ChannelRules[chanID] = NewThresholdRule(1, 1)
// Make sure that we have not mutated the liquidity manager's params
// by making this change.
params := manager.GetParameters()
require.Equal(t, newParameters(), params)
// Provide a valid set of parameters and validate assert that they are
// set.
originalRule := NewThresholdRule(10, 10)
expected := Parameters{
ChannelRules: map[lnwire.ShortChannelID]*ThresholdRule{
chanID: originalRule,
},
}
err := manager.SetParameters(expected)
require.NoError(t, err)
// Check that changing the parameters we just set does not mutate
// our liquidity manager's parameters.
expected.ChannelRules[chanID] = NewThresholdRule(11, 11)
params = manager.GetParameters()
require.NoError(t, err)
require.Equal(t, originalRule, params.ChannelRules[chanID])
// Set invalid parameters and assert that we fail.
expected.ChannelRules = map[lnwire.ShortChannelID]*ThresholdRule{
lnwire.NewShortChanIDFromInt(0): NewThresholdRule(1, 2),
}
err = manager.SetParameters(expected)
require.Equal(t, ErrZeroChannelID, err)
}

@ -0,0 +1,29 @@
package liquidity
import (
"fmt"
"github.com/btcsuite/btcutil"
)
// Restrictions indicates the restrictions placed on a swap.
type Restrictions struct {
// Minimum is the minimum amount we can swap, inclusive.
Minimum btcutil.Amount
// Maximum is the maximum amount we can swap, inclusive.
Maximum btcutil.Amount
}
// String returns a string representation of a set of restrictions.
func (r *Restrictions) String() string {
return fmt.Sprintf("%v-%v", r.Minimum, r.Maximum)
}
// NewRestrictions creates a set of restrictions.
func NewRestrictions(minimum, maximum btcutil.Amount) *Restrictions {
return &Restrictions{
Minimum: minimum,
Maximum: maximum,
}
}

@ -0,0 +1,61 @@
package liquidity
import (
"errors"
"fmt"
)
var (
// errInvalidLiquidityThreshold is returned when a liquidity threshold
// has an invalid value.
errInvalidLiquidityThreshold = errors.New("liquidity threshold must " +
"be in [0:100)")
// errInvalidThresholdSum is returned when the sum of the percentages
// provided for a threshold rule is >= 100.
errInvalidThresholdSum = errors.New("sum of incoming and outgoing " +
"percentages must be < 100")
)
// ThresholdRule is a liquidity rule that implements minimum incoming and
// outgoing liquidity threshold.
type ThresholdRule struct {
// MinimumIncoming is the percentage of incoming liquidity that we do
// not want to drop below.
MinimumIncoming int
// MinimumOutgoing is the percentage of outgoing liquidity that we do
// not want to drop below.
MinimumOutgoing int
}
// NewThresholdRule returns a new threshold rule.
func NewThresholdRule(minIncoming, minOutgoing int) *ThresholdRule {
return &ThresholdRule{
MinimumIncoming: minIncoming,
MinimumOutgoing: minOutgoing,
}
}
// String returns a string representation of a rule.
func (r *ThresholdRule) String() string {
return fmt.Sprintf("threshold rule: minimum incoming: %v%%, minimum "+
"outgoing: %v%%", r.MinimumIncoming, r.MinimumOutgoing)
}
// validate validates the parameters that a rule was created with.
func (r *ThresholdRule) validate() error {
if r.MinimumIncoming < 0 || r.MinimumIncoming > 100 {
return errInvalidLiquidityThreshold
}
if r.MinimumOutgoing < 0 || r.MinimumOutgoing > 100 {
return errInvalidLiquidityThreshold
}
if r.MinimumIncoming+r.MinimumOutgoing >= 100 {
return errInvalidThresholdSum
}
return nil
}

@ -0,0 +1,93 @@
package liquidity
import (
"testing"
"github.com/stretchr/testify/require"
)
// TestValidateThreshold tests validation of the values set for a threshold
// rule.
func TestValidateThreshold(t *testing.T) {
tests := []struct {
name string
threshold ThresholdRule
err error
}{
{
name: "values ok",
threshold: ThresholdRule{
MinimumIncoming: 20,
MinimumOutgoing: 20,
},
err: nil,
},
{
name: "negative incoming",
threshold: ThresholdRule{
MinimumIncoming: -1,
MinimumOutgoing: 20,
},
err: errInvalidLiquidityThreshold,
},
{
name: "negative outgoing",
threshold: ThresholdRule{
MinimumIncoming: 20,
MinimumOutgoing: -1,
},
err: errInvalidLiquidityThreshold,
},
{
name: "incoming > 1",
threshold: ThresholdRule{
MinimumIncoming: 120,
MinimumOutgoing: 20,
},
err: errInvalidLiquidityThreshold,
},
{
name: "outgoing >1",
threshold: ThresholdRule{
MinimumIncoming: 20,
MinimumOutgoing: 120,
},
err: errInvalidLiquidityThreshold,
},
{
name: "sum < 100",
threshold: ThresholdRule{
MinimumIncoming: 60,
MinimumOutgoing: 39,
},
err: nil,
},
{
name: "sum = 100",
threshold: ThresholdRule{
MinimumIncoming: 60,
MinimumOutgoing: 40,
},
err: errInvalidThresholdSum,
},
{
name: "sum > 100",
threshold: ThresholdRule{
MinimumIncoming: 60,
MinimumOutgoing: 60,
},
err: errInvalidThresholdSum,
},
}
for _, testCase := range tests {
testCase := testCase
t.Run(testCase.name, func(t *testing.T) {
t.Parallel()
err := testCase.threshold.validate()
require.Equal(t, testCase.err, err)
})
}
}

@ -300,12 +300,13 @@ func (d *Daemon) initialize() error {
// Now finally fully initialize the swap client RPC server instance. // Now finally fully initialize the swap client RPC server instance.
d.swapClientServer = swapClientServer{ d.swapClientServer = swapClientServer{
impl: swapclient, impl: swapclient,
lnd: &d.lnd.LndServices, liquidityMgr: getLiquidityManager(swapclient),
swaps: make(map[lntypes.Hash]loop.SwapInfo), lnd: &d.lnd.LndServices,
subscribers: make(map[int]chan<- interface{}), swaps: make(map[lntypes.Hash]loop.SwapInfo),
statusChan: make(chan loop.SwapInfo), subscribers: make(map[int]chan<- interface{}),
mainCtx: d.mainCtx, statusChan: make(chan loop.SwapInfo),
mainCtx: d.mainCtx,
} }
// Retrieve all currently existing swaps from the database. // Retrieve all currently existing swaps from the database.

@ -8,6 +8,7 @@ import (
"sync" "sync"
"time" "time"
"github.com/lightninglabs/loop/liquidity"
"github.com/lightningnetwork/lnd/lntypes" "github.com/lightningnetwork/lnd/lntypes"
"github.com/lightningnetwork/lnd/queue" "github.com/lightningnetwork/lnd/queue"
"github.com/lightningnetwork/lnd/routing/route" "github.com/lightningnetwork/lnd/routing/route"
@ -33,6 +34,7 @@ const (
// swapClientServer implements the grpc service exposed by loopd. // swapClientServer implements the grpc service exposed by loopd.
type swapClientServer struct { type swapClientServer struct {
impl *loop.Client impl *loop.Client
liquidityMgr *liquidity.Manager
lnd *lndclient.LndServices lnd *lndclient.LndServices
swaps map[lntypes.Hash]loop.SwapInfo swaps map[lntypes.Hash]loop.SwapInfo
subscribers map[int]chan<- interface{} subscribers map[int]chan<- interface{}

@ -1,9 +1,12 @@
package loopd package loopd
import ( import (
"context"
"github.com/btcsuite/btcutil" "github.com/btcsuite/btcutil"
"github.com/lightninglabs/lndclient" "github.com/lightninglabs/lndclient"
"github.com/lightninglabs/loop" "github.com/lightninglabs/loop"
"github.com/lightninglabs/loop/liquidity"
) )
// getClient returns an instance of the swap client. // getClient returns an instance of the swap client.
@ -28,3 +31,22 @@ func getClient(config *Config, lnd *lndclient.LndServices) (*loop.Client,
return swapClient, cleanUp, nil return swapClient, cleanUp, nil
} }
func getLiquidityManager(client *loop.Client) *liquidity.Manager {
mngrCfg := &liquidity.Config{
LoopOutRestrictions: func(ctx context.Context) (
*liquidity.Restrictions, error) {
outTerms, err := client.Server.GetLoopOutTerms(ctx)
if err != nil {
return nil, err
}
return liquidity.NewRestrictions(
outTerms.MinSwapAmount, outTerms.MaxSwapAmount,
), nil
},
}
return liquidity.NewManager(mngrCfg)
}

Loading…
Cancel
Save