loopd: add instantout handling

pull/651/head
sputn1ck 6 months ago
parent 6c07f88458
commit 7cafbe957d
No known key found for this signature in database
GPG Key ID: 671103D881A5F0E4

@ -16,6 +16,7 @@ import (
proxy "github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
"github.com/lightninglabs/lndclient"
"github.com/lightninglabs/loop"
"github.com/lightninglabs/loop/instantout"
"github.com/lightninglabs/loop/loopd/perms"
"github.com/lightninglabs/loop/loopdb"
"github.com/lightninglabs/loop/sweepbatcher"
@ -24,6 +25,7 @@ import (
loop_looprpc "github.com/lightninglabs/loop/looprpc"
loop_swaprpc "github.com/lightninglabs/loop/swapserverrpc"
"github.com/lightningnetwork/lnd/clock"
"github.com/lightningnetwork/lnd/lntypes"
"github.com/lightningnetwork/lnd/macaroons"
"google.golang.org/grpc"
@ -67,10 +69,6 @@ type Daemon struct {
// same process.
swapClientServer
// reservationManager is the manager that handles all reservation state
// machines.
reservationManager *reservation.Manager
// ErrChan is an error channel that users of the Daemon struct must use
// to detect runtime errors and also whether a shutdown is fully
// completed.
@ -429,6 +427,11 @@ func (d *Daemon) initialize(withMacaroonService bool) error {
swapClient.Conn,
)
// Create an instantout server client.
instantOutClient := loop_swaprpc.NewInstantSwapServerClient(
swapClient.Conn,
)
// Both the client RPC server and the swap server client should stop
// on main context cancel. So we create it early and pass it down.
d.mainCtx, d.mainCtxCancel = context.WithCancel(context.Background())
@ -486,7 +489,11 @@ func (d *Daemon) initialize(withMacaroonService bool) error {
}
}
// Create the reservation rpc server.
var (
reservationManager *reservation.Manager
instantOutManager *instantout.Manager
)
// Create the reservation and instantout managers.
if d.cfg.EnableExperimental {
reservationStore := reservation.NewSQLStore(baseDb)
reservationConfig := &reservation.Config{
@ -497,9 +504,30 @@ func (d *Daemon) initialize(withMacaroonService bool) error {
FetchL402: swapClient.Server.FetchL402,
}
d.reservationManager = reservation.NewManager(
reservationManager = reservation.NewManager(
reservationConfig,
)
// Create the instantout services.
instantOutStore := instantout.NewSQLStore(
baseDb, clock.NewDefaultClock(), reservationStore,
d.lnd.ChainParams,
)
instantOutConfig := &instantout.Config{
Store: instantOutStore,
LndClient: d.lnd.Client,
RouterClient: d.lnd.Router,
ChainNotifier: d.lnd.ChainNotifier,
Signer: d.lnd.Signer,
Wallet: d.lnd.WalletKit,
ReservationManager: reservationManager,
InstantOutClient: instantOutClient,
Network: d.lnd.ChainParams,
}
instantOutManager = instantout.NewInstantOutManager(
instantOutConfig,
)
}
// Now finally fully initialize the swap client RPC server instance.
@ -513,7 +541,8 @@ func (d *Daemon) initialize(withMacaroonService bool) error {
subscribers: make(map[int]chan<- interface{}),
statusChan: make(chan loop.SwapInfo),
mainCtx: d.mainCtx,
reservationManager: d.reservationManager,
reservationManager: reservationManager,
instantOutManager: instantOutManager,
}
// Retrieve all currently existing swaps from the database.
@ -606,6 +635,43 @@ func (d *Daemon) initialize(withMacaroonService bool) error {
}()
}
// Start the instant out manager.
if d.instantOutManager != nil {
d.wg.Add(1)
initChan := make(chan struct{})
go func() {
defer d.wg.Done()
getInfo, err := d.lnd.Client.GetInfo(d.mainCtx)
if err != nil {
d.internalErrChan <- err
return
}
log.Info("Starting instantout manager")
defer log.Info("Instantout manager stopped")
err = d.instantOutManager.Run(
d.mainCtx, initChan, int32(getInfo.BlockHeight),
)
if err != nil && !errors.Is(err, context.Canceled) {
d.internalErrChan <- err
}
}()
// Wait for the instantout server to be ready before starting the
// grpc server.
timeOutCtx, cancel := context.WithTimeout(d.mainCtx, 10*time.Second)
select {
case <-timeOutCtx.Done():
cancel()
return fmt.Errorf("reservation server not ready: %v",
timeOutCtx.Err())
case <-initChan:
cancel()
}
}
// Last, start our internal error handler. This will return exactly one
// error or nil on the main error channel to inform the caller that
// something went wrong or that shutdown is complete. We don't add to

@ -6,6 +6,7 @@ import (
"github.com/lightninglabs/lndclient"
"github.com/lightninglabs/loop"
"github.com/lightninglabs/loop/fsm"
"github.com/lightninglabs/loop/instantout"
"github.com/lightninglabs/loop/instantout/reservation"
"github.com/lightninglabs/loop/liquidity"
"github.com/lightninglabs/loop/loopdb"
@ -44,6 +45,9 @@ func SetupLoggers(root *build.RotatingLogWriter, intercept signal.Interceptor) {
lnd.AddSubLogger(
root, reservation.Subsystem, intercept, reservation.UseLogger,
)
lnd.AddSubLogger(
root, instantout.Subsystem, intercept, instantout.UseLogger,
)
}
// genSubLogger creates a logger for a subsystem. We provide an instance of

@ -97,7 +97,11 @@ var RequiredPermissions = map[string][]bakery.Op{
Action: "in",
}},
"/looprpc.SwapClient/ListReservations": {{
Entity: "reservation",
Entity: "swap",
Action: "read",
}},
"/looprpc.SwapClient/InstantOut": {{
Entity: "swap",
Action: "execute",
}},
}

@ -18,6 +18,7 @@ import (
"github.com/lightninglabs/aperture/lsat"
"github.com/lightninglabs/lndclient"
"github.com/lightninglabs/loop"
"github.com/lightninglabs/loop/instantout"
"github.com/lightninglabs/loop/instantout/reservation"
"github.com/lightninglabs/loop/labels"
"github.com/lightninglabs/loop/liquidity"
@ -81,6 +82,7 @@ type swapClientServer struct {
liquidityMgr *liquidity.Manager
lnd *lndclient.LndServices
reservationManager *reservation.Manager
instantOutManager *instantout.Manager
swaps map[lntypes.Hash]loop.SwapInfo
subscribers map[int]chan<- interface{}
statusChan chan loop.SwapInfo
@ -1169,6 +1171,44 @@ func (s *swapClientServer) ListReservations(ctx context.Context,
}, nil
}
// InstantOut initiates an instant out swap.
func (s *swapClientServer) InstantOut(ctx context.Context,
req *clientrpc.InstantOutRequest) (*clientrpc.InstantOutResponse,
error) {
reservationIds := make([]reservation.ID, len(req.ReservationIds))
for i, id := range req.ReservationIds {
if len(id) != reservation.IdLength {
return nil, fmt.Errorf("invalid reservation id: "+
"expected %v bytes, got %d",
reservation.IdLength, len(id))
}
var resId reservation.ID
copy(resId[:], id)
reservationIds[i] = resId
}
instantOutFsm, err := s.instantOutManager.NewInstantOut(
ctx, reservationIds,
)
if err != nil {
return nil, err
}
res := &clientrpc.InstantOutResponse{
InstantOutHash: instantOutFsm.InstantOut.SwapHash[:],
State: string(instantOutFsm.InstantOut.State),
}
if instantOutFsm.InstantOut.SweepTxHash != nil {
res.SweepTxId = instantOutFsm.InstantOut.SweepTxHash.String()
}
return res, nil
}
func rpcAutoloopReason(reason liquidity.Reason) (clientrpc.AutoReason, error) {
switch reason {
case liquidity.ReasonNone:
@ -1448,11 +1488,11 @@ func toClientReservation(
res *reservation.Reservation) *clientrpc.ClientReservation {
var (
txid []byte
txid string
vout uint32
)
if res.Outpoint != nil {
txid = res.Outpoint.Hash[:]
txid = res.Outpoint.Hash.String()
vout = res.Outpoint.Index
}

@ -59,6 +59,9 @@ func openDatabase(cfg *Config, chainParams *chaincfg.Params) (loopdb.SwapStore,
db, err = loopdb.NewSqliteStore(
cfg.Sqlite, chainParams,
)
if err != nil {
return nil, nil, err
}
baseDb = *db.(*loopdb.SqliteSwapStore).BaseDB
case DatabaseBackendPostgres:
@ -67,6 +70,9 @@ func openDatabase(cfg *Config, chainParams *chaincfg.Params) (loopdb.SwapStore,
db, err = loopdb.NewPostgresStore(
cfg.Postgres, chainParams,
)
if err != nil {
return nil, nil, err
}
baseDb = *db.(*loopdb.PostgresStore).BaseDB
default:

Loading…
Cancel
Save