mirror of https://github.com/lightninglabs/loop
loopdb+sweepbatcher: add sweepbatcher store
parent
e9d374a341
commit
99608ad515
@ -0,0 +1,317 @@
|
||||
// Code generated by sqlc. DO NOT EDIT.
|
||||
// versions:
|
||||
// sqlc v1.17.2
|
||||
// source: batch.sql
|
||||
|
||||
package sqlc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"time"
|
||||
)
|
||||
|
||||
const confirmBatch = `-- name: ConfirmBatch :exec
|
||||
UPDATE
|
||||
sweep_batches
|
||||
SET
|
||||
confirmed = TRUE
|
||||
WHERE
|
||||
id = $1
|
||||
`
|
||||
|
||||
func (q *Queries) ConfirmBatch(ctx context.Context, id int32) error {
|
||||
_, err := q.db.ExecContext(ctx, confirmBatch, id)
|
||||
return err
|
||||
}
|
||||
|
||||
const getBatchSweeps = `-- name: GetBatchSweeps :many
|
||||
SELECT
|
||||
sweeps.id, sweeps.swap_hash, sweeps.batch_id, sweeps.outpoint_txid, sweeps.outpoint_index, sweeps.amt, sweeps.completed,
|
||||
swaps.id, swaps.swap_hash, swaps.preimage, swaps.initiation_time, swaps.amount_requested, swaps.cltv_expiry, swaps.max_miner_fee, swaps.max_swap_fee, swaps.initiation_height, swaps.protocol_version, swaps.label,
|
||||
loopout_swaps.swap_hash, loopout_swaps.dest_address, loopout_swaps.swap_invoice, loopout_swaps.max_swap_routing_fee, loopout_swaps.sweep_conf_target, loopout_swaps.htlc_confirmations, loopout_swaps.outgoing_chan_set, loopout_swaps.prepay_invoice, loopout_swaps.max_prepay_routing_fee, loopout_swaps.publication_deadline, loopout_swaps.single_sweep,
|
||||
htlc_keys.swap_hash, htlc_keys.sender_script_pubkey, htlc_keys.receiver_script_pubkey, htlc_keys.sender_internal_pubkey, htlc_keys.receiver_internal_pubkey, htlc_keys.client_key_family, htlc_keys.client_key_index
|
||||
FROM
|
||||
sweeps
|
||||
JOIN
|
||||
swaps ON sweeps.swap_hash = swaps.swap_hash
|
||||
JOIN
|
||||
loopout_swaps ON sweeps.swap_hash = loopout_swaps.swap_hash
|
||||
JOIN
|
||||
htlc_keys ON sweeps.swap_hash = htlc_keys.swap_hash
|
||||
WHERE
|
||||
sweeps.batch_id = $1
|
||||
ORDER BY
|
||||
sweeps.id ASC
|
||||
`
|
||||
|
||||
type GetBatchSweepsRow struct {
|
||||
ID int32
|
||||
SwapHash []byte
|
||||
BatchID int32
|
||||
OutpointTxid []byte
|
||||
OutpointIndex int32
|
||||
Amt int64
|
||||
Completed bool
|
||||
ID_2 int32
|
||||
SwapHash_2 []byte
|
||||
Preimage []byte
|
||||
InitiationTime time.Time
|
||||
AmountRequested int64
|
||||
CltvExpiry int32
|
||||
MaxMinerFee int64
|
||||
MaxSwapFee int64
|
||||
InitiationHeight int32
|
||||
ProtocolVersion int32
|
||||
Label string
|
||||
SwapHash_3 []byte
|
||||
DestAddress string
|
||||
SwapInvoice string
|
||||
MaxSwapRoutingFee int64
|
||||
SweepConfTarget int32
|
||||
HtlcConfirmations int32
|
||||
OutgoingChanSet string
|
||||
PrepayInvoice string
|
||||
MaxPrepayRoutingFee int64
|
||||
PublicationDeadline time.Time
|
||||
SingleSweep bool
|
||||
SwapHash_4 []byte
|
||||
SenderScriptPubkey []byte
|
||||
ReceiverScriptPubkey []byte
|
||||
SenderInternalPubkey []byte
|
||||
ReceiverInternalPubkey []byte
|
||||
ClientKeyFamily int32
|
||||
ClientKeyIndex int32
|
||||
}
|
||||
|
||||
func (q *Queries) GetBatchSweeps(ctx context.Context, batchID int32) ([]GetBatchSweepsRow, error) {
|
||||
rows, err := q.db.QueryContext(ctx, getBatchSweeps, batchID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
var items []GetBatchSweepsRow
|
||||
for rows.Next() {
|
||||
var i GetBatchSweepsRow
|
||||
if err := rows.Scan(
|
||||
&i.ID,
|
||||
&i.SwapHash,
|
||||
&i.BatchID,
|
||||
&i.OutpointTxid,
|
||||
&i.OutpointIndex,
|
||||
&i.Amt,
|
||||
&i.Completed,
|
||||
&i.ID_2,
|
||||
&i.SwapHash_2,
|
||||
&i.Preimage,
|
||||
&i.InitiationTime,
|
||||
&i.AmountRequested,
|
||||
&i.CltvExpiry,
|
||||
&i.MaxMinerFee,
|
||||
&i.MaxSwapFee,
|
||||
&i.InitiationHeight,
|
||||
&i.ProtocolVersion,
|
||||
&i.Label,
|
||||
&i.SwapHash_3,
|
||||
&i.DestAddress,
|
||||
&i.SwapInvoice,
|
||||
&i.MaxSwapRoutingFee,
|
||||
&i.SweepConfTarget,
|
||||
&i.HtlcConfirmations,
|
||||
&i.OutgoingChanSet,
|
||||
&i.PrepayInvoice,
|
||||
&i.MaxPrepayRoutingFee,
|
||||
&i.PublicationDeadline,
|
||||
&i.SingleSweep,
|
||||
&i.SwapHash_4,
|
||||
&i.SenderScriptPubkey,
|
||||
&i.ReceiverScriptPubkey,
|
||||
&i.SenderInternalPubkey,
|
||||
&i.ReceiverInternalPubkey,
|
||||
&i.ClientKeyFamily,
|
||||
&i.ClientKeyIndex,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
items = append(items, i)
|
||||
}
|
||||
if err := rows.Close(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return items, nil
|
||||
}
|
||||
|
||||
const getSweepStatus = `-- name: GetSweepStatus :one
|
||||
SELECT
|
||||
COALESCE(s.completed, f.false_value) AS completed
|
||||
FROM
|
||||
(SELECT false AS false_value) AS f
|
||||
LEFT JOIN
|
||||
sweeps s ON s.swap_hash = $1
|
||||
`
|
||||
|
||||
func (q *Queries) GetSweepStatus(ctx context.Context, swapHash []byte) (bool, error) {
|
||||
row := q.db.QueryRowContext(ctx, getSweepStatus, swapHash)
|
||||
var completed bool
|
||||
err := row.Scan(&completed)
|
||||
return completed, err
|
||||
}
|
||||
|
||||
const getUnconfirmedBatches = `-- name: GetUnconfirmedBatches :many
|
||||
SELECT
|
||||
id, confirmed, batch_tx_id, batch_pk_script, last_rbf_height, last_rbf_sat_per_kw, max_timeout_distance
|
||||
FROM
|
||||
sweep_batches
|
||||
WHERE
|
||||
confirmed = FALSE
|
||||
`
|
||||
|
||||
func (q *Queries) GetUnconfirmedBatches(ctx context.Context) ([]SweepBatch, error) {
|
||||
rows, err := q.db.QueryContext(ctx, getUnconfirmedBatches)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
var items []SweepBatch
|
||||
for rows.Next() {
|
||||
var i SweepBatch
|
||||
if err := rows.Scan(
|
||||
&i.ID,
|
||||
&i.Confirmed,
|
||||
&i.BatchTxID,
|
||||
&i.BatchPkScript,
|
||||
&i.LastRbfHeight,
|
||||
&i.LastRbfSatPerKw,
|
||||
&i.MaxTimeoutDistance,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
items = append(items, i)
|
||||
}
|
||||
if err := rows.Close(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return items, nil
|
||||
}
|
||||
|
||||
const insertBatch = `-- name: InsertBatch :one
|
||||
INSERT INTO sweep_batches (
|
||||
confirmed,
|
||||
batch_tx_id,
|
||||
batch_pk_script,
|
||||
last_rbf_height,
|
||||
last_rbf_sat_per_kw,
|
||||
max_timeout_distance
|
||||
) VALUES (
|
||||
$1,
|
||||
$2,
|
||||
$3,
|
||||
$4,
|
||||
$5,
|
||||
$6
|
||||
) RETURNING id
|
||||
`
|
||||
|
||||
type InsertBatchParams struct {
|
||||
Confirmed bool
|
||||
BatchTxID sql.NullString
|
||||
BatchPkScript []byte
|
||||
LastRbfHeight sql.NullInt32
|
||||
LastRbfSatPerKw sql.NullInt32
|
||||
MaxTimeoutDistance int32
|
||||
}
|
||||
|
||||
func (q *Queries) InsertBatch(ctx context.Context, arg InsertBatchParams) (int32, error) {
|
||||
row := q.db.QueryRowContext(ctx, insertBatch,
|
||||
arg.Confirmed,
|
||||
arg.BatchTxID,
|
||||
arg.BatchPkScript,
|
||||
arg.LastRbfHeight,
|
||||
arg.LastRbfSatPerKw,
|
||||
arg.MaxTimeoutDistance,
|
||||
)
|
||||
var id int32
|
||||
err := row.Scan(&id)
|
||||
return id, err
|
||||
}
|
||||
|
||||
const updateBatch = `-- name: UpdateBatch :exec
|
||||
UPDATE sweep_batches SET
|
||||
confirmed = $2,
|
||||
batch_tx_id = $3,
|
||||
batch_pk_script = $4,
|
||||
last_rbf_height = $5,
|
||||
last_rbf_sat_per_kw = $6
|
||||
WHERE id = $1
|
||||
`
|
||||
|
||||
type UpdateBatchParams struct {
|
||||
ID int32
|
||||
Confirmed bool
|
||||
BatchTxID sql.NullString
|
||||
BatchPkScript []byte
|
||||
LastRbfHeight sql.NullInt32
|
||||
LastRbfSatPerKw sql.NullInt32
|
||||
}
|
||||
|
||||
func (q *Queries) UpdateBatch(ctx context.Context, arg UpdateBatchParams) error {
|
||||
_, err := q.db.ExecContext(ctx, updateBatch,
|
||||
arg.ID,
|
||||
arg.Confirmed,
|
||||
arg.BatchTxID,
|
||||
arg.BatchPkScript,
|
||||
arg.LastRbfHeight,
|
||||
arg.LastRbfSatPerKw,
|
||||
)
|
||||
return err
|
||||
}
|
||||
|
||||
const upsertSweep = `-- name: UpsertSweep :exec
|
||||
INSERT INTO sweeps (
|
||||
swap_hash,
|
||||
batch_id,
|
||||
outpoint_txid,
|
||||
outpoint_index,
|
||||
amt,
|
||||
completed
|
||||
) VALUES (
|
||||
$1,
|
||||
$2,
|
||||
$3,
|
||||
$4,
|
||||
$5,
|
||||
$6
|
||||
) ON CONFLICT (swap_hash) DO UPDATE SET
|
||||
batch_id = $2,
|
||||
outpoint_txid = $3,
|
||||
outpoint_index = $4,
|
||||
amt = $5,
|
||||
completed = $6
|
||||
`
|
||||
|
||||
type UpsertSweepParams struct {
|
||||
SwapHash []byte
|
||||
BatchID int32
|
||||
OutpointTxid []byte
|
||||
OutpointIndex int32
|
||||
Amt int64
|
||||
Completed bool
|
||||
}
|
||||
|
||||
func (q *Queries) UpsertSweep(ctx context.Context, arg UpsertSweepParams) error {
|
||||
_, err := q.db.ExecContext(ctx, upsertSweep,
|
||||
arg.SwapHash,
|
||||
arg.BatchID,
|
||||
arg.OutpointTxid,
|
||||
arg.OutpointIndex,
|
||||
arg.Amt,
|
||||
arg.Completed,
|
||||
)
|
||||
return err
|
||||
}
|
@ -0,0 +1 @@
|
||||
ALTER TABLE loopout_swaps DROP COLUMN single_sweep;
|
@ -0,0 +1,4 @@
|
||||
-- is_external_addr indicates whether the destination address of the swap is not
|
||||
-- a wallet address. The default value used is TRUE in order to maintain the old
|
||||
-- behavior of swaps which doesn't override the destination address.
|
||||
ALTER TABLE loopout_swaps ADD single_sweep BOOLEAN NOT NULL DEFAULT TRUE;
|
@ -0,0 +1,2 @@
|
||||
DROP TABLE sweep_batches;
|
||||
DROP TABLE sweeps;
|
@ -0,0 +1,58 @@
|
||||
-- sweep_batches stores the on-going swaps that are batched together.
|
||||
CREATE TABLE sweep_batches (
|
||||
-- id is the autoincrementing primary key of the batch.
|
||||
id INTEGER PRIMARY KEY,
|
||||
|
||||
-- confirmed indicates whether this batch is confirmed.
|
||||
confirmed BOOLEAN NOT NULL DEFAULT FALSE,
|
||||
|
||||
-- batch_tx_id is the transaction id of the batch transaction.
|
||||
batch_tx_id TEXT,
|
||||
|
||||
-- batch_pk_script is the pkscript of the batch transaction's output.
|
||||
batch_pk_script BLOB,
|
||||
|
||||
-- last_rbf_height was the last height at which we attempted to publish
|
||||
-- an rbf replacement transaction.
|
||||
last_rbf_height INTEGER,
|
||||
|
||||
-- last_rbf_sat_per_kw was the last sat per kw fee rate we used for the
|
||||
-- last published transaction.
|
||||
last_rbf_sat_per_kw INTEGER,
|
||||
|
||||
-- max_timeout_distance is the maximum distance the timeouts of the
|
||||
-- sweeps can have in the batch.
|
||||
max_timeout_distance INTEGER NOT NULL
|
||||
);
|
||||
|
||||
-- sweeps stores the individual sweeps that are part of a batch.
|
||||
CREATE TABLE sweeps (
|
||||
-- id is the autoincrementing primary key.
|
||||
id INTEGER PRIMARY KEY,
|
||||
|
||||
-- swap_hash is the hash of the swap that is being swept.
|
||||
swap_hash BLOB NOT NULL UNIQUE,
|
||||
|
||||
-- batch_id is the id of the batch this swap is part of.
|
||||
batch_id INTEGER NOT NULL,
|
||||
|
||||
-- outpoint_txid is the transaction id of the output being swept.
|
||||
outpoint_txid BLOB NOT NULL,
|
||||
|
||||
-- outpoint_index is the index of the output being swept.
|
||||
outpoint_index INTEGER NOT NULL,
|
||||
|
||||
-- amt is the amount of the output being swept.
|
||||
amt BIGINT NOT NULL,
|
||||
|
||||
-- completed indicates whether the sweep has been completed.
|
||||
completed BOOLEAN NOT NULL DEFAULT FALSE,
|
||||
|
||||
-- Foreign key constraint to ensure that we reference an existing batch
|
||||
-- id.
|
||||
FOREIGN KEY (batch_id) REFERENCES sweep_batches(id),
|
||||
|
||||
-- Foreign key constraint to ensure that swap_hash references an
|
||||
-- existing swap.
|
||||
FOREIGN KEY (swap_hash) REFERENCES swaps(swap_hash)
|
||||
);
|
@ -0,0 +1,91 @@
|
||||
-- name: GetUnconfirmedBatches :many
|
||||
SELECT
|
||||
*
|
||||
FROM
|
||||
sweep_batches
|
||||
WHERE
|
||||
confirmed = FALSE;
|
||||
|
||||
-- name: InsertBatch :one
|
||||
INSERT INTO sweep_batches (
|
||||
confirmed,
|
||||
batch_tx_id,
|
||||
batch_pk_script,
|
||||
last_rbf_height,
|
||||
last_rbf_sat_per_kw,
|
||||
max_timeout_distance
|
||||
) VALUES (
|
||||
$1,
|
||||
$2,
|
||||
$3,
|
||||
$4,
|
||||
$5,
|
||||
$6
|
||||
) RETURNING id;
|
||||
|
||||
-- name: UpdateBatch :exec
|
||||
UPDATE sweep_batches SET
|
||||
confirmed = $2,
|
||||
batch_tx_id = $3,
|
||||
batch_pk_script = $4,
|
||||
last_rbf_height = $5,
|
||||
last_rbf_sat_per_kw = $6
|
||||
WHERE id = $1;
|
||||
|
||||
-- name: ConfirmBatch :exec
|
||||
UPDATE
|
||||
sweep_batches
|
||||
SET
|
||||
confirmed = TRUE
|
||||
WHERE
|
||||
id = $1;
|
||||
|
||||
-- name: UpsertSweep :exec
|
||||
INSERT INTO sweeps (
|
||||
swap_hash,
|
||||
batch_id,
|
||||
outpoint_txid,
|
||||
outpoint_index,
|
||||
amt,
|
||||
completed
|
||||
) VALUES (
|
||||
$1,
|
||||
$2,
|
||||
$3,
|
||||
$4,
|
||||
$5,
|
||||
$6
|
||||
) ON CONFLICT (swap_hash) DO UPDATE SET
|
||||
batch_id = $2,
|
||||
outpoint_txid = $3,
|
||||
outpoint_index = $4,
|
||||
amt = $5,
|
||||
completed = $6;
|
||||
|
||||
|
||||
-- name: GetBatchSweeps :many
|
||||
SELECT
|
||||
sweeps.*,
|
||||
swaps.*,
|
||||
loopout_swaps.*,
|
||||
htlc_keys.*
|
||||
FROM
|
||||
sweeps
|
||||
JOIN
|
||||
swaps ON sweeps.swap_hash = swaps.swap_hash
|
||||
JOIN
|
||||
loopout_swaps ON sweeps.swap_hash = loopout_swaps.swap_hash
|
||||
JOIN
|
||||
htlc_keys ON sweeps.swap_hash = htlc_keys.swap_hash
|
||||
WHERE
|
||||
sweeps.batch_id = $1
|
||||
ORDER BY
|
||||
sweeps.id ASC;
|
||||
|
||||
-- name: GetSweepStatus :one
|
||||
SELECT
|
||||
COALESCE(s.completed, f.false_value) AS completed
|
||||
FROM
|
||||
(SELECT false AS false_value) AS f
|
||||
LEFT JOIN
|
||||
sweeps s ON s.swap_hash = $1;
|
@ -0,0 +1,371 @@
|
||||
package sweepbatcher
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
|
||||
"github.com/btcsuite/btcd/btcutil"
|
||||
"github.com/btcsuite/btcd/chaincfg"
|
||||
"github.com/btcsuite/btcd/chaincfg/chainhash"
|
||||
"github.com/btcsuite/btcd/wire"
|
||||
"github.com/lightninglabs/loop/loopdb"
|
||||
"github.com/lightninglabs/loop/loopdb/sqlc"
|
||||
"github.com/lightningnetwork/lnd/clock"
|
||||
"github.com/lightningnetwork/lnd/lntypes"
|
||||
)
|
||||
|
||||
type BaseDB interface {
|
||||
// ConfirmBatch confirms a batch by setting the state to confirmed.
|
||||
ConfirmBatch(ctx context.Context, id int32) error
|
||||
|
||||
// GetBatchSweeps fetches all the sweeps that are part a batch.
|
||||
GetBatchSweeps(ctx context.Context, batchID int32) (
|
||||
[]sqlc.GetBatchSweepsRow, error)
|
||||
|
||||
// GetSweepStatus returns true if the sweep has been completed.
|
||||
GetSweepStatus(ctx context.Context, swapHash []byte) (bool, error)
|
||||
|
||||
// GetSwapUpdates fetches all the updates for a swap.
|
||||
GetSwapUpdates(ctx context.Context, swapHash []byte) (
|
||||
[]sqlc.SwapUpdate, error)
|
||||
|
||||
// FetchUnconfirmedSweepBatches fetches all the batches from the
|
||||
// database that are not in a confirmed state.
|
||||
GetUnconfirmedBatches(ctx context.Context) ([]sqlc.SweepBatch, error)
|
||||
|
||||
// InsertBatch inserts a batch into the database, returning the id of
|
||||
// the inserted batch.
|
||||
InsertBatch(ctx context.Context, arg sqlc.InsertBatchParams) (
|
||||
int32, error)
|
||||
|
||||
// UpdateBatch updates a batch in the database.
|
||||
UpdateBatch(ctx context.Context, arg sqlc.UpdateBatchParams) error
|
||||
|
||||
// UpsertSweep inserts a sweep into the database, or updates an existing
|
||||
// sweep if it already exists.
|
||||
UpsertSweep(ctx context.Context, arg sqlc.UpsertSweepParams) error
|
||||
|
||||
// ExecTx allows for executing a function in the context of a database
|
||||
// transaction.
|
||||
ExecTx(ctx context.Context, txOptions loopdb.TxOptions,
|
||||
txBody func(*sqlc.Queries) error) error
|
||||
}
|
||||
|
||||
// SQLStore manages the reservations in the database.
|
||||
type SQLStore struct {
|
||||
baseDb BaseDB
|
||||
|
||||
network *chaincfg.Params
|
||||
clock clock.Clock
|
||||
}
|
||||
|
||||
// NewSQLStore creates a new SQLStore.
|
||||
func NewSQLStore(db BaseDB, network *chaincfg.Params) *SQLStore {
|
||||
return &SQLStore{
|
||||
baseDb: db,
|
||||
network: network,
|
||||
clock: clock.NewDefaultClock(),
|
||||
}
|
||||
}
|
||||
|
||||
// FetchUnconfirmedSweepBatches fetches all the batches from the database that
|
||||
// are not in a confirmed state.
|
||||
func (s *SQLStore) FetchUnconfirmedSweepBatches(ctx context.Context) ([]*dbBatch,
|
||||
error) {
|
||||
|
||||
var batches []*dbBatch
|
||||
|
||||
dbBatches, err := s.baseDb.GetUnconfirmedBatches(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, dbBatch := range dbBatches {
|
||||
batch := convertBatchRow(dbBatch)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
batches = append(batches, batch)
|
||||
}
|
||||
|
||||
return batches, err
|
||||
}
|
||||
|
||||
// InsertSweepBatch inserts a batch into the database, returning the id of the
|
||||
// inserted batch.
|
||||
func (s *SQLStore) InsertSweepBatch(ctx context.Context, batch *dbBatch) (int32,
|
||||
error) {
|
||||
|
||||
return s.baseDb.InsertBatch(ctx, batchToInsertArgs(*batch))
|
||||
}
|
||||
|
||||
// UpdateSweepBatch updates a batch in the database.
|
||||
func (s *SQLStore) UpdateSweepBatch(ctx context.Context, batch *dbBatch) error {
|
||||
return s.baseDb.UpdateBatch(ctx, batchToUpdateArgs(*batch))
|
||||
}
|
||||
|
||||
// ConfirmBatch confirms a batch by setting the state to confirmed.
|
||||
func (s *SQLStore) ConfirmBatch(ctx context.Context, id int32) error {
|
||||
return s.baseDb.ConfirmBatch(ctx, id)
|
||||
}
|
||||
|
||||
// FetchBatchSweeps fetches all the sweeps that are part a batch.
|
||||
func (s *SQLStore) FetchBatchSweeps(ctx context.Context, id int32) (
|
||||
[]*dbSweep, error) {
|
||||
|
||||
readOpts := loopdb.NewSqlReadOpts()
|
||||
var sweeps []*dbSweep
|
||||
|
||||
err := s.baseDb.ExecTx(ctx, readOpts, func(tx *sqlc.Queries) error {
|
||||
dbSweeps, err := tx.GetBatchSweeps(ctx, id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, dbSweep := range dbSweeps {
|
||||
updates, err := s.baseDb.GetSwapUpdates(
|
||||
ctx, dbSweep.SwapHash,
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
sweep, err := s.convertSweepRow(dbSweep, updates)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
sweeps = append(sweeps, &sweep)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return sweeps, nil
|
||||
}
|
||||
|
||||
// UpsertSweep inserts a sweep into the database, or updates an existing sweep
|
||||
// if it already exists.
|
||||
func (s *SQLStore) UpsertSweep(ctx context.Context, sweep *dbSweep) error {
|
||||
return s.baseDb.UpsertSweep(ctx, sweepToUpsertArgs(*sweep))
|
||||
}
|
||||
|
||||
// GetSweepStatus returns true if the sweep has been completed.
|
||||
func (s *SQLStore) GetSweepStatus(ctx context.Context, swapHash lntypes.Hash) (
|
||||
bool, error) {
|
||||
|
||||
return s.baseDb.GetSweepStatus(ctx, swapHash[:])
|
||||
}
|
||||
|
||||
type dbBatch struct {
|
||||
// ID is the unique identifier of the batch.
|
||||
ID int32
|
||||
|
||||
// State is the current state of the batch.
|
||||
State string
|
||||
|
||||
// BatchTxid is the txid of the batch transaction.
|
||||
BatchTxid chainhash.Hash
|
||||
|
||||
// BatchPkScript is the pkscript of the batch transaction.
|
||||
BatchPkScript []byte
|
||||
|
||||
// LastRbfHeight is the height at which the last RBF attempt was made.
|
||||
LastRbfHeight int32
|
||||
|
||||
// LastRbfSatPerKw is the sat per kw of the last RBF attempt.
|
||||
LastRbfSatPerKw int32
|
||||
|
||||
// MaxTimeoutDistance is the maximum timeout distance of the batch.
|
||||
MaxTimeoutDistance int32
|
||||
}
|
||||
|
||||
type dbSweep struct {
|
||||
// ID is the unique identifier of the sweep.
|
||||
ID int32
|
||||
|
||||
// BatchID is the ID of the batch that the sweep belongs to.
|
||||
BatchID int32
|
||||
|
||||
// SwapHash is the hash of the swap that the sweep belongs to.
|
||||
SwapHash lntypes.Hash
|
||||
|
||||
// Outpoint is the outpoint of the sweep.
|
||||
Outpoint wire.OutPoint
|
||||
|
||||
// Amount is the amount of the sweep.
|
||||
Amount btcutil.Amount
|
||||
|
||||
// Completed indicates whether this sweep is completed.
|
||||
Completed bool
|
||||
|
||||
// LoopOut is the loop out that the sweep belongs to.
|
||||
LoopOut *loopdb.LoopOut
|
||||
}
|
||||
|
||||
// convertBatchRow converts a batch row from db to a sweepbatcher.Batch struct.
|
||||
func convertBatchRow(row sqlc.SweepBatch) *dbBatch {
|
||||
batch := dbBatch{
|
||||
ID: row.ID,
|
||||
}
|
||||
|
||||
if row.Confirmed {
|
||||
batch.State = batchOpen
|
||||
}
|
||||
|
||||
if row.BatchTxID.Valid {
|
||||
err := chainhash.Decode(&batch.BatchTxid, row.BatchTxID.String)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
batch.BatchPkScript = row.BatchPkScript
|
||||
|
||||
if row.LastRbfHeight.Valid {
|
||||
batch.LastRbfHeight = row.LastRbfHeight.Int32
|
||||
}
|
||||
|
||||
if row.LastRbfSatPerKw.Valid {
|
||||
batch.LastRbfSatPerKw = row.LastRbfSatPerKw.Int32
|
||||
}
|
||||
|
||||
batch.MaxTimeoutDistance = row.MaxTimeoutDistance
|
||||
|
||||
return &batch
|
||||
}
|
||||
|
||||
// BatchToUpsertArgs converts a Batch struct to the arguments needed to insert
|
||||
// it into the database.
|
||||
func batchToInsertArgs(batch dbBatch) sqlc.InsertBatchParams {
|
||||
args := sqlc.InsertBatchParams{
|
||||
Confirmed: false,
|
||||
BatchTxID: sql.NullString{
|
||||
Valid: true,
|
||||
String: batch.BatchTxid.String(),
|
||||
},
|
||||
BatchPkScript: batch.BatchPkScript,
|
||||
LastRbfHeight: sql.NullInt32{
|
||||
Valid: true,
|
||||
Int32: batch.LastRbfHeight,
|
||||
},
|
||||
LastRbfSatPerKw: sql.NullInt32{
|
||||
Valid: true,
|
||||
Int32: batch.LastRbfSatPerKw,
|
||||
},
|
||||
MaxTimeoutDistance: batch.MaxTimeoutDistance,
|
||||
}
|
||||
|
||||
if batch.State == batchConfirmed {
|
||||
args.Confirmed = true
|
||||
}
|
||||
|
||||
return args
|
||||
}
|
||||
|
||||
// BatchToUpsertArgs converts a Batch struct to the arguments needed to insert
|
||||
// it into the database.
|
||||
func batchToUpdateArgs(batch dbBatch) sqlc.UpdateBatchParams {
|
||||
args := sqlc.UpdateBatchParams{
|
||||
ID: batch.ID,
|
||||
Confirmed: false,
|
||||
BatchTxID: sql.NullString{
|
||||
Valid: true,
|
||||
String: batch.BatchTxid.String(),
|
||||
},
|
||||
BatchPkScript: batch.BatchPkScript,
|
||||
LastRbfHeight: sql.NullInt32{
|
||||
Valid: true,
|
||||
Int32: batch.LastRbfHeight,
|
||||
},
|
||||
LastRbfSatPerKw: sql.NullInt32{
|
||||
Valid: true,
|
||||
Int32: batch.LastRbfSatPerKw,
|
||||
},
|
||||
}
|
||||
|
||||
if batch.State == batchConfirmed {
|
||||
args.Confirmed = true
|
||||
}
|
||||
|
||||
return args
|
||||
}
|
||||
|
||||
// convertSweepRow converts a sweep row from db to a sweep struct.
|
||||
func (s *SQLStore) convertSweepRow(row sqlc.GetBatchSweepsRow,
|
||||
updates []sqlc.SwapUpdate) (dbSweep, error) {
|
||||
|
||||
sweep := dbSweep{
|
||||
ID: row.ID,
|
||||
BatchID: row.BatchID,
|
||||
Amount: btcutil.Amount(row.Amt),
|
||||
}
|
||||
|
||||
swapHash, err := lntypes.MakeHash(row.SwapHash)
|
||||
if err != nil {
|
||||
return sweep, err
|
||||
}
|
||||
|
||||
sweep.SwapHash = swapHash
|
||||
|
||||
hash, err := chainhash.NewHash(row.OutpointTxid)
|
||||
if err != nil {
|
||||
return sweep, err
|
||||
}
|
||||
|
||||
sweep.Outpoint = wire.OutPoint{
|
||||
Hash: *hash,
|
||||
Index: uint32(row.OutpointIndex),
|
||||
}
|
||||
|
||||
sweep.LoopOut, err = loopdb.ConvertLoopOutRow(
|
||||
s.network,
|
||||
sqlc.GetLoopOutSwapRow{
|
||||
ID: row.ID,
|
||||
SwapHash: row.SwapHash,
|
||||
Preimage: row.Preimage,
|
||||
InitiationTime: row.InitiationTime,
|
||||
AmountRequested: row.AmountRequested,
|
||||
CltvExpiry: row.CltvExpiry,
|
||||
MaxMinerFee: row.MaxMinerFee,
|
||||
MaxSwapFee: row.MaxSwapFee,
|
||||
InitiationHeight: row.InitiationHeight,
|
||||
ProtocolVersion: row.ProtocolVersion,
|
||||
Label: row.Label,
|
||||
DestAddress: row.DestAddress,
|
||||
SwapInvoice: row.SwapInvoice,
|
||||
MaxSwapRoutingFee: row.MaxSwapRoutingFee,
|
||||
SweepConfTarget: row.SweepConfTarget,
|
||||
HtlcConfirmations: row.HtlcConfirmations,
|
||||
OutgoingChanSet: row.OutgoingChanSet,
|
||||
PrepayInvoice: row.PrepayInvoice,
|
||||
MaxPrepayRoutingFee: row.MaxPrepayRoutingFee,
|
||||
PublicationDeadline: row.PublicationDeadline,
|
||||
SingleSweep: row.SingleSweep,
|
||||
SenderScriptPubkey: row.SenderScriptPubkey,
|
||||
ReceiverScriptPubkey: row.ReceiverScriptPubkey,
|
||||
SenderInternalPubkey: row.SenderInternalPubkey,
|
||||
ReceiverInternalPubkey: row.ReceiverInternalPubkey,
|
||||
ClientKeyFamily: row.ClientKeyFamily,
|
||||
ClientKeyIndex: row.ClientKeyIndex,
|
||||
}, updates,
|
||||
)
|
||||
|
||||
return sweep, err
|
||||
}
|
||||
|
||||
// sweepToUpsertArgs converts a Sweep struct to the arguments needed to insert.
|
||||
func sweepToUpsertArgs(sweep dbSweep) sqlc.UpsertSweepParams {
|
||||
return sqlc.UpsertSweepParams{
|
||||
SwapHash: sweep.SwapHash[:],
|
||||
BatchID: sweep.BatchID,
|
||||
OutpointTxid: sweep.Outpoint.Hash[:],
|
||||
OutpointIndex: int32(sweep.Outpoint.Index),
|
||||
Amt: int64(sweep.Amount),
|
||||
Completed: sweep.Completed,
|
||||
}
|
||||
}
|
@ -0,0 +1,125 @@
|
||||
package sweepbatcher
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"sort"
|
||||
|
||||
"github.com/lightningnetwork/lnd/lntypes"
|
||||
)
|
||||
|
||||
// StoreMock implements a mock client swap store.
|
||||
type StoreMock struct {
|
||||
batches map[int32]dbBatch
|
||||
sweeps map[lntypes.Hash]dbSweep
|
||||
}
|
||||
|
||||
// NewStoreMock instantiates a new mock store.
|
||||
func NewStoreMock() *StoreMock {
|
||||
return &StoreMock{
|
||||
batches: make(map[int32]dbBatch),
|
||||
sweeps: make(map[lntypes.Hash]dbSweep),
|
||||
}
|
||||
}
|
||||
|
||||
// FetchUnconfirmedBatches fetches all the loop out sweep batches from the
|
||||
// database that are not in a confirmed state.
|
||||
func (s *StoreMock) FetchUnconfirmedSweepBatches(ctx context.Context) (
|
||||
[]*dbBatch, error) {
|
||||
|
||||
result := []*dbBatch{}
|
||||
for _, batch := range s.batches {
|
||||
batch := batch
|
||||
if batch.State != "confirmed" {
|
||||
result = append(result, &batch)
|
||||
}
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// InsertSweepBatch inserts a batch into the database, returning the id of the
|
||||
// inserted batch.
|
||||
func (s *StoreMock) InsertSweepBatch(ctx context.Context,
|
||||
batch *dbBatch) (int32, error) {
|
||||
|
||||
var id int32
|
||||
|
||||
if len(s.batches) == 0 {
|
||||
id = 0
|
||||
} else {
|
||||
id = int32(len(s.batches))
|
||||
}
|
||||
|
||||
s.batches[id] = *batch
|
||||
return id, nil
|
||||
}
|
||||
|
||||
// UpdateSweepBatch updates a batch in the database.
|
||||
func (s *StoreMock) UpdateSweepBatch(ctx context.Context,
|
||||
batch *dbBatch) error {
|
||||
|
||||
s.batches[batch.ID] = *batch
|
||||
return nil
|
||||
}
|
||||
|
||||
// ConfirmBatch confirms a batch.
|
||||
func (s *StoreMock) ConfirmBatch(ctx context.Context, id int32) error {
|
||||
batch, ok := s.batches[id]
|
||||
if !ok {
|
||||
return errors.New("batch not found")
|
||||
}
|
||||
|
||||
batch.State = "confirmed"
|
||||
s.batches[batch.ID] = batch
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// FetchBatchSweeps fetches all the sweeps that belong to a batch.
|
||||
func (s *StoreMock) FetchBatchSweeps(ctx context.Context,
|
||||
id int32) ([]*dbSweep, error) {
|
||||
|
||||
result := []*dbSweep{}
|
||||
for _, sweep := range s.sweeps {
|
||||
sweep := sweep
|
||||
if sweep.BatchID == id {
|
||||
result = append(result, &sweep)
|
||||
}
|
||||
}
|
||||
|
||||
sort.Slice(result, func(i, j int) bool {
|
||||
return result[i].ID < result[j].ID
|
||||
})
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// UpsertSweep inserts a sweep into the database, or updates an existing sweep.
|
||||
func (s *StoreMock) UpsertSweep(ctx context.Context, sweep *dbSweep) error {
|
||||
s.sweeps[sweep.SwapHash] = *sweep
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetSweepStatus returns the status of a sweep.
|
||||
func (s *StoreMock) GetSweepStatus(ctx context.Context,
|
||||
swapHash lntypes.Hash) (bool, error) {
|
||||
|
||||
sweep, ok := s.sweeps[swapHash]
|
||||
if !ok {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
return sweep.Completed, nil
|
||||
}
|
||||
|
||||
// Close closes the store.
|
||||
func (s *StoreMock) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// AssertSweepStored asserts that a sweep is stored.
|
||||
func (s *StoreMock) AssertSweepStored(id lntypes.Hash) bool {
|
||||
_, ok := s.sweeps[id]
|
||||
return ok
|
||||
}
|
Loading…
Reference in New Issue