loopdb: add loop in

This commit adds the required code to persist loop in swaps. It also
introduces the file loop.go to which shared code is moved.

Sharing of contract serialization/deserialization code has been
reverted. The prepay fields do not apply to loop in, but were part of
the shared contract struct. Without also adding a migration, it wouldn't
be possible to keep the shared code.

In general it is probably more flexible to keep the contract
serialization code separated between in and out swaps.
pull/34/head
Joost Jager 5 years ago
parent e81298ce3e
commit 6a0a9556a0
No known key found for this signature in database
GPG Key ID: A61B9D4C393C59C7

@ -188,6 +188,7 @@ func testResume(t *testing.T, expired, preimageRevealed, expectSuccess bool) {
SwapInvoice: swapPayReq, SwapInvoice: swapPayReq,
SweepConfTarget: 2, SweepConfTarget: 2,
MaxSwapRoutingFee: 70000, MaxSwapRoutingFee: 70000,
PrepayInvoice: prePayReq,
SwapContract: loopdb.SwapContract{ SwapContract: loopdb.SwapContract{
Preimage: preimage, Preimage: preimage,
AmountRequested: amt, AmountRequested: amt,
@ -195,16 +196,17 @@ func testResume(t *testing.T, expired, preimageRevealed, expectSuccess bool) {
ReceiverKey: receiverKey, ReceiverKey: receiverKey,
SenderKey: senderKey, SenderKey: senderKey,
MaxSwapFee: 60000, MaxSwapFee: 60000,
PrepayInvoice: prePayReq,
MaxMinerFee: 50000, MaxMinerFee: 50000,
}, },
}, },
Events: []*loopdb.LoopOutEvent{ Loop: loopdb.Loop{
{ Events: []*loopdb.LoopEvent{
State: state, {
State: state,
},
}, },
Hash: hash,
}, },
Hash: hash,
} }
if expired { if expired {

@ -20,6 +20,17 @@ type SwapStore interface {
// the various stages in its lifetime. // the various stages in its lifetime.
UpdateLoopOut(hash lntypes.Hash, time time.Time, state SwapState) error UpdateLoopOut(hash lntypes.Hash, time time.Time, state SwapState) error
// FetchLoopInSwaps returns all swaps currently in the store.
FetchLoopInSwaps() ([]*LoopIn, error)
// CreateLoopIn adds an initiated swap to the store.
CreateLoopIn(hash lntypes.Hash, swap *LoopInContract) error
// UpdateLoopIn stores a new event for a target loop in swap. This
// appends to the event log for a particular swap as it goes through
// the various stages in its lifetime.
UpdateLoopIn(hash lntypes.Hash, time time.Time, state SwapState) error
// Close closes the underlying database. // Close closes the underlying database.
Close() error Close() error
} }

@ -18,10 +18,6 @@ type SwapContract struct {
// AmountRequested is the total amount of the swap. // AmountRequested is the total amount of the swap.
AmountRequested btcutil.Amount AmountRequested btcutil.Amount
// PrepayInvoice is the invoice that the client should pay to the
// server that will be returned if the swap is complete.
PrepayInvoice string
// SenderKey is the key of the sender that will be used in the on-chain // SenderKey is the key of the sender that will be used in the on-chain
// HTLC. // HTLC.
SenderKey [33]byte SenderKey [33]byte
@ -33,10 +29,6 @@ type SwapContract struct {
// CltvExpiry is the total absolute CLTV expiry of the swap. // CltvExpiry is the total absolute CLTV expiry of the swap.
CltvExpiry int32 CltvExpiry int32
// MaxPrepayRoutingFee is the maximum off-chain fee in msat that may be
// paid for the prepayment to the server.
MaxPrepayRoutingFee btcutil.Amount
// MaxSwapFee is the maximum we are willing to pay the server for the // MaxSwapFee is the maximum we are willing to pay the server for the
// swap. // swap.
MaxSwapFee btcutil.Amount MaxSwapFee btcutil.Amount
@ -53,8 +45,14 @@ type SwapContract struct {
InitiationTime time.Time InitiationTime time.Time
} }
// LoopOutEvent contains the dynamic data of a swap. // Loop contains fields shared between LoopIn and LoopOut
type LoopOutEvent struct { type Loop struct {
Hash lntypes.Hash
Events []*LoopEvent
}
// LoopEvent contains the dynamic data of a swap.
type LoopEvent struct {
// State is the new state for this swap as a result of this event. // State is the new state for this swap as a result of this event.
State SwapState State SwapState
@ -63,7 +61,7 @@ type LoopOutEvent struct {
} }
// State returns the most recent state of this swap. // State returns the most recent state of this swap.
func (s *LoopOut) State() SwapState { func (s *Loop) State() SwapState {
lastUpdate := s.LastUpdate() lastUpdate := s.LastUpdate()
if lastUpdate == nil { if lastUpdate == nil {
return StateInitiated return StateInitiated
@ -73,7 +71,7 @@ func (s *LoopOut) State() SwapState {
} }
// LastUpdate returns the most recent update of this swap. // LastUpdate returns the most recent update of this swap.
func (s *LoopOut) LastUpdate() *LoopOutEvent { func (s *Loop) LastUpdate() *LoopEvent {
eventCount := len(s.Events) eventCount := len(s.Events)
if eventCount == 0 { if eventCount == 0 {
@ -84,7 +82,9 @@ func (s *LoopOut) LastUpdate() *LoopOutEvent {
return lastEvent return lastEvent
} }
func serializeLoopOutEvent(time time.Time, state SwapState) ( // serializeLoopEvent serializes a state update of a swap. This is used for both
// in and out swaps.
func serializeLoopEvent(time time.Time, state SwapState) (
[]byte, error) { []byte, error) {
var b bytes.Buffer var b bytes.Buffer
@ -100,8 +100,10 @@ func serializeLoopOutEvent(time time.Time, state SwapState) (
return b.Bytes(), nil return b.Bytes(), nil
} }
func deserializeLoopOutEvent(value []byte) (*LoopOutEvent, error) { // deserializeLoopEvent deserializes a state update of a swap. This is used for
update := &LoopOutEvent{} // both in and out swaps.
func deserializeLoopEvent(value []byte) (*LoopEvent, error) {
update := &LoopEvent{}
r := bytes.NewReader(value) r := bytes.NewReader(value)

@ -0,0 +1,168 @@
package loopdb
import (
"bytes"
"encoding/binary"
"fmt"
"time"
)
// LoopInContract contains the data that is serialized to persistent storage for
// pending loop in swaps.
type LoopInContract struct {
SwapContract
// SweepConfTarget specifies the targeted confirmation target for the
// client sweep tx.
HtlcConfTarget int32
// LoopInChannel is the channel to charge. If zero, any channel may
// be used.
LoopInChannel *uint64
}
// LoopIn is a combination of the contract and the updates.
type LoopIn struct {
Loop
Contract *LoopInContract
}
// LastUpdateTime returns the last update time of this swap.
func (s *LoopIn) LastUpdateTime() time.Time {
lastUpdate := s.LastUpdate()
if lastUpdate == nil {
return s.Contract.InitiationTime
}
return lastUpdate.Time
}
// serializeLoopInContract serialize the loop in contract into a byte slice.
func serializeLoopInContract(swap *LoopInContract) (
[]byte, error) {
var b bytes.Buffer
if err := binary.Write(&b, byteOrder, swap.InitiationTime.UnixNano()); err != nil {
return nil, err
}
if err := binary.Write(&b, byteOrder, swap.Preimage); err != nil {
return nil, err
}
if err := binary.Write(&b, byteOrder, swap.AmountRequested); err != nil {
return nil, err
}
n, err := b.Write(swap.SenderKey[:])
if err != nil {
return nil, err
}
if n != keyLength {
return nil, fmt.Errorf("sender key has invalid length")
}
n, err = b.Write(swap.ReceiverKey[:])
if err != nil {
return nil, err
}
if n != keyLength {
return nil, fmt.Errorf("receiver key has invalid length")
}
if err := binary.Write(&b, byteOrder, swap.CltvExpiry); err != nil {
return nil, err
}
if err := binary.Write(&b, byteOrder, swap.MaxMinerFee); err != nil {
return nil, err
}
if err := binary.Write(&b, byteOrder, swap.MaxSwapFee); err != nil {
return nil, err
}
if err := binary.Write(&b, byteOrder, swap.InitiationHeight); err != nil {
return nil, err
}
if err := binary.Write(&b, byteOrder, swap.HtlcConfTarget); err != nil {
return nil, err
}
var chargeChannel uint64
if swap.LoopInChannel != nil {
chargeChannel = *swap.LoopInChannel
}
if err := binary.Write(&b, byteOrder, chargeChannel); err != nil {
return nil, err
}
return b.Bytes(), nil
}
// deserializeLoopInContract deserializes the loop in contract from a byte slice.
func deserializeLoopInContract(value []byte) (*LoopInContract, error) {
r := bytes.NewReader(value)
contract := LoopInContract{}
var err error
var unixNano int64
if err := binary.Read(r, byteOrder, &unixNano); err != nil {
return nil, err
}
contract.InitiationTime = time.Unix(0, unixNano)
if err := binary.Read(r, byteOrder, &contract.Preimage); err != nil {
return nil, err
}
binary.Read(r, byteOrder, &contract.AmountRequested)
n, err := r.Read(contract.SenderKey[:])
if err != nil {
return nil, err
}
if n != keyLength {
return nil, fmt.Errorf("sender key has invalid length")
}
n, err = r.Read(contract.ReceiverKey[:])
if err != nil {
return nil, err
}
if n != keyLength {
return nil, fmt.Errorf("receiver key has invalid length")
}
if err := binary.Read(r, byteOrder, &contract.CltvExpiry); err != nil {
return nil, err
}
if err := binary.Read(r, byteOrder, &contract.MaxMinerFee); err != nil {
return nil, err
}
if err := binary.Read(r, byteOrder, &contract.MaxSwapFee); err != nil {
return nil, err
}
if err := binary.Read(r, byteOrder, &contract.InitiationHeight); err != nil {
return nil, err
}
if err := binary.Read(r, byteOrder, &contract.HtlcConfTarget); err != nil {
return nil, err
}
var loopInChannel uint64
if err := binary.Read(r, byteOrder, &loopInChannel); err != nil {
return nil, err
}
if loopInChannel != 0 {
contract.LoopInChannel = &loopInChannel
}
return &contract, nil
}

@ -4,13 +4,11 @@ import (
"bytes" "bytes"
"encoding/binary" "encoding/binary"
"fmt" "fmt"
"io"
"time" "time"
"github.com/btcsuite/btcd/chaincfg" "github.com/btcsuite/btcd/chaincfg"
"github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcutil" "github.com/btcsuite/btcutil"
"github.com/lightningnetwork/lnd/lntypes"
) )
// LoopOutContract contains the data that is serialized to persistent storage // LoopOutContract contains the data that is serialized to persistent storage
@ -39,20 +37,24 @@ type LoopOutContract struct {
// TargetChannel is the channel to loop out. If zero, any channel may // TargetChannel is the channel to loop out. If zero, any channel may
// be used. // be used.
UnchargeChannel *uint64 UnchargeChannel *uint64
// PrepayInvoice is the invoice that the client should pay to the
// server that will be returned if the swap is complete.
PrepayInvoice string
// MaxPrepayRoutingFee is the maximum off-chain fee in msat that may be
// paid for the prepayment to the server.
MaxPrepayRoutingFee btcutil.Amount
} }
// LoopOut is a combination of the contract and the updates. // LoopOut is a combination of the contract and the updates.
type LoopOut struct { type LoopOut struct {
// Hash is the hash that uniquely identifies this swap. Loop
Hash lntypes.Hash
// Contract is the active contract for this swap. It describes the // Contract is the active contract for this swap. It describes the
// precise details of the swap including the final fee, CLTV value, // precise details of the swap including the final fee, CLTV value,
// etc. // etc.
Contract *LoopOutContract Contract *LoopOutContract
// Events are each of the state transitions that this swap underwent.
Events []*LoopOutEvent
} }
// LastUpdateTime returns the last update time of this swap. // LastUpdateTime returns the last update time of this swap.
@ -70,104 +72,114 @@ func deserializeLoopOutContract(value []byte, chainParams *chaincfg.Params) (
r := bytes.NewReader(value) r := bytes.NewReader(value)
contract, err := deserializeContract(r) contract := LoopOutContract{}
if err != nil { var err error
var unixNano int64
if err := binary.Read(r, byteOrder, &unixNano); err != nil {
return nil, err return nil, err
} }
contract.InitiationTime = time.Unix(0, unixNano)
swap := LoopOutContract{ if err := binary.Read(r, byteOrder, &contract.Preimage); err != nil {
SwapContract: *contract, return nil, err
} }
addr, err := wire.ReadVarString(r, 0) binary.Read(r, byteOrder, &contract.AmountRequested)
contract.PrepayInvoice, err = wire.ReadVarString(r, 0)
if err != nil { if err != nil {
return nil, err return nil, err
} }
swap.DestAddr, err = btcutil.DecodeAddress(addr, chainParams)
n, err := r.Read(contract.SenderKey[:])
if err != nil { if err != nil {
return nil, err return nil, err
} }
if n != keyLength {
return nil, fmt.Errorf("sender key has invalid length")
}
swap.SwapInvoice, err = wire.ReadVarString(r, 0) n, err = r.Read(contract.ReceiverKey[:])
if err != nil { if err != nil {
return nil, err return nil, err
} }
if n != keyLength {
return nil, fmt.Errorf("receiver key has invalid length")
}
if err := binary.Read(r, byteOrder, &swap.SweepConfTarget); err != nil { if err := binary.Read(r, byteOrder, &contract.CltvExpiry); err != nil {
return nil, err
}
if err := binary.Read(r, byteOrder, &contract.MaxMinerFee); err != nil {
return nil, err return nil, err
} }
if err := binary.Read(r, byteOrder, &swap.MaxSwapRoutingFee); err != nil { if err := binary.Read(r, byteOrder, &contract.MaxSwapFee); err != nil {
return nil, err return nil, err
} }
var unchargeChannel uint64 if err := binary.Read(r, byteOrder, &contract.MaxPrepayRoutingFee); err != nil {
if err := binary.Read(r, byteOrder, &unchargeChannel); err != nil {
return nil, err return nil, err
} }
if unchargeChannel != 0 { if err := binary.Read(r, byteOrder, &contract.InitiationHeight); err != nil {
swap.UnchargeChannel = &unchargeChannel return nil, err
} }
return &swap, nil addr, err := wire.ReadVarString(r, 0)
} if err != nil {
return nil, err
func serializeLoopOutContract(swap *LoopOutContract) ( }
[]byte, error) { contract.DestAddr, err = btcutil.DecodeAddress(addr, chainParams)
if err != nil {
var b bytes.Buffer
serializeContract(&swap.SwapContract, &b)
addr := swap.DestAddr.String()
if err := wire.WriteVarString(&b, 0, addr); err != nil {
return nil, err return nil, err
} }
if err := wire.WriteVarString(&b, 0, swap.SwapInvoice); err != nil { contract.SwapInvoice, err = wire.ReadVarString(r, 0)
if err != nil {
return nil, err return nil, err
} }
if err := binary.Write(&b, byteOrder, swap.SweepConfTarget); err != nil { if err := binary.Read(r, byteOrder, &contract.SweepConfTarget); err != nil {
return nil, err return nil, err
} }
if err := binary.Write(&b, byteOrder, swap.MaxSwapRoutingFee); err != nil { if err := binary.Read(r, byteOrder, &contract.MaxSwapRoutingFee); err != nil {
return nil, err return nil, err
} }
var unchargeChannel uint64 var unchargeChannel uint64
if swap.UnchargeChannel != nil { if err := binary.Read(r, byteOrder, &unchargeChannel); err != nil {
unchargeChannel = *swap.UnchargeChannel
}
if err := binary.Write(&b, byteOrder, unchargeChannel); err != nil {
return nil, err return nil, err
} }
if unchargeChannel != 0 {
contract.UnchargeChannel = &unchargeChannel
}
return b.Bytes(), nil return &contract, nil
} }
func deserializeContract(r io.Reader) (*SwapContract, error) { func serializeLoopOutContract(swap *LoopOutContract) (
swap := SwapContract{} []byte, error) {
var err error
var unixNano int64 var b bytes.Buffer
if err := binary.Read(r, byteOrder, &unixNano); err != nil {
if err := binary.Write(&b, byteOrder, swap.InitiationTime.UnixNano()); err != nil {
return nil, err return nil, err
} }
swap.InitiationTime = time.Unix(0, unixNano)
if err := binary.Read(r, byteOrder, &swap.Preimage); err != nil { if err := binary.Write(&b, byteOrder, swap.Preimage); err != nil {
return nil, err return nil, err
} }
binary.Read(r, byteOrder, &swap.AmountRequested) if err := binary.Write(&b, byteOrder, swap.AmountRequested); err != nil {
return nil, err
}
swap.PrepayInvoice, err = wire.ReadVarString(r, 0) if err := wire.WriteVarString(&b, 0, swap.PrepayInvoice); err != nil {
if err != nil {
return nil, err return nil, err
} }
n, err := r.Read(swap.SenderKey[:]) n, err := b.Write(swap.SenderKey[:])
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -175,7 +187,7 @@ func deserializeContract(r io.Reader) (*SwapContract, error) {
return nil, fmt.Errorf("sender key has invalid length") return nil, fmt.Errorf("sender key has invalid length")
} }
n, err = r.Read(swap.ReceiverKey[:]) n, err = b.Write(swap.ReceiverKey[:])
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -183,79 +195,50 @@ func deserializeContract(r io.Reader) (*SwapContract, error) {
return nil, fmt.Errorf("receiver key has invalid length") return nil, fmt.Errorf("receiver key has invalid length")
} }
if err := binary.Read(r, byteOrder, &swap.CltvExpiry); err != nil { if err := binary.Write(&b, byteOrder, swap.CltvExpiry); err != nil {
return nil, err
}
if err := binary.Read(r, byteOrder, &swap.MaxMinerFee); err != nil {
return nil, err return nil, err
} }
if err := binary.Read(r, byteOrder, &swap.MaxSwapFee); err != nil { if err := binary.Write(&b, byteOrder, swap.MaxMinerFee); err != nil {
return nil, err return nil, err
} }
if err := binary.Read(r, byteOrder, &swap.MaxPrepayRoutingFee); err != nil { if err := binary.Write(&b, byteOrder, swap.MaxSwapFee); err != nil {
return nil, err
}
if err := binary.Read(r, byteOrder, &swap.InitiationHeight); err != nil {
return nil, err return nil, err
} }
return &swap, nil if err := binary.Write(&b, byteOrder, swap.MaxPrepayRoutingFee); err != nil {
} return nil, err
func serializeContract(swap *SwapContract, b *bytes.Buffer) error {
if err := binary.Write(b, byteOrder, swap.InitiationTime.UnixNano()); err != nil {
return err
}
if err := binary.Write(b, byteOrder, swap.Preimage); err != nil {
return err
}
if err := binary.Write(b, byteOrder, swap.AmountRequested); err != nil {
return err
}
if err := wire.WriteVarString(b, 0, swap.PrepayInvoice); err != nil {
return err
} }
n, err := b.Write(swap.SenderKey[:]) if err := binary.Write(&b, byteOrder, swap.InitiationHeight); err != nil {
if err != nil { return nil, err
return err
}
if n != keyLength {
return fmt.Errorf("sender key has invalid length")
} }
n, err = b.Write(swap.ReceiverKey[:]) addr := swap.DestAddr.String()
if err != nil { if err := wire.WriteVarString(&b, 0, addr); err != nil {
return err return nil, err
}
if n != keyLength {
return fmt.Errorf("receiver key has invalid length")
} }
if err := binary.Write(b, byteOrder, swap.CltvExpiry); err != nil { if err := wire.WriteVarString(&b, 0, swap.SwapInvoice); err != nil {
return err return nil, err
} }
if err := binary.Write(b, byteOrder, swap.MaxMinerFee); err != nil { if err := binary.Write(&b, byteOrder, swap.SweepConfTarget); err != nil {
return err return nil, err
} }
if err := binary.Write(b, byteOrder, swap.MaxSwapFee); err != nil { if err := binary.Write(&b, byteOrder, swap.MaxSwapRoutingFee); err != nil {
return err return nil, err
} }
if err := binary.Write(b, byteOrder, swap.MaxPrepayRoutingFee); err != nil { var unchargeChannel uint64
return err if swap.UnchargeChannel != nil {
unchargeChannel = *swap.UnchargeChannel
} }
if err := binary.Write(&b, byteOrder, unchargeChannel); err != nil {
if err := binary.Write(b, byteOrder, swap.InitiationHeight); err != nil { return nil, err
return err
} }
return nil return b.Bytes(), nil
} }

@ -18,19 +18,27 @@ var (
// database. // database.
dbFileName = "loop.db" dbFileName = "loop.db"
// unchargeSwapsBucketKey is a bucket that contains all swaps that are // loopOutBucketKey is a bucket that contains all out swaps that are
// currently pending or completed. This bucket is keyed by the // currently pending or completed. This bucket is keyed by the swaphash,
// swaphash, and leads to a nested sub-bucket that houses information // and leads to a nested sub-bucket that houses information for that
// for that swap. // swap.
// //
// maps: swapHash -> swapBucket // maps: swapHash -> swapBucket
unchargeSwapsBucketKey = []byte("uncharge-swaps") loopOutBucketKey = []byte("uncharge-swaps")
// unchargeUpdatesBucketKey is a bucket that contains all updates // loopInBucketKey is a bucket that contains all in swaps that are
// pertaining to a swap. This is a sub-bucket of the swap bucket for a // currently pending or completed. This bucket is keyed by the swaphash,
// particular swap. This list only ever grows. // and leads to a nested sub-bucket that houses information for that
// swap.
// //
// path: unchargeUpdatesBucket -> swapBucket[hash] -> updateBucket // maps: swapHash -> swapBucket
loopInBucketKey = []byte("loop-in")
// updatesBucketKey is a bucket that contains all updates pertaining to
// a swap. This is a sub-bucket of the swap bucket for a particular
// swap. This list only ever grows.
//
// path: loopInBucket/loopOutBucket -> swapBucket[hash] -> updatesBucket
// //
// maps: updateNumber -> time || state // maps: updateNumber -> time || state
updatesBucketKey = []byte("updates") updatesBucketKey = []byte("updates")
@ -38,7 +46,7 @@ var (
// contractKey is the key that stores the serialized swap contract. It // contractKey is the key that stores the serialized swap contract. It
// is nested within the sub-bucket for each active swap. // is nested within the sub-bucket for each active swap.
// //
// path: unchargeUpdatesBucket -> swapBucket[hash] // path: loopInBucket/loopOutBucket -> swapBucket[hash] -> contractKey
// //
// value: time || rawSwapState // value: time || rawSwapState
contractKey = []byte("contract") contractKey = []byte("contract")
@ -92,11 +100,11 @@ func NewBoltSwapStore(dbPath string, chainParams *chaincfg.Params) (
// We'll create all the buckets we need if this is the first time we're // We'll create all the buckets we need if this is the first time we're
// starting up. If they already exist, then these calls will be noops. // starting up. If they already exist, then these calls will be noops.
err = bdb.Update(func(tx *bbolt.Tx) error { err = bdb.Update(func(tx *bbolt.Tx) error {
_, err := tx.CreateBucketIfNotExists(unchargeSwapsBucketKey) _, err := tx.CreateBucketIfNotExists(loopOutBucketKey)
if err != nil { if err != nil {
return err return err
} }
_, err = tx.CreateBucketIfNotExists(updatesBucketKey) _, err = tx.CreateBucketIfNotExists(loopInBucketKey)
if err != nil { if err != nil {
return err return err
} }
@ -123,15 +131,12 @@ func NewBoltSwapStore(dbPath string, chainParams *chaincfg.Params) (
}, nil }, nil
} }
// FetchLoopOutSwaps returns all swaps currently in the store. func (s *boltSwapStore) fetchSwaps(bucketKey []byte,
// callback func([]byte, Loop) error) error {
// NOTE: Part of the loopdb.SwapStore interface.
func (s *boltSwapStore) FetchLoopOutSwaps() ([]*LoopOut, error) {
var swaps []*LoopOut
err := s.db.View(func(tx *bbolt.Tx) error { return s.db.View(func(tx *bbolt.Tx) error {
// First, we'll grab our main loop out swap bucket key. // First, we'll grab our main loop in bucket key.
rootBucket := tx.Bucket(unchargeSwapsBucketKey) rootBucket := tx.Bucket(bucketKey)
if rootBucket == nil { if rootBucket == nil {
return errors.New("bucket does not exist") return errors.New("bucket does not exist")
} }
@ -159,12 +164,6 @@ func (s *boltSwapStore) FetchLoopOutSwaps() ([]*LoopOut, error) {
if contractBytes == nil { if contractBytes == nil {
return errors.New("contract not found") return errors.New("contract not found")
} }
contract, err := deserializeLoopOutContract(
contractBytes, s.chainParams,
)
if err != nil {
return err
}
// Once we have the raw swap, we'll also need to decode // Once we have the raw swap, we'll also need to decode
// each of the past updates to the swap itself. // each of the past updates to the swap itself.
@ -175,9 +174,9 @@ func (s *boltSwapStore) FetchLoopOutSwaps() ([]*LoopOut, error) {
// De serialize and collect each swap update into our // De serialize and collect each swap update into our
// slice of swap events. // slice of swap events.
var updates []*LoopOutEvent var updates []*LoopEvent
err = stateBucket.ForEach(func(k, v []byte) error { err := stateBucket.ForEach(func(k, v []byte) error {
event, err := deserializeLoopOutEvent(v) event, err := deserializeLoopEvent(v)
if err != nil { if err != nil {
return err return err
} }
@ -192,16 +191,39 @@ func (s *boltSwapStore) FetchLoopOutSwaps() ([]*LoopOut, error) {
var hash lntypes.Hash var hash lntypes.Hash
copy(hash[:], swapHash) copy(hash[:], swapHash)
swap := LoopOut{ loop := Loop{
Contract: contract, Hash: hash,
Hash: hash, Events: updates,
Events: updates,
} }
swaps = append(swaps, &swap) return callback(contractBytes, loop)
return nil
}) })
}) })
}
// FetchLoopOutSwaps returns all loop out swaps currently in the store.
//
// NOTE: Part of the loopdb.SwapStore interface.
func (s *boltSwapStore) FetchLoopOutSwaps() ([]*LoopOut, error) {
var swaps []*LoopOut
err := s.fetchSwaps(loopOutBucketKey,
func(contractBytes []byte, loop Loop) error {
contract, err := deserializeLoopOutContract(
contractBytes, s.chainParams,
)
if err != nil {
return err
}
swaps = append(swaps, &LoopOut{
Contract: contract,
Loop: loop,
})
return nil
},
)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -209,24 +231,47 @@ func (s *boltSwapStore) FetchLoopOutSwaps() ([]*LoopOut, error) {
return swaps, nil return swaps, nil
} }
// CreateLoopOut adds an initiated swap to the store. // FetchLoopInSwaps returns all loop in swaps currently in the store.
// //
// NOTE: Part of the loopdb.SwapStore interface. // NOTE: Part of the loopdb.SwapStore interface.
func (s *boltSwapStore) CreateLoopOut(hash lntypes.Hash, func (s *boltSwapStore) FetchLoopInSwaps() ([]*LoopIn, error) {
swap *LoopOutContract) error { var swaps []*LoopIn
// If the hash doesn't match the pre-image, then this is an invalid err := s.fetchSwaps(loopInBucketKey,
// swap so we'll bail out early. func(contractBytes []byte, loop Loop) error {
if hash != swap.Preimage.Hash() { contract, err := deserializeLoopInContract(
return errors.New("hash and preimage do not match") contractBytes,
)
if err != nil {
return err
}
swaps = append(swaps, &LoopIn{
Contract: contract,
Loop: loop,
})
return nil
},
)
if err != nil {
return nil, err
} }
return swaps, nil
}
// createLoop creates a swap in the store. It requires that the contract is
// already serialized to be able to use this function for both in and out swaps.
func (s *boltSwapStore) createLoop(bucketKey []byte, hash lntypes.Hash,
contractBytes []byte) error {
// Otherwise, we'll create a new swap within the database. // Otherwise, we'll create a new swap within the database.
return s.db.Update(func(tx *bbolt.Tx) error { return s.db.Update(func(tx *bbolt.Tx) error {
// First, we'll grab the root bucket that houses all of our // First, we'll grab the root bucket that houses all of our
// main swaps. // main swaps.
rootBucket, err := tx.CreateBucketIfNotExists( rootBucket, err := tx.CreateBucketIfNotExists(
unchargeSwapsBucketKey, bucketKey,
) )
if err != nil { if err != nil {
return err return err
@ -235,8 +280,7 @@ func (s *boltSwapStore) CreateLoopOut(hash lntypes.Hash,
// If the swap already exists, then we'll exit as we don't want // If the swap already exists, then we'll exit as we don't want
// to override a swap. // to override a swap.
if rootBucket.Get(hash[:]) != nil { if rootBucket.Get(hash[:]) != nil {
return fmt.Errorf("swap %v already exists", return fmt.Errorf("swap %v already exists", hash)
swap.Preimage)
} }
// From the root bucket, we'll make a new sub swap bucket using // From the root bucket, we'll make a new sub swap bucket using
@ -246,15 +290,11 @@ func (s *boltSwapStore) CreateLoopOut(hash lntypes.Hash,
return err return err
} }
// With out swap bucket created, we'll serialize and store the // With the swap bucket created, we'll store the swap itself.
// swap itself. err = swapBucket.Put(contractKey, contractBytes)
contract, err := serializeLoopOutContract(swap)
if err != nil { if err != nil {
return err return err
} }
if err := swapBucket.Put(contractKey, contract); err != nil {
return err
}
// Finally, we'll create an empty updates bucket for this swap // Finally, we'll create an empty updates bucket for this swap
// to track any future updates to the swap itself. // to track any future updates to the swap itself.
@ -263,18 +303,56 @@ func (s *boltSwapStore) CreateLoopOut(hash lntypes.Hash,
}) })
} }
// UpdateLoopOut stores a swap updateLoopOut. This appends to the event log for // CreateLoopOut adds an initiated swap to the store.
// a particular swap as it goes through the various stages in its lifetime.
// //
// NOTE: Part of the loopdb.SwapStore interface. // NOTE: Part of the loopdb.SwapStore interface.
func (s *boltSwapStore) UpdateLoopOut(hash lntypes.Hash, time time.Time, func (s *boltSwapStore) CreateLoopOut(hash lntypes.Hash,
state SwapState) error { swap *LoopOutContract) error {
// If the hash doesn't match the pre-image, then this is an invalid
// swap so we'll bail out early.
if hash != swap.Preimage.Hash() {
return errors.New("hash and preimage do not match")
}
contractBytes, err := serializeLoopOutContract(swap)
if err != nil {
return err
}
return s.createLoop(loopOutBucketKey, hash, contractBytes)
}
// CreateLoopIn adds an initiated swap to the store.
//
// NOTE: Part of the loopdb.SwapStore interface.
func (s *boltSwapStore) CreateLoopIn(hash lntypes.Hash,
swap *LoopInContract) error {
// If the hash doesn't match the pre-image, then this is an invalid
// swap so we'll bail out early.
if hash != swap.Preimage.Hash() {
return errors.New("hash and preimage do not match")
}
contractBytes, err := serializeLoopInContract(swap)
if err != nil {
return err
}
return s.createLoop(loopInBucketKey, hash, contractBytes)
}
// updateLoop saves a new swap state transition to the store. It takes in a
// bucket key so that this function can be used for both in and out swaps.
func (s *boltSwapStore) updateLoop(bucketKey []byte, hash lntypes.Hash,
time time.Time, state SwapState) error {
return s.db.Update(func(tx *bbolt.Tx) error { return s.db.Update(func(tx *bbolt.Tx) error {
// Starting from the root bucket, we'll traverse the bucket // Starting from the root bucket, we'll traverse the bucket
// hierarchy all the way down to the swap bucket, and the // hierarchy all the way down to the swap bucket, and the
// update sub-bucket within that. // update sub-bucket within that.
rootBucket := tx.Bucket(unchargeSwapsBucketKey) rootBucket := tx.Bucket(bucketKey)
if rootBucket == nil { if rootBucket == nil {
return errors.New("bucket does not exist") return errors.New("bucket does not exist")
} }
@ -295,7 +373,7 @@ func (s *boltSwapStore) UpdateLoopOut(hash lntypes.Hash, time time.Time,
} }
// With the ID obtained, we'll write out this new update value. // With the ID obtained, we'll write out this new update value.
updateValue, err := serializeLoopOutEvent(time, state) updateValue, err := serializeLoopEvent(time, state)
if err != nil { if err != nil {
return err return err
} }
@ -303,6 +381,26 @@ func (s *boltSwapStore) UpdateLoopOut(hash lntypes.Hash, time time.Time,
}) })
} }
// UpdateLoopOut stores a swap update. This appends to the event log for
// a particular swap as it goes through the various stages in its lifetime.
//
// NOTE: Part of the loopdb.SwapStore interface.
func (s *boltSwapStore) UpdateLoopOut(hash lntypes.Hash, time time.Time,
state SwapState) error {
return s.updateLoop(loopOutBucketKey, hash, time, state)
}
// UpdateLoopIn stores a swap update. This appends to the event log for
// a particular swap as it goes through the various stages in its lifetime.
//
// NOTE: Part of the loopdb.SwapStore interface.
func (s *boltSwapStore) UpdateLoopIn(hash lntypes.Hash, time time.Time,
state SwapState) error {
return s.updateLoop(loopInBucketKey, hash, time, state)
}
// Close closes the underlying database. // Close closes the underlying database.
// //
// NOTE: Part of the loopdb.SwapStore interface. // NOTE: Part of the loopdb.SwapStore interface.

@ -34,9 +34,9 @@ var (
testTime = time.Date(2018, time.January, 9, 14, 00, 00, 0, time.UTC) testTime = time.Date(2018, time.January, 9, 14, 00, 00, 0, time.UTC)
) )
// TestBoltSwapStore tests all the basic functionality of the current bbolt // TestLoopOutStore tests all the basic functionality of the current bbolt
// swap store. // swap store.
func TestBoltSwapStore(t *testing.T) { func TestLoopOutStore(t *testing.T) {
tempDirName, err := ioutil.TempDir("", "clientstore") tempDirName, err := ioutil.TempDir("", "clientstore")
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
@ -65,25 +65,27 @@ func TestBoltSwapStore(t *testing.T) {
// database shortly. // database shortly.
pendingSwap := LoopOutContract{ pendingSwap := LoopOutContract{
SwapContract: SwapContract{ SwapContract: SwapContract{
AmountRequested: 100, AmountRequested: 100,
Preimage: testPreimage, Preimage: testPreimage,
CltvExpiry: 144, CltvExpiry: 144,
SenderKey: senderKey, SenderKey: senderKey,
PrepayInvoice: "prepayinvoice",
ReceiverKey: receiverKey, ReceiverKey: receiverKey,
MaxMinerFee: 10, MaxMinerFee: 10,
MaxSwapFee: 20, MaxSwapFee: 20,
MaxPrepayRoutingFee: 40,
InitiationHeight: 99, InitiationHeight: 99,
// Convert to/from unix to remove timezone, so that it // Convert to/from unix to remove timezone, so that it
// doesn't interfere with DeepEqual. // doesn't interfere with DeepEqual.
InitiationTime: time.Unix(0, initiationTime.UnixNano()), InitiationTime: time.Unix(0, initiationTime.UnixNano()),
}, },
DestAddr: destAddr, MaxPrepayRoutingFee: 40,
SwapInvoice: "swapinvoice", PrepayInvoice: "prepayinvoice",
MaxSwapRoutingFee: 30, DestAddr: destAddr,
SweepConfTarget: 2, SwapInvoice: "swapinvoice",
MaxSwapRoutingFee: 30,
SweepConfTarget: 2,
} }
// checkSwap is a test helper function that'll assert the state of a // checkSwap is a test helper function that'll assert the state of a
@ -157,3 +159,124 @@ func TestBoltSwapStore(t *testing.T) {
} }
checkSwap(StateFailInsufficientValue) checkSwap(StateFailInsufficientValue)
} }
// TestLoopInStore tests all the basic functionality of the current bbolt
// swap store.
func TestLoopInStore(t *testing.T) {
tempDirName, err := ioutil.TempDir("", "clientstore")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(tempDirName)
store, err := NewBoltSwapStore(tempDirName, &chaincfg.MainNetParams)
if err != nil {
t.Fatal(err)
}
// First, verify that an empty database has no active swaps.
swaps, err := store.FetchLoopInSwaps()
if err != nil {
t.Fatal(err)
}
if len(swaps) != 0 {
t.Fatal("expected empty store")
}
hash := sha256.Sum256(testPreimage[:])
initiationTime := time.Date(2018, 11, 1, 0, 0, 0, 0, time.UTC)
// Next, we'll make a new pending swap that we'll insert into the
// database shortly.
loopInChannel := uint64(123)
pendingSwap := LoopInContract{
SwapContract: SwapContract{
AmountRequested: 100,
Preimage: testPreimage,
CltvExpiry: 144,
SenderKey: senderKey,
ReceiverKey: receiverKey,
MaxMinerFee: 10,
MaxSwapFee: 20,
InitiationHeight: 99,
// Convert to/from unix to remove timezone, so that it
// doesn't interfere with DeepEqual.
InitiationTime: time.Unix(0, initiationTime.UnixNano()),
},
HtlcConfTarget: 2,
LoopInChannel: &loopInChannel,
}
// checkSwap is a test helper function that'll assert the state of a
// swap.
checkSwap := func(expectedState SwapState) {
t.Helper()
swaps, err := store.FetchLoopInSwaps()
if err != nil {
t.Fatal(err)
}
if len(swaps) != 1 {
t.Fatal("expected pending swap in store")
}
swap := swaps[0].Contract
if !reflect.DeepEqual(swap, &pendingSwap) {
t.Fatal("invalid pending swap data")
}
if swaps[0].State() != expectedState {
t.Fatalf("expected state %v, but got %v",
expectedState, swaps[0].State(),
)
}
}
// If we create a new swap, then it should show up as being initialized
// right after.
if err := store.CreateLoopIn(hash, &pendingSwap); err != nil {
t.Fatal(err)
}
checkSwap(StateInitiated)
// Trying to make the same swap again should result in an error.
if err := store.CreateLoopIn(hash, &pendingSwap); err == nil {
t.Fatal("expected error on storing duplicate")
}
checkSwap(StateInitiated)
// Next, we'll update to the next state of the pre-image being
// revealed. The state should be reflected here again.
err = store.UpdateLoopIn(
hash, testTime, StatePreimageRevealed,
)
if err != nil {
t.Fatal(err)
}
checkSwap(StatePreimageRevealed)
// Next, we'll update to the final state to ensure that the state is
// properly updated.
err = store.UpdateLoopIn(
hash, testTime, StateFailInsufficientValue,
)
if err != nil {
t.Fatal(err)
}
checkSwap(StateFailInsufficientValue)
if err := store.Close(); err != nil {
t.Fatal(err)
}
// If we re-open the same store, then the state of the current swap
// should be the same.
store, err = NewBoltSwapStore(tempDirName, &chaincfg.MainNetParams)
if err != nil {
t.Fatal(err)
}
checkSwap(StateFailInsufficientValue)
}

@ -1,6 +1,8 @@
package loopdb package loopdb
// SwapState indicates the current state of a swap. // SwapState indicates the current state of a swap. This enumeration is the
// union of loop in and loop out states. A single type is used for both swap
// types to be able to reduce code duplication that would otherwise be required.
type SwapState uint8 type SwapState uint8
const ( const (
@ -22,23 +24,24 @@ const (
// server pulled the off-chain htlc. // server pulled the off-chain htlc.
StateSuccess = 2 StateSuccess = 2
// StateFailOffchainPayments indicates that it wasn't possible to find routes // StateFailOffchainPayments indicates that it wasn't possible to find
// for one or both of the off-chain payments to the server that // routes for one or both of the off-chain payments to the server that
// satisfied the payment restrictions (fee and timelock limits). // satisfied the payment restrictions (fee and timelock limits).
StateFailOffchainPayments = 3 StateFailOffchainPayments = 3
// StateFailTimeout indicates that the on-chain htlc wasn't confirmed before // StateFailTimeout indicates that the on-chain htlc wasn't confirmed
// its expiry or confirmed too late (MinPreimageRevealDelta violated). // before its expiry or confirmed too late (MinPreimageRevealDelta
// violated).
StateFailTimeout = 4 StateFailTimeout = 4
// StateFailSweepTimeout indicates that the on-chain htlc wasn't swept before // StateFailSweepTimeout indicates that the on-chain htlc wasn't swept
// the server revoked the htlc. The server didn't pull the off-chain // before the server revoked the htlc. The server didn't pull the
// htlc (even though it could have) and we timed out the off-chain htlc // off-chain htlc (even though it could have) and we timed out the
// ourselves. No funds lost. // off-chain htlc ourselves. No funds lost.
StateFailSweepTimeout = 5 StateFailSweepTimeout = 5
// StateFailInsufficientValue indicates that the published on-chain htlc had // StateFailInsufficientValue indicates that the published on-chain htlc
// a value lower than the requested amount. // had a value lower than the requested amount.
StateFailInsufficientValue = 6 StateFailInsufficientValue = 6
// StateFailTemporary indicates that the swap cannot progress because // StateFailTemporary indicates that the swap cannot progress because
@ -90,6 +93,9 @@ func (s SwapState) String() string {
case StatePreimageRevealed: case StatePreimageRevealed:
return "PreimageRevealed" return "PreimageRevealed"
case StateHtlcPublished:
return "HtlcPublished"
case StateSuccess: case StateSuccess:
return "Success" return "Success"

@ -84,23 +84,23 @@ func newLoopOutSwap(globalCtx context.Context, cfg *swapConfig,
initiationTime := time.Now() initiationTime := time.Now()
contract := loopdb.LoopOutContract{ contract := loopdb.LoopOutContract{
SwapInvoice: swapResp.swapInvoice, SwapInvoice: swapResp.swapInvoice,
DestAddr: request.DestAddr, DestAddr: request.DestAddr,
MaxSwapRoutingFee: request.MaxSwapRoutingFee, MaxSwapRoutingFee: request.MaxSwapRoutingFee,
SweepConfTarget: request.SweepConfTarget, SweepConfTarget: request.SweepConfTarget,
UnchargeChannel: request.LoopOutChannel, UnchargeChannel: request.LoopOutChannel,
PrepayInvoice: swapResp.prepayInvoice,
MaxPrepayRoutingFee: request.MaxPrepayRoutingFee,
SwapContract: loopdb.SwapContract{ SwapContract: loopdb.SwapContract{
InitiationHeight: currentHeight, InitiationHeight: currentHeight,
InitiationTime: initiationTime, InitiationTime: initiationTime,
PrepayInvoice: swapResp.prepayInvoice, ReceiverKey: receiverKey,
ReceiverKey: receiverKey, SenderKey: swapResp.senderKey,
SenderKey: swapResp.senderKey, Preimage: swapPreimage,
Preimage: swapPreimage, AmountRequested: request.Amount,
AmountRequested: request.Amount, CltvExpiry: swapResp.expiry,
CltvExpiry: swapResp.expiry, MaxMinerFee: request.MaxMinerFee,
MaxMinerFee: request.MaxMinerFee, MaxSwapFee: request.MaxSwapFee,
MaxSwapFee: request.MaxSwapFee,
MaxPrepayRoutingFee: request.MaxPrepayRoutingFee,
}, },
} }

@ -17,6 +17,11 @@ type storeMock struct {
loopOutStoreChan chan loopdb.LoopOutContract loopOutStoreChan chan loopdb.LoopOutContract
loopOutUpdateChan chan loopdb.SwapState loopOutUpdateChan chan loopdb.SwapState
loopInSwaps map[lntypes.Hash]*loopdb.LoopInContract
loopInUpdates map[lntypes.Hash][]loopdb.SwapState
loopInStoreChan chan loopdb.LoopInContract
loopInUpdateChan chan loopdb.SwapState
t *testing.T t *testing.T
} }
@ -33,7 +38,11 @@ func newStoreMock(t *testing.T) *storeMock {
loopOutSwaps: make(map[lntypes.Hash]*loopdb.LoopOutContract), loopOutSwaps: make(map[lntypes.Hash]*loopdb.LoopOutContract),
loopOutUpdates: make(map[lntypes.Hash][]loopdb.SwapState), loopOutUpdates: make(map[lntypes.Hash][]loopdb.SwapState),
t: t, loopInStoreChan: make(chan loopdb.LoopInContract, 1),
loopInUpdateChan: make(chan loopdb.SwapState, 1),
loopInSwaps: make(map[lntypes.Hash]*loopdb.LoopInContract),
loopInUpdates: make(map[lntypes.Hash][]loopdb.SwapState),
t: t,
} }
} }
@ -45,17 +54,19 @@ func (s *storeMock) FetchLoopOutSwaps() ([]*loopdb.LoopOut, error) {
for hash, contract := range s.loopOutSwaps { for hash, contract := range s.loopOutSwaps {
updates := s.loopOutUpdates[hash] updates := s.loopOutUpdates[hash]
events := make([]*loopdb.LoopOutEvent, len(updates)) events := make([]*loopdb.LoopEvent, len(updates))
for i, u := range updates { for i, u := range updates {
events[i] = &loopdb.LoopOutEvent{ events[i] = &loopdb.LoopEvent{
State: u, State: u,
} }
} }
swap := &loopdb.LoopOut{ swap := &loopdb.LoopOut{
Hash: hash, Loop: loopdb.Loop{
Hash: hash,
Events: events,
},
Contract: contract, Contract: contract,
Events: events,
} }
result = append(result, swap) result = append(result, swap)
} }
@ -81,6 +92,50 @@ func (s *storeMock) CreateLoopOut(hash lntypes.Hash,
return nil return nil
} }
// FetchLoopInSwaps returns all in swaps currently in the store.
func (s *storeMock) FetchLoopInSwaps() ([]*loopdb.LoopIn, error) {
result := []*loopdb.LoopIn{}
for hash, contract := range s.loopInSwaps {
updates := s.loopInUpdates[hash]
events := make([]*loopdb.LoopEvent, len(updates))
for i, u := range updates {
events[i] = &loopdb.LoopEvent{
State: u,
}
}
swap := &loopdb.LoopIn{
Loop: loopdb.Loop{
Hash: hash,
Events: events,
},
Contract: contract,
}
result = append(result, swap)
}
return result, nil
}
// CreateLoopIn adds an initiated loop in swap to the store.
//
// NOTE: Part of the loopdb.SwapStore interface.
func (s *storeMock) CreateLoopIn(hash lntypes.Hash,
swap *loopdb.LoopInContract) error {
_, ok := s.loopInSwaps[hash]
if ok {
return errors.New("swap already exists")
}
s.loopInSwaps[hash] = swap
s.loopInUpdates[hash] = []loopdb.SwapState{}
s.loopInStoreChan <- *swap
return nil
}
// UpdateLoopOut stores a new event for a target loop out swap. This appends to // UpdateLoopOut stores a new event for a target loop out swap. This appends to
// the event log for a particular swap as it goes through the various stages in // the event log for a particular swap as it goes through the various stages in
// its lifetime. // its lifetime.
@ -101,6 +156,26 @@ func (s *storeMock) UpdateLoopOut(hash lntypes.Hash, time time.Time,
return nil return nil
} }
// UpdateLoopIn stores a new event for a target loop in swap. This appends to
// the event log for a particular swap as it goes through the various stages in
// its lifetime.
//
// NOTE: Part of the loopdb.SwapStore interface.
func (s *storeMock) UpdateLoopIn(hash lntypes.Hash, time time.Time,
state loopdb.SwapState) error {
updates, ok := s.loopInUpdates[hash]
if !ok {
return errors.New("swap does not exists")
}
updates = append(updates, state)
s.loopOutUpdates[hash] = updates
s.loopOutUpdateChan <- state
return nil
}
func (s *storeMock) Close() error { func (s *storeMock) Close() error {
return nil return nil
} }
@ -130,6 +205,21 @@ func (s *storeMock) assertLoopOutStored() {
} }
} }
func (s *storeMock) assertLoopInStored() {
s.t.Helper()
<-s.loopInStoreChan
}
func (s *storeMock) assertLoopInState(expectedState loopdb.SwapState) {
s.t.Helper()
state := <-s.loopOutUpdateChan
if state != expectedState {
s.t.Fatalf("unexpected state")
}
}
func (s *storeMock) assertStorePreimageReveal() { func (s *storeMock) assertStorePreimageReveal() {
s.t.Helper() s.t.Helper()

Loading…
Cancel
Save