From 7cafbe957d380ea30fcf92f03c9534ffc51eb24b Mon Sep 17 00:00:00 2001 From: sputn1ck Date: Wed, 25 Oct 2023 23:32:28 +0200 Subject: [PATCH] loopd: add instantout handling --- loopd/daemon.go | 80 ++++++++++++++++++++++++++++++++++---- loopd/log.go | 4 ++ loopd/perms/perms.go | 6 ++- loopd/swapclient_server.go | 44 ++++++++++++++++++++- loopd/utils.go | 6 +++ 5 files changed, 130 insertions(+), 10 deletions(-) diff --git a/loopd/daemon.go b/loopd/daemon.go index 747780d..fa30b11 100644 --- a/loopd/daemon.go +++ b/loopd/daemon.go @@ -16,6 +16,7 @@ import ( proxy "github.com/grpc-ecosystem/grpc-gateway/v2/runtime" "github.com/lightninglabs/lndclient" "github.com/lightninglabs/loop" + "github.com/lightninglabs/loop/instantout" "github.com/lightninglabs/loop/loopd/perms" "github.com/lightninglabs/loop/loopdb" "github.com/lightninglabs/loop/sweepbatcher" @@ -24,6 +25,7 @@ import ( loop_looprpc "github.com/lightninglabs/loop/looprpc" loop_swaprpc "github.com/lightninglabs/loop/swapserverrpc" + "github.com/lightningnetwork/lnd/clock" "github.com/lightningnetwork/lnd/lntypes" "github.com/lightningnetwork/lnd/macaroons" "google.golang.org/grpc" @@ -67,10 +69,6 @@ type Daemon struct { // same process. swapClientServer - // reservationManager is the manager that handles all reservation state - // machines. - reservationManager *reservation.Manager - // ErrChan is an error channel that users of the Daemon struct must use // to detect runtime errors and also whether a shutdown is fully // completed. @@ -429,6 +427,11 @@ func (d *Daemon) initialize(withMacaroonService bool) error { swapClient.Conn, ) + // Create an instantout server client. + instantOutClient := loop_swaprpc.NewInstantSwapServerClient( + swapClient.Conn, + ) + // Both the client RPC server and the swap server client should stop // on main context cancel. So we create it early and pass it down. d.mainCtx, d.mainCtxCancel = context.WithCancel(context.Background()) @@ -486,7 +489,11 @@ func (d *Daemon) initialize(withMacaroonService bool) error { } } - // Create the reservation rpc server. + var ( + reservationManager *reservation.Manager + instantOutManager *instantout.Manager + ) + // Create the reservation and instantout managers. if d.cfg.EnableExperimental { reservationStore := reservation.NewSQLStore(baseDb) reservationConfig := &reservation.Config{ @@ -497,9 +504,30 @@ func (d *Daemon) initialize(withMacaroonService bool) error { FetchL402: swapClient.Server.FetchL402, } - d.reservationManager = reservation.NewManager( + reservationManager = reservation.NewManager( reservationConfig, ) + + // Create the instantout services. + instantOutStore := instantout.NewSQLStore( + baseDb, clock.NewDefaultClock(), reservationStore, + d.lnd.ChainParams, + ) + instantOutConfig := &instantout.Config{ + Store: instantOutStore, + LndClient: d.lnd.Client, + RouterClient: d.lnd.Router, + ChainNotifier: d.lnd.ChainNotifier, + Signer: d.lnd.Signer, + Wallet: d.lnd.WalletKit, + ReservationManager: reservationManager, + InstantOutClient: instantOutClient, + Network: d.lnd.ChainParams, + } + + instantOutManager = instantout.NewInstantOutManager( + instantOutConfig, + ) } // Now finally fully initialize the swap client RPC server instance. @@ -513,7 +541,8 @@ func (d *Daemon) initialize(withMacaroonService bool) error { subscribers: make(map[int]chan<- interface{}), statusChan: make(chan loop.SwapInfo), mainCtx: d.mainCtx, - reservationManager: d.reservationManager, + reservationManager: reservationManager, + instantOutManager: instantOutManager, } // Retrieve all currently existing swaps from the database. @@ -606,6 +635,43 @@ func (d *Daemon) initialize(withMacaroonService bool) error { }() } + // Start the instant out manager. + if d.instantOutManager != nil { + d.wg.Add(1) + initChan := make(chan struct{}) + go func() { + defer d.wg.Done() + + getInfo, err := d.lnd.Client.GetInfo(d.mainCtx) + if err != nil { + d.internalErrChan <- err + return + } + + log.Info("Starting instantout manager") + defer log.Info("Instantout manager stopped") + + err = d.instantOutManager.Run( + d.mainCtx, initChan, int32(getInfo.BlockHeight), + ) + if err != nil && !errors.Is(err, context.Canceled) { + d.internalErrChan <- err + } + }() + + // Wait for the instantout server to be ready before starting the + // grpc server. + timeOutCtx, cancel := context.WithTimeout(d.mainCtx, 10*time.Second) + select { + case <-timeOutCtx.Done(): + cancel() + return fmt.Errorf("reservation server not ready: %v", + timeOutCtx.Err()) + case <-initChan: + cancel() + } + } + // Last, start our internal error handler. This will return exactly one // error or nil on the main error channel to inform the caller that // something went wrong or that shutdown is complete. We don't add to diff --git a/loopd/log.go b/loopd/log.go index 873a385..489fa98 100644 --- a/loopd/log.go +++ b/loopd/log.go @@ -6,6 +6,7 @@ import ( "github.com/lightninglabs/lndclient" "github.com/lightninglabs/loop" "github.com/lightninglabs/loop/fsm" + "github.com/lightninglabs/loop/instantout" "github.com/lightninglabs/loop/instantout/reservation" "github.com/lightninglabs/loop/liquidity" "github.com/lightninglabs/loop/loopdb" @@ -44,6 +45,9 @@ func SetupLoggers(root *build.RotatingLogWriter, intercept signal.Interceptor) { lnd.AddSubLogger( root, reservation.Subsystem, intercept, reservation.UseLogger, ) + lnd.AddSubLogger( + root, instantout.Subsystem, intercept, instantout.UseLogger, + ) } // genSubLogger creates a logger for a subsystem. We provide an instance of diff --git a/loopd/perms/perms.go b/loopd/perms/perms.go index 6e2d078..7ac85a2 100644 --- a/loopd/perms/perms.go +++ b/loopd/perms/perms.go @@ -97,7 +97,11 @@ var RequiredPermissions = map[string][]bakery.Op{ Action: "in", }}, "/looprpc.SwapClient/ListReservations": {{ - Entity: "reservation", + Entity: "swap", Action: "read", }}, + "/looprpc.SwapClient/InstantOut": {{ + Entity: "swap", + Action: "execute", + }}, } diff --git a/loopd/swapclient_server.go b/loopd/swapclient_server.go index 05f9156..7cec2ad 100644 --- a/loopd/swapclient_server.go +++ b/loopd/swapclient_server.go @@ -18,6 +18,7 @@ import ( "github.com/lightninglabs/aperture/lsat" "github.com/lightninglabs/lndclient" "github.com/lightninglabs/loop" + "github.com/lightninglabs/loop/instantout" "github.com/lightninglabs/loop/instantout/reservation" "github.com/lightninglabs/loop/labels" "github.com/lightninglabs/loop/liquidity" @@ -81,6 +82,7 @@ type swapClientServer struct { liquidityMgr *liquidity.Manager lnd *lndclient.LndServices reservationManager *reservation.Manager + instantOutManager *instantout.Manager swaps map[lntypes.Hash]loop.SwapInfo subscribers map[int]chan<- interface{} statusChan chan loop.SwapInfo @@ -1169,6 +1171,44 @@ func (s *swapClientServer) ListReservations(ctx context.Context, }, nil } +// InstantOut initiates an instant out swap. +func (s *swapClientServer) InstantOut(ctx context.Context, + req *clientrpc.InstantOutRequest) (*clientrpc.InstantOutResponse, + error) { + + reservationIds := make([]reservation.ID, len(req.ReservationIds)) + for i, id := range req.ReservationIds { + if len(id) != reservation.IdLength { + return nil, fmt.Errorf("invalid reservation id: "+ + "expected %v bytes, got %d", + reservation.IdLength, len(id)) + } + + var resId reservation.ID + copy(resId[:], id) + + reservationIds[i] = resId + } + + instantOutFsm, err := s.instantOutManager.NewInstantOut( + ctx, reservationIds, + ) + if err != nil { + return nil, err + } + + res := &clientrpc.InstantOutResponse{ + InstantOutHash: instantOutFsm.InstantOut.SwapHash[:], + State: string(instantOutFsm.InstantOut.State), + } + + if instantOutFsm.InstantOut.SweepTxHash != nil { + res.SweepTxId = instantOutFsm.InstantOut.SweepTxHash.String() + } + + return res, nil +} + func rpcAutoloopReason(reason liquidity.Reason) (clientrpc.AutoReason, error) { switch reason { case liquidity.ReasonNone: @@ -1448,11 +1488,11 @@ func toClientReservation( res *reservation.Reservation) *clientrpc.ClientReservation { var ( - txid []byte + txid string vout uint32 ) if res.Outpoint != nil { - txid = res.Outpoint.Hash[:] + txid = res.Outpoint.Hash.String() vout = res.Outpoint.Index } diff --git a/loopd/utils.go b/loopd/utils.go index ef763e2..acb22f4 100644 --- a/loopd/utils.go +++ b/loopd/utils.go @@ -59,6 +59,9 @@ func openDatabase(cfg *Config, chainParams *chaincfg.Params) (loopdb.SwapStore, db, err = loopdb.NewSqliteStore( cfg.Sqlite, chainParams, ) + if err != nil { + return nil, nil, err + } baseDb = *db.(*loopdb.SqliteSwapStore).BaseDB case DatabaseBackendPostgres: @@ -67,6 +70,9 @@ func openDatabase(cfg *Config, chainParams *chaincfg.Params) (loopdb.SwapStore, db, err = loopdb.NewPostgresStore( cfg.Postgres, chainParams, ) + if err != nil { + return nil, nil, err + } baseDb = *db.(*loopdb.PostgresStore).BaseDB default: