- package loop
-
- import (
- "context"
- "crypto/tls"
- "encoding/hex"
- "errors"
- "fmt"
- "io"
- "net"
- "strings"
- "sync"
- "time"
-
- "github.com/btcsuite/btcd/btcec"
- "github.com/btcsuite/btcutil"
- "github.com/lightninglabs/loop/loopdb"
- "github.com/lightninglabs/loop/looprpc"
- "github.com/lightninglabs/loop/lsat"
- "github.com/lightningnetwork/lnd/lntypes"
- "github.com/lightningnetwork/lnd/routing/route"
- "github.com/lightningnetwork/lnd/tor"
- "google.golang.org/grpc"
- "google.golang.org/grpc/credentials"
- )
-
- var (
- // errServerSubscriptionComplete is returned when our subscription to
- // server updates exits because the server has no more updates to
- // provide us, because its part in the swap is complete.
- errServerSubscriptionComplete = errors.New("server finished serving " +
- "updates")
-
- // errSubscriptionFailed is returned when our subscription returns with
- // and EOF, indicating that the server restarted, we had an unexpected
- // network failure. Since we do not have restart-recovery, we note that
- // we will not resume our subscription once this error occurs.
- errSubscriptionFailed = errors.New("failed, no further updates will " +
- "be provided")
- )
-
- type swapServerClient interface {
- GetLoopOutTerms(ctx context.Context) (
- *LoopOutTerms, error)
-
- GetLoopOutQuote(ctx context.Context, amt btcutil.Amount, expiry int32,
- swapPublicationDeadline time.Time) (
- *LoopOutQuote, error)
-
- GetLoopInTerms(ctx context.Context) (
- *LoopInTerms, error)
-
- GetLoopInQuote(ctx context.Context, amt btcutil.Amount) (
- *LoopInQuote, error)
-
- NewLoopOutSwap(ctx context.Context,
- swapHash lntypes.Hash, amount btcutil.Amount, expiry int32,
- receiverKey [33]byte, swapPublicationDeadline time.Time,
- initiator string) (*newLoopOutResponse, error)
-
- PushLoopOutPreimage(ctx context.Context,
- preimage lntypes.Preimage) error
-
- NewLoopInSwap(ctx context.Context,
- swapHash lntypes.Hash, amount btcutil.Amount,
- senderKey [33]byte, swapInvoice, probeInvoice string,
- lastHop *route.Vertex, initiator string) (*newLoopInResponse,
- error)
-
- // SubscribeLoopOutUpdates subscribes to loop out server state.
- SubscribeLoopOutUpdates(ctx context.Context,
- hash lntypes.Hash) (<-chan *ServerUpdate, <-chan error, error)
-
- // SubscribeLoopInUpdates subscribes to loop in server state.
- SubscribeLoopInUpdates(ctx context.Context,
- hash lntypes.Hash) (<-chan *ServerUpdate, <-chan error, error)
- }
-
- type grpcSwapServerClient struct {
- server looprpc.SwapServerClient
- conn *grpc.ClientConn
-
- wg sync.WaitGroup
- }
-
- // stop sends the signal for the server's goroutines to shutdown and waits for
- // them to complete.
- func (s *grpcSwapServerClient) stop() {
- if err := s.conn.Close(); err != nil {
- log.Warnf("could not close connection: %v", err)
- }
-
- s.wg.Wait()
- }
-
- var _ swapServerClient = (*grpcSwapServerClient)(nil)
-
- func newSwapServerClient(cfg *ClientConfig, lsatStore lsat.Store) (
- *grpcSwapServerClient, error) {
-
- // Create the server connection with the interceptor that will handle
- // the LSAT protocol for us.
- clientInterceptor := lsat.NewInterceptor(
- cfg.Lnd, lsatStore, serverRPCTimeout, cfg.MaxLsatCost,
- cfg.MaxLsatFee,
- )
- serverConn, err := getSwapServerConn(
- cfg.ServerAddress, cfg.ProxyAddress, cfg.SwapServerNoTLS,
- cfg.TLSPathServer, clientInterceptor,
- )
- if err != nil {
- return nil, err
- }
-
- server := looprpc.NewSwapServerClient(serverConn)
-
- return &grpcSwapServerClient{
- conn: serverConn,
- server: server,
- }, nil
- }
-
- func (s *grpcSwapServerClient) GetLoopOutTerms(ctx context.Context) (
- *LoopOutTerms, error) {
-
- rpcCtx, rpcCancel := context.WithTimeout(ctx, globalCallTimeout)
- defer rpcCancel()
- terms, err := s.server.LoopOutTerms(rpcCtx,
- &looprpc.ServerLoopOutTermsRequest{
- ProtocolVersion: loopdb.CurrentRPCProtocolVersion,
- },
- )
- if err != nil {
- return nil, err
- }
-
- return &LoopOutTerms{
- MinSwapAmount: btcutil.Amount(terms.MinSwapAmount),
- MaxSwapAmount: btcutil.Amount(terms.MaxSwapAmount),
- MinCltvDelta: terms.MinCltvDelta,
- MaxCltvDelta: terms.MaxCltvDelta,
- }, nil
- }
-
- func (s *grpcSwapServerClient) GetLoopOutQuote(ctx context.Context,
- amt btcutil.Amount, expiry int32, swapPublicationDeadline time.Time) (
- *LoopOutQuote, error) {
-
- rpcCtx, rpcCancel := context.WithTimeout(ctx, globalCallTimeout)
- defer rpcCancel()
- quoteResp, err := s.server.LoopOutQuote(rpcCtx,
- &looprpc.ServerLoopOutQuoteRequest{
- Amt: uint64(amt),
- SwapPublicationDeadline: swapPublicationDeadline.Unix(),
- ProtocolVersion: loopdb.CurrentRPCProtocolVersion,
- Expiry: expiry,
- },
- )
- if err != nil {
- return nil, err
- }
-
- dest, err := hex.DecodeString(quoteResp.SwapPaymentDest)
- if err != nil {
- return nil, err
- }
- if len(dest) != 33 {
- return nil, errors.New("invalid payment dest")
- }
- var destArray [33]byte
- copy(destArray[:], dest)
-
- return &LoopOutQuote{
- PrepayAmount: btcutil.Amount(quoteResp.PrepayAmt),
- SwapFee: btcutil.Amount(quoteResp.SwapFee),
- SwapPaymentDest: destArray,
- }, nil
- }
-
- func (s *grpcSwapServerClient) GetLoopInTerms(ctx context.Context) (
- *LoopInTerms, error) {
-
- rpcCtx, rpcCancel := context.WithTimeout(ctx, globalCallTimeout)
- defer rpcCancel()
- terms, err := s.server.LoopInTerms(rpcCtx,
- &looprpc.ServerLoopInTermsRequest{
- ProtocolVersion: loopdb.CurrentRPCProtocolVersion,
- },
- )
- if err != nil {
- return nil, err
- }
-
- return &LoopInTerms{
- MinSwapAmount: btcutil.Amount(terms.MinSwapAmount),
- MaxSwapAmount: btcutil.Amount(terms.MaxSwapAmount),
- }, nil
- }
-
- func (s *grpcSwapServerClient) GetLoopInQuote(ctx context.Context,
- amt btcutil.Amount) (*LoopInQuote, error) {
-
- rpcCtx, rpcCancel := context.WithTimeout(ctx, globalCallTimeout)
- defer rpcCancel()
- quoteResp, err := s.server.LoopInQuote(rpcCtx,
- &looprpc.ServerLoopInQuoteRequest{
- Amt: uint64(amt),
- ProtocolVersion: loopdb.CurrentRPCProtocolVersion,
- },
- )
- if err != nil {
- return nil, err
- }
-
- return &LoopInQuote{
- SwapFee: btcutil.Amount(quoteResp.SwapFee),
- CltvDelta: quoteResp.CltvDelta,
- }, nil
- }
-
- func (s *grpcSwapServerClient) NewLoopOutSwap(ctx context.Context,
- swapHash lntypes.Hash, amount btcutil.Amount, expiry int32,
- receiverKey [33]byte, swapPublicationDeadline time.Time,
- initiator string) (*newLoopOutResponse, error) {
-
- rpcCtx, rpcCancel := context.WithTimeout(ctx, globalCallTimeout)
- defer rpcCancel()
- swapResp, err := s.server.NewLoopOutSwap(rpcCtx,
- &looprpc.ServerLoopOutRequest{
- SwapHash: swapHash[:],
- Amt: uint64(amount),
- ReceiverKey: receiverKey[:],
- SwapPublicationDeadline: swapPublicationDeadline.Unix(),
- ProtocolVersion: loopdb.CurrentRPCProtocolVersion,
- Expiry: expiry,
- UserAgent: UserAgent(initiator),
- },
- )
- if err != nil {
- return nil, err
- }
-
- var senderKey [33]byte
- copy(senderKey[:], swapResp.SenderKey)
-
- // Validate sender key.
- _, err = btcec.ParsePubKey(senderKey[:], btcec.S256())
- if err != nil {
- return nil, fmt.Errorf("invalid sender key: %v", err)
- }
-
- return &newLoopOutResponse{
- swapInvoice: swapResp.SwapInvoice,
- prepayInvoice: swapResp.PrepayInvoice,
- senderKey: senderKey,
- serverMessage: swapResp.ServerMessage,
- }, nil
- }
-
- // PushLoopOutPreimage pushes a preimage to the server.
- func (s *grpcSwapServerClient) PushLoopOutPreimage(ctx context.Context,
- preimage lntypes.Preimage) error {
-
- rpcCtx, rpcCancel := context.WithTimeout(ctx, globalCallTimeout)
- defer rpcCancel()
-
- _, err := s.server.LoopOutPushPreimage(rpcCtx,
- &looprpc.ServerLoopOutPushPreimageRequest{
- ProtocolVersion: loopdb.CurrentRPCProtocolVersion,
- Preimage: preimage[:],
- },
- )
-
- return err
- }
-
- func (s *grpcSwapServerClient) NewLoopInSwap(ctx context.Context,
- swapHash lntypes.Hash, amount btcutil.Amount, senderKey [33]byte,
- swapInvoice, probeInvoice string, lastHop *route.Vertex,
- initiator string) (*newLoopInResponse, error) {
-
- rpcCtx, rpcCancel := context.WithTimeout(ctx, globalCallTimeout)
- defer rpcCancel()
-
- req := &looprpc.ServerLoopInRequest{
- SwapHash: swapHash[:],
- Amt: uint64(amount),
- SenderKey: senderKey[:],
- SwapInvoice: swapInvoice,
- ProtocolVersion: loopdb.CurrentRPCProtocolVersion,
- ProbeInvoice: probeInvoice,
- UserAgent: UserAgent(initiator),
- }
- if lastHop != nil {
- req.LastHop = lastHop[:]
- }
-
- swapResp, err := s.server.NewLoopInSwap(rpcCtx, req)
- if err != nil {
- return nil, err
- }
-
- var receiverKey [33]byte
- copy(receiverKey[:], swapResp.ReceiverKey)
-
- // Validate receiver key.
- _, err = btcec.ParsePubKey(receiverKey[:], btcec.S256())
- if err != nil {
- return nil, fmt.Errorf("invalid sender key: %v", err)
- }
-
- return &newLoopInResponse{
- receiverKey: receiverKey,
- expiry: swapResp.Expiry,
- serverMessage: swapResp.ServerMessage,
- }, nil
- }
-
- // ServerUpdate summarizes an update from the swap server.
- type ServerUpdate struct {
- // State is the state that the server has sent us.
- State looprpc.ServerSwapState
-
- // Timestamp is the time of the server state update.
- Timestamp time.Time
- }
-
- // SubscribeLoopInUpdates subscribes to loop in server state and pipes updates
- // into the channel provided.
- func (s *grpcSwapServerClient) SubscribeLoopInUpdates(ctx context.Context,
- hash lntypes.Hash) (<-chan *ServerUpdate, <-chan error, error) {
-
- resp, err := s.server.SubscribeLoopInUpdates(
- ctx, &looprpc.SubscribeUpdatesRequest{
- ProtocolVersion: loopdb.CurrentRPCProtocolVersion,
- SwapHash: hash[:],
- },
- )
- if err != nil {
- return nil, nil, err
- }
-
- receive := func() (*ServerUpdate, error) {
- response, err := resp.Recv()
- if err != nil {
- return nil, err
- }
-
- return &ServerUpdate{
- State: response.State,
- Timestamp: time.Unix(0, response.TimestampNs),
- }, nil
- }
-
- updateChan, errChan := s.makeServerUpdate(ctx, receive)
- return updateChan, errChan, nil
- }
-
- // SubscribeLoopOutUpdates subscribes to loop out server state and pipes updates
- // into the channel provided.
- func (s *grpcSwapServerClient) SubscribeLoopOutUpdates(ctx context.Context,
- hash lntypes.Hash) (<-chan *ServerUpdate, <-chan error, error) {
-
- resp, err := s.server.SubscribeLoopOutUpdates(
- ctx, &looprpc.SubscribeUpdatesRequest{
- ProtocolVersion: loopdb.CurrentRPCProtocolVersion,
- SwapHash: hash[:],
- },
- )
- if err != nil {
- return nil, nil, err
- }
-
- receive := func() (*ServerUpdate, error) {
- response, err := resp.Recv()
- if err != nil {
- return nil, err
- }
-
- return &ServerUpdate{
- State: response.State,
- Timestamp: time.Unix(0, response.TimestampNs),
- }, nil
- }
-
- updateChan, errChan := s.makeServerUpdate(ctx, receive)
- return updateChan, errChan, nil
- }
-
- // makeServerUpdate takes a stream receive function and a channel that it
- // should pipe updates into. It sends events into the updates channel until
- // the client cancels, server client shuts down or the subscription is cancelled
- // server side.
- func (s *grpcSwapServerClient) makeServerUpdate(ctx context.Context,
- receive func() (*ServerUpdate, error)) (<-chan *ServerUpdate,
- <-chan error) {
-
- // We will return exactly one error from this function so we buffer
- // our error channel so that the function exit is not dependent on
- // the error being read.
- errChan := make(chan error, 1)
- updateChan := make(chan *ServerUpdate)
-
- // Create a goroutine that will pipe updates in to our updates channel.
- s.wg.Add(1)
- go func() {
- defer s.wg.Done()
-
- for {
- // Try to receive from our stream. If there are no items
- // to consume, this call will block. If our stream is
- // cancelled by the server we will receive an error.
- response, err := receive()
- switch err {
- // If we get a nil error, we proceed with to delivering
- // the update we have just received.
- case nil:
-
- // If we get an EOF error, the server is finished
- // sending us updates, so we return with a non-nil
- // a subscription complete error to inform the caller
- // that they will no longer receive updates.
- case io.EOF:
- errChan <- errServerSubscriptionComplete
- return
-
- // If we receive a non-nil error, we exit.
- default:
- // If we get a transport is closing error, we
- // send a server restarting error so that the
- // caller is informed that we will not get
- // any more updates from the server (since we
- // don't have retry logic yet).
- if isErrConClosing(err) {
- errChan <- errSubscriptionFailed
- } else {
- errChan <- err
- }
-
- return
- }
-
- select {
- // Try to send our update to the update channel.
- case updateChan <- response:
-
- // If the client cancels their context, we exit with
- // no error.
- case <-ctx.Done():
- errChan <- nil
- return
- }
- }
- }()
-
- return updateChan, errChan
- }
-
- // getSwapServerConn returns a connection to the swap server. A non-empty
- // proxyAddr indicates that a SOCKS proxy found at the address should be used to
- // establish the connection.
- func getSwapServerConn(address, proxyAddress string, insecure bool,
- tlsPath string, interceptor *lsat.Interceptor) (*grpc.ClientConn, error) {
-
- // Create a dial options array.
- opts := []grpc.DialOption{
- grpc.WithUnaryInterceptor(
- interceptor.UnaryInterceptor,
- ),
- grpc.WithStreamInterceptor(
- interceptor.StreamInterceptor,
- ),
- }
-
- // There are three options to connect to a swap server, either insecure,
- // using a self-signed certificate or with a certificate signed by a
- // public CA.
- switch {
- case insecure:
- opts = append(opts, grpc.WithInsecure())
-
- case tlsPath != "":
- // Load the specified TLS certificate and build
- // transport credentials
- creds, err := credentials.NewClientTLSFromFile(tlsPath, "")
- if err != nil {
- return nil, err
- }
- opts = append(opts, grpc.WithTransportCredentials(creds))
-
- default:
- creds := credentials.NewTLS(&tls.Config{})
- opts = append(opts, grpc.WithTransportCredentials(creds))
- }
-
- // If a SOCKS proxy address was specified, then we should dial through
- // it.
- if proxyAddress != "" {
- log.Infof("Proxying connection to %v over Tor SOCKS proxy %v",
- address, proxyAddress)
- torDialer := func(_ context.Context, addr string) (net.Conn, error) {
- return tor.Dial(
- addr, proxyAddress, false,
- tor.DefaultConnTimeout,
- )
- }
- opts = append(opts, grpc.WithContextDialer(torDialer))
- }
-
- conn, err := grpc.Dial(address, opts...)
- if err != nil {
- return nil, fmt.Errorf("unable to connect to RPC server: %v",
- err)
- }
-
- return conn, nil
- }
-
- // isErrConClosing identifies whether we have received a "transport is closing"
- // error from a grpc stream, indicating that the server has shutdown. We need
- // to string match this error because ErrConnClosing is part of an internal
- // grpc package, so cannot be used directly.
- func isErrConClosing(err error) bool {
- if err == nil {
- return false
- }
-
- return strings.Contains(err.Error(), "transport is closing")
- }
-
- type newLoopOutResponse struct {
- swapInvoice string
- prepayInvoice string
- senderKey [33]byte
- serverMessage string
- }
-
- type newLoopInResponse struct {
- receiverKey [33]byte
- expiry int32
- serverMessage string
- }
|