diff --git a/cmd/loop/liquidity.go b/cmd/loop/liquidity.go index a494190..b0f4b08 100644 --- a/cmd/loop/liquidity.go +++ b/cmd/loop/liquidity.go @@ -223,9 +223,12 @@ func setRule(ctx *cli.Context) error { } var setParamsCommand = cli.Command{ - Name: "setparams", - Usage: "update the parameters set for the liquidity manager", - Description: "Updates the parameters set for the liquidity manager.", + Name: "setparams", + Usage: "update the parameters set for the liquidity manager", + Description: "Updates the parameters set for the liquidity manager. " + + "Note the parameters are persisted in db to save the trouble " + + "of setting them again upon loopd restart. To get the " + + "default values, use `getparams` before any `setparams`.", Flags: []cli.Flag{ cli.IntFlag{ Name: "sweeplimit", diff --git a/liquidity/autoloop_testcontext_test.go b/liquidity/autoloop_testcontext_test.go index ef48bdf..a26300a 100644 --- a/liquidity/autoloop_testcontext_test.go +++ b/liquidity/autoloop_testcontext_test.go @@ -166,6 +166,12 @@ func newAutoloopTestCtx(t *testing.T, parameters Parameters, MinimumConfirmations: loop.DefaultSweepConfTarget, Lnd: &testCtx.lnd.LndServices, Clock: testCtx.testClock, + PutLiquidityParams: func(_ []byte) error { + return nil + }, + FetchLiquidityParams: func() ([]byte, error) { + return nil, nil + }, } // SetParameters needs to make a call to our mocked restrictions call, @@ -179,7 +185,7 @@ func newAutoloopTestCtx(t *testing.T, parameters Parameters, // Create a manager with our test config and set our starting set of // parameters. testCtx.manager = NewManager(cfg) - err := testCtx.manager.SetParameters(context.Background(), parameters) + err := testCtx.manager.setParameters(context.Background(), parameters) assert.NoError(t, err) <-done return testCtx diff --git a/liquidity/liquidity.go b/liquidity/liquidity.go index 0b3f0d4..2721c71 100644 --- a/liquidity/liquidity.go +++ b/liquidity/liquidity.go @@ -37,7 +37,6 @@ import ( "errors" "fmt" "sort" - "strings" "sync" "time" @@ -53,6 +52,9 @@ import ( "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/routing/route" "github.com/lightningnetwork/lnd/ticker" + "google.golang.org/protobuf/proto" + + clientrpc "github.com/lightninglabs/loop/looprpc" ) const ( @@ -102,19 +104,6 @@ var ( // funding amount. defaultBudget = ppmToSat(funding.MaxBtcFundingAmount, defaultFeePPM) - // defaultParameters contains the default parameters that we start our - // liquidity manger with. - defaultParameters = Parameters{ - AutoFeeBudget: defaultBudget, - MaxAutoInFlight: defaultMaxInFlight, - ChannelRules: make(map[lnwire.ShortChannelID]*SwapRule), - PeerRules: make(map[route.Vertex]*SwapRule), - FailureBackOff: defaultFailureBackoff, - SweepConfTarget: defaultConfTarget, - HtlcConfTarget: defaultHtlcConfTarget, - FeeLimit: defaultFeePortion(), - } - // ErrZeroChannelID is returned if we get a rule for a 0 channel ID. ErrZeroChannelID = fmt.Errorf("zero channel ID not allowed") @@ -193,222 +182,18 @@ type Config struct { // MinimumConfirmations is the minimum number of confirmations we allow // setting for sweep target. MinimumConfirmations int32 -} - -// Parameters is a set of parameters provided by the user which guide -// how we assess liquidity. -type Parameters struct { - // Autoloop enables automatic dispatch of swaps. - Autoloop bool - - // AutoFeeBudget is the total amount we allow to be spent on - // automatically dispatched swaps. Once this budget has been used, we - // will stop dispatching swaps until the budget is increased or the - // start date is moved. - AutoFeeBudget btcutil.Amount - - // AutoFeeStartDate is the date from which we will include automatically - // dispatched swaps in our current budget, inclusive. - AutoFeeStartDate time.Time - - // MaxAutoInFlight is the maximum number of in-flight automatically - // dispatched swaps we allow. - MaxAutoInFlight int - - // FailureBackOff is the amount of time that we require passes after a - // channel has been part of a failed loop out swap before we suggest - // using it again. - // TODO(carla): add exponential backoff - FailureBackOff time.Duration - - // SweepConfTarget is the number of blocks we aim to confirm our sweep - // transaction in. This value affects the on chain fees we will pay. - SweepConfTarget int32 - - // HtlcConfTarget is the confirmation target that we use for publishing - // loop in swap htlcs on chain. - HtlcConfTarget int32 - - // FeeLimit controls the fee limit we place on swaps. - FeeLimit FeeLimit - - // ClientRestrictions are the restrictions placed on swap size by the - // client. - ClientRestrictions Restrictions - - // ChannelRules maps a short channel ID to a rule that describes how we - // would like liquidity to be managed. These rules and PeerRules are - // exclusively set to prevent overlap between peer and channel rules. - ChannelRules map[lnwire.ShortChannelID]*SwapRule - - // PeerRules maps a peer's pubkey to a rule that applies to all the - // channels that we have with the peer collectively. These rules and - // ChannelRules are exclusively set to prevent overlap between peer - // and channel rules map to avoid ambiguity. - PeerRules map[route.Vertex]*SwapRule -} - -// String returns the string representation of our parameters. -func (p Parameters) String() string { - ruleList := make([]string, 0, len(p.ChannelRules)+len(p.PeerRules)) - - for channel, rule := range p.ChannelRules { - ruleList = append( - ruleList, fmt.Sprintf("Channel: %v: %v", channel, rule), - ) - } - - for peer, rule := range p.PeerRules { - ruleList = append( - ruleList, fmt.Sprintf("Peer: %v: %v", peer, rule), - ) - } - - return fmt.Sprintf("rules: %v, failure backoff: %v, sweep "+ - "sweep conf target: %v, htlc conf target: %v,fees: %v, "+ - "auto budget: %v, budget start: %v, max auto in flight: %v, "+ - "minimum swap size=%v, maximum swap size=%v", - strings.Join(ruleList, ","), p.FailureBackOff, - p.SweepConfTarget, p.HtlcConfTarget, p.FeeLimit, - p.AutoFeeBudget, p.AutoFeeStartDate, p.MaxAutoInFlight, - p.ClientRestrictions.Minimum, p.ClientRestrictions.Maximum) -} - -// haveRules returns a boolean indicating whether we have any rules configured. -func (p Parameters) haveRules() bool { - if len(p.ChannelRules) != 0 { - return true - } - - if len(p.PeerRules) != 0 { - return true - } - - return false -} - -// validate checks whether a set of parameters is valid. Our set of currently -// open channels are required to check that there is no overlap between the -// rules set on a per-peer level, and those set for specific channels. We can't -// allow both, because then we're trying to cater for two separate liquidity -// goals on the same channel. Since we use short channel ID, we don't need to -// worry about pending channels (users would need to work very hard to get the -// short channel ID for a pending channel). Likewise, we don't care about closed -// channels, since there is no action that may occur on them, and we want to -// allow peer-level rules to be set once a channel which had a specific rule -// has been closed. It takes the minimum confirmations we allow for sweep -// confirmation target as a parameter. -// TODO(carla): prune channels that have been closed from rules. -func (p Parameters) validate(minConfs int32, openChans []lndclient.ChannelInfo, - server *Restrictions) error { - - // First, we check that the rules on a per peer and per channel do not - // overlap, since this could lead to contractions. - for _, channel := range openChans { - // If we don't have a rule for the peer, there's no way we have - // an overlap between this peer and the channel. - _, ok := p.PeerRules[channel.PubKeyBytes] - if !ok { - continue - } - - shortID := lnwire.NewShortChanIDFromInt(channel.ChannelID) - _, ok = p.ChannelRules[shortID] - if ok { - log.Debugf("Rules for peer: %v and its channel: %v "+ - "can't both be set", channel.PubKeyBytes, shortID) - - return ErrExclusiveRules - } - } - - for channel, rule := range p.ChannelRules { - if channel.ToUint64() == 0 { - return ErrZeroChannelID - } - - if rule.Type == swap.TypeIn { - return errors.New("channel level rules not supported for " + - "loop in swaps, only peer-level rules allowed") - } - - if err := rule.validate(); err != nil { - return fmt.Errorf("channel: %v has invalid rule: %v", - channel.ToUint64(), err) - } - } - - for peer, rule := range p.PeerRules { - if err := rule.validate(); err != nil { - return fmt.Errorf("peer: %v has invalid rule: %v", - peer, err) - } - } - - // Check that our confirmation target is above our required minimum. - if p.SweepConfTarget < minConfs { - return fmt.Errorf("confirmation target must be at least: %v", - minConfs) - } - - if p.HtlcConfTarget < 1 { - return fmt.Errorf("htlc confirmation target must be > 0") - } - - if err := p.FeeLimit.validate(); err != nil { - return err - } - - if p.AutoFeeBudget < 0 { - return ErrNegativeBudget - } - - if p.MaxAutoInFlight <= 0 { - return ErrZeroInFlight - } - - err := validateRestrictions(server, &p.ClientRestrictions) - if err != nil { - return err - } - return nil -} - -// validateRestrictions checks that client restrictions fall within the server's -// restrictions. -func validateRestrictions(server, client *Restrictions) error { - zeroMin := client.Minimum == 0 - zeroMax := client.Maximum == 0 - - if zeroMin && zeroMax { - return nil - } - - // If we have a non-zero maximum, we need to ensure it is greater than - // our minimum (which is fine if min is zero), and does not exceed the - // server's maximum. - if !zeroMax { - if client.Minimum > client.Maximum { - return ErrMinimumExceedsMaximumAmt - } - - if client.Maximum > server.Maximum { - return ErrMaxExceedsServer - } - } - - if zeroMin { - return nil - } - - // If the client set a minimum, ensure it is at least equal to the - // server's limit. - if client.Minimum < server.Minimum { - return ErrMinLessThanServer - } - - return nil + // PutLiquidityParams writes the serialized `Parameters` into db. + // + // NOTE: the params are encoded using `proto.Marshal` over an RPC + // request. + PutLiquidityParams func(params []byte) error + + // FetchLiquidityParams reads the serialized `Parameters` from db. + // + // NOTE: the params are decoded using `proto.Unmarshal` over a + // serialized RPC request. + FetchLiquidityParams func() ([]byte, error) } // Manager contains a set of desired liquidity rules for our channel @@ -433,6 +218,19 @@ func (m *Manager) Run(ctx context.Context) error { m.cfg.AutoloopTicker.Resume() defer m.cfg.AutoloopTicker.Stop() + // Before we start the main loop, load the params from db. + req, err := m.loadParams() + if err != nil { + return err + } + + // Set the params if there's one. + if req != nil { + if err := m.SetParameters(ctx, req); err != nil { + return err + } + } + for { select { case <-m.cfg.AutoloopTicker.Ticks(): @@ -469,9 +267,35 @@ func (m *Manager) GetParameters() Parameters { return cloneParameters(m.params) } +// SetParameters takes an RPC request and calls the internal method to set +// parameters for the manager. +func (m *Manager) SetParameters(ctx context.Context, + req *clientrpc.LiquidityParameters) error { + + params, err := rpcToParameters(req) + if err != nil { + return err + } + + if err := m.setParameters(ctx, *params); err != nil { + return err + } + + // Save the params on disk. + // + // NOTE: alternatively we can save the bytes in memory and persist them + // on disk during shutdown to save us some IO cost from hitting the db. + // Since setting params is NOT a frequent action, it's should put + // little pressure on our db. Only when performance becomes an issue, + // we can then apply the alternative. + return m.saveParams(req) +} + // SetParameters updates our current set of parameters if the new parameters // provided are valid. -func (m *Manager) SetParameters(ctx context.Context, params Parameters) error { +func (m *Manager) setParameters(ctx context.Context, + params Parameters) error { + restrictions, err := m.cfg.Restrictions(ctx, swap.TypeOut) if err != nil { return err @@ -482,7 +306,9 @@ func (m *Manager) SetParameters(ctx context.Context, params Parameters) error { return err } - err = params.validate(m.cfg.MinimumConfirmations, channels, restrictions) + err = params.validate( + m.cfg.MinimumConfirmations, channels, restrictions, + ) if err != nil { return err } @@ -491,35 +317,47 @@ func (m *Manager) SetParameters(ctx context.Context, params Parameters) error { defer m.paramsLock.Unlock() m.params = cloneParameters(params) + return nil } -// cloneParameters creates a deep clone of a parameters struct so that callers -// cannot mutate our parameters. Although our parameters struct itself is not -// a reference, we still need to clone the contents of maps. -func cloneParameters(params Parameters) Parameters { - paramCopy := params - paramCopy.ChannelRules = make( - map[lnwire.ShortChannelID]*SwapRule, - len(params.ChannelRules), - ) +// saveParams marshals an RPC request and saves it to db. +func (m *Manager) saveParams(req proto.Message) error { + // Marshal the params. + paramsBytes, err := proto.Marshal(req) + if err != nil { + return err + } - for channel, rule := range params.ChannelRules { - ruleCopy := *rule - paramCopy.ChannelRules[channel] = &ruleCopy + // Save the params on disk. + if err := m.cfg.PutLiquidityParams(paramsBytes); err != nil { + return fmt.Errorf("failed to save params: %v", err) } - paramCopy.PeerRules = make( - map[route.Vertex]*SwapRule, - len(params.PeerRules), - ) + return nil +} - for peer, rule := range params.PeerRules { - ruleCopy := *rule - paramCopy.PeerRules[peer] = &ruleCopy +// loadParams unmarshals a serialized RPC request from db and returns the RPC +// request. +func (m *Manager) loadParams() (*clientrpc.LiquidityParameters, error) { + paramsBytes, err := m.cfg.FetchLiquidityParams() + if err != nil { + return nil, fmt.Errorf("failed to read params: %v", err) + } + + // Return early if there's nothing saved. + if paramsBytes == nil { + return nil, nil + } + + // Unmarshal the params. + req := &clientrpc.LiquidityParameters{} + err = proto.Unmarshal(paramsBytes, req) + if err != nil { + return nil, fmt.Errorf("failed to unmarshal params: %v", err) } - return paramCopy + return req, nil } // autoloop gets a set of suggested swaps and dispatches them automatically if diff --git a/liquidity/liquidity_test.go b/liquidity/liquidity_test.go index c1fc41d..76343bf 100644 --- a/liquidity/liquidity_test.go +++ b/liquidity/liquidity_test.go @@ -10,6 +10,7 @@ import ( "github.com/lightninglabs/loop" "github.com/lightninglabs/loop/labels" "github.com/lightninglabs/loop/loopdb" + clientrpc "github.com/lightninglabs/loop/looprpc" "github.com/lightninglabs/loop/swap" "github.com/lightninglabs/loop/test" "github.com/lightningnetwork/lnd/clock" @@ -221,7 +222,7 @@ func TestParameters(t *testing.T) { chanID: originalRule, } - err := manager.SetParameters(context.Background(), expected) + err := manager.setParameters(context.Background(), expected) require.NoError(t, err) // Check that changing the parameters we just set does not mutate @@ -242,62 +243,60 @@ func TestParameters(t *testing.T) { Type: swap.TypeOut, }, } - err = manager.SetParameters(context.Background(), expected) + err = manager.setParameters(context.Background(), expected) require.Equal(t, ErrZeroChannelID, err) } -// TestValidateRestrictions tests validating client restrictions against a set -// of server restrictions. -func TestValidateRestrictions(t *testing.T) { - tests := []struct { - name string - client *Restrictions - server *Restrictions - err error - }{ - { - name: "client invalid", - client: &Restrictions{ - Minimum: 100, - Maximum: 1, - }, - server: testRestrictions, - err: ErrMinimumExceedsMaximumAmt, - }, - { - name: "maximum exceeds server", - client: &Restrictions{ - Maximum: 2000, - }, - server: &Restrictions{ - Minimum: 1000, - Maximum: 1500, - }, - err: ErrMaxExceedsServer, - }, - { - name: "minimum less than server", - client: &Restrictions{ - Minimum: 500, - }, - server: &Restrictions{ - Minimum: 1000, - Maximum: 1500, - }, - err: ErrMinLessThanServer, - }, +// TestPersistParams tests reading and writing of parameters for our manager. +func TestPersistParams(t *testing.T) { + rpcParams := &clientrpc.LiquidityParameters{ + FeePpm: 100, + AutoMaxInFlight: 10, + HtlcConfTarget: 2, } + cfg, _ := newTestConfig() + manager := NewManager(cfg) - for _, testCase := range tests { - testCase := testCase + var paramsBytes []byte - t.Run(testCase.name, func(t *testing.T) { - err := validateRestrictions( - testCase.server, testCase.client, - ) - require.Equal(t, testCase.err, err) - }) + // Mock the read method to return empty data. + manager.cfg.FetchLiquidityParams = func() ([]byte, error) { + return paramsBytes, nil + } + + // Test the nil params is returned. + req, err := manager.loadParams() + require.Nil(t, req) + require.NoError(t, err) + + // Mock the write method to return no error. + manager.cfg.PutLiquidityParams = func(data []byte) error { + paramsBytes = data + return nil } + + // Test save the message. + err = manager.saveParams(rpcParams) + require.NoError(t, err) + + // Test the nil params is returned. + req, err = manager.loadParams() + require.NoError(t, err) + + // Check the specified fields are set as expected. + require.Equal(t, rpcParams.FeePpm, req.FeePpm) + require.Equal(t, rpcParams.AutoMaxInFlight, req.AutoMaxInFlight) + require.Equal(t, rpcParams.HtlcConfTarget, req.HtlcConfTarget) + + // Check the unspecified fields are using empty values. + require.False(t, req.Autoloop) + require.Empty(t, req.Rules) + require.Zero(t, req.AutoloopBudgetSat) + + // Finally, check the loaded request can be used to set params without + // error. + err = manager.SetParameters(context.Background(), req) + require.NoError(t, err) } // TestRestrictedSuggestions tests getting of swap suggestions when we have @@ -1832,7 +1831,7 @@ func testSuggestSwaps(t *testing.T, setup *testSuggestSwapsSetup, // them to use the rules set by the test. manager := NewManager(setup.cfg) - err := manager.SetParameters(context.Background(), setup.params) + err := manager.setParameters(context.Background(), setup.params) require.NoError(t, err) actual, err := manager.SuggestSwaps(context.Background(), false) @@ -2014,7 +2013,7 @@ func TestCurrentTraffic(t *testing.T) { params := m.GetParameters() params.FailureBackOff = backoff - require.NoError(t, m.SetParameters(context.Background(), params)) + require.NoError(t, m.setParameters(context.Background(), params)) actual := m.currentSwapTraffic(testCase.loopOut, testCase.loopIn) require.Equal(t, testCase.expected, actual) diff --git a/liquidity/parameters.go b/liquidity/parameters.go new file mode 100644 index 0000000..35cfb67 --- /dev/null +++ b/liquidity/parameters.go @@ -0,0 +1,423 @@ +package liquidity + +import ( + "errors" + "fmt" + "strings" + "time" + + "github.com/btcsuite/btcd/btcutil" + "github.com/lightninglabs/lndclient" + "github.com/lightninglabs/loop/swap" + "github.com/lightningnetwork/lnd/lnwallet/chainfee" + "github.com/lightningnetwork/lnd/lnwire" + "github.com/lightningnetwork/lnd/routing/route" + + clientrpc "github.com/lightninglabs/loop/looprpc" +) + +var ( + // defaultParameters contains the default parameters that we start our + // liquidity manger with. + defaultParameters = Parameters{ + AutoFeeBudget: defaultBudget, + MaxAutoInFlight: defaultMaxInFlight, + ChannelRules: make(map[lnwire.ShortChannelID]*SwapRule), + PeerRules: make(map[route.Vertex]*SwapRule), + FailureBackOff: defaultFailureBackoff, + SweepConfTarget: defaultConfTarget, + HtlcConfTarget: defaultHtlcConfTarget, + FeeLimit: defaultFeePortion(), + } +) + +// Parameters is a set of parameters provided by the user which guide +// how we assess liquidity. +type Parameters struct { + // Autoloop enables automatic dispatch of swaps. + Autoloop bool + + // AutoFeeBudget is the total amount we allow to be spent on + // automatically dispatched swaps. Once this budget has been used, we + // will stop dispatching swaps until the budget is increased or the + // start date is moved. + AutoFeeBudget btcutil.Amount + + // AutoFeeStartDate is the date from which we will include automatically + // dispatched swaps in our current budget, inclusive. + AutoFeeStartDate time.Time + + // MaxAutoInFlight is the maximum number of in-flight automatically + // dispatched swaps we allow. + MaxAutoInFlight int + + // FailureBackOff is the amount of time that we require passes after a + // channel has been part of a failed loop out swap before we suggest + // using it again. + // TODO(carla): add exponential backoff + FailureBackOff time.Duration + + // SweepConfTarget is the number of blocks we aim to confirm our sweep + // transaction in. This value affects the on chain fees we will pay. + SweepConfTarget int32 + + // HtlcConfTarget is the confirmation target that we use for publishing + // loop in swap htlcs on chain. + HtlcConfTarget int32 + + // FeeLimit controls the fee limit we place on swaps. + FeeLimit FeeLimit + + // ClientRestrictions are the restrictions placed on swap size by the + // client. + ClientRestrictions Restrictions + + // ChannelRules maps a short channel ID to a rule that describes how we + // would like liquidity to be managed. These rules and PeerRules are + // exclusively set to prevent overlap between peer and channel rules. + ChannelRules map[lnwire.ShortChannelID]*SwapRule + + // PeerRules maps a peer's pubkey to a rule that applies to all the + // channels that we have with the peer collectively. These rules and + // ChannelRules are exclusively set to prevent overlap between peer + // and channel rules map to avoid ambiguity. + PeerRules map[route.Vertex]*SwapRule +} + +// String returns the string representation of our parameters. +func (p Parameters) String() string { + ruleList := make([]string, 0, len(p.ChannelRules)+len(p.PeerRules)) + + for channel, rule := range p.ChannelRules { + ruleList = append( + ruleList, fmt.Sprintf("Channel: %v: %v", channel, rule), + ) + } + + for peer, rule := range p.PeerRules { + ruleList = append( + ruleList, fmt.Sprintf("Peer: %v: %v", peer, rule), + ) + } + + return fmt.Sprintf("rules: %v, failure backoff: %v, sweep "+ + "sweep conf target: %v, htlc conf target: %v,fees: %v, "+ + "auto budget: %v, budget start: %v, max auto in flight: %v, "+ + "minimum swap size=%v, maximum swap size=%v", + strings.Join(ruleList, ","), p.FailureBackOff, + p.SweepConfTarget, p.HtlcConfTarget, p.FeeLimit, + p.AutoFeeBudget, p.AutoFeeStartDate, p.MaxAutoInFlight, + p.ClientRestrictions.Minimum, p.ClientRestrictions.Maximum) +} + +// haveRules returns a boolean indicating whether we have any rules configured. +func (p Parameters) haveRules() bool { + if len(p.ChannelRules) != 0 { + return true + } + + if len(p.PeerRules) != 0 { + return true + } + + return false +} + +// validate checks whether a set of parameters is valid. Our set of currently +// open channels are required to check that there is no overlap between the +// rules set on a per-peer level, and those set for specific channels. We can't +// allow both, because then we're trying to cater for two separate liquidity +// goals on the same channel. Since we use short channel ID, we don't need to +// worry about pending channels (users would need to work very hard to get the +// short channel ID for a pending channel). Likewise, we don't care about closed +// channels, since there is no action that may occur on them, and we want to +// allow peer-level rules to be set once a channel which had a specific rule +// has been closed. It takes the minimum confirmations we allow for sweep +// confirmation target as a parameter. +// TODO(carla): prune channels that have been closed from rules. +func (p Parameters) validate(minConfs int32, openChans []lndclient.ChannelInfo, + server *Restrictions) error { + + // First, we check that the rules on a per peer and per channel do not + // overlap, since this could lead to contractions. + for _, channel := range openChans { + // If we don't have a rule for the peer, there's no way we have + // an overlap between this peer and the channel. + _, ok := p.PeerRules[channel.PubKeyBytes] + if !ok { + continue + } + + shortID := lnwire.NewShortChanIDFromInt(channel.ChannelID) + _, ok = p.ChannelRules[shortID] + if ok { + log.Debugf("Rules for peer: %v and its channel: %v "+ + "can't both be set", channel.PubKeyBytes, shortID) + + return ErrExclusiveRules + } + } + + for channel, rule := range p.ChannelRules { + if channel.ToUint64() == 0 { + return ErrZeroChannelID + } + + if rule.Type == swap.TypeIn { + return errors.New("channel level rules not supported for " + + "loop in swaps, only peer-level rules allowed") + } + + if err := rule.validate(); err != nil { + return fmt.Errorf("channel: %v has invalid rule: %v", + channel.ToUint64(), err) + } + } + + for peer, rule := range p.PeerRules { + if err := rule.validate(); err != nil { + return fmt.Errorf("peer: %v has invalid rule: %v", + peer, err) + } + } + + // Check that our confirmation target is above our required minimum. + if p.SweepConfTarget < minConfs { + return fmt.Errorf("confirmation target must be at least: %v", + minConfs) + } + + if p.HtlcConfTarget < 1 { + return fmt.Errorf("htlc confirmation target must be > 0") + } + + if err := p.FeeLimit.validate(); err != nil { + return err + } + + if p.AutoFeeBudget < 0 { + return ErrNegativeBudget + } + + if p.MaxAutoInFlight <= 0 { + return ErrZeroInFlight + } + + err := validateRestrictions(server, &p.ClientRestrictions) + if err != nil { + return err + } + + return nil +} + +// validateRestrictions checks that client restrictions fall within the server's +// restrictions. +func validateRestrictions(server, client *Restrictions) error { + zeroMin := client.Minimum == 0 + zeroMax := client.Maximum == 0 + + if zeroMin && zeroMax { + return nil + } + + // If we have a non-zero maximum, we need to ensure it is greater than + // our minimum (which is fine if min is zero), and does not exceed the + // server's maximum. + if !zeroMax { + if client.Minimum > client.Maximum { + return ErrMinimumExceedsMaximumAmt + } + + if client.Maximum > server.Maximum { + return ErrMaxExceedsServer + } + } + + if zeroMin { + return nil + } + + // If the client set a minimum, ensure it is at least equal to the + // server's limit. + if client.Minimum < server.Minimum { + return ErrMinLessThanServer + } + + return nil +} + +// cloneParameters creates a deep clone of a parameters struct so that callers +// cannot mutate our parameters. Although our parameters struct itself is not +// a reference, we still need to clone the contents of maps. +func cloneParameters(params Parameters) Parameters { + paramCopy := params + paramCopy.ChannelRules = make( + map[lnwire.ShortChannelID]*SwapRule, + len(params.ChannelRules), + ) + + for channel, rule := range params.ChannelRules { + ruleCopy := *rule + paramCopy.ChannelRules[channel] = &ruleCopy + } + + paramCopy.PeerRules = make( + map[route.Vertex]*SwapRule, + len(params.PeerRules), + ) + + for peer, rule := range params.PeerRules { + ruleCopy := *rule + paramCopy.PeerRules[peer] = &ruleCopy + } + + return paramCopy +} + +// rpcToFee converts the values provided over rpc to a fee limit interface, +// failing if an inconsistent set of fields are set. +func rpcToFee(req *clientrpc.LiquidityParameters) (FeeLimit, error) { + // Check which fee limit type we have values set for. If any fields + // relevant to our individual categories are set, we count that type + // as set. + isFeePPM := req.FeePpm != 0 + isCategories := req.MaxSwapFeePpm != 0 || req.MaxRoutingFeePpm != 0 || + req.MaxPrepayRoutingFeePpm != 0 || req.MaxMinerFeeSat != 0 || + req.MaxPrepaySat != 0 || req.SweepFeeRateSatPerVbyte != 0 + + switch { + case isFeePPM && isCategories: + return nil, errors.New("set either fee ppm, or individual " + + "fee categories") + case isFeePPM: + return NewFeePortion(req.FeePpm), nil + + case isCategories: + satPerKVbyte := chainfee.SatPerKVByte( + req.SweepFeeRateSatPerVbyte * 1000, + ) + + return NewFeeCategoryLimit( + req.MaxSwapFeePpm, + req.MaxRoutingFeePpm, + req.MaxPrepayRoutingFeePpm, + btcutil.Amount(req.MaxMinerFeeSat), + btcutil.Amount(req.MaxPrepaySat), + satPerKVbyte.FeePerKWeight(), + ), nil + + default: + return nil, errors.New("no fee categories set") + } +} + +// rpcToRule switches on rpc rule type to convert to our rule interface. +func rpcToRule(rule *clientrpc.LiquidityRule) (*SwapRule, error) { + swapType := swap.TypeOut + if rule.SwapType == clientrpc.SwapType_LOOP_IN { + swapType = swap.TypeIn + } + + switch rule.Type { + case clientrpc.LiquidityRuleType_UNKNOWN: + return nil, fmt.Errorf("rule type field must be set") + + case clientrpc.LiquidityRuleType_THRESHOLD: + return &SwapRule{ + ThresholdRule: NewThresholdRule( + int(rule.IncomingThreshold), + int(rule.OutgoingThreshold), + ), + Type: swapType, + }, nil + + default: + return nil, fmt.Errorf("unknown rule: %T", rule) + } +} + +// rpcToParameters takes a `LiquidityParameters` and creates a `Parameters` +// from it. +func rpcToParameters(req *clientrpc.LiquidityParameters) (*Parameters, + error) { + + feeLimit, err := rpcToFee(req) + if err != nil { + return nil, err + } + + params := &Parameters{ + FeeLimit: feeLimit, + SweepConfTarget: req.SweepConfTarget, + FailureBackOff: time.Duration(req.FailureBackoffSec) * + time.Second, + Autoloop: req.Autoloop, + AutoFeeBudget: btcutil.Amount(req.AutoloopBudgetSat), + MaxAutoInFlight: int(req.AutoMaxInFlight), + ChannelRules: make( + map[lnwire.ShortChannelID]*SwapRule, + ), + PeerRules: make( + map[route.Vertex]*SwapRule, + ), + ClientRestrictions: Restrictions{ + Minimum: btcutil.Amount(req.MinSwapAmount), + Maximum: btcutil.Amount(req.MaxSwapAmount), + }, + HtlcConfTarget: req.HtlcConfTarget, + } + + // Zero unix time is different to zero golang time. + if req.AutoloopBudgetStartSec != 0 { + params.AutoFeeStartDate = time.Unix( + int64(req.AutoloopBudgetStartSec), 0, + ) + } + + for _, rule := range req.Rules { + peerRule := rule.Pubkey != nil + chanRule := rule.ChannelId != 0 + + liquidityRule, err := rpcToRule(rule) + if err != nil { + return nil, err + } + + switch { + case peerRule && chanRule: + return nil, fmt.Errorf("cannot set channel: %v and "+ + "peer: %v fields in rule", rule.ChannelId, + rule.Pubkey) + + case peerRule: + pubkey, err := route.NewVertexFromBytes(rule.Pubkey) + if err != nil { + return nil, err + } + + if _, ok := params.PeerRules[pubkey]; ok { + return nil, fmt.Errorf("multiple rules set "+ + "for peer: %v", pubkey) + } + + params.PeerRules[pubkey] = liquidityRule + + case chanRule: + shortID := lnwire.NewShortChanIDFromInt(rule.ChannelId) + + if _, ok := params.ChannelRules[shortID]; ok { + return nil, fmt.Errorf("multiple rules set "+ + "for channel: %v", shortID) + } + + params.ChannelRules[shortID] = liquidityRule + + default: + return nil, errors.New("please set channel id or " + + "pubkey for rule") + } + } + + return params, nil +} diff --git a/liquidity/parameters_test.go b/liquidity/parameters_test.go new file mode 100644 index 0000000..46faa3b --- /dev/null +++ b/liquidity/parameters_test.go @@ -0,0 +1,61 @@ +package liquidity + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +// TestValidateRestrictions tests validating client restrictions against a set +// of server restrictions. +func TestValidateRestrictions(t *testing.T) { + tests := []struct { + name string + client *Restrictions + server *Restrictions + err error + }{ + { + name: "client invalid", + client: &Restrictions{ + Minimum: 100, + Maximum: 1, + }, + server: testRestrictions, + err: ErrMinimumExceedsMaximumAmt, + }, + { + name: "maximum exceeds server", + client: &Restrictions{ + Maximum: 2000, + }, + server: &Restrictions{ + Minimum: 1000, + Maximum: 1500, + }, + err: ErrMaxExceedsServer, + }, + { + name: "minimum less than server", + client: &Restrictions{ + Minimum: 500, + }, + server: &Restrictions{ + Minimum: 1000, + Maximum: 1500, + }, + err: ErrMinLessThanServer, + }, + } + + for _, testCase := range tests { + testCase := testCase + + t.Run(testCase.name, func(t *testing.T) { + err := validateRestrictions( + testCase.server, testCase.client, + ) + require.Equal(t, testCase.err, err) + }) + } +} diff --git a/loopd/swapclient_server.go b/loopd/swapclient_server.go index 3e643ad..564d84a 100644 --- a/loopd/swapclient_server.go +++ b/loopd/swapclient_server.go @@ -21,8 +21,6 @@ import ( "github.com/lightninglabs/loop/swap" looprpc "github.com/lightninglabs/loop/swapserverrpc" "github.com/lightningnetwork/lnd/lntypes" - "github.com/lightningnetwork/lnd/lnwallet/chainfee" - "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/queue" "github.com/lightningnetwork/lnd/routing/route" "github.com/lightningnetwork/lnd/zpay32" @@ -806,154 +804,14 @@ func (s *swapClientServer) SetLiquidityParams(ctx context.Context, in *clientrpc.SetLiquidityParamsRequest) (*clientrpc.SetLiquidityParamsResponse, error) { - feeLimit, err := rpcToFee(in.Parameters) + err := s.liquidityMgr.SetParameters(ctx, in.Parameters) if err != nil { return nil, err } - params := liquidity.Parameters{ - FeeLimit: feeLimit, - SweepConfTarget: in.Parameters.SweepConfTarget, - FailureBackOff: time.Duration(in.Parameters.FailureBackoffSec) * - time.Second, - Autoloop: in.Parameters.Autoloop, - AutoFeeBudget: btcutil.Amount(in.Parameters.AutoloopBudgetSat), - MaxAutoInFlight: int(in.Parameters.AutoMaxInFlight), - ChannelRules: make( - map[lnwire.ShortChannelID]*liquidity.SwapRule, - ), - PeerRules: make( - map[route.Vertex]*liquidity.SwapRule, - ), - ClientRestrictions: liquidity.Restrictions{ - Minimum: btcutil.Amount(in.Parameters.MinSwapAmount), - Maximum: btcutil.Amount(in.Parameters.MaxSwapAmount), - }, - HtlcConfTarget: in.Parameters.HtlcConfTarget, - } - - // Zero unix time is different to zero golang time. - if in.Parameters.AutoloopBudgetStartSec != 0 { - params.AutoFeeStartDate = time.Unix( - int64(in.Parameters.AutoloopBudgetStartSec), 0, - ) - } - - for _, rule := range in.Parameters.Rules { - peerRule := rule.Pubkey != nil - chanRule := rule.ChannelId != 0 - - liquidityRule, err := rpcToRule(rule) - if err != nil { - return nil, err - } - - switch { - case peerRule && chanRule: - return nil, fmt.Errorf("cannot set channel: %v and "+ - "peer: %v fields in rule", rule.ChannelId, - rule.Pubkey) - - case peerRule: - pubkey, err := route.NewVertexFromBytes(rule.Pubkey) - if err != nil { - return nil, err - } - - if _, ok := params.PeerRules[pubkey]; ok { - return nil, fmt.Errorf("multiple rules set "+ - "for peer: %v", pubkey) - } - - params.PeerRules[pubkey] = liquidityRule - - case chanRule: - shortID := lnwire.NewShortChanIDFromInt(rule.ChannelId) - - if _, ok := params.ChannelRules[shortID]; ok { - return nil, fmt.Errorf("multiple rules set "+ - "for channel: %v", shortID) - } - - params.ChannelRules[shortID] = liquidityRule - - default: - return nil, errors.New("please set channel id or " + - "pubkey for rule") - } - } - - if err := s.liquidityMgr.SetParameters(ctx, params); err != nil { - return nil, err - } - return &clientrpc.SetLiquidityParamsResponse{}, nil } -// rpcToFee converts the values provided over rpc to a fee limit interface, -// failing if an inconsistent set of fields are set. -func rpcToFee(req *clientrpc.LiquidityParameters) (liquidity.FeeLimit, - error) { - - // Check which fee limit type we have values set for. If any fields - // relevant to our individual categories are set, we count that type - // as set. - isFeePPM := req.FeePpm != 0 - isCategories := req.MaxSwapFeePpm != 0 || req.MaxRoutingFeePpm != 0 || - req.MaxPrepayRoutingFeePpm != 0 || req.MaxMinerFeeSat != 0 || - req.MaxPrepaySat != 0 || req.SweepFeeRateSatPerVbyte != 0 - - switch { - case isFeePPM && isCategories: - return nil, errors.New("set either fee ppm, or individual " + - "fee categories") - case isFeePPM: - return liquidity.NewFeePortion(req.FeePpm), nil - - case isCategories: - satPerVbyte := chainfee.SatPerKVByte( - req.SweepFeeRateSatPerVbyte * 1000, - ) - - return liquidity.NewFeeCategoryLimit( - req.MaxSwapFeePpm, - req.MaxRoutingFeePpm, - req.MaxPrepayRoutingFeePpm, - btcutil.Amount(req.MaxMinerFeeSat), - btcutil.Amount(req.MaxPrepaySat), - satPerVbyte.FeePerKWeight(), - ), nil - - default: - return nil, errors.New("no fee categories set") - } -} - -// rpcToRule switches on rpc rule type to convert to our rule interface. -func rpcToRule(rule *clientrpc.LiquidityRule) (*liquidity.SwapRule, error) { - swapType := swap.TypeOut - if rule.SwapType == clientrpc.SwapType_LOOP_IN { - swapType = swap.TypeIn - } - - switch rule.Type { - case clientrpc.LiquidityRuleType_UNKNOWN: - return nil, fmt.Errorf("rule type field must be set") - - case clientrpc.LiquidityRuleType_THRESHOLD: - return &liquidity.SwapRule{ - ThresholdRule: liquidity.NewThresholdRule( - int(rule.IncomingThreshold), - int(rule.OutgoingThreshold), - ), - Type: swapType, - }, nil - - default: - return nil, fmt.Errorf("unknown rule: %T", rule) - } -} - // SuggestSwaps provides a list of suggested swaps based on lnd's current // channel balances and rules set by the liquidity manager. func (s *swapClientServer) SuggestSwaps(ctx context.Context, diff --git a/loopd/utils.go b/loopd/utils.go index ebf349a..d0f827a 100644 --- a/loopd/utils.go +++ b/loopd/utils.go @@ -72,6 +72,8 @@ func getLiquidityManager(client *loop.Client) *liquidity.Manager { ListLoopOut: client.Store.FetchLoopOutSwaps, ListLoopIn: client.Store.FetchLoopInSwaps, MinimumConfirmations: minConfTarget, + PutLiquidityParams: client.Store.PutLiquidityParams, + FetchLiquidityParams: client.Store.FetchLiquidityParams, } return liquidity.NewManager(mngrCfg) diff --git a/loopdb/interface.go b/loopdb/interface.go index 86dd853..41172da 100644 --- a/loopdb/interface.go +++ b/loopdb/interface.go @@ -33,6 +33,20 @@ type SwapStore interface { UpdateLoopIn(hash lntypes.Hash, time time.Time, state SwapStateData) error + // PutLiquidityParams writes the serialized `manager.Parameters` bytes + // into the bucket. + // + // NOTE: it's the caller's responsibility to encode the param. Atm, + // it's encoding using the proto package's `Marshal` method. + PutLiquidityParams(params []byte) error + + // FetchLiquidityParams reads the serialized `manager.Parameters` bytes + // from the bucket. + // + // NOTE: it's the caller's responsibility to decode the param. Atm, + // it's decoding using the proto package's `Unmarshal` method. + FetchLiquidityParams() ([]byte, error) + // Close closes the underlying database. Close() error } diff --git a/loopdb/store.go b/loopdb/store.go index 420a00c..67fdcd6 100644 --- a/loopdb/store.go +++ b/loopdb/store.go @@ -94,6 +94,14 @@ var ( // value: uint32 confirmation value confirmationsKey = []byte("confirmations") + // liquidtyBucket is a root bucket used to save liquidity manager + // related info. + liquidityBucket = []byte("liquidity") + + // liquidtyParamsKey specifies the key used to store the liquidity + // parameters. + liquidtyParamsKey = []byte("params") + byteOrder = binary.BigEndian keyLength = 33 @@ -190,6 +198,12 @@ func NewBoltSwapStore(dbPath string, chainParams *chaincfg.Params) ( return err } + // Create liquidity manager's bucket. + _, err = tx.CreateBucketIfNotExists(liquidityBucket) + if err != nil { + return err + } + return nil }) if err != nil { @@ -712,3 +726,41 @@ func (s *boltSwapStore) UpdateLoopIn(hash lntypes.Hash, time time.Time, func (s *boltSwapStore) Close() error { return s.db.Close() } + +// PutLiquidityParams writes the serialized `manager.Parameters` bytes into the +// bucket. +// +// NOTE: it's the caller's responsibility to encode the param. Atm, it's +// encoding using the proto package's `Marshal` method. +func (s *boltSwapStore) PutLiquidityParams(params []byte) error { + return s.db.Update(func(tx *bbolt.Tx) error { + // Read the root bucket. + rootBucket := tx.Bucket(liquidityBucket) + if rootBucket == nil { + return errors.New("liquidity bucket does not exist") + } + return rootBucket.Put(liquidtyParamsKey, params) + }) +} + +// FetchLiquidityParams reads the serialized `manager.Parameters` bytes from +// the bucket. +// +// NOTE: it's the caller's responsibility to decode the param. Atm, it's +// decoding using the proto package's `Unmarshal` method. +func (s *boltSwapStore) FetchLiquidityParams() ([]byte, error) { + var params []byte + + err := s.db.View(func(tx *bbolt.Tx) error { + // Read the root bucket. + rootBucket := tx.Bucket(liquidityBucket) + if rootBucket == nil { + return errors.New("liquidity bucket does not exist") + } + + params = rootBucket.Get(liquidtyParamsKey) + return nil + }) + + return params, err +} diff --git a/loopdb/store_test.go b/loopdb/store_test.go index 3f7e11b..ea349d6 100644 --- a/loopdb/store_test.go +++ b/loopdb/store_test.go @@ -471,3 +471,31 @@ func TestLegacyOutgoingChannel(t *testing.T) { t.Fatal("invalid outgoing channel") } } + +// TestLiquidityParams checks that reading and writing to liquidty bucket are +// as expected. +func TestLiquidityParams(t *testing.T) { + tempDirName, err := ioutil.TempDir("", "clientstore") + require.NoError(t, err, "failed to db") + defer os.RemoveAll(tempDirName) + + store, err := NewBoltSwapStore(tempDirName, &chaincfg.MainNetParams) + require.NoError(t, err, "failed to create store") + + // Test when there's no params saved before, an empty bytes is + // returned. + params, err := store.FetchLiquidityParams() + require.NoError(t, err, "failed to fetch params") + require.Empty(t, params, "expect empty bytes") + + params = []byte("test") + + // Test we can save the params. + err = store.PutLiquidityParams(params) + require.NoError(t, err, "failed to put params") + + // Now fetch the db again should return the above saved bytes. + paramsRead, err := store.FetchLiquidityParams() + require.NoError(t, err, "failed to fetch params") + require.Equal(t, params, paramsRead, "unexpected return value") +} diff --git a/release_notes.md b/release_notes.md index c71d135..d9d4564 100644 --- a/release_notes.md +++ b/release_notes.md @@ -16,6 +16,9 @@ This file tracks release notes for the loop client. #### New Features +* User-specified liquidity parameters are persisted on disk to enable the + liquidity manager's config to surive after a restart. + #### Breaking Changes #### Bug Fixes diff --git a/store_mock_test.go b/store_mock_test.go index 77f36c3..a366fcd 100644 --- a/store_mock_test.go +++ b/store_mock_test.go @@ -171,6 +171,22 @@ func (s *storeMock) UpdateLoopIn(hash lntypes.Hash, time time.Time, return nil } +// PutLiquidityParams writes the serialized `manager.Parameters` bytes into the +// bucket. +// +// NOTE: Part of the loopdb.SwapStore interface. +func (s *storeMock) PutLiquidityParams(params []byte) error { + return nil +} + +// FetchLiquidityParams reads the serialized `manager.Parameters` bytes from +// the bucket. +// +// NOTE: Part of the loopdb.SwapStore interface. +func (s *storeMock) FetchLiquidityParams() ([]byte, error) { + return nil, nil +} + func (s *storeMock) Close() error { return nil } diff --git a/version.go b/version.go index c79f9ec..2768ed4 100644 --- a/version.go +++ b/version.go @@ -26,7 +26,7 @@ const semanticAlphabet = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqr const ( // Note: please update release_notes.md when you change these values. appMajor uint = 0 - appMinor uint = 18 + appMinor uint = 19 appPatch uint = 0 // appPreRelease MUST only contain characters from semanticAlphabet per