loop: extract code from client package into new loop primary package

pull/9/head
Olaoluwa Osuntokun 5 years ago
parent f552bc06b1
commit 908d82acdb
No known key found for this signature in database
GPG Key ID: CE58F7F8E20FD9A2

@ -1,4 +1,4 @@
package client package loop
import ( import (
"context" "context"
@ -46,8 +46,8 @@ var (
republishDelay = 10 * time.Second republishDelay = 10 * time.Second
) )
// Client performs the client side part of swaps. This interface exists to // Client performs the client side part of swaps. This interface exists to be
// be able to implement a stub. // able to implement a stub.
type Client struct { type Client struct {
started uint32 // To be used atomically. started uint32 // To be used atomically.
errChan chan error errChan chan error
@ -118,9 +118,9 @@ func (s *Client) GetUnchargeSwaps() ([]*PersistentUncharge, error) {
} }
// Run is a blocking call that executes all swaps. Any pending swaps are // Run is a blocking call that executes all swaps. Any pending swaps are
// restored from persistent storage and resumed. Subsequent updates // restored from persistent storage and resumed. Subsequent updates will be
// will be sent through the passed in statusChan. The function can be // sent through the passed in statusChan. The function can be terminated by
// terminated by cancelling the context. // cancelling the context.
func (s *Client) Run(ctx context.Context, func (s *Client) Run(ctx context.Context,
statusChan chan<- SwapInfo) error { statusChan chan<- SwapInfo) error {
@ -213,11 +213,10 @@ func (s *Client) resumeSwaps(ctx context.Context,
} }
} }
// Uncharge initiates a uncharge swap. It blocks until the swap is // Uncharge initiates a uncharge swap. It blocks until the swap is initiation
// initiation with the swap server is completed (typically this takes // with the swap server is completed (typically this takes only a short amount
// only a short amount of time). From there on further status // of time). From there on further status information can be acquired through
// information can be acquired through the status channel returned from // the status channel returned from the Run call.
// the Run call.
// //
// When the call returns, the swap has been persisted and will be // When the call returns, the swap has been persisted and will be
// resumed automatically after restarts. // resumed automatically after restarts.
@ -258,8 +257,8 @@ func (s *Client) Uncharge(globalCtx context.Context,
} }
// UnchargeQuote takes a Uncharge amount and returns a break down of estimated // UnchargeQuote takes a Uncharge amount and returns a break down of estimated
// costs for the client. Both the swap server and the on-chain fee estimator are // costs for the client. Both the swap server and the on-chain fee estimator
// queried to get to build the quote response. // are queried to get to build the quote response.
func (s *Client) UnchargeQuote(ctx context.Context, func (s *Client) UnchargeQuote(ctx context.Context,
request *UnchargeQuoteRequest) (*UnchargeQuote, error) { request *UnchargeQuoteRequest) (*UnchargeQuote, error) {

@ -1,24 +0,0 @@
package client
import (
"github.com/btcsuite/btclog"
"os"
)
// log is a logger that is initialized with no output filters. This
// means the package will not perform any logging by default until the caller
// requests it.
var (
backendLog = btclog.NewBackend(logWriter{})
logger = backendLog.Logger("CLIENT")
servicesLogger = backendLog.Logger("SERVICES")
)
// logWriter implements an io.Writer that outputs to both standard output and
// the write-end pipe of an initialized log rotator.
type logWriter struct{}
func (logWriter) Write(p []byte) (n int, err error) {
os.Stdout.Write(p)
return len(p), nil
}

@ -1,20 +0,0 @@
package client
import (
"errors"
"github.com/btcsuite/btcd/wire"
)
// getTxInputByOutpoint returns a tx input based on a given input outpoint.
func getTxInputByOutpoint(tx *wire.MsgTx, input *wire.OutPoint) (
*wire.TxIn, error) {
for _, in := range tx.TxIn {
if in.PreviousOutPoint == *input {
return in, nil
}
}
return nil, errors.New("input not found")
}

@ -1,4 +1,4 @@
package client package loop
import ( import (
"bytes" "bytes"

@ -1,4 +1,4 @@
package client package loop
import ( import (
"time" "time"

@ -1,4 +1,4 @@
package client package loop
import ( import (
"context" "context"
@ -21,6 +21,8 @@ type executorConfig struct {
} }
// executor is responsible for executing swaps. // executor is responsible for executing swaps.
//
// TODO(roasbeef): rename to SubSwapper
type executor struct { type executor struct {
wg sync.WaitGroup wg sync.WaitGroup
newSwaps chan genericSwap newSwaps chan genericSwap
@ -91,6 +93,7 @@ func (s *executor) run(mainCtx context.Context,
nextSwapID := 0 nextSwapID := 0
for { for {
select { select {
case newSwap := <-s.newSwaps: case newSwap := <-s.newSwaps:
queue := queue.NewConcurrentQueue(10) queue := queue.NewConcurrentQueue(10)
queue.Start() queue.Start()
@ -115,6 +118,7 @@ func (s *executor) run(mainCtx context.Context,
}() }()
nextSwapID++ nextSwapID++
case doneID := <-swapDoneChan: case doneID := <-swapDoneChan:
queue, ok := blockEpochQueues[doneID] queue, ok := blockEpochQueues[doneID]
if !ok { if !ok {

@ -1,4 +1,4 @@
package client package loop
import ( import (
"time" "time"
@ -29,9 +29,9 @@ type UnchargeRequest struct {
MaxPrepayRoutingFee btcutil.Amount MaxPrepayRoutingFee btcutil.Amount
// MaxSwapFee is the maximum we are willing to pay the server for the // MaxSwapFee is the maximum we are willing to pay the server for the
// swap. This value is not disclosed in the swap initiation call, but if // swap. This value is not disclosed in the swap initiation call, but
// the server asks for a higher fee, we abort the swap. Typically this // if the server asks for a higher fee, we abort the swap. Typically
// value is taken from the response of the UnchargeQuote call. It // this value is taken from the response of the UnchargeQuote call. It
// includes the prepay amount. // includes the prepay amount.
MaxSwapFee btcutil.Amount MaxSwapFee btcutil.Amount
@ -66,28 +66,6 @@ type UnchargeRequest struct {
UnchargeChannel *uint64 UnchargeChannel *uint64
} }
// UnchargeContract contains the data that is serialized to persistent storage for
// pending swaps.
type UnchargeContract struct {
SwapContract
DestAddr btcutil.Address
SwapInvoice string
// MaxSwapRoutingFee is the maximum off-chain fee in msat that may be
// paid for the swap payment to the server.
MaxSwapRoutingFee btcutil.Amount
// SweepConfTarget specifies the targeted confirmation target for the
// client sweep tx.
SweepConfTarget int32
// UnchargeChannel is the channel to uncharge. If zero, any channel may
// be used.
UnchargeChannel *uint64
}
// UnchargeSwapInfo contains status information for a uncharge swap. // UnchargeSwapInfo contains status information for a uncharge swap.
type UnchargeSwapInfo struct { type UnchargeSwapInfo struct {
UnchargeContract UnchargeContract
@ -152,7 +130,8 @@ type UnchargeTerms struct {
// SwapFeeRate is the variable fee in parts per million. // SwapFeeRate is the variable fee in parts per million.
SwapFeeRate int64 SwapFeeRate int64
// PrepayAmt is the fixed part of the swap fee that needs to be prepaid. // PrepayAmt is the fixed part of the swap fee that needs to be
// prepaid.
PrepayAmt btcutil.Amount PrepayAmt btcutil.Amount
// MinSwapAmount is the minimum amount that the server requires for a // MinSwapAmount is the minimum amount that the server requires for a
@ -172,42 +151,10 @@ type UnchargeTerms struct {
SwapPaymentDest [33]byte SwapPaymentDest [33]byte
} }
// SwapContract contains the base data that is serialized to persistent storage
// for pending swaps.
type SwapContract struct {
Preimage lntypes.Preimage
AmountRequested btcutil.Amount
PrepayInvoice string
SenderKey [33]byte
ReceiverKey [33]byte
CltvExpiry int32
// MaxPrepayRoutingFee is the maximum off-chain fee in msat that may be
// paid for the prepayment to the server.
MaxPrepayRoutingFee btcutil.Amount
// MaxSwapFee is the maximum we are willing to pay the server for the
// swap.
MaxSwapFee btcutil.Amount
// MaxMinerFee is the maximum in on-chain fees that we are willing to
// spend.
MaxMinerFee btcutil.Amount
// InitiationHeight is the block height at which the swap was initiated.
InitiationHeight int32
// InitiationTime is the time at which the swap was initiated.
InitiationTime time.Time
}
// SwapInfoKit contains common swap info fields. // SwapInfoKit contains common swap info fields.
type SwapInfoKit struct { type SwapInfoKit struct {
// Hash is the sha256 hash of the preimage that unlocks the htlcs. It is // Hash is the sha256 hash of the preimage that unlocks the htlcs. It
// used to uniquely identify this swap. // is used to uniquely identify this swap.
Hash lntypes.Hash Hash lntypes.Hash
// LastUpdateTime is the time of the last update of this swap. // LastUpdateTime is the time of the last update of this swap.

@ -7,7 +7,7 @@ import (
"github.com/btcsuite/btcd/chaincfg/chainhash" "github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcd/wire"
"github.com/lightninglabs/loop/utils" "github.com/lightninglabs/loop/swap"
"github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/lnrpc/chainrpc" "github.com/lightningnetwork/lnd/lnrpc/chainrpc"
"google.golang.org/grpc" "google.golang.org/grpc"
@ -77,7 +77,7 @@ func (s *chainNotifierClient) RegisterSpendNtfn(ctx context.Context,
if err != nil { if err != nil {
return err return err
} }
tx, err := utils.DecodeTx(d.RawSpendingTx) tx, err := swap.DecodeTx(d.RawSpendingTx)
if err != nil { if err != nil {
return err return err
} }
@ -165,7 +165,7 @@ func (s *chainNotifierClient) RegisterConfirmationsNtfn(ctx context.Context,
// Script confirmed // Script confirmed
case *chainrpc.ConfEvent_Conf: case *chainrpc.ConfEvent_Conf:
tx, err := utils.DecodeTx(c.Conf.RawTx) tx, err := swap.DecodeTx(c.Conf.RawTx)
if err != nil { if err != nil {
errChan <- err errChan <- err
return return

@ -4,13 +4,13 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"github.com/lightninglabs/loop/utils"
"io/ioutil" "io/ioutil"
"path/filepath" "path/filepath"
"time" "time"
"github.com/btcsuite/btcd/chaincfg" "github.com/btcsuite/btcd/chaincfg"
"github.com/btcsuite/btcutil" "github.com/btcsuite/btcutil"
"github.com/lightninglabs/loop/swap"
"github.com/lightningnetwork/lnd/lncfg" "github.com/lightningnetwork/lnd/lncfg"
"github.com/lightningnetwork/lnd/macaroons" "github.com/lightningnetwork/lnd/macaroons"
"google.golang.org/grpc" "google.golang.org/grpc"
@ -53,7 +53,7 @@ func NewLndServices(lndAddress string, application string,
logger.Infof("Connected to lnd") logger.Infof("Connected to lnd")
chainParams, err := utils.ChainParamsFromNetwork(network) chainParams, err := swap.ChainParamsFromNetwork(network)
if err != nil { if err != nil {
return nil, err return nil, err
} }

@ -4,7 +4,7 @@ import (
"context" "context"
"github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcd/wire"
"github.com/lightninglabs/loop/utils" "github.com/lightninglabs/loop/swap"
"github.com/lightningnetwork/lnd/input" "github.com/lightningnetwork/lnd/input"
"github.com/lightningnetwork/lnd/lnrpc/signrpc" "github.com/lightningnetwork/lnd/lnrpc/signrpc"
"google.golang.org/grpc" "google.golang.org/grpc"
@ -29,7 +29,7 @@ func newSignerClient(conn *grpc.ClientConn) *signerClient {
func (s *signerClient) SignOutputRaw(ctx context.Context, tx *wire.MsgTx, func (s *signerClient) SignOutputRaw(ctx context.Context, tx *wire.MsgTx,
signDescriptors []*input.SignDescriptor) ([][]byte, error) { signDescriptors []*input.SignDescriptor) ([][]byte, error) {
txRaw, err := utils.EncodeTx(tx) txRaw, err := swap.EncodeTx(tx)
if err != nil { if err != nil {
return nil, err return nil, err
} }

@ -6,7 +6,7 @@ import (
"github.com/btcsuite/btcd/btcec" "github.com/btcsuite/btcd/btcec"
"github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcutil" "github.com/btcsuite/btcutil"
"github.com/lightninglabs/loop/utils" "github.com/lightninglabs/loop/swap"
"github.com/lightningnetwork/lnd/keychain" "github.com/lightningnetwork/lnd/keychain"
"github.com/lightningnetwork/lnd/lnrpc/signrpc" "github.com/lightningnetwork/lnd/lnrpc/signrpc"
"github.com/lightningnetwork/lnd/lnrpc/walletrpc" "github.com/lightningnetwork/lnd/lnrpc/walletrpc"
@ -117,7 +117,7 @@ func (m *walletKitClient) NextAddr(ctx context.Context) (
func (m *walletKitClient) PublishTransaction(ctx context.Context, func (m *walletKitClient) PublishTransaction(ctx context.Context,
tx *wire.MsgTx) error { tx *wire.MsgTx) error {
txHex, err := utils.EncodeTx(tx) txHex, err := swap.EncodeTx(tx)
if err != nil { if err != nil {
return err return err
} }
@ -155,7 +155,7 @@ func (m *walletKitClient) SendOutputs(ctx context.Context,
return nil, err return nil, err
} }
tx, err := utils.DecodeTx(resp.RawTx) tx, err := swap.DecodeTx(resp.RawTx)
if err != nil { if err != nil {
return nil, err return nil, err
} }

@ -0,0 +1,70 @@
package loop
import (
"fmt"
"os"
"github.com/btcsuite/btclog"
"github.com/lightningnetwork/lnd/lntypes"
)
// log is a logger that is initialized with no output filters. This
// means the package will not perform any logging by default until the caller
// requests it.
var (
backendLog = btclog.NewBackend(logWriter{})
logger = backendLog.Logger("CLIENT")
servicesLogger = backendLog.Logger("SERVICES")
)
// logWriter implements an io.Writer that outputs to both standard output and
// the write-end pipe of an initialized log rotator.
type logWriter struct{}
func (logWriter) Write(p []byte) (n int, err error) {
os.Stdout.Write(p)
return len(p), nil
}
// SwapLog logs with a short swap hash prefix.
type SwapLog struct {
// Logger is the underlying based logger.
Logger btclog.Logger
// Hash is the hash the identifies the target swap.
Hash lntypes.Hash
}
// Infof formats message according to format specifier and writes to
// log with LevelInfo.
func (s *SwapLog) Infof(format string, params ...interface{}) {
s.Logger.Infof(
fmt.Sprintf("%v %s", ShortHash(&s.Hash), format),
params...,
)
}
// Warnf formats message according to format specifier and writes to
// to log with LevelError.
func (s *SwapLog) Warnf(format string, params ...interface{}) {
s.Logger.Warnf(
fmt.Sprintf("%v %s", ShortHash(&s.Hash), format),
params...,
)
}
// Errorf formats message according to format specifier and writes to
// to log with LevelError.
func (s *SwapLog) Errorf(format string, params ...interface{}) {
s.Logger.Errorf(
fmt.Sprintf("%v %s", ShortHash(&s.Hash), format),
params...,
)
}
// ShortHash returns a shortened version of the hash suitable for use in
// logging.
func ShortHash(hash *lntypes.Hash) string {
return hash.String()[:6]
}

@ -1,4 +1,4 @@
package client package loop
import ( import (
"context" "context"

@ -1,4 +1,4 @@
package client package loop
import ( import (
"errors" "errors"

@ -1,4 +1,4 @@
package client package loop
import ( import (
"context" "context"

@ -1,4 +1,4 @@
package client package loop
import ( import (
"context" "context"

@ -9,7 +9,7 @@ import (
"github.com/btcsuite/btcd/chaincfg" "github.com/btcsuite/btcd/chaincfg"
"github.com/btcsuite/btcutil" "github.com/btcsuite/btcutil"
"github.com/lightninglabs/loop/lndclient" "github.com/lightninglabs/loop/lndclient"
"github.com/lightninglabs/loop/utils" "github.com/lightninglabs/loop/swap"
"github.com/lightningnetwork/lnd/lnrpc/invoicesrpc" "github.com/lightningnetwork/lnd/lnrpc/invoicesrpc"
"github.com/lightningnetwork/lnd/lntypes" "github.com/lightningnetwork/lnd/lntypes"
"github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/lnwire"
@ -34,7 +34,7 @@ func (h *mockLightningClient) PayInvoice(ctx context.Context, invoice string,
go func() { go func() {
defer h.wg.Done() defer h.wg.Done()
amt, err := utils.GetInvoiceAmt(&chaincfg.TestNet3Params, invoice) amt, err := swap.GetInvoiceAmt(&chaincfg.TestNet3Params, invoice)
if err != nil { if err != nil {
select { select {
case done <- lndclient.PaymentResult{ case done <- lndclient.PaymentResult{

@ -1,4 +1,4 @@
package client package loop
import ( import (
"context" "context"
@ -139,8 +139,9 @@ func (ctx *testContext) finish() {
ctx.assertIsDone() ctx.assertIsDone()
} }
// notifyHeight notifies swap client of the arrival of a new block and waits for // notifyHeight notifies swap client of the arrival of a new block and
// the notification to be processed by selecting on a dedicated test channel. // waits for the notification to be processed by selecting on a dedicated
// test channel.
func (ctx *testContext) notifyHeight(height int32) { func (ctx *testContext) notifyHeight(height int32) {
ctx.T.Helper() ctx.T.Helper()
@ -205,7 +206,9 @@ func (ctx *testContext) assertStatus(expectedState SwapState) {
} }
} }
func (ctx *testContext) publishHtlc(script []byte, amt btcutil.Amount) wire.OutPoint { func (ctx *testContext) publishHtlc(script []byte,
amt btcutil.Amount) wire.OutPoint {
// Create the htlc tx. // Create the htlc tx.
htlcTx := wire.MsgTx{} htlcTx := wire.MsgTx{}
htlcTx.AddTxIn(&wire.TxIn{ htlcTx.AddTxIn(&wire.TxIn{

@ -1,4 +1,4 @@
package client package loop
import ( import (
"context" "context"
@ -305,10 +305,9 @@ func (s *unchargeSwap) executeSwap(globalCtx context.Context) error {
return err return err
} }
// Inspect witness stack to see if it is a success transaction. We don't // Inspect witness stack to see if it is a success transaction. We
// just try to match with the hash of our sweep tx, because it may be // don't just try to match with the hash of our sweep tx, because it
// swept by a different (fee) sweep tx from a previous run. // may be swept by a different (fee) sweep tx from a previous run.
htlcInput, err := getTxInputByOutpoint( htlcInput, err := getTxInputByOutpoint(
spendDetails.SpendingTx, htlcOutpoint, spendDetails.SpendingTx, htlcOutpoint,
) )

@ -0,0 +1 @@
package loop

@ -1,4 +1,4 @@
package client package loop
import ( import (
"context" "context"
Loading…
Cancel
Save