You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

542 lines
15 KiB

  1. package loop
  2. import (
  3. "context"
  4. "crypto/tls"
  5. "encoding/hex"
  6. "errors"
  7. "fmt"
  8. "io"
  9. "net"
  10. "strings"
  11. "sync"
  12. "time"
  13. "github.com/btcsuite/btcd/btcec"
  14. "github.com/btcsuite/btcutil"
  15. "github.com/lightninglabs/loop/loopdb"
  16. "github.com/lightninglabs/loop/looprpc"
  17. "github.com/lightninglabs/loop/lsat"
  18. "github.com/lightningnetwork/lnd/lntypes"
  19. "github.com/lightningnetwork/lnd/routing/route"
  20. "github.com/lightningnetwork/lnd/tor"
  21. "google.golang.org/grpc"
  22. "google.golang.org/grpc/credentials"
  23. )
  24. var (
  25. // errServerSubscriptionComplete is returned when our subscription to
  26. // server updates exits because the server has no more updates to
  27. // provide us, because its part in the swap is complete.
  28. errServerSubscriptionComplete = errors.New("server finished serving " +
  29. "updates")
  30. // errSubscriptionFailed is returned when our subscription returns with
  31. // and EOF, indicating that the server restarted, we had an unexpected
  32. // network failure. Since we do not have restart-recovery, we note that
  33. // we will not resume our subscription once this error occurs.
  34. errSubscriptionFailed = errors.New("failed, no further updates will " +
  35. "be provided")
  36. )
  37. type swapServerClient interface {
  38. GetLoopOutTerms(ctx context.Context) (
  39. *LoopOutTerms, error)
  40. GetLoopOutQuote(ctx context.Context, amt btcutil.Amount, expiry int32,
  41. swapPublicationDeadline time.Time) (
  42. *LoopOutQuote, error)
  43. GetLoopInTerms(ctx context.Context) (
  44. *LoopInTerms, error)
  45. GetLoopInQuote(ctx context.Context, amt btcutil.Amount) (
  46. *LoopInQuote, error)
  47. NewLoopOutSwap(ctx context.Context,
  48. swapHash lntypes.Hash, amount btcutil.Amount, expiry int32,
  49. receiverKey [33]byte, swapPublicationDeadline time.Time,
  50. initiator string) (*newLoopOutResponse, error)
  51. PushLoopOutPreimage(ctx context.Context,
  52. preimage lntypes.Preimage) error
  53. NewLoopInSwap(ctx context.Context,
  54. swapHash lntypes.Hash, amount btcutil.Amount,
  55. senderKey [33]byte, swapInvoice, probeInvoice string,
  56. lastHop *route.Vertex, initiator string) (*newLoopInResponse,
  57. error)
  58. // SubscribeLoopOutUpdates subscribes to loop out server state.
  59. SubscribeLoopOutUpdates(ctx context.Context,
  60. hash lntypes.Hash) (<-chan *ServerUpdate, <-chan error, error)
  61. // SubscribeLoopInUpdates subscribes to loop in server state.
  62. SubscribeLoopInUpdates(ctx context.Context,
  63. hash lntypes.Hash) (<-chan *ServerUpdate, <-chan error, error)
  64. }
  65. type grpcSwapServerClient struct {
  66. server looprpc.SwapServerClient
  67. conn *grpc.ClientConn
  68. wg sync.WaitGroup
  69. }
  70. // stop sends the signal for the server's goroutines to shutdown and waits for
  71. // them to complete.
  72. func (s *grpcSwapServerClient) stop() {
  73. if err := s.conn.Close(); err != nil {
  74. log.Warnf("could not close connection: %v", err)
  75. }
  76. s.wg.Wait()
  77. }
  78. var _ swapServerClient = (*grpcSwapServerClient)(nil)
  79. func newSwapServerClient(cfg *ClientConfig, lsatStore lsat.Store) (
  80. *grpcSwapServerClient, error) {
  81. // Create the server connection with the interceptor that will handle
  82. // the LSAT protocol for us.
  83. clientInterceptor := lsat.NewInterceptor(
  84. cfg.Lnd, lsatStore, serverRPCTimeout, cfg.MaxLsatCost,
  85. cfg.MaxLsatFee,
  86. )
  87. serverConn, err := getSwapServerConn(
  88. cfg.ServerAddress, cfg.ProxyAddress, cfg.SwapServerNoTLS,
  89. cfg.TLSPathServer, clientInterceptor,
  90. )
  91. if err != nil {
  92. return nil, err
  93. }
  94. server := looprpc.NewSwapServerClient(serverConn)
  95. return &grpcSwapServerClient{
  96. conn: serverConn,
  97. server: server,
  98. }, nil
  99. }
  100. func (s *grpcSwapServerClient) GetLoopOutTerms(ctx context.Context) (
  101. *LoopOutTerms, error) {
  102. rpcCtx, rpcCancel := context.WithTimeout(ctx, globalCallTimeout)
  103. defer rpcCancel()
  104. terms, err := s.server.LoopOutTerms(rpcCtx,
  105. &looprpc.ServerLoopOutTermsRequest{
  106. ProtocolVersion: loopdb.CurrentRPCProtocolVersion,
  107. },
  108. )
  109. if err != nil {
  110. return nil, err
  111. }
  112. return &LoopOutTerms{
  113. MinSwapAmount: btcutil.Amount(terms.MinSwapAmount),
  114. MaxSwapAmount: btcutil.Amount(terms.MaxSwapAmount),
  115. MinCltvDelta: terms.MinCltvDelta,
  116. MaxCltvDelta: terms.MaxCltvDelta,
  117. }, nil
  118. }
  119. func (s *grpcSwapServerClient) GetLoopOutQuote(ctx context.Context,
  120. amt btcutil.Amount, expiry int32, swapPublicationDeadline time.Time) (
  121. *LoopOutQuote, error) {
  122. rpcCtx, rpcCancel := context.WithTimeout(ctx, globalCallTimeout)
  123. defer rpcCancel()
  124. quoteResp, err := s.server.LoopOutQuote(rpcCtx,
  125. &looprpc.ServerLoopOutQuoteRequest{
  126. Amt: uint64(amt),
  127. SwapPublicationDeadline: swapPublicationDeadline.Unix(),
  128. ProtocolVersion: loopdb.CurrentRPCProtocolVersion,
  129. Expiry: expiry,
  130. },
  131. )
  132. if err != nil {
  133. return nil, err
  134. }
  135. dest, err := hex.DecodeString(quoteResp.SwapPaymentDest)
  136. if err != nil {
  137. return nil, err
  138. }
  139. if len(dest) != 33 {
  140. return nil, errors.New("invalid payment dest")
  141. }
  142. var destArray [33]byte
  143. copy(destArray[:], dest)
  144. return &LoopOutQuote{
  145. PrepayAmount: btcutil.Amount(quoteResp.PrepayAmt),
  146. SwapFee: btcutil.Amount(quoteResp.SwapFee),
  147. SwapPaymentDest: destArray,
  148. }, nil
  149. }
  150. func (s *grpcSwapServerClient) GetLoopInTerms(ctx context.Context) (
  151. *LoopInTerms, error) {
  152. rpcCtx, rpcCancel := context.WithTimeout(ctx, globalCallTimeout)
  153. defer rpcCancel()
  154. terms, err := s.server.LoopInTerms(rpcCtx,
  155. &looprpc.ServerLoopInTermsRequest{
  156. ProtocolVersion: loopdb.CurrentRPCProtocolVersion,
  157. },
  158. )
  159. if err != nil {
  160. return nil, err
  161. }
  162. return &LoopInTerms{
  163. MinSwapAmount: btcutil.Amount(terms.MinSwapAmount),
  164. MaxSwapAmount: btcutil.Amount(terms.MaxSwapAmount),
  165. }, nil
  166. }
  167. func (s *grpcSwapServerClient) GetLoopInQuote(ctx context.Context,
  168. amt btcutil.Amount) (*LoopInQuote, error) {
  169. rpcCtx, rpcCancel := context.WithTimeout(ctx, globalCallTimeout)
  170. defer rpcCancel()
  171. quoteResp, err := s.server.LoopInQuote(rpcCtx,
  172. &looprpc.ServerLoopInQuoteRequest{
  173. Amt: uint64(amt),
  174. ProtocolVersion: loopdb.CurrentRPCProtocolVersion,
  175. },
  176. )
  177. if err != nil {
  178. return nil, err
  179. }
  180. return &LoopInQuote{
  181. SwapFee: btcutil.Amount(quoteResp.SwapFee),
  182. CltvDelta: quoteResp.CltvDelta,
  183. }, nil
  184. }
  185. func (s *grpcSwapServerClient) NewLoopOutSwap(ctx context.Context,
  186. swapHash lntypes.Hash, amount btcutil.Amount, expiry int32,
  187. receiverKey [33]byte, swapPublicationDeadline time.Time,
  188. initiator string) (*newLoopOutResponse, error) {
  189. rpcCtx, rpcCancel := context.WithTimeout(ctx, globalCallTimeout)
  190. defer rpcCancel()
  191. swapResp, err := s.server.NewLoopOutSwap(rpcCtx,
  192. &looprpc.ServerLoopOutRequest{
  193. SwapHash: swapHash[:],
  194. Amt: uint64(amount),
  195. ReceiverKey: receiverKey[:],
  196. SwapPublicationDeadline: swapPublicationDeadline.Unix(),
  197. ProtocolVersion: loopdb.CurrentRPCProtocolVersion,
  198. Expiry: expiry,
  199. UserAgent: UserAgent(initiator),
  200. },
  201. )
  202. if err != nil {
  203. return nil, err
  204. }
  205. var senderKey [33]byte
  206. copy(senderKey[:], swapResp.SenderKey)
  207. // Validate sender key.
  208. _, err = btcec.ParsePubKey(senderKey[:], btcec.S256())
  209. if err != nil {
  210. return nil, fmt.Errorf("invalid sender key: %v", err)
  211. }
  212. return &newLoopOutResponse{
  213. swapInvoice: swapResp.SwapInvoice,
  214. prepayInvoice: swapResp.PrepayInvoice,
  215. senderKey: senderKey,
  216. serverMessage: swapResp.ServerMessage,
  217. }, nil
  218. }
  219. // PushLoopOutPreimage pushes a preimage to the server.
  220. func (s *grpcSwapServerClient) PushLoopOutPreimage(ctx context.Context,
  221. preimage lntypes.Preimage) error {
  222. rpcCtx, rpcCancel := context.WithTimeout(ctx, globalCallTimeout)
  223. defer rpcCancel()
  224. _, err := s.server.LoopOutPushPreimage(rpcCtx,
  225. &looprpc.ServerLoopOutPushPreimageRequest{
  226. ProtocolVersion: loopdb.CurrentRPCProtocolVersion,
  227. Preimage: preimage[:],
  228. },
  229. )
  230. return err
  231. }
  232. func (s *grpcSwapServerClient) NewLoopInSwap(ctx context.Context,
  233. swapHash lntypes.Hash, amount btcutil.Amount, senderKey [33]byte,
  234. swapInvoice, probeInvoice string, lastHop *route.Vertex,
  235. initiator string) (*newLoopInResponse, error) {
  236. rpcCtx, rpcCancel := context.WithTimeout(ctx, globalCallTimeout)
  237. defer rpcCancel()
  238. req := &looprpc.ServerLoopInRequest{
  239. SwapHash: swapHash[:],
  240. Amt: uint64(amount),
  241. SenderKey: senderKey[:],
  242. SwapInvoice: swapInvoice,
  243. ProtocolVersion: loopdb.CurrentRPCProtocolVersion,
  244. ProbeInvoice: probeInvoice,
  245. UserAgent: UserAgent(initiator),
  246. }
  247. if lastHop != nil {
  248. req.LastHop = lastHop[:]
  249. }
  250. swapResp, err := s.server.NewLoopInSwap(rpcCtx, req)
  251. if err != nil {
  252. return nil, err
  253. }
  254. var receiverKey [33]byte
  255. copy(receiverKey[:], swapResp.ReceiverKey)
  256. // Validate receiver key.
  257. _, err = btcec.ParsePubKey(receiverKey[:], btcec.S256())
  258. if err != nil {
  259. return nil, fmt.Errorf("invalid sender key: %v", err)
  260. }
  261. return &newLoopInResponse{
  262. receiverKey: receiverKey,
  263. expiry: swapResp.Expiry,
  264. serverMessage: swapResp.ServerMessage,
  265. }, nil
  266. }
  267. // ServerUpdate summarizes an update from the swap server.
  268. type ServerUpdate struct {
  269. // State is the state that the server has sent us.
  270. State looprpc.ServerSwapState
  271. // Timestamp is the time of the server state update.
  272. Timestamp time.Time
  273. }
  274. // SubscribeLoopInUpdates subscribes to loop in server state and pipes updates
  275. // into the channel provided.
  276. func (s *grpcSwapServerClient) SubscribeLoopInUpdates(ctx context.Context,
  277. hash lntypes.Hash) (<-chan *ServerUpdate, <-chan error, error) {
  278. resp, err := s.server.SubscribeLoopInUpdates(
  279. ctx, &looprpc.SubscribeUpdatesRequest{
  280. ProtocolVersion: loopdb.CurrentRPCProtocolVersion,
  281. SwapHash: hash[:],
  282. },
  283. )
  284. if err != nil {
  285. return nil, nil, err
  286. }
  287. receive := func() (*ServerUpdate, error) {
  288. response, err := resp.Recv()
  289. if err != nil {
  290. return nil, err
  291. }
  292. return &ServerUpdate{
  293. State: response.State,
  294. Timestamp: time.Unix(0, response.TimestampNs),
  295. }, nil
  296. }
  297. updateChan, errChan := s.makeServerUpdate(ctx, receive)
  298. return updateChan, errChan, nil
  299. }
  300. // SubscribeLoopOutUpdates subscribes to loop out server state and pipes updates
  301. // into the channel provided.
  302. func (s *grpcSwapServerClient) SubscribeLoopOutUpdates(ctx context.Context,
  303. hash lntypes.Hash) (<-chan *ServerUpdate, <-chan error, error) {
  304. resp, err := s.server.SubscribeLoopOutUpdates(
  305. ctx, &looprpc.SubscribeUpdatesRequest{
  306. ProtocolVersion: loopdb.CurrentRPCProtocolVersion,
  307. SwapHash: hash[:],
  308. },
  309. )
  310. if err != nil {
  311. return nil, nil, err
  312. }
  313. receive := func() (*ServerUpdate, error) {
  314. response, err := resp.Recv()
  315. if err != nil {
  316. return nil, err
  317. }
  318. return &ServerUpdate{
  319. State: response.State,
  320. Timestamp: time.Unix(0, response.TimestampNs),
  321. }, nil
  322. }
  323. updateChan, errChan := s.makeServerUpdate(ctx, receive)
  324. return updateChan, errChan, nil
  325. }
  326. // makeServerUpdate takes a stream receive function and a channel that it
  327. // should pipe updates into. It sends events into the updates channel until
  328. // the client cancels, server client shuts down or the subscription is cancelled
  329. // server side.
  330. func (s *grpcSwapServerClient) makeServerUpdate(ctx context.Context,
  331. receive func() (*ServerUpdate, error)) (<-chan *ServerUpdate,
  332. <-chan error) {
  333. // We will return exactly one error from this function so we buffer
  334. // our error channel so that the function exit is not dependent on
  335. // the error being read.
  336. errChan := make(chan error, 1)
  337. updateChan := make(chan *ServerUpdate)
  338. // Create a goroutine that will pipe updates in to our updates channel.
  339. s.wg.Add(1)
  340. go func() {
  341. defer s.wg.Done()
  342. for {
  343. // Try to receive from our stream. If there are no items
  344. // to consume, this call will block. If our stream is
  345. // cancelled by the server we will receive an error.
  346. response, err := receive()
  347. switch err {
  348. // If we get a nil error, we proceed with to delivering
  349. // the update we have just received.
  350. case nil:
  351. // If we get an EOF error, the server is finished
  352. // sending us updates, so we return with a non-nil
  353. // a subscription complete error to inform the caller
  354. // that they will no longer receive updates.
  355. case io.EOF:
  356. errChan <- errServerSubscriptionComplete
  357. return
  358. // If we receive a non-nil error, we exit.
  359. default:
  360. // If we get a transport is closing error, we
  361. // send a server restarting error so that the
  362. // caller is informed that we will not get
  363. // any more updates from the server (since we
  364. // don't have retry logic yet).
  365. if isErrConClosing(err) {
  366. errChan <- errSubscriptionFailed
  367. } else {
  368. errChan <- err
  369. }
  370. return
  371. }
  372. select {
  373. // Try to send our update to the update channel.
  374. case updateChan <- response:
  375. // If the client cancels their context, we exit with
  376. // no error.
  377. case <-ctx.Done():
  378. errChan <- nil
  379. return
  380. }
  381. }
  382. }()
  383. return updateChan, errChan
  384. }
  385. // getSwapServerConn returns a connection to the swap server. A non-empty
  386. // proxyAddr indicates that a SOCKS proxy found at the address should be used to
  387. // establish the connection.
  388. func getSwapServerConn(address, proxyAddress string, insecure bool,
  389. tlsPath string, interceptor *lsat.Interceptor) (*grpc.ClientConn, error) {
  390. // Create a dial options array.
  391. opts := []grpc.DialOption{
  392. grpc.WithUnaryInterceptor(
  393. interceptor.UnaryInterceptor,
  394. ),
  395. grpc.WithStreamInterceptor(
  396. interceptor.StreamInterceptor,
  397. ),
  398. }
  399. // There are three options to connect to a swap server, either insecure,
  400. // using a self-signed certificate or with a certificate signed by a
  401. // public CA.
  402. switch {
  403. case insecure:
  404. opts = append(opts, grpc.WithInsecure())
  405. case tlsPath != "":
  406. // Load the specified TLS certificate and build
  407. // transport credentials
  408. creds, err := credentials.NewClientTLSFromFile(tlsPath, "")
  409. if err != nil {
  410. return nil, err
  411. }
  412. opts = append(opts, grpc.WithTransportCredentials(creds))
  413. default:
  414. creds := credentials.NewTLS(&tls.Config{})
  415. opts = append(opts, grpc.WithTransportCredentials(creds))
  416. }
  417. // If a SOCKS proxy address was specified, then we should dial through
  418. // it.
  419. if proxyAddress != "" {
  420. log.Infof("Proxying connection to %v over Tor SOCKS proxy %v",
  421. address, proxyAddress)
  422. torDialer := func(_ context.Context, addr string) (net.Conn, error) {
  423. return tor.Dial(
  424. addr, proxyAddress, false,
  425. tor.DefaultConnTimeout,
  426. )
  427. }
  428. opts = append(opts, grpc.WithContextDialer(torDialer))
  429. }
  430. conn, err := grpc.Dial(address, opts...)
  431. if err != nil {
  432. return nil, fmt.Errorf("unable to connect to RPC server: %v",
  433. err)
  434. }
  435. return conn, nil
  436. }
  437. // isErrConClosing identifies whether we have received a "transport is closing"
  438. // error from a grpc stream, indicating that the server has shutdown. We need
  439. // to string match this error because ErrConnClosing is part of an internal
  440. // grpc package, so cannot be used directly.
  441. func isErrConClosing(err error) bool {
  442. if err == nil {
  443. return false
  444. }
  445. return strings.Contains(err.Error(), "transport is closing")
  446. }
  447. type newLoopOutResponse struct {
  448. swapInvoice string
  449. prepayInvoice string
  450. senderKey [33]byte
  451. serverMessage string
  452. }
  453. type newLoopInResponse struct {
  454. receiverKey [33]byte
  455. expiry int32
  456. serverMessage string
  457. }