loopd+liquidity: persist parameters on disk

This commit saves the RPC request used to construct the `Parameters` on
disk. Since it's a proto message, an easy way to read/write it is to
rely on the proto marshal/unmarshal methods. A side effect is that
migration also becomes easy as proto message have its own internal
mechanism to keep track of the compatibility.
pull/493/head
yyforyongyu 2 years ago
parent 26edd21889
commit 425a007aaf
No known key found for this signature in database
GPG Key ID: 9BCD95C4FF296868

@ -223,9 +223,12 @@ func setRule(ctx *cli.Context) error {
}
var setParamsCommand = cli.Command{
Name: "setparams",
Usage: "update the parameters set for the liquidity manager",
Description: "Updates the parameters set for the liquidity manager.",
Name: "setparams",
Usage: "update the parameters set for the liquidity manager",
Description: "Updates the parameters set for the liquidity manager. " +
"Note the parameters are persisted in db to save the trouble " +
"of setting them again upon loopd restart. To get the " +
"default values, use `getparams` before any `setparams`.",
Flags: []cli.Flag{
cli.IntFlag{
Name: "sweeplimit",

@ -166,6 +166,12 @@ func newAutoloopTestCtx(t *testing.T, parameters Parameters,
MinimumConfirmations: loop.DefaultSweepConfTarget,
Lnd: &testCtx.lnd.LndServices,
Clock: testCtx.testClock,
PutLiquidityParams: func(_ []byte) error {
return nil
},
FetchLiquidityParams: func() ([]byte, error) {
return nil, nil
},
}
// SetParameters needs to make a call to our mocked restrictions call,

@ -52,6 +52,7 @@ import (
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/routing/route"
"github.com/lightningnetwork/lnd/ticker"
"google.golang.org/protobuf/proto"
clientrpc "github.com/lightninglabs/loop/looprpc"
)
@ -181,6 +182,18 @@ type Config struct {
// MinimumConfirmations is the minimum number of confirmations we allow
// setting for sweep target.
MinimumConfirmations int32
// PutLiquidityParams writes the serialized `Parameters` into db.
//
// NOTE: the params are encoded using `proto.Marshal` over an RPC
// request.
PutLiquidityParams func(params []byte) error
// FetchLiquidityParams reads the serialized `Parameters` from db.
//
// NOTE: the params are decoded using `proto.Unmarshal` over a
// serialized RPC request.
FetchLiquidityParams func() ([]byte, error)
}
// Manager contains a set of desired liquidity rules for our channel
@ -205,6 +218,19 @@ func (m *Manager) Run(ctx context.Context) error {
m.cfg.AutoloopTicker.Resume()
defer m.cfg.AutoloopTicker.Stop()
// Before we start the main loop, load the params from db.
req, err := m.loadParams()
if err != nil {
return err
}
// Set the params if there's one.
if req != nil {
if err := m.SetParameters(ctx, req); err != nil {
return err
}
}
for {
select {
case <-m.cfg.AutoloopTicker.Ticks():
@ -251,7 +277,18 @@ func (m *Manager) SetParameters(ctx context.Context,
return err
}
return m.setParameters(ctx, *params)
if err := m.setParameters(ctx, *params); err != nil {
return err
}
// Save the params on disk.
//
// NOTE: alternatively we can save the bytes in memory and persist them
// on disk during shutdown to save us some IO cost from hitting the db.
// Since setting params is NOT a frequent action, it's should put
// little pressure on our db. Only when performance becomes an issue,
// we can then apply the alternative.
return m.saveParams(req)
}
// SetParameters updates our current set of parameters if the new parameters
@ -280,9 +317,49 @@ func (m *Manager) setParameters(ctx context.Context,
defer m.paramsLock.Unlock()
m.params = cloneParameters(params)
return nil
}
// saveParams marshals an RPC request and saves it to db.
func (m *Manager) saveParams(req proto.Message) error {
// Marshal the params.
paramsBytes, err := proto.Marshal(req)
if err != nil {
return err
}
// Save the params on disk.
if err := m.cfg.PutLiquidityParams(paramsBytes); err != nil {
return fmt.Errorf("failed to save params: %v", err)
}
return nil
}
// loadParams unmarshals a serialized RPC request from db and returns the RPC
// request.
func (m *Manager) loadParams() (*clientrpc.LiquidityParameters, error) {
paramsBytes, err := m.cfg.FetchLiquidityParams()
if err != nil {
return nil, fmt.Errorf("failed to read params: %v", err)
}
// Return early if there's nothing saved.
if paramsBytes == nil {
return nil, nil
}
// Unmarshal the params.
req := &clientrpc.LiquidityParameters{}
err = proto.Unmarshal(paramsBytes, req)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal params: %v", err)
}
return req, nil
}
// autoloop gets a set of suggested swaps and dispatches them automatically if
// we have automated looping enabled.
func (m *Manager) autoloop(ctx context.Context) error {

@ -10,6 +10,7 @@ import (
"github.com/lightninglabs/loop"
"github.com/lightninglabs/loop/labels"
"github.com/lightninglabs/loop/loopdb"
clientrpc "github.com/lightninglabs/loop/looprpc"
"github.com/lightninglabs/loop/swap"
"github.com/lightninglabs/loop/test"
"github.com/lightningnetwork/lnd/clock"
@ -246,6 +247,58 @@ func TestParameters(t *testing.T) {
require.Equal(t, ErrZeroChannelID, err)
}
// TestPersistParams tests reading and writing of parameters for our manager.
func TestPersistParams(t *testing.T) {
rpcParams := &clientrpc.LiquidityParameters{
FeePpm: 100,
AutoMaxInFlight: 10,
HtlcConfTarget: 2,
}
cfg, _ := newTestConfig()
manager := NewManager(cfg)
var paramsBytes []byte
// Mock the read method to return empty data.
manager.cfg.FetchLiquidityParams = func() ([]byte, error) {
return paramsBytes, nil
}
// Test the nil params is returned.
req, err := manager.loadParams()
require.Nil(t, req)
require.NoError(t, err)
// Mock the write method to return no error.
manager.cfg.PutLiquidityParams = func(data []byte) error {
paramsBytes = data
return nil
}
// Test save the message.
err = manager.saveParams(rpcParams)
require.NoError(t, err)
// Test the nil params is returned.
req, err = manager.loadParams()
require.NoError(t, err)
// Check the specified fields are set as expected.
require.Equal(t, rpcParams.FeePpm, req.FeePpm)
require.Equal(t, rpcParams.AutoMaxInFlight, req.AutoMaxInFlight)
require.Equal(t, rpcParams.HtlcConfTarget, req.HtlcConfTarget)
// Check the unspecified fields are using empty values.
require.False(t, req.Autoloop)
require.Empty(t, req.Rules)
require.Zero(t, req.AutoloopBudgetSat)
// Finally, check the loaded request can be used to set params without
// error.
err = manager.SetParameters(context.Background(), req)
require.NoError(t, err)
}
// TestRestrictedSuggestions tests getting of swap suggestions when we have
// other in-flight swaps. We setup our manager with a set of channels and rules
// that require a loop out swap, focusing on the filtering our of channels that

@ -72,6 +72,8 @@ func getLiquidityManager(client *loop.Client) *liquidity.Manager {
ListLoopOut: client.Store.FetchLoopOutSwaps,
ListLoopIn: client.Store.FetchLoopInSwaps,
MinimumConfirmations: minConfTarget,
PutLiquidityParams: client.Store.PutLiquidityParams,
FetchLiquidityParams: client.Store.FetchLiquidityParams,
}
return liquidity.NewManager(mngrCfg)

Loading…
Cancel
Save