Merge pull request #493 from yyforyongyu/db-params

liquidity: persist manager's params to disk
pull/504/head
Yong 2 years ago committed by GitHub
commit dfe50e44ef
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -223,9 +223,12 @@ func setRule(ctx *cli.Context) error {
}
var setParamsCommand = cli.Command{
Name: "setparams",
Usage: "update the parameters set for the liquidity manager",
Description: "Updates the parameters set for the liquidity manager.",
Name: "setparams",
Usage: "update the parameters set for the liquidity manager",
Description: "Updates the parameters set for the liquidity manager. " +
"Note the parameters are persisted in db to save the trouble " +
"of setting them again upon loopd restart. To get the " +
"default values, use `getparams` before any `setparams`.",
Flags: []cli.Flag{
cli.IntFlag{
Name: "sweeplimit",

@ -166,6 +166,12 @@ func newAutoloopTestCtx(t *testing.T, parameters Parameters,
MinimumConfirmations: loop.DefaultSweepConfTarget,
Lnd: &testCtx.lnd.LndServices,
Clock: testCtx.testClock,
PutLiquidityParams: func(_ []byte) error {
return nil
},
FetchLiquidityParams: func() ([]byte, error) {
return nil, nil
},
}
// SetParameters needs to make a call to our mocked restrictions call,
@ -179,7 +185,7 @@ func newAutoloopTestCtx(t *testing.T, parameters Parameters,
// Create a manager with our test config and set our starting set of
// parameters.
testCtx.manager = NewManager(cfg)
err := testCtx.manager.SetParameters(context.Background(), parameters)
err := testCtx.manager.setParameters(context.Background(), parameters)
assert.NoError(t, err)
<-done
return testCtx

@ -37,7 +37,6 @@ import (
"errors"
"fmt"
"sort"
"strings"
"sync"
"time"
@ -53,6 +52,9 @@ import (
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/routing/route"
"github.com/lightningnetwork/lnd/ticker"
"google.golang.org/protobuf/proto"
clientrpc "github.com/lightninglabs/loop/looprpc"
)
const (
@ -102,19 +104,6 @@ var (
// funding amount.
defaultBudget = ppmToSat(funding.MaxBtcFundingAmount, defaultFeePPM)
// defaultParameters contains the default parameters that we start our
// liquidity manger with.
defaultParameters = Parameters{
AutoFeeBudget: defaultBudget,
MaxAutoInFlight: defaultMaxInFlight,
ChannelRules: make(map[lnwire.ShortChannelID]*SwapRule),
PeerRules: make(map[route.Vertex]*SwapRule),
FailureBackOff: defaultFailureBackoff,
SweepConfTarget: defaultConfTarget,
HtlcConfTarget: defaultHtlcConfTarget,
FeeLimit: defaultFeePortion(),
}
// ErrZeroChannelID is returned if we get a rule for a 0 channel ID.
ErrZeroChannelID = fmt.Errorf("zero channel ID not allowed")
@ -193,222 +182,18 @@ type Config struct {
// MinimumConfirmations is the minimum number of confirmations we allow
// setting for sweep target.
MinimumConfirmations int32
}
// Parameters is a set of parameters provided by the user which guide
// how we assess liquidity.
type Parameters struct {
// Autoloop enables automatic dispatch of swaps.
Autoloop bool
// AutoFeeBudget is the total amount we allow to be spent on
// automatically dispatched swaps. Once this budget has been used, we
// will stop dispatching swaps until the budget is increased or the
// start date is moved.
AutoFeeBudget btcutil.Amount
// AutoFeeStartDate is the date from which we will include automatically
// dispatched swaps in our current budget, inclusive.
AutoFeeStartDate time.Time
// MaxAutoInFlight is the maximum number of in-flight automatically
// dispatched swaps we allow.
MaxAutoInFlight int
// FailureBackOff is the amount of time that we require passes after a
// channel has been part of a failed loop out swap before we suggest
// using it again.
// TODO(carla): add exponential backoff
FailureBackOff time.Duration
// SweepConfTarget is the number of blocks we aim to confirm our sweep
// transaction in. This value affects the on chain fees we will pay.
SweepConfTarget int32
// HtlcConfTarget is the confirmation target that we use for publishing
// loop in swap htlcs on chain.
HtlcConfTarget int32
// FeeLimit controls the fee limit we place on swaps.
FeeLimit FeeLimit
// ClientRestrictions are the restrictions placed on swap size by the
// client.
ClientRestrictions Restrictions
// ChannelRules maps a short channel ID to a rule that describes how we
// would like liquidity to be managed. These rules and PeerRules are
// exclusively set to prevent overlap between peer and channel rules.
ChannelRules map[lnwire.ShortChannelID]*SwapRule
// PeerRules maps a peer's pubkey to a rule that applies to all the
// channels that we have with the peer collectively. These rules and
// ChannelRules are exclusively set to prevent overlap between peer
// and channel rules map to avoid ambiguity.
PeerRules map[route.Vertex]*SwapRule
}
// String returns the string representation of our parameters.
func (p Parameters) String() string {
ruleList := make([]string, 0, len(p.ChannelRules)+len(p.PeerRules))
for channel, rule := range p.ChannelRules {
ruleList = append(
ruleList, fmt.Sprintf("Channel: %v: %v", channel, rule),
)
}
for peer, rule := range p.PeerRules {
ruleList = append(
ruleList, fmt.Sprintf("Peer: %v: %v", peer, rule),
)
}
return fmt.Sprintf("rules: %v, failure backoff: %v, sweep "+
"sweep conf target: %v, htlc conf target: %v,fees: %v, "+
"auto budget: %v, budget start: %v, max auto in flight: %v, "+
"minimum swap size=%v, maximum swap size=%v",
strings.Join(ruleList, ","), p.FailureBackOff,
p.SweepConfTarget, p.HtlcConfTarget, p.FeeLimit,
p.AutoFeeBudget, p.AutoFeeStartDate, p.MaxAutoInFlight,
p.ClientRestrictions.Minimum, p.ClientRestrictions.Maximum)
}
// haveRules returns a boolean indicating whether we have any rules configured.
func (p Parameters) haveRules() bool {
if len(p.ChannelRules) != 0 {
return true
}
if len(p.PeerRules) != 0 {
return true
}
return false
}
// validate checks whether a set of parameters is valid. Our set of currently
// open channels are required to check that there is no overlap between the
// rules set on a per-peer level, and those set for specific channels. We can't
// allow both, because then we're trying to cater for two separate liquidity
// goals on the same channel. Since we use short channel ID, we don't need to
// worry about pending channels (users would need to work very hard to get the
// short channel ID for a pending channel). Likewise, we don't care about closed
// channels, since there is no action that may occur on them, and we want to
// allow peer-level rules to be set once a channel which had a specific rule
// has been closed. It takes the minimum confirmations we allow for sweep
// confirmation target as a parameter.
// TODO(carla): prune channels that have been closed from rules.
func (p Parameters) validate(minConfs int32, openChans []lndclient.ChannelInfo,
server *Restrictions) error {
// First, we check that the rules on a per peer and per channel do not
// overlap, since this could lead to contractions.
for _, channel := range openChans {
// If we don't have a rule for the peer, there's no way we have
// an overlap between this peer and the channel.
_, ok := p.PeerRules[channel.PubKeyBytes]
if !ok {
continue
}
shortID := lnwire.NewShortChanIDFromInt(channel.ChannelID)
_, ok = p.ChannelRules[shortID]
if ok {
log.Debugf("Rules for peer: %v and its channel: %v "+
"can't both be set", channel.PubKeyBytes, shortID)
return ErrExclusiveRules
}
}
for channel, rule := range p.ChannelRules {
if channel.ToUint64() == 0 {
return ErrZeroChannelID
}
if rule.Type == swap.TypeIn {
return errors.New("channel level rules not supported for " +
"loop in swaps, only peer-level rules allowed")
}
if err := rule.validate(); err != nil {
return fmt.Errorf("channel: %v has invalid rule: %v",
channel.ToUint64(), err)
}
}
for peer, rule := range p.PeerRules {
if err := rule.validate(); err != nil {
return fmt.Errorf("peer: %v has invalid rule: %v",
peer, err)
}
}
// Check that our confirmation target is above our required minimum.
if p.SweepConfTarget < minConfs {
return fmt.Errorf("confirmation target must be at least: %v",
minConfs)
}
if p.HtlcConfTarget < 1 {
return fmt.Errorf("htlc confirmation target must be > 0")
}
if err := p.FeeLimit.validate(); err != nil {
return err
}
if p.AutoFeeBudget < 0 {
return ErrNegativeBudget
}
if p.MaxAutoInFlight <= 0 {
return ErrZeroInFlight
}
err := validateRestrictions(server, &p.ClientRestrictions)
if err != nil {
return err
}
return nil
}
// validateRestrictions checks that client restrictions fall within the server's
// restrictions.
func validateRestrictions(server, client *Restrictions) error {
zeroMin := client.Minimum == 0
zeroMax := client.Maximum == 0
if zeroMin && zeroMax {
return nil
}
// If we have a non-zero maximum, we need to ensure it is greater than
// our minimum (which is fine if min is zero), and does not exceed the
// server's maximum.
if !zeroMax {
if client.Minimum > client.Maximum {
return ErrMinimumExceedsMaximumAmt
}
if client.Maximum > server.Maximum {
return ErrMaxExceedsServer
}
}
if zeroMin {
return nil
}
// If the client set a minimum, ensure it is at least equal to the
// server's limit.
if client.Minimum < server.Minimum {
return ErrMinLessThanServer
}
return nil
// PutLiquidityParams writes the serialized `Parameters` into db.
//
// NOTE: the params are encoded using `proto.Marshal` over an RPC
// request.
PutLiquidityParams func(params []byte) error
// FetchLiquidityParams reads the serialized `Parameters` from db.
//
// NOTE: the params are decoded using `proto.Unmarshal` over a
// serialized RPC request.
FetchLiquidityParams func() ([]byte, error)
}
// Manager contains a set of desired liquidity rules for our channel
@ -433,6 +218,19 @@ func (m *Manager) Run(ctx context.Context) error {
m.cfg.AutoloopTicker.Resume()
defer m.cfg.AutoloopTicker.Stop()
// Before we start the main loop, load the params from db.
req, err := m.loadParams()
if err != nil {
return err
}
// Set the params if there's one.
if req != nil {
if err := m.SetParameters(ctx, req); err != nil {
return err
}
}
for {
select {
case <-m.cfg.AutoloopTicker.Ticks():
@ -469,9 +267,35 @@ func (m *Manager) GetParameters() Parameters {
return cloneParameters(m.params)
}
// SetParameters takes an RPC request and calls the internal method to set
// parameters for the manager.
func (m *Manager) SetParameters(ctx context.Context,
req *clientrpc.LiquidityParameters) error {
params, err := rpcToParameters(req)
if err != nil {
return err
}
if err := m.setParameters(ctx, *params); err != nil {
return err
}
// Save the params on disk.
//
// NOTE: alternatively we can save the bytes in memory and persist them
// on disk during shutdown to save us some IO cost from hitting the db.
// Since setting params is NOT a frequent action, it's should put
// little pressure on our db. Only when performance becomes an issue,
// we can then apply the alternative.
return m.saveParams(req)
}
// SetParameters updates our current set of parameters if the new parameters
// provided are valid.
func (m *Manager) SetParameters(ctx context.Context, params Parameters) error {
func (m *Manager) setParameters(ctx context.Context,
params Parameters) error {
restrictions, err := m.cfg.Restrictions(ctx, swap.TypeOut)
if err != nil {
return err
@ -482,7 +306,9 @@ func (m *Manager) SetParameters(ctx context.Context, params Parameters) error {
return err
}
err = params.validate(m.cfg.MinimumConfirmations, channels, restrictions)
err = params.validate(
m.cfg.MinimumConfirmations, channels, restrictions,
)
if err != nil {
return err
}
@ -491,35 +317,47 @@ func (m *Manager) SetParameters(ctx context.Context, params Parameters) error {
defer m.paramsLock.Unlock()
m.params = cloneParameters(params)
return nil
}
// cloneParameters creates a deep clone of a parameters struct so that callers
// cannot mutate our parameters. Although our parameters struct itself is not
// a reference, we still need to clone the contents of maps.
func cloneParameters(params Parameters) Parameters {
paramCopy := params
paramCopy.ChannelRules = make(
map[lnwire.ShortChannelID]*SwapRule,
len(params.ChannelRules),
)
// saveParams marshals an RPC request and saves it to db.
func (m *Manager) saveParams(req proto.Message) error {
// Marshal the params.
paramsBytes, err := proto.Marshal(req)
if err != nil {
return err
}
for channel, rule := range params.ChannelRules {
ruleCopy := *rule
paramCopy.ChannelRules[channel] = &ruleCopy
// Save the params on disk.
if err := m.cfg.PutLiquidityParams(paramsBytes); err != nil {
return fmt.Errorf("failed to save params: %v", err)
}
paramCopy.PeerRules = make(
map[route.Vertex]*SwapRule,
len(params.PeerRules),
)
return nil
}
for peer, rule := range params.PeerRules {
ruleCopy := *rule
paramCopy.PeerRules[peer] = &ruleCopy
// loadParams unmarshals a serialized RPC request from db and returns the RPC
// request.
func (m *Manager) loadParams() (*clientrpc.LiquidityParameters, error) {
paramsBytes, err := m.cfg.FetchLiquidityParams()
if err != nil {
return nil, fmt.Errorf("failed to read params: %v", err)
}
// Return early if there's nothing saved.
if paramsBytes == nil {
return nil, nil
}
// Unmarshal the params.
req := &clientrpc.LiquidityParameters{}
err = proto.Unmarshal(paramsBytes, req)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal params: %v", err)
}
return paramCopy
return req, nil
}
// autoloop gets a set of suggested swaps and dispatches them automatically if

@ -10,6 +10,7 @@ import (
"github.com/lightninglabs/loop"
"github.com/lightninglabs/loop/labels"
"github.com/lightninglabs/loop/loopdb"
clientrpc "github.com/lightninglabs/loop/looprpc"
"github.com/lightninglabs/loop/swap"
"github.com/lightninglabs/loop/test"
"github.com/lightningnetwork/lnd/clock"
@ -221,7 +222,7 @@ func TestParameters(t *testing.T) {
chanID: originalRule,
}
err := manager.SetParameters(context.Background(), expected)
err := manager.setParameters(context.Background(), expected)
require.NoError(t, err)
// Check that changing the parameters we just set does not mutate
@ -242,62 +243,60 @@ func TestParameters(t *testing.T) {
Type: swap.TypeOut,
},
}
err = manager.SetParameters(context.Background(), expected)
err = manager.setParameters(context.Background(), expected)
require.Equal(t, ErrZeroChannelID, err)
}
// TestValidateRestrictions tests validating client restrictions against a set
// of server restrictions.
func TestValidateRestrictions(t *testing.T) {
tests := []struct {
name string
client *Restrictions
server *Restrictions
err error
}{
{
name: "client invalid",
client: &Restrictions{
Minimum: 100,
Maximum: 1,
},
server: testRestrictions,
err: ErrMinimumExceedsMaximumAmt,
},
{
name: "maximum exceeds server",
client: &Restrictions{
Maximum: 2000,
},
server: &Restrictions{
Minimum: 1000,
Maximum: 1500,
},
err: ErrMaxExceedsServer,
},
{
name: "minimum less than server",
client: &Restrictions{
Minimum: 500,
},
server: &Restrictions{
Minimum: 1000,
Maximum: 1500,
},
err: ErrMinLessThanServer,
},
// TestPersistParams tests reading and writing of parameters for our manager.
func TestPersistParams(t *testing.T) {
rpcParams := &clientrpc.LiquidityParameters{
FeePpm: 100,
AutoMaxInFlight: 10,
HtlcConfTarget: 2,
}
cfg, _ := newTestConfig()
manager := NewManager(cfg)
for _, testCase := range tests {
testCase := testCase
var paramsBytes []byte
t.Run(testCase.name, func(t *testing.T) {
err := validateRestrictions(
testCase.server, testCase.client,
)
require.Equal(t, testCase.err, err)
})
// Mock the read method to return empty data.
manager.cfg.FetchLiquidityParams = func() ([]byte, error) {
return paramsBytes, nil
}
// Test the nil params is returned.
req, err := manager.loadParams()
require.Nil(t, req)
require.NoError(t, err)
// Mock the write method to return no error.
manager.cfg.PutLiquidityParams = func(data []byte) error {
paramsBytes = data
return nil
}
// Test save the message.
err = manager.saveParams(rpcParams)
require.NoError(t, err)
// Test the nil params is returned.
req, err = manager.loadParams()
require.NoError(t, err)
// Check the specified fields are set as expected.
require.Equal(t, rpcParams.FeePpm, req.FeePpm)
require.Equal(t, rpcParams.AutoMaxInFlight, req.AutoMaxInFlight)
require.Equal(t, rpcParams.HtlcConfTarget, req.HtlcConfTarget)
// Check the unspecified fields are using empty values.
require.False(t, req.Autoloop)
require.Empty(t, req.Rules)
require.Zero(t, req.AutoloopBudgetSat)
// Finally, check the loaded request can be used to set params without
// error.
err = manager.SetParameters(context.Background(), req)
require.NoError(t, err)
}
// TestRestrictedSuggestions tests getting of swap suggestions when we have
@ -1832,7 +1831,7 @@ func testSuggestSwaps(t *testing.T, setup *testSuggestSwapsSetup,
// them to use the rules set by the test.
manager := NewManager(setup.cfg)
err := manager.SetParameters(context.Background(), setup.params)
err := manager.setParameters(context.Background(), setup.params)
require.NoError(t, err)
actual, err := manager.SuggestSwaps(context.Background(), false)
@ -2014,7 +2013,7 @@ func TestCurrentTraffic(t *testing.T) {
params := m.GetParameters()
params.FailureBackOff = backoff
require.NoError(t, m.SetParameters(context.Background(), params))
require.NoError(t, m.setParameters(context.Background(), params))
actual := m.currentSwapTraffic(testCase.loopOut, testCase.loopIn)
require.Equal(t, testCase.expected, actual)

@ -0,0 +1,423 @@
package liquidity
import (
"errors"
"fmt"
"strings"
"time"
"github.com/btcsuite/btcd/btcutil"
"github.com/lightninglabs/lndclient"
"github.com/lightninglabs/loop/swap"
"github.com/lightningnetwork/lnd/lnwallet/chainfee"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/routing/route"
clientrpc "github.com/lightninglabs/loop/looprpc"
)
var (
// defaultParameters contains the default parameters that we start our
// liquidity manger with.
defaultParameters = Parameters{
AutoFeeBudget: defaultBudget,
MaxAutoInFlight: defaultMaxInFlight,
ChannelRules: make(map[lnwire.ShortChannelID]*SwapRule),
PeerRules: make(map[route.Vertex]*SwapRule),
FailureBackOff: defaultFailureBackoff,
SweepConfTarget: defaultConfTarget,
HtlcConfTarget: defaultHtlcConfTarget,
FeeLimit: defaultFeePortion(),
}
)
// Parameters is a set of parameters provided by the user which guide
// how we assess liquidity.
type Parameters struct {
// Autoloop enables automatic dispatch of swaps.
Autoloop bool
// AutoFeeBudget is the total amount we allow to be spent on
// automatically dispatched swaps. Once this budget has been used, we
// will stop dispatching swaps until the budget is increased or the
// start date is moved.
AutoFeeBudget btcutil.Amount
// AutoFeeStartDate is the date from which we will include automatically
// dispatched swaps in our current budget, inclusive.
AutoFeeStartDate time.Time
// MaxAutoInFlight is the maximum number of in-flight automatically
// dispatched swaps we allow.
MaxAutoInFlight int
// FailureBackOff is the amount of time that we require passes after a
// channel has been part of a failed loop out swap before we suggest
// using it again.
// TODO(carla): add exponential backoff
FailureBackOff time.Duration
// SweepConfTarget is the number of blocks we aim to confirm our sweep
// transaction in. This value affects the on chain fees we will pay.
SweepConfTarget int32
// HtlcConfTarget is the confirmation target that we use for publishing
// loop in swap htlcs on chain.
HtlcConfTarget int32
// FeeLimit controls the fee limit we place on swaps.
FeeLimit FeeLimit
// ClientRestrictions are the restrictions placed on swap size by the
// client.
ClientRestrictions Restrictions
// ChannelRules maps a short channel ID to a rule that describes how we
// would like liquidity to be managed. These rules and PeerRules are
// exclusively set to prevent overlap between peer and channel rules.
ChannelRules map[lnwire.ShortChannelID]*SwapRule
// PeerRules maps a peer's pubkey to a rule that applies to all the
// channels that we have with the peer collectively. These rules and
// ChannelRules are exclusively set to prevent overlap between peer
// and channel rules map to avoid ambiguity.
PeerRules map[route.Vertex]*SwapRule
}
// String returns the string representation of our parameters.
func (p Parameters) String() string {
ruleList := make([]string, 0, len(p.ChannelRules)+len(p.PeerRules))
for channel, rule := range p.ChannelRules {
ruleList = append(
ruleList, fmt.Sprintf("Channel: %v: %v", channel, rule),
)
}
for peer, rule := range p.PeerRules {
ruleList = append(
ruleList, fmt.Sprintf("Peer: %v: %v", peer, rule),
)
}
return fmt.Sprintf("rules: %v, failure backoff: %v, sweep "+
"sweep conf target: %v, htlc conf target: %v,fees: %v, "+
"auto budget: %v, budget start: %v, max auto in flight: %v, "+
"minimum swap size=%v, maximum swap size=%v",
strings.Join(ruleList, ","), p.FailureBackOff,
p.SweepConfTarget, p.HtlcConfTarget, p.FeeLimit,
p.AutoFeeBudget, p.AutoFeeStartDate, p.MaxAutoInFlight,
p.ClientRestrictions.Minimum, p.ClientRestrictions.Maximum)
}
// haveRules returns a boolean indicating whether we have any rules configured.
func (p Parameters) haveRules() bool {
if len(p.ChannelRules) != 0 {
return true
}
if len(p.PeerRules) != 0 {
return true
}
return false
}
// validate checks whether a set of parameters is valid. Our set of currently
// open channels are required to check that there is no overlap between the
// rules set on a per-peer level, and those set for specific channels. We can't
// allow both, because then we're trying to cater for two separate liquidity
// goals on the same channel. Since we use short channel ID, we don't need to
// worry about pending channels (users would need to work very hard to get the
// short channel ID for a pending channel). Likewise, we don't care about closed
// channels, since there is no action that may occur on them, and we want to
// allow peer-level rules to be set once a channel which had a specific rule
// has been closed. It takes the minimum confirmations we allow for sweep
// confirmation target as a parameter.
// TODO(carla): prune channels that have been closed from rules.
func (p Parameters) validate(minConfs int32, openChans []lndclient.ChannelInfo,
server *Restrictions) error {
// First, we check that the rules on a per peer and per channel do not
// overlap, since this could lead to contractions.
for _, channel := range openChans {
// If we don't have a rule for the peer, there's no way we have
// an overlap between this peer and the channel.
_, ok := p.PeerRules[channel.PubKeyBytes]
if !ok {
continue
}
shortID := lnwire.NewShortChanIDFromInt(channel.ChannelID)
_, ok = p.ChannelRules[shortID]
if ok {
log.Debugf("Rules for peer: %v and its channel: %v "+
"can't both be set", channel.PubKeyBytes, shortID)
return ErrExclusiveRules
}
}
for channel, rule := range p.ChannelRules {
if channel.ToUint64() == 0 {
return ErrZeroChannelID
}
if rule.Type == swap.TypeIn {
return errors.New("channel level rules not supported for " +
"loop in swaps, only peer-level rules allowed")
}
if err := rule.validate(); err != nil {
return fmt.Errorf("channel: %v has invalid rule: %v",
channel.ToUint64(), err)
}
}
for peer, rule := range p.PeerRules {
if err := rule.validate(); err != nil {
return fmt.Errorf("peer: %v has invalid rule: %v",
peer, err)
}
}
// Check that our confirmation target is above our required minimum.
if p.SweepConfTarget < minConfs {
return fmt.Errorf("confirmation target must be at least: %v",
minConfs)
}
if p.HtlcConfTarget < 1 {
return fmt.Errorf("htlc confirmation target must be > 0")
}
if err := p.FeeLimit.validate(); err != nil {
return err
}
if p.AutoFeeBudget < 0 {
return ErrNegativeBudget
}
if p.MaxAutoInFlight <= 0 {
return ErrZeroInFlight
}
err := validateRestrictions(server, &p.ClientRestrictions)
if err != nil {
return err
}
return nil
}
// validateRestrictions checks that client restrictions fall within the server's
// restrictions.
func validateRestrictions(server, client *Restrictions) error {
zeroMin := client.Minimum == 0
zeroMax := client.Maximum == 0
if zeroMin && zeroMax {
return nil
}
// If we have a non-zero maximum, we need to ensure it is greater than
// our minimum (which is fine if min is zero), and does not exceed the
// server's maximum.
if !zeroMax {
if client.Minimum > client.Maximum {
return ErrMinimumExceedsMaximumAmt
}
if client.Maximum > server.Maximum {
return ErrMaxExceedsServer
}
}
if zeroMin {
return nil
}
// If the client set a minimum, ensure it is at least equal to the
// server's limit.
if client.Minimum < server.Minimum {
return ErrMinLessThanServer
}
return nil
}
// cloneParameters creates a deep clone of a parameters struct so that callers
// cannot mutate our parameters. Although our parameters struct itself is not
// a reference, we still need to clone the contents of maps.
func cloneParameters(params Parameters) Parameters {
paramCopy := params
paramCopy.ChannelRules = make(
map[lnwire.ShortChannelID]*SwapRule,
len(params.ChannelRules),
)
for channel, rule := range params.ChannelRules {
ruleCopy := *rule
paramCopy.ChannelRules[channel] = &ruleCopy
}
paramCopy.PeerRules = make(
map[route.Vertex]*SwapRule,
len(params.PeerRules),
)
for peer, rule := range params.PeerRules {
ruleCopy := *rule
paramCopy.PeerRules[peer] = &ruleCopy
}
return paramCopy
}
// rpcToFee converts the values provided over rpc to a fee limit interface,
// failing if an inconsistent set of fields are set.
func rpcToFee(req *clientrpc.LiquidityParameters) (FeeLimit, error) {
// Check which fee limit type we have values set for. If any fields
// relevant to our individual categories are set, we count that type
// as set.
isFeePPM := req.FeePpm != 0
isCategories := req.MaxSwapFeePpm != 0 || req.MaxRoutingFeePpm != 0 ||
req.MaxPrepayRoutingFeePpm != 0 || req.MaxMinerFeeSat != 0 ||
req.MaxPrepaySat != 0 || req.SweepFeeRateSatPerVbyte != 0
switch {
case isFeePPM && isCategories:
return nil, errors.New("set either fee ppm, or individual " +
"fee categories")
case isFeePPM:
return NewFeePortion(req.FeePpm), nil
case isCategories:
satPerKVbyte := chainfee.SatPerKVByte(
req.SweepFeeRateSatPerVbyte * 1000,
)
return NewFeeCategoryLimit(
req.MaxSwapFeePpm,
req.MaxRoutingFeePpm,
req.MaxPrepayRoutingFeePpm,
btcutil.Amount(req.MaxMinerFeeSat),
btcutil.Amount(req.MaxPrepaySat),
satPerKVbyte.FeePerKWeight(),
), nil
default:
return nil, errors.New("no fee categories set")
}
}
// rpcToRule switches on rpc rule type to convert to our rule interface.
func rpcToRule(rule *clientrpc.LiquidityRule) (*SwapRule, error) {
swapType := swap.TypeOut
if rule.SwapType == clientrpc.SwapType_LOOP_IN {
swapType = swap.TypeIn
}
switch rule.Type {
case clientrpc.LiquidityRuleType_UNKNOWN:
return nil, fmt.Errorf("rule type field must be set")
case clientrpc.LiquidityRuleType_THRESHOLD:
return &SwapRule{
ThresholdRule: NewThresholdRule(
int(rule.IncomingThreshold),
int(rule.OutgoingThreshold),
),
Type: swapType,
}, nil
default:
return nil, fmt.Errorf("unknown rule: %T", rule)
}
}
// rpcToParameters takes a `LiquidityParameters` and creates a `Parameters`
// from it.
func rpcToParameters(req *clientrpc.LiquidityParameters) (*Parameters,
error) {
feeLimit, err := rpcToFee(req)
if err != nil {
return nil, err
}
params := &Parameters{
FeeLimit: feeLimit,
SweepConfTarget: req.SweepConfTarget,
FailureBackOff: time.Duration(req.FailureBackoffSec) *
time.Second,
Autoloop: req.Autoloop,
AutoFeeBudget: btcutil.Amount(req.AutoloopBudgetSat),
MaxAutoInFlight: int(req.AutoMaxInFlight),
ChannelRules: make(
map[lnwire.ShortChannelID]*SwapRule,
),
PeerRules: make(
map[route.Vertex]*SwapRule,
),
ClientRestrictions: Restrictions{
Minimum: btcutil.Amount(req.MinSwapAmount),
Maximum: btcutil.Amount(req.MaxSwapAmount),
},
HtlcConfTarget: req.HtlcConfTarget,
}
// Zero unix time is different to zero golang time.
if req.AutoloopBudgetStartSec != 0 {
params.AutoFeeStartDate = time.Unix(
int64(req.AutoloopBudgetStartSec), 0,
)
}
for _, rule := range req.Rules {
peerRule := rule.Pubkey != nil
chanRule := rule.ChannelId != 0
liquidityRule, err := rpcToRule(rule)
if err != nil {
return nil, err
}
switch {
case peerRule && chanRule:
return nil, fmt.Errorf("cannot set channel: %v and "+
"peer: %v fields in rule", rule.ChannelId,
rule.Pubkey)
case peerRule:
pubkey, err := route.NewVertexFromBytes(rule.Pubkey)
if err != nil {
return nil, err
}
if _, ok := params.PeerRules[pubkey]; ok {
return nil, fmt.Errorf("multiple rules set "+
"for peer: %v", pubkey)
}
params.PeerRules[pubkey] = liquidityRule
case chanRule:
shortID := lnwire.NewShortChanIDFromInt(rule.ChannelId)
if _, ok := params.ChannelRules[shortID]; ok {
return nil, fmt.Errorf("multiple rules set "+
"for channel: %v", shortID)
}
params.ChannelRules[shortID] = liquidityRule
default:
return nil, errors.New("please set channel id or " +
"pubkey for rule")
}
}
return params, nil
}

@ -0,0 +1,61 @@
package liquidity
import (
"testing"
"github.com/stretchr/testify/require"
)
// TestValidateRestrictions tests validating client restrictions against a set
// of server restrictions.
func TestValidateRestrictions(t *testing.T) {
tests := []struct {
name string
client *Restrictions
server *Restrictions
err error
}{
{
name: "client invalid",
client: &Restrictions{
Minimum: 100,
Maximum: 1,
},
server: testRestrictions,
err: ErrMinimumExceedsMaximumAmt,
},
{
name: "maximum exceeds server",
client: &Restrictions{
Maximum: 2000,
},
server: &Restrictions{
Minimum: 1000,
Maximum: 1500,
},
err: ErrMaxExceedsServer,
},
{
name: "minimum less than server",
client: &Restrictions{
Minimum: 500,
},
server: &Restrictions{
Minimum: 1000,
Maximum: 1500,
},
err: ErrMinLessThanServer,
},
}
for _, testCase := range tests {
testCase := testCase
t.Run(testCase.name, func(t *testing.T) {
err := validateRestrictions(
testCase.server, testCase.client,
)
require.Equal(t, testCase.err, err)
})
}
}

@ -21,8 +21,6 @@ import (
"github.com/lightninglabs/loop/swap"
looprpc "github.com/lightninglabs/loop/swapserverrpc"
"github.com/lightningnetwork/lnd/lntypes"
"github.com/lightningnetwork/lnd/lnwallet/chainfee"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/queue"
"github.com/lightningnetwork/lnd/routing/route"
"github.com/lightningnetwork/lnd/zpay32"
@ -806,154 +804,14 @@ func (s *swapClientServer) SetLiquidityParams(ctx context.Context,
in *clientrpc.SetLiquidityParamsRequest) (*clientrpc.SetLiquidityParamsResponse,
error) {
feeLimit, err := rpcToFee(in.Parameters)
err := s.liquidityMgr.SetParameters(ctx, in.Parameters)
if err != nil {
return nil, err
}
params := liquidity.Parameters{
FeeLimit: feeLimit,
SweepConfTarget: in.Parameters.SweepConfTarget,
FailureBackOff: time.Duration(in.Parameters.FailureBackoffSec) *
time.Second,
Autoloop: in.Parameters.Autoloop,
AutoFeeBudget: btcutil.Amount(in.Parameters.AutoloopBudgetSat),
MaxAutoInFlight: int(in.Parameters.AutoMaxInFlight),
ChannelRules: make(
map[lnwire.ShortChannelID]*liquidity.SwapRule,
),
PeerRules: make(
map[route.Vertex]*liquidity.SwapRule,
),
ClientRestrictions: liquidity.Restrictions{
Minimum: btcutil.Amount(in.Parameters.MinSwapAmount),
Maximum: btcutil.Amount(in.Parameters.MaxSwapAmount),
},
HtlcConfTarget: in.Parameters.HtlcConfTarget,
}
// Zero unix time is different to zero golang time.
if in.Parameters.AutoloopBudgetStartSec != 0 {
params.AutoFeeStartDate = time.Unix(
int64(in.Parameters.AutoloopBudgetStartSec), 0,
)
}
for _, rule := range in.Parameters.Rules {
peerRule := rule.Pubkey != nil
chanRule := rule.ChannelId != 0
liquidityRule, err := rpcToRule(rule)
if err != nil {
return nil, err
}
switch {
case peerRule && chanRule:
return nil, fmt.Errorf("cannot set channel: %v and "+
"peer: %v fields in rule", rule.ChannelId,
rule.Pubkey)
case peerRule:
pubkey, err := route.NewVertexFromBytes(rule.Pubkey)
if err != nil {
return nil, err
}
if _, ok := params.PeerRules[pubkey]; ok {
return nil, fmt.Errorf("multiple rules set "+
"for peer: %v", pubkey)
}
params.PeerRules[pubkey] = liquidityRule
case chanRule:
shortID := lnwire.NewShortChanIDFromInt(rule.ChannelId)
if _, ok := params.ChannelRules[shortID]; ok {
return nil, fmt.Errorf("multiple rules set "+
"for channel: %v", shortID)
}
params.ChannelRules[shortID] = liquidityRule
default:
return nil, errors.New("please set channel id or " +
"pubkey for rule")
}
}
if err := s.liquidityMgr.SetParameters(ctx, params); err != nil {
return nil, err
}
return &clientrpc.SetLiquidityParamsResponse{}, nil
}
// rpcToFee converts the values provided over rpc to a fee limit interface,
// failing if an inconsistent set of fields are set.
func rpcToFee(req *clientrpc.LiquidityParameters) (liquidity.FeeLimit,
error) {
// Check which fee limit type we have values set for. If any fields
// relevant to our individual categories are set, we count that type
// as set.
isFeePPM := req.FeePpm != 0
isCategories := req.MaxSwapFeePpm != 0 || req.MaxRoutingFeePpm != 0 ||
req.MaxPrepayRoutingFeePpm != 0 || req.MaxMinerFeeSat != 0 ||
req.MaxPrepaySat != 0 || req.SweepFeeRateSatPerVbyte != 0
switch {
case isFeePPM && isCategories:
return nil, errors.New("set either fee ppm, or individual " +
"fee categories")
case isFeePPM:
return liquidity.NewFeePortion(req.FeePpm), nil
case isCategories:
satPerVbyte := chainfee.SatPerKVByte(
req.SweepFeeRateSatPerVbyte * 1000,
)
return liquidity.NewFeeCategoryLimit(
req.MaxSwapFeePpm,
req.MaxRoutingFeePpm,
req.MaxPrepayRoutingFeePpm,
btcutil.Amount(req.MaxMinerFeeSat),
btcutil.Amount(req.MaxPrepaySat),
satPerVbyte.FeePerKWeight(),
), nil
default:
return nil, errors.New("no fee categories set")
}
}
// rpcToRule switches on rpc rule type to convert to our rule interface.
func rpcToRule(rule *clientrpc.LiquidityRule) (*liquidity.SwapRule, error) {
swapType := swap.TypeOut
if rule.SwapType == clientrpc.SwapType_LOOP_IN {
swapType = swap.TypeIn
}
switch rule.Type {
case clientrpc.LiquidityRuleType_UNKNOWN:
return nil, fmt.Errorf("rule type field must be set")
case clientrpc.LiquidityRuleType_THRESHOLD:
return &liquidity.SwapRule{
ThresholdRule: liquidity.NewThresholdRule(
int(rule.IncomingThreshold),
int(rule.OutgoingThreshold),
),
Type: swapType,
}, nil
default:
return nil, fmt.Errorf("unknown rule: %T", rule)
}
}
// SuggestSwaps provides a list of suggested swaps based on lnd's current
// channel balances and rules set by the liquidity manager.
func (s *swapClientServer) SuggestSwaps(ctx context.Context,

@ -72,6 +72,8 @@ func getLiquidityManager(client *loop.Client) *liquidity.Manager {
ListLoopOut: client.Store.FetchLoopOutSwaps,
ListLoopIn: client.Store.FetchLoopInSwaps,
MinimumConfirmations: minConfTarget,
PutLiquidityParams: client.Store.PutLiquidityParams,
FetchLiquidityParams: client.Store.FetchLiquidityParams,
}
return liquidity.NewManager(mngrCfg)

@ -33,6 +33,20 @@ type SwapStore interface {
UpdateLoopIn(hash lntypes.Hash, time time.Time,
state SwapStateData) error
// PutLiquidityParams writes the serialized `manager.Parameters` bytes
// into the bucket.
//
// NOTE: it's the caller's responsibility to encode the param. Atm,
// it's encoding using the proto package's `Marshal` method.
PutLiquidityParams(params []byte) error
// FetchLiquidityParams reads the serialized `manager.Parameters` bytes
// from the bucket.
//
// NOTE: it's the caller's responsibility to decode the param. Atm,
// it's decoding using the proto package's `Unmarshal` method.
FetchLiquidityParams() ([]byte, error)
// Close closes the underlying database.
Close() error
}

@ -94,6 +94,14 @@ var (
// value: uint32 confirmation value
confirmationsKey = []byte("confirmations")
// liquidtyBucket is a root bucket used to save liquidity manager
// related info.
liquidityBucket = []byte("liquidity")
// liquidtyParamsKey specifies the key used to store the liquidity
// parameters.
liquidtyParamsKey = []byte("params")
byteOrder = binary.BigEndian
keyLength = 33
@ -190,6 +198,12 @@ func NewBoltSwapStore(dbPath string, chainParams *chaincfg.Params) (
return err
}
// Create liquidity manager's bucket.
_, err = tx.CreateBucketIfNotExists(liquidityBucket)
if err != nil {
return err
}
return nil
})
if err != nil {
@ -712,3 +726,41 @@ func (s *boltSwapStore) UpdateLoopIn(hash lntypes.Hash, time time.Time,
func (s *boltSwapStore) Close() error {
return s.db.Close()
}
// PutLiquidityParams writes the serialized `manager.Parameters` bytes into the
// bucket.
//
// NOTE: it's the caller's responsibility to encode the param. Atm, it's
// encoding using the proto package's `Marshal` method.
func (s *boltSwapStore) PutLiquidityParams(params []byte) error {
return s.db.Update(func(tx *bbolt.Tx) error {
// Read the root bucket.
rootBucket := tx.Bucket(liquidityBucket)
if rootBucket == nil {
return errors.New("liquidity bucket does not exist")
}
return rootBucket.Put(liquidtyParamsKey, params)
})
}
// FetchLiquidityParams reads the serialized `manager.Parameters` bytes from
// the bucket.
//
// NOTE: it's the caller's responsibility to decode the param. Atm, it's
// decoding using the proto package's `Unmarshal` method.
func (s *boltSwapStore) FetchLiquidityParams() ([]byte, error) {
var params []byte
err := s.db.View(func(tx *bbolt.Tx) error {
// Read the root bucket.
rootBucket := tx.Bucket(liquidityBucket)
if rootBucket == nil {
return errors.New("liquidity bucket does not exist")
}
params = rootBucket.Get(liquidtyParamsKey)
return nil
})
return params, err
}

@ -471,3 +471,31 @@ func TestLegacyOutgoingChannel(t *testing.T) {
t.Fatal("invalid outgoing channel")
}
}
// TestLiquidityParams checks that reading and writing to liquidty bucket are
// as expected.
func TestLiquidityParams(t *testing.T) {
tempDirName, err := ioutil.TempDir("", "clientstore")
require.NoError(t, err, "failed to db")
defer os.RemoveAll(tempDirName)
store, err := NewBoltSwapStore(tempDirName, &chaincfg.MainNetParams)
require.NoError(t, err, "failed to create store")
// Test when there's no params saved before, an empty bytes is
// returned.
params, err := store.FetchLiquidityParams()
require.NoError(t, err, "failed to fetch params")
require.Empty(t, params, "expect empty bytes")
params = []byte("test")
// Test we can save the params.
err = store.PutLiquidityParams(params)
require.NoError(t, err, "failed to put params")
// Now fetch the db again should return the above saved bytes.
paramsRead, err := store.FetchLiquidityParams()
require.NoError(t, err, "failed to fetch params")
require.Equal(t, params, paramsRead, "unexpected return value")
}

@ -16,6 +16,9 @@ This file tracks release notes for the loop client.
#### New Features
* User-specified liquidity parameters are persisted on disk to enable the
liquidity manager's config to surive after a restart.
#### Breaking Changes
#### Bug Fixes

@ -171,6 +171,22 @@ func (s *storeMock) UpdateLoopIn(hash lntypes.Hash, time time.Time,
return nil
}
// PutLiquidityParams writes the serialized `manager.Parameters` bytes into the
// bucket.
//
// NOTE: Part of the loopdb.SwapStore interface.
func (s *storeMock) PutLiquidityParams(params []byte) error {
return nil
}
// FetchLiquidityParams reads the serialized `manager.Parameters` bytes from
// the bucket.
//
// NOTE: Part of the loopdb.SwapStore interface.
func (s *storeMock) FetchLiquidityParams() ([]byte, error) {
return nil, nil
}
func (s *storeMock) Close() error {
return nil
}

@ -26,7 +26,7 @@ const semanticAlphabet = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqr
const (
// Note: please update release_notes.md when you change these values.
appMajor uint = 0
appMinor uint = 18
appMinor uint = 19
appPatch uint = 0
// appPreRelease MUST only contain characters from semanticAlphabet per

Loading…
Cancel
Save