From 99608ad51561112670ffe148ba3b2dd76b17dcb1 Mon Sep 17 00:00:00 2001 From: George Tsagkarelis Date: Tue, 5 Sep 2023 13:48:25 -0400 Subject: [PATCH] loopdb+sweepbatcher: add sweepbatcher store --- loopdb/loopout.go | 4 + loopdb/sql_store.go | 22 +- loopdb/sqlc/batch.sql.go | 317 +++++++++++++++ .../migrations/000004_is_wallet_addr.down.sql | 1 + .../migrations/000004_is_wallet_addr.up.sql | 4 + .../migrations/000005_sweep_batcher.down.sql | 2 + .../migrations/000005_sweep_batcher.up.sql | 58 +++ loopdb/sqlc/models.go | 21 + loopdb/sqlc/querier.go | 7 + loopdb/sqlc/queries/batch.sql | 91 +++++ loopdb/sqlc/queries/swaps.sql | 5 +- loopdb/sqlc/swaps.sql.go | 15 +- loopdb/store.go | 9 +- sweepbatcher/store.go | 371 ++++++++++++++++++ sweepbatcher/store_mock.go | 125 ++++++ 15 files changed, 1035 insertions(+), 17 deletions(-) create mode 100644 loopdb/sqlc/batch.sql.go create mode 100644 loopdb/sqlc/migrations/000004_is_wallet_addr.down.sql create mode 100644 loopdb/sqlc/migrations/000004_is_wallet_addr.up.sql create mode 100644 loopdb/sqlc/migrations/000005_sweep_batcher.down.sql create mode 100644 loopdb/sqlc/migrations/000005_sweep_batcher.up.sql create mode 100644 loopdb/sqlc/queries/batch.sql create mode 100644 sweepbatcher/store.go create mode 100644 sweepbatcher/store_mock.go diff --git a/loopdb/loopout.go b/loopdb/loopout.go index 9ebb858..2507784 100644 --- a/loopdb/loopout.go +++ b/loopdb/loopout.go @@ -24,6 +24,10 @@ type LoopOutContract struct { // DestAddr is the destination address of the loop out swap. DestAddr btcutil.Address + // IsExternalAddr indicates whether the destination address does not + // belong to the backing lnd node. + IsExternalAddr bool + // SwapInvoice is the invoice that is to be paid by the client to // initiate the loop out swap. SwapInvoice string diff --git a/loopdb/sql_store.go b/loopdb/sql_store.go index 9613ec5..83de95c 100644 --- a/loopdb/sql_store.go +++ b/loopdb/sql_store.go @@ -9,6 +9,7 @@ import ( "time" "github.com/btcsuite/btcd/btcutil" + "github.com/btcsuite/btcd/chaincfg" "github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/lightninglabs/loop/loopdb/sqlc" "github.com/lightningnetwork/lnd/keychain" @@ -31,13 +32,16 @@ func (s *BaseDB) FetchLoopOutSwaps(ctx context.Context) ([]*LoopOut, loopOuts = make([]*LoopOut, len(swaps)) for i, swap := range swaps { - updates, err := s.Queries.GetSwapUpdates(ctx, swap.SwapHash) + updates, err := s.Queries.GetSwapUpdates( + ctx, swap.SwapHash, + ) if err != nil { return err } - loopOut, err := s.convertLoopOutRow( - sqlc.GetLoopOutSwapRow(swap), updates, + loopOut, err := ConvertLoopOutRow( + s.network, sqlc.GetLoopOutSwapRow(swap), + updates, ) if err != nil { return err @@ -72,8 +76,8 @@ func (s *BaseDB) FetchLoopOutSwap(ctx context.Context, return err } - loopOut, err = s.convertLoopOutRow( - swap, updates, + loopOut, err = ConvertLoopOutRow( + s.network, swap, updates, ) if err != nil { return err @@ -430,6 +434,7 @@ func loopOutToInsertArgs(hash lntypes.Hash, return sqlc.InsertLoopOutParams{ SwapHash: hash[:], DestAddress: loopOut.DestAddr.String(), + SingleSweep: loopOut.IsExternalAddr, SwapInvoice: loopOut.SwapInvoice, MaxSwapRoutingFee: int64(loopOut.MaxSwapRoutingFee), SweepConfTarget: loopOut.SweepConfTarget, @@ -479,9 +484,9 @@ func swapToHtlcKeysInsertArgs(hash lntypes.Hash, } } -// convertLoopOutRow converts a database row containing a loop out swap to a +// ConvertLoopOutRow converts a database row containing a loop out swap to a // LoopOut struct. -func (s *BaseDB) convertLoopOutRow(row sqlc.GetLoopOutSwapRow, +func ConvertLoopOutRow(network *chaincfg.Params, row sqlc.GetLoopOutSwapRow, updates []sqlc.SwapUpdate) (*LoopOut, error) { htlcKeys, err := fetchHtlcKeys( @@ -498,7 +503,7 @@ func (s *BaseDB) convertLoopOutRow(row sqlc.GetLoopOutSwapRow, return nil, err } - destAddress, err := btcutil.DecodeAddress(row.DestAddress, s.network) + destAddress, err := btcutil.DecodeAddress(row.DestAddress, network) if err != nil { return nil, err } @@ -523,6 +528,7 @@ func (s *BaseDB) convertLoopOutRow(row sqlc.GetLoopOutSwapRow, ProtocolVersion: ProtocolVersion(row.ProtocolVersion), }, DestAddr: destAddress, + IsExternalAddr: row.SingleSweep, SwapInvoice: row.SwapInvoice, MaxSwapRoutingFee: btcutil.Amount(row.MaxSwapRoutingFee), SweepConfTarget: row.SweepConfTarget, diff --git a/loopdb/sqlc/batch.sql.go b/loopdb/sqlc/batch.sql.go new file mode 100644 index 0000000..9469292 --- /dev/null +++ b/loopdb/sqlc/batch.sql.go @@ -0,0 +1,317 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.17.2 +// source: batch.sql + +package sqlc + +import ( + "context" + "database/sql" + "time" +) + +const confirmBatch = `-- name: ConfirmBatch :exec +UPDATE + sweep_batches +SET + confirmed = TRUE +WHERE + id = $1 +` + +func (q *Queries) ConfirmBatch(ctx context.Context, id int32) error { + _, err := q.db.ExecContext(ctx, confirmBatch, id) + return err +} + +const getBatchSweeps = `-- name: GetBatchSweeps :many +SELECT + sweeps.id, sweeps.swap_hash, sweeps.batch_id, sweeps.outpoint_txid, sweeps.outpoint_index, sweeps.amt, sweeps.completed, + swaps.id, swaps.swap_hash, swaps.preimage, swaps.initiation_time, swaps.amount_requested, swaps.cltv_expiry, swaps.max_miner_fee, swaps.max_swap_fee, swaps.initiation_height, swaps.protocol_version, swaps.label, + loopout_swaps.swap_hash, loopout_swaps.dest_address, loopout_swaps.swap_invoice, loopout_swaps.max_swap_routing_fee, loopout_swaps.sweep_conf_target, loopout_swaps.htlc_confirmations, loopout_swaps.outgoing_chan_set, loopout_swaps.prepay_invoice, loopout_swaps.max_prepay_routing_fee, loopout_swaps.publication_deadline, loopout_swaps.single_sweep, + htlc_keys.swap_hash, htlc_keys.sender_script_pubkey, htlc_keys.receiver_script_pubkey, htlc_keys.sender_internal_pubkey, htlc_keys.receiver_internal_pubkey, htlc_keys.client_key_family, htlc_keys.client_key_index +FROM + sweeps +JOIN + swaps ON sweeps.swap_hash = swaps.swap_hash +JOIN + loopout_swaps ON sweeps.swap_hash = loopout_swaps.swap_hash +JOIN + htlc_keys ON sweeps.swap_hash = htlc_keys.swap_hash +WHERE + sweeps.batch_id = $1 +ORDER BY + sweeps.id ASC +` + +type GetBatchSweepsRow struct { + ID int32 + SwapHash []byte + BatchID int32 + OutpointTxid []byte + OutpointIndex int32 + Amt int64 + Completed bool + ID_2 int32 + SwapHash_2 []byte + Preimage []byte + InitiationTime time.Time + AmountRequested int64 + CltvExpiry int32 + MaxMinerFee int64 + MaxSwapFee int64 + InitiationHeight int32 + ProtocolVersion int32 + Label string + SwapHash_3 []byte + DestAddress string + SwapInvoice string + MaxSwapRoutingFee int64 + SweepConfTarget int32 + HtlcConfirmations int32 + OutgoingChanSet string + PrepayInvoice string + MaxPrepayRoutingFee int64 + PublicationDeadline time.Time + SingleSweep bool + SwapHash_4 []byte + SenderScriptPubkey []byte + ReceiverScriptPubkey []byte + SenderInternalPubkey []byte + ReceiverInternalPubkey []byte + ClientKeyFamily int32 + ClientKeyIndex int32 +} + +func (q *Queries) GetBatchSweeps(ctx context.Context, batchID int32) ([]GetBatchSweepsRow, error) { + rows, err := q.db.QueryContext(ctx, getBatchSweeps, batchID) + if err != nil { + return nil, err + } + defer rows.Close() + var items []GetBatchSweepsRow + for rows.Next() { + var i GetBatchSweepsRow + if err := rows.Scan( + &i.ID, + &i.SwapHash, + &i.BatchID, + &i.OutpointTxid, + &i.OutpointIndex, + &i.Amt, + &i.Completed, + &i.ID_2, + &i.SwapHash_2, + &i.Preimage, + &i.InitiationTime, + &i.AmountRequested, + &i.CltvExpiry, + &i.MaxMinerFee, + &i.MaxSwapFee, + &i.InitiationHeight, + &i.ProtocolVersion, + &i.Label, + &i.SwapHash_3, + &i.DestAddress, + &i.SwapInvoice, + &i.MaxSwapRoutingFee, + &i.SweepConfTarget, + &i.HtlcConfirmations, + &i.OutgoingChanSet, + &i.PrepayInvoice, + &i.MaxPrepayRoutingFee, + &i.PublicationDeadline, + &i.SingleSweep, + &i.SwapHash_4, + &i.SenderScriptPubkey, + &i.ReceiverScriptPubkey, + &i.SenderInternalPubkey, + &i.ReceiverInternalPubkey, + &i.ClientKeyFamily, + &i.ClientKeyIndex, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const getSweepStatus = `-- name: GetSweepStatus :one +SELECT + COALESCE(s.completed, f.false_value) AS completed +FROM + (SELECT false AS false_value) AS f +LEFT JOIN + sweeps s ON s.swap_hash = $1 +` + +func (q *Queries) GetSweepStatus(ctx context.Context, swapHash []byte) (bool, error) { + row := q.db.QueryRowContext(ctx, getSweepStatus, swapHash) + var completed bool + err := row.Scan(&completed) + return completed, err +} + +const getUnconfirmedBatches = `-- name: GetUnconfirmedBatches :many +SELECT + id, confirmed, batch_tx_id, batch_pk_script, last_rbf_height, last_rbf_sat_per_kw, max_timeout_distance +FROM + sweep_batches +WHERE + confirmed = FALSE +` + +func (q *Queries) GetUnconfirmedBatches(ctx context.Context) ([]SweepBatch, error) { + rows, err := q.db.QueryContext(ctx, getUnconfirmedBatches) + if err != nil { + return nil, err + } + defer rows.Close() + var items []SweepBatch + for rows.Next() { + var i SweepBatch + if err := rows.Scan( + &i.ID, + &i.Confirmed, + &i.BatchTxID, + &i.BatchPkScript, + &i.LastRbfHeight, + &i.LastRbfSatPerKw, + &i.MaxTimeoutDistance, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const insertBatch = `-- name: InsertBatch :one +INSERT INTO sweep_batches ( + confirmed, + batch_tx_id, + batch_pk_script, + last_rbf_height, + last_rbf_sat_per_kw, + max_timeout_distance +) VALUES ( + $1, + $2, + $3, + $4, + $5, + $6 +) RETURNING id +` + +type InsertBatchParams struct { + Confirmed bool + BatchTxID sql.NullString + BatchPkScript []byte + LastRbfHeight sql.NullInt32 + LastRbfSatPerKw sql.NullInt32 + MaxTimeoutDistance int32 +} + +func (q *Queries) InsertBatch(ctx context.Context, arg InsertBatchParams) (int32, error) { + row := q.db.QueryRowContext(ctx, insertBatch, + arg.Confirmed, + arg.BatchTxID, + arg.BatchPkScript, + arg.LastRbfHeight, + arg.LastRbfSatPerKw, + arg.MaxTimeoutDistance, + ) + var id int32 + err := row.Scan(&id) + return id, err +} + +const updateBatch = `-- name: UpdateBatch :exec +UPDATE sweep_batches SET + confirmed = $2, + batch_tx_id = $3, + batch_pk_script = $4, + last_rbf_height = $5, + last_rbf_sat_per_kw = $6 +WHERE id = $1 +` + +type UpdateBatchParams struct { + ID int32 + Confirmed bool + BatchTxID sql.NullString + BatchPkScript []byte + LastRbfHeight sql.NullInt32 + LastRbfSatPerKw sql.NullInt32 +} + +func (q *Queries) UpdateBatch(ctx context.Context, arg UpdateBatchParams) error { + _, err := q.db.ExecContext(ctx, updateBatch, + arg.ID, + arg.Confirmed, + arg.BatchTxID, + arg.BatchPkScript, + arg.LastRbfHeight, + arg.LastRbfSatPerKw, + ) + return err +} + +const upsertSweep = `-- name: UpsertSweep :exec +INSERT INTO sweeps ( + swap_hash, + batch_id, + outpoint_txid, + outpoint_index, + amt, + completed +) VALUES ( + $1, + $2, + $3, + $4, + $5, + $6 +) ON CONFLICT (swap_hash) DO UPDATE SET + batch_id = $2, + outpoint_txid = $3, + outpoint_index = $4, + amt = $5, + completed = $6 +` + +type UpsertSweepParams struct { + SwapHash []byte + BatchID int32 + OutpointTxid []byte + OutpointIndex int32 + Amt int64 + Completed bool +} + +func (q *Queries) UpsertSweep(ctx context.Context, arg UpsertSweepParams) error { + _, err := q.db.ExecContext(ctx, upsertSweep, + arg.SwapHash, + arg.BatchID, + arg.OutpointTxid, + arg.OutpointIndex, + arg.Amt, + arg.Completed, + ) + return err +} diff --git a/loopdb/sqlc/migrations/000004_is_wallet_addr.down.sql b/loopdb/sqlc/migrations/000004_is_wallet_addr.down.sql new file mode 100644 index 0000000..e2e9d3f --- /dev/null +++ b/loopdb/sqlc/migrations/000004_is_wallet_addr.down.sql @@ -0,0 +1 @@ +ALTER TABLE loopout_swaps DROP COLUMN single_sweep; \ No newline at end of file diff --git a/loopdb/sqlc/migrations/000004_is_wallet_addr.up.sql b/loopdb/sqlc/migrations/000004_is_wallet_addr.up.sql new file mode 100644 index 0000000..26940fd --- /dev/null +++ b/loopdb/sqlc/migrations/000004_is_wallet_addr.up.sql @@ -0,0 +1,4 @@ +-- is_external_addr indicates whether the destination address of the swap is not +-- a wallet address. The default value used is TRUE in order to maintain the old +-- behavior of swaps which doesn't override the destination address. +ALTER TABLE loopout_swaps ADD single_sweep BOOLEAN NOT NULL DEFAULT TRUE; \ No newline at end of file diff --git a/loopdb/sqlc/migrations/000005_sweep_batcher.down.sql b/loopdb/sqlc/migrations/000005_sweep_batcher.down.sql new file mode 100644 index 0000000..bf27a39 --- /dev/null +++ b/loopdb/sqlc/migrations/000005_sweep_batcher.down.sql @@ -0,0 +1,2 @@ +DROP TABLE sweep_batches; +DROP TABLE sweeps; \ No newline at end of file diff --git a/loopdb/sqlc/migrations/000005_sweep_batcher.up.sql b/loopdb/sqlc/migrations/000005_sweep_batcher.up.sql new file mode 100644 index 0000000..c8b0ed6 --- /dev/null +++ b/loopdb/sqlc/migrations/000005_sweep_batcher.up.sql @@ -0,0 +1,58 @@ +-- sweep_batches stores the on-going swaps that are batched together. +CREATE TABLE sweep_batches ( + -- id is the autoincrementing primary key of the batch. + id INTEGER PRIMARY KEY, + + -- confirmed indicates whether this batch is confirmed. + confirmed BOOLEAN NOT NULL DEFAULT FALSE, + + -- batch_tx_id is the transaction id of the batch transaction. + batch_tx_id TEXT, + + -- batch_pk_script is the pkscript of the batch transaction's output. + batch_pk_script BLOB, + + -- last_rbf_height was the last height at which we attempted to publish + -- an rbf replacement transaction. + last_rbf_height INTEGER, + + -- last_rbf_sat_per_kw was the last sat per kw fee rate we used for the + -- last published transaction. + last_rbf_sat_per_kw INTEGER, + + -- max_timeout_distance is the maximum distance the timeouts of the + -- sweeps can have in the batch. + max_timeout_distance INTEGER NOT NULL +); + +-- sweeps stores the individual sweeps that are part of a batch. +CREATE TABLE sweeps ( + -- id is the autoincrementing primary key. + id INTEGER PRIMARY KEY, + + -- swap_hash is the hash of the swap that is being swept. + swap_hash BLOB NOT NULL UNIQUE, + + -- batch_id is the id of the batch this swap is part of. + batch_id INTEGER NOT NULL, + + -- outpoint_txid is the transaction id of the output being swept. + outpoint_txid BLOB NOT NULL, + + -- outpoint_index is the index of the output being swept. + outpoint_index INTEGER NOT NULL, + + -- amt is the amount of the output being swept. + amt BIGINT NOT NULL, + + -- completed indicates whether the sweep has been completed. + completed BOOLEAN NOT NULL DEFAULT FALSE, + + -- Foreign key constraint to ensure that we reference an existing batch + -- id. + FOREIGN KEY (batch_id) REFERENCES sweep_batches(id), + + -- Foreign key constraint to ensure that swap_hash references an + -- existing swap. + FOREIGN KEY (swap_hash) REFERENCES swaps(swap_hash) +); diff --git a/loopdb/sqlc/models.go b/loopdb/sqlc/models.go index 413b146..d10b67d 100644 --- a/loopdb/sqlc/models.go +++ b/loopdb/sqlc/models.go @@ -42,6 +42,7 @@ type LoopoutSwap struct { PrepayInvoice string MaxPrepayRoutingFee int64 PublicationDeadline time.Time + SingleSweep bool } type Reservation struct { @@ -90,3 +91,23 @@ type SwapUpdate struct { OnchainCost int64 OffchainCost int64 } + +type Sweep struct { + ID int32 + SwapHash []byte + BatchID int32 + OutpointTxid []byte + OutpointIndex int32 + Amt int64 + Completed bool +} + +type SweepBatch struct { + ID int32 + Confirmed bool + BatchTxID sql.NullString + BatchPkScript []byte + LastRbfHeight sql.NullInt32 + LastRbfSatPerKw sql.NullInt32 + MaxTimeoutDistance int32 +} diff --git a/loopdb/sqlc/querier.go b/loopdb/sqlc/querier.go index 69ae559..a5578ba 100644 --- a/loopdb/sqlc/querier.go +++ b/loopdb/sqlc/querier.go @@ -9,8 +9,10 @@ import ( ) type Querier interface { + ConfirmBatch(ctx context.Context, id int32) error CreateReservation(ctx context.Context, arg CreateReservationParams) error FetchLiquidityParams(ctx context.Context) ([]byte, error) + GetBatchSweeps(ctx context.Context, batchID int32) ([]GetBatchSweepsRow, error) GetLoopInSwap(ctx context.Context, swapHash []byte) (GetLoopInSwapRow, error) GetLoopInSwaps(ctx context.Context) ([]GetLoopInSwapsRow, error) GetLoopOutSwap(ctx context.Context, swapHash []byte) (GetLoopOutSwapRow, error) @@ -19,14 +21,19 @@ type Querier interface { GetReservationUpdates(ctx context.Context, reservationID []byte) ([]ReservationUpdate, error) GetReservations(ctx context.Context) ([]Reservation, error) GetSwapUpdates(ctx context.Context, swapHash []byte) ([]SwapUpdate, error) + GetSweepStatus(ctx context.Context, swapHash []byte) (bool, error) + GetUnconfirmedBatches(ctx context.Context) ([]SweepBatch, error) + InsertBatch(ctx context.Context, arg InsertBatchParams) (int32, error) InsertHtlcKeys(ctx context.Context, arg InsertHtlcKeysParams) error InsertLoopIn(ctx context.Context, arg InsertLoopInParams) error InsertLoopOut(ctx context.Context, arg InsertLoopOutParams) error InsertReservationUpdate(ctx context.Context, arg InsertReservationUpdateParams) error InsertSwap(ctx context.Context, arg InsertSwapParams) error InsertSwapUpdate(ctx context.Context, arg InsertSwapUpdateParams) error + UpdateBatch(ctx context.Context, arg UpdateBatchParams) error UpdateReservation(ctx context.Context, arg UpdateReservationParams) error UpsertLiquidityParams(ctx context.Context, params []byte) error + UpsertSweep(ctx context.Context, arg UpsertSweepParams) error } var _ Querier = (*Queries)(nil) diff --git a/loopdb/sqlc/queries/batch.sql b/loopdb/sqlc/queries/batch.sql new file mode 100644 index 0000000..336ccf1 --- /dev/null +++ b/loopdb/sqlc/queries/batch.sql @@ -0,0 +1,91 @@ +-- name: GetUnconfirmedBatches :many +SELECT + * +FROM + sweep_batches +WHERE + confirmed = FALSE; + +-- name: InsertBatch :one +INSERT INTO sweep_batches ( + confirmed, + batch_tx_id, + batch_pk_script, + last_rbf_height, + last_rbf_sat_per_kw, + max_timeout_distance +) VALUES ( + $1, + $2, + $3, + $4, + $5, + $6 +) RETURNING id; + +-- name: UpdateBatch :exec +UPDATE sweep_batches SET + confirmed = $2, + batch_tx_id = $3, + batch_pk_script = $4, + last_rbf_height = $5, + last_rbf_sat_per_kw = $6 +WHERE id = $1; + +-- name: ConfirmBatch :exec +UPDATE + sweep_batches +SET + confirmed = TRUE +WHERE + id = $1; + +-- name: UpsertSweep :exec +INSERT INTO sweeps ( + swap_hash, + batch_id, + outpoint_txid, + outpoint_index, + amt, + completed +) VALUES ( + $1, + $2, + $3, + $4, + $5, + $6 +) ON CONFLICT (swap_hash) DO UPDATE SET + batch_id = $2, + outpoint_txid = $3, + outpoint_index = $4, + amt = $5, + completed = $6; + + +-- name: GetBatchSweeps :many +SELECT + sweeps.*, + swaps.*, + loopout_swaps.*, + htlc_keys.* +FROM + sweeps +JOIN + swaps ON sweeps.swap_hash = swaps.swap_hash +JOIN + loopout_swaps ON sweeps.swap_hash = loopout_swaps.swap_hash +JOIN + htlc_keys ON sweeps.swap_hash = htlc_keys.swap_hash +WHERE + sweeps.batch_id = $1 +ORDER BY + sweeps.id ASC; + +-- name: GetSweepStatus :one +SELECT + COALESCE(s.completed, f.false_value) AS completed +FROM + (SELECT false AS false_value) AS f +LEFT JOIN + sweeps s ON s.swap_hash = $1; diff --git a/loopdb/sqlc/queries/swaps.sql b/loopdb/sqlc/queries/swaps.sql index b1c8da0..925ede6 100644 --- a/loopdb/sqlc/queries/swaps.sql +++ b/loopdb/sqlc/queries/swaps.sql @@ -104,9 +104,10 @@ INSERT INTO loopout_swaps ( outgoing_chan_set, prepay_invoice, max_prepay_routing_fee, - publication_deadline + publication_deadline, + single_sweep ) VALUES ( - $1, $2, $3, $4, $5, $6, $7, $8, $9, $10 + $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11 ); -- name: InsertLoopIn :exec diff --git a/loopdb/sqlc/swaps.sql.go b/loopdb/sqlc/swaps.sql.go index 793dfa9..8249942 100644 --- a/loopdb/sqlc/swaps.sql.go +++ b/loopdb/sqlc/swaps.sql.go @@ -169,7 +169,7 @@ func (q *Queries) GetLoopInSwaps(ctx context.Context) ([]GetLoopInSwapsRow, erro const getLoopOutSwap = `-- name: GetLoopOutSwap :one SELECT swaps.id, swaps.swap_hash, swaps.preimage, swaps.initiation_time, swaps.amount_requested, swaps.cltv_expiry, swaps.max_miner_fee, swaps.max_swap_fee, swaps.initiation_height, swaps.protocol_version, swaps.label, - loopout_swaps.swap_hash, loopout_swaps.dest_address, loopout_swaps.swap_invoice, loopout_swaps.max_swap_routing_fee, loopout_swaps.sweep_conf_target, loopout_swaps.htlc_confirmations, loopout_swaps.outgoing_chan_set, loopout_swaps.prepay_invoice, loopout_swaps.max_prepay_routing_fee, loopout_swaps.publication_deadline, + loopout_swaps.swap_hash, loopout_swaps.dest_address, loopout_swaps.swap_invoice, loopout_swaps.max_swap_routing_fee, loopout_swaps.sweep_conf_target, loopout_swaps.htlc_confirmations, loopout_swaps.outgoing_chan_set, loopout_swaps.prepay_invoice, loopout_swaps.max_prepay_routing_fee, loopout_swaps.publication_deadline, loopout_swaps.single_sweep, htlc_keys.swap_hash, htlc_keys.sender_script_pubkey, htlc_keys.receiver_script_pubkey, htlc_keys.sender_internal_pubkey, htlc_keys.receiver_internal_pubkey, htlc_keys.client_key_family, htlc_keys.client_key_index FROM swaps @@ -203,6 +203,7 @@ type GetLoopOutSwapRow struct { PrepayInvoice string MaxPrepayRoutingFee int64 PublicationDeadline time.Time + SingleSweep bool SwapHash_3 []byte SenderScriptPubkey []byte ReceiverScriptPubkey []byte @@ -237,6 +238,7 @@ func (q *Queries) GetLoopOutSwap(ctx context.Context, swapHash []byte) (GetLoopO &i.PrepayInvoice, &i.MaxPrepayRoutingFee, &i.PublicationDeadline, + &i.SingleSweep, &i.SwapHash_3, &i.SenderScriptPubkey, &i.ReceiverScriptPubkey, @@ -251,7 +253,7 @@ func (q *Queries) GetLoopOutSwap(ctx context.Context, swapHash []byte) (GetLoopO const getLoopOutSwaps = `-- name: GetLoopOutSwaps :many SELECT swaps.id, swaps.swap_hash, swaps.preimage, swaps.initiation_time, swaps.amount_requested, swaps.cltv_expiry, swaps.max_miner_fee, swaps.max_swap_fee, swaps.initiation_height, swaps.protocol_version, swaps.label, - loopout_swaps.swap_hash, loopout_swaps.dest_address, loopout_swaps.swap_invoice, loopout_swaps.max_swap_routing_fee, loopout_swaps.sweep_conf_target, loopout_swaps.htlc_confirmations, loopout_swaps.outgoing_chan_set, loopout_swaps.prepay_invoice, loopout_swaps.max_prepay_routing_fee, loopout_swaps.publication_deadline, + loopout_swaps.swap_hash, loopout_swaps.dest_address, loopout_swaps.swap_invoice, loopout_swaps.max_swap_routing_fee, loopout_swaps.sweep_conf_target, loopout_swaps.htlc_confirmations, loopout_swaps.outgoing_chan_set, loopout_swaps.prepay_invoice, loopout_swaps.max_prepay_routing_fee, loopout_swaps.publication_deadline, loopout_swaps.single_sweep, htlc_keys.swap_hash, htlc_keys.sender_script_pubkey, htlc_keys.receiver_script_pubkey, htlc_keys.sender_internal_pubkey, htlc_keys.receiver_internal_pubkey, htlc_keys.client_key_family, htlc_keys.client_key_index FROM swaps @@ -285,6 +287,7 @@ type GetLoopOutSwapsRow struct { PrepayInvoice string MaxPrepayRoutingFee int64 PublicationDeadline time.Time + SingleSweep bool SwapHash_3 []byte SenderScriptPubkey []byte ReceiverScriptPubkey []byte @@ -325,6 +328,7 @@ func (q *Queries) GetLoopOutSwaps(ctx context.Context) ([]GetLoopOutSwapsRow, er &i.PrepayInvoice, &i.MaxPrepayRoutingFee, &i.PublicationDeadline, + &i.SingleSweep, &i.SwapHash_3, &i.SenderScriptPubkey, &i.ReceiverScriptPubkey, @@ -465,9 +469,10 @@ INSERT INTO loopout_swaps ( outgoing_chan_set, prepay_invoice, max_prepay_routing_fee, - publication_deadline + publication_deadline, + single_sweep ) VALUES ( - $1, $2, $3, $4, $5, $6, $7, $8, $9, $10 + $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11 ) ` @@ -482,6 +487,7 @@ type InsertLoopOutParams struct { PrepayInvoice string MaxPrepayRoutingFee int64 PublicationDeadline time.Time + SingleSweep bool } func (q *Queries) InsertLoopOut(ctx context.Context, arg InsertLoopOutParams) error { @@ -496,6 +502,7 @@ func (q *Queries) InsertLoopOut(ctx context.Context, arg InsertLoopOutParams) er arg.PrepayInvoice, arg.MaxPrepayRoutingFee, arg.PublicationDeadline, + arg.SingleSweep, ) return err } diff --git a/loopdb/store.go b/loopdb/store.go index a52ef89..3726ea2 100644 --- a/loopdb/store.go +++ b/loopdb/store.go @@ -137,6 +137,9 @@ var ( // errInvalidKey is returned when a serialized key is not the expected // length. errInvalidKey = fmt.Errorf("invalid serialized key") + + // errUnimplemented is returned when a method is not implemented. + errUnimplemented = fmt.Errorf("unimplemented method") ) const ( @@ -990,19 +993,19 @@ func (s *boltSwapStore) fetchLoopInSwap(rootBucket *bbolt.Bucket, func (b *boltSwapStore) BatchCreateLoopOut(ctx context.Context, swaps map[lntypes.Hash]*LoopOutContract) error { - return errors.New("not implemented") + return errUnimplemented } // BatchCreateLoopIn creates a batch of loop in swaps to the store. func (b *boltSwapStore) BatchCreateLoopIn(ctx context.Context, swaps map[lntypes.Hash]*LoopInContract) error { - return errors.New("not implemented") + return errUnimplemented } // BatchInsertUpdate inserts batch of swap updates to the store. func (b *boltSwapStore) BatchInsertUpdate(ctx context.Context, updateData map[lntypes.Hash][]BatchInsertUpdateData) error { - return errors.New("not implemented") + return errUnimplemented } diff --git a/sweepbatcher/store.go b/sweepbatcher/store.go new file mode 100644 index 0000000..2bd3ea2 --- /dev/null +++ b/sweepbatcher/store.go @@ -0,0 +1,371 @@ +package sweepbatcher + +import ( + "context" + "database/sql" + + "github.com/btcsuite/btcd/btcutil" + "github.com/btcsuite/btcd/chaincfg" + "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/btcsuite/btcd/wire" + "github.com/lightninglabs/loop/loopdb" + "github.com/lightninglabs/loop/loopdb/sqlc" + "github.com/lightningnetwork/lnd/clock" + "github.com/lightningnetwork/lnd/lntypes" +) + +type BaseDB interface { + // ConfirmBatch confirms a batch by setting the state to confirmed. + ConfirmBatch(ctx context.Context, id int32) error + + // GetBatchSweeps fetches all the sweeps that are part a batch. + GetBatchSweeps(ctx context.Context, batchID int32) ( + []sqlc.GetBatchSweepsRow, error) + + // GetSweepStatus returns true if the sweep has been completed. + GetSweepStatus(ctx context.Context, swapHash []byte) (bool, error) + + // GetSwapUpdates fetches all the updates for a swap. + GetSwapUpdates(ctx context.Context, swapHash []byte) ( + []sqlc.SwapUpdate, error) + + // FetchUnconfirmedSweepBatches fetches all the batches from the + // database that are not in a confirmed state. + GetUnconfirmedBatches(ctx context.Context) ([]sqlc.SweepBatch, error) + + // InsertBatch inserts a batch into the database, returning the id of + // the inserted batch. + InsertBatch(ctx context.Context, arg sqlc.InsertBatchParams) ( + int32, error) + + // UpdateBatch updates a batch in the database. + UpdateBatch(ctx context.Context, arg sqlc.UpdateBatchParams) error + + // UpsertSweep inserts a sweep into the database, or updates an existing + // sweep if it already exists. + UpsertSweep(ctx context.Context, arg sqlc.UpsertSweepParams) error + + // ExecTx allows for executing a function in the context of a database + // transaction. + ExecTx(ctx context.Context, txOptions loopdb.TxOptions, + txBody func(*sqlc.Queries) error) error +} + +// SQLStore manages the reservations in the database. +type SQLStore struct { + baseDb BaseDB + + network *chaincfg.Params + clock clock.Clock +} + +// NewSQLStore creates a new SQLStore. +func NewSQLStore(db BaseDB, network *chaincfg.Params) *SQLStore { + return &SQLStore{ + baseDb: db, + network: network, + clock: clock.NewDefaultClock(), + } +} + +// FetchUnconfirmedSweepBatches fetches all the batches from the database that +// are not in a confirmed state. +func (s *SQLStore) FetchUnconfirmedSweepBatches(ctx context.Context) ([]*dbBatch, + error) { + + var batches []*dbBatch + + dbBatches, err := s.baseDb.GetUnconfirmedBatches(ctx) + if err != nil { + return nil, err + } + + for _, dbBatch := range dbBatches { + batch := convertBatchRow(dbBatch) + if err != nil { + return nil, err + } + + batches = append(batches, batch) + } + + return batches, err +} + +// InsertSweepBatch inserts a batch into the database, returning the id of the +// inserted batch. +func (s *SQLStore) InsertSweepBatch(ctx context.Context, batch *dbBatch) (int32, + error) { + + return s.baseDb.InsertBatch(ctx, batchToInsertArgs(*batch)) +} + +// UpdateSweepBatch updates a batch in the database. +func (s *SQLStore) UpdateSweepBatch(ctx context.Context, batch *dbBatch) error { + return s.baseDb.UpdateBatch(ctx, batchToUpdateArgs(*batch)) +} + +// ConfirmBatch confirms a batch by setting the state to confirmed. +func (s *SQLStore) ConfirmBatch(ctx context.Context, id int32) error { + return s.baseDb.ConfirmBatch(ctx, id) +} + +// FetchBatchSweeps fetches all the sweeps that are part a batch. +func (s *SQLStore) FetchBatchSweeps(ctx context.Context, id int32) ( + []*dbSweep, error) { + + readOpts := loopdb.NewSqlReadOpts() + var sweeps []*dbSweep + + err := s.baseDb.ExecTx(ctx, readOpts, func(tx *sqlc.Queries) error { + dbSweeps, err := tx.GetBatchSweeps(ctx, id) + if err != nil { + return err + } + + for _, dbSweep := range dbSweeps { + updates, err := s.baseDb.GetSwapUpdates( + ctx, dbSweep.SwapHash, + ) + if err != nil { + return err + } + + sweep, err := s.convertSweepRow(dbSweep, updates) + if err != nil { + return err + } + + sweeps = append(sweeps, &sweep) + } + + return nil + }) + if err != nil { + return nil, err + } + + return sweeps, nil +} + +// UpsertSweep inserts a sweep into the database, or updates an existing sweep +// if it already exists. +func (s *SQLStore) UpsertSweep(ctx context.Context, sweep *dbSweep) error { + return s.baseDb.UpsertSweep(ctx, sweepToUpsertArgs(*sweep)) +} + +// GetSweepStatus returns true if the sweep has been completed. +func (s *SQLStore) GetSweepStatus(ctx context.Context, swapHash lntypes.Hash) ( + bool, error) { + + return s.baseDb.GetSweepStatus(ctx, swapHash[:]) +} + +type dbBatch struct { + // ID is the unique identifier of the batch. + ID int32 + + // State is the current state of the batch. + State string + + // BatchTxid is the txid of the batch transaction. + BatchTxid chainhash.Hash + + // BatchPkScript is the pkscript of the batch transaction. + BatchPkScript []byte + + // LastRbfHeight is the height at which the last RBF attempt was made. + LastRbfHeight int32 + + // LastRbfSatPerKw is the sat per kw of the last RBF attempt. + LastRbfSatPerKw int32 + + // MaxTimeoutDistance is the maximum timeout distance of the batch. + MaxTimeoutDistance int32 +} + +type dbSweep struct { + // ID is the unique identifier of the sweep. + ID int32 + + // BatchID is the ID of the batch that the sweep belongs to. + BatchID int32 + + // SwapHash is the hash of the swap that the sweep belongs to. + SwapHash lntypes.Hash + + // Outpoint is the outpoint of the sweep. + Outpoint wire.OutPoint + + // Amount is the amount of the sweep. + Amount btcutil.Amount + + // Completed indicates whether this sweep is completed. + Completed bool + + // LoopOut is the loop out that the sweep belongs to. + LoopOut *loopdb.LoopOut +} + +// convertBatchRow converts a batch row from db to a sweepbatcher.Batch struct. +func convertBatchRow(row sqlc.SweepBatch) *dbBatch { + batch := dbBatch{ + ID: row.ID, + } + + if row.Confirmed { + batch.State = batchOpen + } + + if row.BatchTxID.Valid { + err := chainhash.Decode(&batch.BatchTxid, row.BatchTxID.String) + if err != nil { + return nil + } + } + + batch.BatchPkScript = row.BatchPkScript + + if row.LastRbfHeight.Valid { + batch.LastRbfHeight = row.LastRbfHeight.Int32 + } + + if row.LastRbfSatPerKw.Valid { + batch.LastRbfSatPerKw = row.LastRbfSatPerKw.Int32 + } + + batch.MaxTimeoutDistance = row.MaxTimeoutDistance + + return &batch +} + +// BatchToUpsertArgs converts a Batch struct to the arguments needed to insert +// it into the database. +func batchToInsertArgs(batch dbBatch) sqlc.InsertBatchParams { + args := sqlc.InsertBatchParams{ + Confirmed: false, + BatchTxID: sql.NullString{ + Valid: true, + String: batch.BatchTxid.String(), + }, + BatchPkScript: batch.BatchPkScript, + LastRbfHeight: sql.NullInt32{ + Valid: true, + Int32: batch.LastRbfHeight, + }, + LastRbfSatPerKw: sql.NullInt32{ + Valid: true, + Int32: batch.LastRbfSatPerKw, + }, + MaxTimeoutDistance: batch.MaxTimeoutDistance, + } + + if batch.State == batchConfirmed { + args.Confirmed = true + } + + return args +} + +// BatchToUpsertArgs converts a Batch struct to the arguments needed to insert +// it into the database. +func batchToUpdateArgs(batch dbBatch) sqlc.UpdateBatchParams { + args := sqlc.UpdateBatchParams{ + ID: batch.ID, + Confirmed: false, + BatchTxID: sql.NullString{ + Valid: true, + String: batch.BatchTxid.String(), + }, + BatchPkScript: batch.BatchPkScript, + LastRbfHeight: sql.NullInt32{ + Valid: true, + Int32: batch.LastRbfHeight, + }, + LastRbfSatPerKw: sql.NullInt32{ + Valid: true, + Int32: batch.LastRbfSatPerKw, + }, + } + + if batch.State == batchConfirmed { + args.Confirmed = true + } + + return args +} + +// convertSweepRow converts a sweep row from db to a sweep struct. +func (s *SQLStore) convertSweepRow(row sqlc.GetBatchSweepsRow, + updates []sqlc.SwapUpdate) (dbSweep, error) { + + sweep := dbSweep{ + ID: row.ID, + BatchID: row.BatchID, + Amount: btcutil.Amount(row.Amt), + } + + swapHash, err := lntypes.MakeHash(row.SwapHash) + if err != nil { + return sweep, err + } + + sweep.SwapHash = swapHash + + hash, err := chainhash.NewHash(row.OutpointTxid) + if err != nil { + return sweep, err + } + + sweep.Outpoint = wire.OutPoint{ + Hash: *hash, + Index: uint32(row.OutpointIndex), + } + + sweep.LoopOut, err = loopdb.ConvertLoopOutRow( + s.network, + sqlc.GetLoopOutSwapRow{ + ID: row.ID, + SwapHash: row.SwapHash, + Preimage: row.Preimage, + InitiationTime: row.InitiationTime, + AmountRequested: row.AmountRequested, + CltvExpiry: row.CltvExpiry, + MaxMinerFee: row.MaxMinerFee, + MaxSwapFee: row.MaxSwapFee, + InitiationHeight: row.InitiationHeight, + ProtocolVersion: row.ProtocolVersion, + Label: row.Label, + DestAddress: row.DestAddress, + SwapInvoice: row.SwapInvoice, + MaxSwapRoutingFee: row.MaxSwapRoutingFee, + SweepConfTarget: row.SweepConfTarget, + HtlcConfirmations: row.HtlcConfirmations, + OutgoingChanSet: row.OutgoingChanSet, + PrepayInvoice: row.PrepayInvoice, + MaxPrepayRoutingFee: row.MaxPrepayRoutingFee, + PublicationDeadline: row.PublicationDeadline, + SingleSweep: row.SingleSweep, + SenderScriptPubkey: row.SenderScriptPubkey, + ReceiverScriptPubkey: row.ReceiverScriptPubkey, + SenderInternalPubkey: row.SenderInternalPubkey, + ReceiverInternalPubkey: row.ReceiverInternalPubkey, + ClientKeyFamily: row.ClientKeyFamily, + ClientKeyIndex: row.ClientKeyIndex, + }, updates, + ) + + return sweep, err +} + +// sweepToUpsertArgs converts a Sweep struct to the arguments needed to insert. +func sweepToUpsertArgs(sweep dbSweep) sqlc.UpsertSweepParams { + return sqlc.UpsertSweepParams{ + SwapHash: sweep.SwapHash[:], + BatchID: sweep.BatchID, + OutpointTxid: sweep.Outpoint.Hash[:], + OutpointIndex: int32(sweep.Outpoint.Index), + Amt: int64(sweep.Amount), + Completed: sweep.Completed, + } +} diff --git a/sweepbatcher/store_mock.go b/sweepbatcher/store_mock.go new file mode 100644 index 0000000..fef88c7 --- /dev/null +++ b/sweepbatcher/store_mock.go @@ -0,0 +1,125 @@ +package sweepbatcher + +import ( + "context" + "errors" + "sort" + + "github.com/lightningnetwork/lnd/lntypes" +) + +// StoreMock implements a mock client swap store. +type StoreMock struct { + batches map[int32]dbBatch + sweeps map[lntypes.Hash]dbSweep +} + +// NewStoreMock instantiates a new mock store. +func NewStoreMock() *StoreMock { + return &StoreMock{ + batches: make(map[int32]dbBatch), + sweeps: make(map[lntypes.Hash]dbSweep), + } +} + +// FetchUnconfirmedBatches fetches all the loop out sweep batches from the +// database that are not in a confirmed state. +func (s *StoreMock) FetchUnconfirmedSweepBatches(ctx context.Context) ( + []*dbBatch, error) { + + result := []*dbBatch{} + for _, batch := range s.batches { + batch := batch + if batch.State != "confirmed" { + result = append(result, &batch) + } + } + + return result, nil +} + +// InsertSweepBatch inserts a batch into the database, returning the id of the +// inserted batch. +func (s *StoreMock) InsertSweepBatch(ctx context.Context, + batch *dbBatch) (int32, error) { + + var id int32 + + if len(s.batches) == 0 { + id = 0 + } else { + id = int32(len(s.batches)) + } + + s.batches[id] = *batch + return id, nil +} + +// UpdateSweepBatch updates a batch in the database. +func (s *StoreMock) UpdateSweepBatch(ctx context.Context, + batch *dbBatch) error { + + s.batches[batch.ID] = *batch + return nil +} + +// ConfirmBatch confirms a batch. +func (s *StoreMock) ConfirmBatch(ctx context.Context, id int32) error { + batch, ok := s.batches[id] + if !ok { + return errors.New("batch not found") + } + + batch.State = "confirmed" + s.batches[batch.ID] = batch + + return nil +} + +// FetchBatchSweeps fetches all the sweeps that belong to a batch. +func (s *StoreMock) FetchBatchSweeps(ctx context.Context, + id int32) ([]*dbSweep, error) { + + result := []*dbSweep{} + for _, sweep := range s.sweeps { + sweep := sweep + if sweep.BatchID == id { + result = append(result, &sweep) + } + } + + sort.Slice(result, func(i, j int) bool { + return result[i].ID < result[j].ID + }) + + return result, nil +} + +// UpsertSweep inserts a sweep into the database, or updates an existing sweep. +func (s *StoreMock) UpsertSweep(ctx context.Context, sweep *dbSweep) error { + s.sweeps[sweep.SwapHash] = *sweep + return nil +} + +// GetSweepStatus returns the status of a sweep. +func (s *StoreMock) GetSweepStatus(ctx context.Context, + swapHash lntypes.Hash) (bool, error) { + + sweep, ok := s.sweeps[swapHash] + if !ok { + return false, nil + } + + return sweep.Completed, nil +} + +// Close closes the store. +func (s *StoreMock) Close() error { + return nil +} + +// AssertSweepStored asserts that a sweep is stored. +func (s *StoreMock) AssertSweepStored(id lntypes.Hash) bool { + _, ok := s.sweeps[id] + return ok +}