multi: consume and log sever state updates

pull/226/head
carla 4 years ago
parent a6539b6adb
commit cd2b08aec6
No known key found for this signature in database
GPG Key ID: 4CA7FE54A6213C91

@ -162,7 +162,7 @@ func NewClient(dbDir string, cfg *ClientConfig) (*Client, func(), error) {
}
cleanup := func() {
swapServerClient.Close()
swapServerClient.stop()
}
return client, cleanup, nil

@ -5,6 +5,7 @@ import (
"crypto/rand"
"crypto/sha256"
"fmt"
"sync"
"time"
"github.com/btcsuite/btcutil"
@ -61,6 +62,8 @@ type loopInSwap struct {
htlcTxHash *chainhash.Hash
timeoutAddr btcutil.Address
wg sync.WaitGroup
}
// loopInInitResult contains information about a just-initiated loop in swap.
@ -293,9 +296,25 @@ func (s *loopInSwap) sendUpdate(ctx context.Context) error {
func (s *loopInSwap) execute(mainCtx context.Context,
cfg *executeConfig, height int32) error {
defer s.wg.Wait()
s.executeConfig = *cfg
s.height = height
// Create context for our state subscription which we will cancel once
// swap execution has completed, ensuring that we kill the subscribe
// goroutine.
subCtx, cancel := context.WithCancel(mainCtx)
defer cancel()
s.wg.Add(1)
go func() {
defer s.wg.Done()
subscribeAndLogUpdates(
subCtx, s.hash, s.log, s.server.SubscribeLoopInUpdates,
)
}()
// Announce swap by sending out an initial update.
err := s.sendUpdate(mainCtx)
if err != nil {

@ -6,6 +6,7 @@ import (
"crypto/sha256"
"errors"
"fmt"
"sync"
"time"
"github.com/btcsuite/btcd/chaincfg/chainhash"
@ -62,6 +63,8 @@ type loopOutSwap struct {
swapPaymentChan chan lndclient.PaymentResult
prePaymentChan chan lndclient.PaymentResult
wg sync.WaitGroup
}
// executeConfig contains extra configuration to execute the swap.
@ -254,9 +257,25 @@ func (s *loopOutSwap) sendUpdate(ctx context.Context) error {
func (s *loopOutSwap) execute(mainCtx context.Context,
cfg *executeConfig, height int32) error {
defer s.wg.Wait()
s.executeConfig = *cfg
s.height = height
// Create context for our state subscription which we will cancel once
// swap execution has completed, ensuring that we kill the subscribe
// goroutine.
subCtx, cancel := context.WithCancel(mainCtx)
defer cancel()
s.wg.Add(1)
go func() {
defer s.wg.Done()
subscribeAndLogUpdates(
subCtx, s.hash, s.log, s.server.SubscribeLoopOutUpdates,
)
}()
// Execute swap.
err := s.executeAndFinalize(mainCtx)

@ -71,6 +71,85 @@ func (ProtocolVersion) EnumDescriptor() ([]byte, []int) {
return fileDescriptor_ad098daeda4239f7, []int{0}
}
// ServerSwapState is an enum which represents all the states a swap may have
// from the server's perspective.
type ServerSwapState int32
const (
// The server has created the swap.
ServerSwapState_INITIATED ServerSwapState = 0
// The server has published the loop out on chain htlc.
ServerSwapState_HTLC_PUBLISHED ServerSwapState = 1
// The swap completed successfully.
ServerSwapState_SUCCESS ServerSwapState = 2
//
//The swap failed for a reason that is unknown to the server, this is only
//set for older swaps.
ServerSwapState_FAILED_UNKNOWN ServerSwapState = 3
// No htlc was confirmed in time for the loop in swap to complete.
ServerSwapState_FAILED_NO_HTLC ServerSwapState = 4
// A loop in htlc confirmed on chain, but it did not have the correct value.
ServerSwapState_FAILED_INVALID_HTLC_AMOUNT ServerSwapState = 5
//
//We did not succeed in completing the loop in off chain payment before the
//timeout.
ServerSwapState_FAILED_OFF_CHAIN_TIMEOUT ServerSwapState = 6
// The on chain timeout was claimed.
ServerSwapState_FAILED_TIMEOUT ServerSwapState = 7
//
//The server could not publish the loop out on chain htlc before the deadline
//provided.
ServerSwapState_FAILED_SWAP_DEADLINE ServerSwapState = 8
// The server could not publish the loop out on chain htlc.
ServerSwapState_FAILED_HTLC_PUBLICATION ServerSwapState = 9
// The server has published the loop out on chain timeout tx.
ServerSwapState_TIMEOUT_PUBLISHED ServerSwapState = 10
// The swap has failed for unknown reasons, it will not be completed.
ServerSwapState_UNEXPECTED_FAILURE ServerSwapState = 11
// The swap htlc has confirmed on chain.
ServerSwapState_HTLC_CONFIRMED ServerSwapState = 12
)
var ServerSwapState_name = map[int32]string{
0: "INITIATED",
1: "HTLC_PUBLISHED",
2: "SUCCESS",
3: "FAILED_UNKNOWN",
4: "FAILED_NO_HTLC",
5: "FAILED_INVALID_HTLC_AMOUNT",
6: "FAILED_OFF_CHAIN_TIMEOUT",
7: "FAILED_TIMEOUT",
8: "FAILED_SWAP_DEADLINE",
9: "FAILED_HTLC_PUBLICATION",
10: "TIMEOUT_PUBLISHED",
11: "UNEXPECTED_FAILURE",
12: "HTLC_CONFIRMED",
}
var ServerSwapState_value = map[string]int32{
"INITIATED": 0,
"HTLC_PUBLISHED": 1,
"SUCCESS": 2,
"FAILED_UNKNOWN": 3,
"FAILED_NO_HTLC": 4,
"FAILED_INVALID_HTLC_AMOUNT": 5,
"FAILED_OFF_CHAIN_TIMEOUT": 6,
"FAILED_TIMEOUT": 7,
"FAILED_SWAP_DEADLINE": 8,
"FAILED_HTLC_PUBLICATION": 9,
"TIMEOUT_PUBLISHED": 10,
"UNEXPECTED_FAILURE": 11,
"HTLC_CONFIRMED": 12,
}
func (x ServerSwapState) String() string {
return proto.EnumName(ServerSwapState_name, int32(x))
}
func (ServerSwapState) EnumDescriptor() ([]byte, []int) {
return fileDescriptor_ad098daeda4239f7, []int{1}
}
type ServerLoopOutRequest struct {
ReceiverKey []byte `protobuf:"bytes,1,opt,name=receiver_key,json=receiverKey,proto3" json:"receiver_key,omitempty"`
SwapHash []byte `protobuf:"bytes,2,opt,name=swap_hash,json=swapHash,proto3" json:"swap_hash,omitempty"`
@ -884,8 +963,156 @@ func (m *ServerLoopOutPushPreimageResponse) XXX_DiscardUnknown() {
var xxx_messageInfo_ServerLoopOutPushPreimageResponse proto.InternalMessageInfo
type SubscribeUpdatesRequest struct {
// The protocol version that the client adheres to.
ProtocolVersion ProtocolVersion `protobuf:"varint,1,opt,name=protocol_version,json=protocolVersion,proto3,enum=looprpc.ProtocolVersion" json:"protocol_version,omitempty"`
// Swap hash is the hash of the swap to subscribe to updates for.
SwapHash []byte `protobuf:"bytes,2,opt,name=swap_hash,json=swapHash,proto3" json:"swap_hash,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *SubscribeUpdatesRequest) Reset() { *m = SubscribeUpdatesRequest{} }
func (m *SubscribeUpdatesRequest) String() string { return proto.CompactTextString(m) }
func (*SubscribeUpdatesRequest) ProtoMessage() {}
func (*SubscribeUpdatesRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_ad098daeda4239f7, []int{14}
}
func (m *SubscribeUpdatesRequest) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_SubscribeUpdatesRequest.Unmarshal(m, b)
}
func (m *SubscribeUpdatesRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_SubscribeUpdatesRequest.Marshal(b, m, deterministic)
}
func (m *SubscribeUpdatesRequest) XXX_Merge(src proto.Message) {
xxx_messageInfo_SubscribeUpdatesRequest.Merge(m, src)
}
func (m *SubscribeUpdatesRequest) XXX_Size() int {
return xxx_messageInfo_SubscribeUpdatesRequest.Size(m)
}
func (m *SubscribeUpdatesRequest) XXX_DiscardUnknown() {
xxx_messageInfo_SubscribeUpdatesRequest.DiscardUnknown(m)
}
var xxx_messageInfo_SubscribeUpdatesRequest proto.InternalMessageInfo
func (m *SubscribeUpdatesRequest) GetProtocolVersion() ProtocolVersion {
if m != nil {
return m.ProtocolVersion
}
return ProtocolVersion_LEGACY
}
func (m *SubscribeUpdatesRequest) GetSwapHash() []byte {
if m != nil {
return m.SwapHash
}
return nil
}
type SubscribeLoopOutUpdatesResponse struct {
// The unix timestamp in nanoseconds when the swap was updated.
TimestampNs int64 `protobuf:"varint,1,opt,name=timestamp_ns,json=timestampNs,proto3" json:"timestamp_ns,omitempty"`
// The swap's current state.
State ServerSwapState `protobuf:"varint,2,opt,name=state,proto3,enum=looprpc.ServerSwapState" json:"state,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *SubscribeLoopOutUpdatesResponse) Reset() { *m = SubscribeLoopOutUpdatesResponse{} }
func (m *SubscribeLoopOutUpdatesResponse) String() string { return proto.CompactTextString(m) }
func (*SubscribeLoopOutUpdatesResponse) ProtoMessage() {}
func (*SubscribeLoopOutUpdatesResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_ad098daeda4239f7, []int{15}
}
func (m *SubscribeLoopOutUpdatesResponse) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_SubscribeLoopOutUpdatesResponse.Unmarshal(m, b)
}
func (m *SubscribeLoopOutUpdatesResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_SubscribeLoopOutUpdatesResponse.Marshal(b, m, deterministic)
}
func (m *SubscribeLoopOutUpdatesResponse) XXX_Merge(src proto.Message) {
xxx_messageInfo_SubscribeLoopOutUpdatesResponse.Merge(m, src)
}
func (m *SubscribeLoopOutUpdatesResponse) XXX_Size() int {
return xxx_messageInfo_SubscribeLoopOutUpdatesResponse.Size(m)
}
func (m *SubscribeLoopOutUpdatesResponse) XXX_DiscardUnknown() {
xxx_messageInfo_SubscribeLoopOutUpdatesResponse.DiscardUnknown(m)
}
var xxx_messageInfo_SubscribeLoopOutUpdatesResponse proto.InternalMessageInfo
func (m *SubscribeLoopOutUpdatesResponse) GetTimestampNs() int64 {
if m != nil {
return m.TimestampNs
}
return 0
}
func (m *SubscribeLoopOutUpdatesResponse) GetState() ServerSwapState {
if m != nil {
return m.State
}
return ServerSwapState_INITIATED
}
type SubscribeLoopInUpdatesResponse struct {
// The unix timestamp in nanoseconds when the swap was updated.
TimestampNs int64 `protobuf:"varint,1,opt,name=timestamp_ns,json=timestampNs,proto3" json:"timestamp_ns,omitempty"`
// The swap's current state.
State ServerSwapState `protobuf:"varint,2,opt,name=state,proto3,enum=looprpc.ServerSwapState" json:"state,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *SubscribeLoopInUpdatesResponse) Reset() { *m = SubscribeLoopInUpdatesResponse{} }
func (m *SubscribeLoopInUpdatesResponse) String() string { return proto.CompactTextString(m) }
func (*SubscribeLoopInUpdatesResponse) ProtoMessage() {}
func (*SubscribeLoopInUpdatesResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_ad098daeda4239f7, []int{16}
}
func (m *SubscribeLoopInUpdatesResponse) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_SubscribeLoopInUpdatesResponse.Unmarshal(m, b)
}
func (m *SubscribeLoopInUpdatesResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_SubscribeLoopInUpdatesResponse.Marshal(b, m, deterministic)
}
func (m *SubscribeLoopInUpdatesResponse) XXX_Merge(src proto.Message) {
xxx_messageInfo_SubscribeLoopInUpdatesResponse.Merge(m, src)
}
func (m *SubscribeLoopInUpdatesResponse) XXX_Size() int {
return xxx_messageInfo_SubscribeLoopInUpdatesResponse.Size(m)
}
func (m *SubscribeLoopInUpdatesResponse) XXX_DiscardUnknown() {
xxx_messageInfo_SubscribeLoopInUpdatesResponse.DiscardUnknown(m)
}
var xxx_messageInfo_SubscribeLoopInUpdatesResponse proto.InternalMessageInfo
func (m *SubscribeLoopInUpdatesResponse) GetTimestampNs() int64 {
if m != nil {
return m.TimestampNs
}
return 0
}
func (m *SubscribeLoopInUpdatesResponse) GetState() ServerSwapState {
if m != nil {
return m.State
}
return ServerSwapState_INITIATED
}
func init() {
proto.RegisterEnum("looprpc.ProtocolVersion", ProtocolVersion_name, ProtocolVersion_value)
proto.RegisterEnum("looprpc.ServerSwapState", ServerSwapState_name, ServerSwapState_value)
proto.RegisterType((*ServerLoopOutRequest)(nil), "looprpc.ServerLoopOutRequest")
proto.RegisterType((*ServerLoopOutResponse)(nil), "looprpc.ServerLoopOutResponse")
proto.RegisterType((*ServerLoopOutQuoteRequest)(nil), "looprpc.ServerLoopOutQuoteRequest")
@ -900,69 +1127,91 @@ func init() {
proto.RegisterType((*ServerLoopInTerms)(nil), "looprpc.ServerLoopInTerms")
proto.RegisterType((*ServerLoopOutPushPreimageRequest)(nil), "looprpc.ServerLoopOutPushPreimageRequest")
proto.RegisterType((*ServerLoopOutPushPreimageResponse)(nil), "looprpc.ServerLoopOutPushPreimageResponse")
proto.RegisterType((*SubscribeUpdatesRequest)(nil), "looprpc.SubscribeUpdatesRequest")
proto.RegisterType((*SubscribeLoopOutUpdatesResponse)(nil), "looprpc.SubscribeLoopOutUpdatesResponse")
proto.RegisterType((*SubscribeLoopInUpdatesResponse)(nil), "looprpc.SubscribeLoopInUpdatesResponse")
}
func init() { proto.RegisterFile("server.proto", fileDescriptor_ad098daeda4239f7) }
var fileDescriptor_ad098daeda4239f7 = []byte{
// 906 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x55, 0xdd, 0x72, 0xda, 0x56,
0x10, 0xae, 0x04, 0xc6, 0x66, 0x0d, 0x36, 0x39, 0x69, 0x52, 0x41, 0xe2, 0x0e, 0xa6, 0x53, 0x8f,
0xeb, 0x0b, 0x67, 0x26, 0xbd, 0xeb, 0x1d, 0x8d, 0x89, 0xcd, 0x14, 0x1b, 0x2a, 0xe3, 0x74, 0x7a,
0xa5, 0x9e, 0xc0, 0xd6, 0x68, 0x2a, 0xe9, 0x28, 0xd2, 0x01, 0x9b, 0xeb, 0x3e, 0x45, 0x5f, 0xa2,
0xaf, 0xd0, 0xa7, 0xe8, 0x75, 0xef, 0x3a, 0xd3, 0xb7, 0xe8, 0x9c, 0x1f, 0x19, 0x04, 0xc2, 0x36,
0x33, 0xbe, 0x43, 0xbb, 0x9f, 0x76, 0xf7, 0xfb, 0x56, 0xdf, 0x02, 0xa5, 0x18, 0xa3, 0x09, 0x46,
0xc7, 0x61, 0xc4, 0x38, 0x23, 0x9b, 0x1e, 0x63, 0x61, 0x14, 0x0e, 0x6a, 0xaf, 0xaf, 0x19, 0xbb,
0xf6, 0xf0, 0x0d, 0x0d, 0xdd, 0x37, 0x34, 0x08, 0x18, 0xa7, 0xdc, 0x65, 0x41, 0xac, 0x60, 0x8d,
0xff, 0x0c, 0xf8, 0xfc, 0x52, 0xbe, 0xd7, 0x61, 0x2c, 0xec, 0x8e, 0xb9, 0x8d, 0x9f, 0xc6, 0x18,
0x73, 0xb2, 0x0f, 0xa5, 0x08, 0x07, 0xe8, 0x4e, 0x30, 0x72, 0x7e, 0xc3, 0xa9, 0x65, 0xd4, 0x8d,
0xc3, 0x92, 0xbd, 0x9d, 0xc4, 0x7e, 0xc0, 0x29, 0x79, 0x05, 0xc5, 0xf8, 0x86, 0x86, 0xce, 0x88,
0xc6, 0x23, 0xcb, 0x94, 0xf9, 0x2d, 0x11, 0x38, 0xa3, 0xf1, 0x88, 0x54, 0x20, 0x47, 0x7d, 0x6e,
0xe5, 0xea, 0xc6, 0x61, 0xde, 0x16, 0x3f, 0xc9, 0x77, 0x50, 0x95, 0xf0, 0x70, 0xfc, 0xd1, 0x73,
0x07, 0x72, 0x0a, 0x67, 0x88, 0x74, 0xe8, 0xb9, 0x01, 0x5a, 0xf9, 0xba, 0x71, 0x98, 0xb3, 0xbf,
0x10, 0x80, 0xde, 0x2c, 0x7f, 0xa2, 0xd3, 0xe4, 0x1d, 0x54, 0xe4, 0xbc, 0x03, 0xe6, 0x39, 0x13,
0x8c, 0x62, 0x97, 0x05, 0xd6, 0x46, 0xdd, 0x38, 0xdc, 0x79, 0x6b, 0x1d, 0x6b, 0xa2, 0xc7, 0x3d,
0x0d, 0xf8, 0xa0, 0xf2, 0xf6, 0x6e, 0x98, 0x0e, 0x34, 0xfe, 0x32, 0xe0, 0xc5, 0x02, 0xd7, 0x38,
0x64, 0x41, 0x8c, 0x82, 0xac, 0x1c, 0xcd, 0x0d, 0x26, 0xcc, 0x1d, 0xa0, 0x24, 0x5b, 0xb4, 0xb7,
0x45, 0xac, 0xad, 0x42, 0xe4, 0x6b, 0xd8, 0x09, 0x23, 0x0c, 0xe9, 0xf4, 0x0e, 0x64, 0x4a, 0x50,
0x59, 0x45, 0x13, 0xd8, 0x1e, 0x40, 0x8c, 0xc1, 0x50, 0x8b, 0x96, 0x93, 0xa2, 0x14, 0x55, 0x44,
0x48, 0xf6, 0x12, 0x0a, 0x78, 0x1b, 0xba, 0xd1, 0x54, 0x12, 0xde, 0xb0, 0xf5, 0x93, 0xa8, 0xae,
0xb6, 0xe7, 0xf8, 0x18, 0xc7, 0xf4, 0x1a, 0x25, 0xbb, 0xa2, 0x5d, 0x56, 0xd1, 0x73, 0x15, 0x6c,
0xfc, 0x69, 0x40, 0x35, 0xc5, 0xe0, 0xc7, 0x31, 0xe3, 0x98, 0xac, 0x4c, 0x4b, 0x6e, 0x3c, 0x52,
0x72, 0x73, 0x7d, 0xc9, 0x73, 0xeb, 0x4a, 0xfe, 0x87, 0x09, 0x64, 0x79, 0x60, 0x72, 0x04, 0xcf,
0xd4, 0x5c, 0x74, 0xea, 0x63, 0xc0, 0x9d, 0x21, 0xc6, 0x5c, 0x8b, 0xbe, 0x2b, 0xe7, 0x51, 0xf1,
0x13, 0xc1, 0xaa, 0x0a, 0xf2, 0xa3, 0x72, 0x7e, 0xc5, 0x64, 0xe4, 0x4d, 0xf1, 0xfc, 0x1e, 0x91,
0x1c, 0x40, 0x39, 0x49, 0x39, 0x11, 0xe5, 0x28, 0xe7, 0xcb, 0x7d, 0x6f, 0x5a, 0x86, 0xda, 0xdd,
0x7b, 0x44, 0x9b, 0x72, 0xb9, 0x14, 0xbd, 0x3b, 0xa1, 0x4f, 0x5e, 0xea, 0x53, 0x54, 0x91, 0xa6,
0xcf, 0xc9, 0x11, 0xec, 0xfa, 0x6e, 0xe0, 0xc8, 0x52, 0xd4, 0x67, 0xe3, 0x80, 0x4b, 0xf5, 0xf3,
0xb2, 0x50, 0xd9, 0x77, 0x83, 0xcb, 0x1b, 0x1a, 0x36, 0x65, 0x42, 0x62, 0xe9, 0x6d, 0x0a, 0x5b,
0x98, 0xc3, 0xd2, 0xdb, 0x39, 0xec, 0x1e, 0xc0, 0xc0, 0xe3, 0x13, 0x67, 0x88, 0x1e, 0xa7, 0xd6,
0xa6, 0x5c, 0x78, 0x51, 0x44, 0x4e, 0x44, 0xa0, 0xf1, 0xcb, 0xc2, 0x2e, 0xfb, 0x18, 0xf9, 0x71,
0xb2, 0xcb, 0x2c, 0xf5, 0x8d, 0x75, 0xd5, 0x1f, 0x2e, 0x88, 0x2f, 0x3b, 0x90, 0x83, 0x65, 0xba,
0xea, 0x93, 0x59, 0xa0, 0x7a, 0xb0, 0x4c, 0xd5, 0xd4, 0xb8, 0x79, 0x9a, 0x8d, 0x7f, 0x0d, 0x78,
0x3e, 0x6b, 0xd3, 0x0e, 0x12, 0x0a, 0x69, 0x2b, 0x18, 0x8b, 0x56, 0x58, 0xf3, 0x7a, 0x2c, 0x5a,
0x34, 0xbf, 0x6c, 0xd1, 0x2a, 0x6c, 0x79, 0x34, 0xe6, 0xce, 0x88, 0x85, 0x72, 0x81, 0x25, 0x7b,
0x53, 0x3c, 0x9f, 0xb1, 0x30, 0x53, 0xce, 0xc2, 0xba, 0x72, 0xde, 0xce, 0x9f, 0x4a, 0xc1, 0x73,
0x76, 0x3d, 0x1e, 0x3a, 0x95, 0x33, 0xdf, 0x9b, 0x0f, 0xf8, 0x3e, 0x97, 0xe5, 0xfb, 0x4f, 0x60,
0xcd, 0x77, 0x7e, 0xc0, 0xf5, 0x59, 0x64, 0xcd, 0x75, 0xc9, 0xfe, 0x9d, 0x3a, 0x35, 0x77, 0x3d,
0x35, 0xe5, 0x79, 0x53, 0x1a, 0x0f, 0x98, 0xd2, 0xcc, 0x36, 0x65, 0x86, 0xeb, 0xf2, 0x6b, 0xb8,
0x6e, 0xe3, 0x71, 0xae, 0x2b, 0x2c, 0xba, 0xce, 0x49, 0x4b, 0xf9, 0xf4, 0xa6, 0x1b, 0xc0, 0xb3,
0xa5, 0x06, 0x4f, 0xee, 0xb9, 0xdf, 0x0d, 0xa8, 0xa7, 0xac, 0xdd, 0x1b, 0xc7, 0xa3, 0x5e, 0x84,
0xae, 0x4f, 0xaf, 0xf1, 0x29, 0xe9, 0x90, 0x1a, 0x6c, 0x85, 0xba, 0x6e, 0xe2, 0xd2, 0xe4, 0xb9,
0xf1, 0x15, 0xec, 0xdf, 0x33, 0x84, 0xfa, 0x54, 0x8e, 0x46, 0xb0, 0xbb, 0xd0, 0x84, 0x00, 0x14,
0x3a, 0xad, 0xd3, 0xe6, 0xbb, 0x9f, 0x2b, 0x9f, 0x11, 0x02, 0x3b, 0xe7, 0x57, 0x9d, 0x7e, 0xdb,
0xe9, 0x74, 0xbb, 0x3d, 0xa7, 0x7b, 0xd5, 0xaf, 0x18, 0xa4, 0x0a, 0x2f, 0x2e, 0x9a, 0xfd, 0xf6,
0x87, 0x96, 0x73, 0xd9, 0x3a, 0xfd, 0xa9, 0xdd, 0x57, 0xb9, 0xf6, 0x45, 0xc5, 0x24, 0x35, 0x78,
0xd9, 0xb3, 0x5b, 0xed, 0xf3, 0xe6, 0x69, 0xcb, 0xe9, 0x5d, 0x5d, 0x9e, 0xcd, 0x5e, 0xcb, 0xbd,
0xfd, 0x27, 0x0f, 0x20, 0x34, 0x52, 0x33, 0x91, 0x2e, 0x94, 0x52, 0x77, 0xaf, 0x71, 0x47, 0x7a,
0xe5, 0xd9, 0xad, 0xbd, 0xba, 0x07, 0x43, 0xba, 0xb0, 0x73, 0x81, 0x37, 0x3a, 0x24, 0x1a, 0x91,
0xbd, 0x6c, 0x78, 0x52, 0xed, 0xcb, 0x55, 0x69, 0xed, 0x22, 0x0f, 0x9e, 0x67, 0x28, 0x47, 0xbe,
0xc9, 0x7e, 0x2d, 0x63, 0xc5, 0xb5, 0xa3, 0xc7, 0x40, 0x75, 0xb7, 0x99, 0x1e, 0xea, 0x4f, 0x78,
0x85, 0x1e, 0xf3, 0xc7, 0x65, 0x95, 0x1e, 0xaa, 0x40, 0x07, 0xb6, 0xe7, 0xbf, 0xf1, 0xfd, 0x0c,
0x6c, 0xda, 0x60, 0xb5, 0xda, 0x6a, 0x08, 0xe9, 0x40, 0x59, 0xab, 0xdb, 0x96, 0x8e, 0x20, 0xaf,
0x33, 0xc1, 0x49, 0xa9, 0xbd, 0x15, 0x59, 0x4d, 0xb6, 0x9f, 0xcc, 0xa6, 0x46, 0xcd, 0x9e, 0x2d,
0x45, 0xb5, 0x71, 0x1f, 0x44, 0x55, 0xfd, 0x58, 0x90, 0xee, 0xf8, 0xf6, 0xff, 0x00, 0x00, 0x00,
0xff, 0xff, 0x52, 0x9a, 0x24, 0x73, 0x6b, 0x0b, 0x00, 0x00,
// 1203 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x56, 0xcd, 0x72, 0xe2, 0x46,
0x10, 0x8e, 0x04, 0x06, 0xd3, 0x80, 0xad, 0x9d, 0xdd, 0xf5, 0x62, 0x76, 0xed, 0x60, 0x52, 0x71,
0x1c, 0x1f, 0xbc, 0x5b, 0x9b, 0x5b, 0x6e, 0x5a, 0x10, 0x46, 0xb5, 0x20, 0x11, 0x21, 0xec, 0xe4,
0x34, 0x91, 0x61, 0x62, 0x54, 0x01, 0x49, 0x2b, 0x09, 0xff, 0x54, 0x8e, 0x79, 0x8a, 0xbc, 0x44,
0x72, 0xca, 0x39, 0x4f, 0x91, 0x57, 0x48, 0x55, 0xde, 0x22, 0xa5, 0xd1, 0x08, 0x24, 0x90, 0x7f,
0xa8, 0x72, 0x6e, 0x56, 0xf7, 0x37, 0xd3, 0xfd, 0x7d, 0x3d, 0x5f, 0x63, 0x28, 0x79, 0xc4, 0xbd,
0x22, 0xee, 0x89, 0xe3, 0xda, 0xbe, 0x8d, 0xf2, 0x13, 0xdb, 0x76, 0x5c, 0x67, 0x58, 0x7d, 0x73,
0x69, 0xdb, 0x97, 0x13, 0xf2, 0xd6, 0x70, 0xcc, 0xb7, 0x86, 0x65, 0xd9, 0xbe, 0xe1, 0x9b, 0xb6,
0xe5, 0x85, 0xb0, 0xfa, 0xbf, 0x1c, 0xbc, 0xe8, 0xd3, 0x73, 0x1d, 0xdb, 0x76, 0xd4, 0x99, 0xaf,
0x91, 0x4f, 0x33, 0xe2, 0xf9, 0xe8, 0x00, 0x4a, 0x2e, 0x19, 0x12, 0xf3, 0x8a, 0xb8, 0xf8, 0x67,
0x72, 0x5b, 0xe1, 0x6a, 0xdc, 0x51, 0x49, 0x2b, 0x46, 0xb1, 0x8f, 0xe4, 0x16, 0xbd, 0x86, 0x82,
0x77, 0x6d, 0x38, 0x78, 0x6c, 0x78, 0xe3, 0x0a, 0x4f, 0xf3, 0x9b, 0x41, 0xa0, 0x6d, 0x78, 0x63,
0x24, 0x40, 0xc6, 0x98, 0xfa, 0x95, 0x4c, 0x8d, 0x3b, 0xca, 0x6a, 0xc1, 0x9f, 0xe8, 0x5b, 0xd8,
0xa5, 0x70, 0x67, 0x76, 0x31, 0x31, 0x87, 0xb4, 0x0b, 0x3c, 0x22, 0xc6, 0x68, 0x62, 0x5a, 0xa4,
0x92, 0xad, 0x71, 0x47, 0x19, 0xed, 0x55, 0x00, 0xe8, 0x2d, 0xf2, 0x4d, 0x96, 0x46, 0x0d, 0x10,
0x68, 0xbf, 0x43, 0x7b, 0x82, 0xaf, 0x88, 0xeb, 0x99, 0xb6, 0x55, 0xd9, 0xa8, 0x71, 0x47, 0x5b,
0xef, 0x2b, 0x27, 0x8c, 0xe8, 0x49, 0x8f, 0x01, 0xce, 0xc2, 0xbc, 0xb6, 0xed, 0x24, 0x03, 0xf5,
0xbf, 0x38, 0x78, 0xb9, 0xc4, 0xd5, 0x73, 0x6c, 0xcb, 0x23, 0x01, 0x59, 0xda, 0x9a, 0x69, 0x5d,
0xd9, 0xe6, 0x90, 0x50, 0xb2, 0x05, 0xad, 0x18, 0xc4, 0xe4, 0x30, 0x84, 0xbe, 0x84, 0x2d, 0xc7,
0x25, 0x8e, 0x71, 0x3b, 0x07, 0xf1, 0x14, 0x54, 0x0e, 0xa3, 0x11, 0x6c, 0x0f, 0xc0, 0x23, 0xd6,
0x88, 0x89, 0x96, 0xa1, 0xa2, 0x14, 0xc2, 0x48, 0x20, 0xd9, 0x0e, 0xe4, 0xc8, 0x8d, 0x63, 0xba,
0xb7, 0x94, 0xf0, 0x86, 0xc6, 0xbe, 0x82, 0xdb, 0xc3, 0xe9, 0xe1, 0x29, 0xf1, 0x3c, 0xe3, 0x92,
0x50, 0x76, 0x05, 0xad, 0x1c, 0x46, 0xbb, 0x61, 0xb0, 0xfe, 0x3b, 0x07, 0xbb, 0x09, 0x06, 0xdf,
0xcd, 0x6c, 0x9f, 0x44, 0x23, 0x63, 0x92, 0x73, 0x8f, 0x94, 0x9c, 0x5f, 0x5f, 0xf2, 0xcc, 0xba,
0x92, 0xff, 0xc6, 0x03, 0x5a, 0x6d, 0x18, 0x1d, 0xc3, 0xb3, 0xb0, 0x2f, 0xe3, 0x76, 0x4a, 0x2c,
0x1f, 0x8f, 0x88, 0xe7, 0x33, 0xd1, 0xb7, 0x69, 0x3f, 0x61, 0xbc, 0x19, 0xb0, 0xda, 0x05, 0xfa,
0xa8, 0xf0, 0x4f, 0x24, 0x6a, 0x39, 0x1f, 0x7c, 0xb7, 0x08, 0x41, 0x87, 0x50, 0x8e, 0x52, 0xd8,
0x35, 0x7c, 0x42, 0xfb, 0xcb, 0x7c, 0xe0, 0x2b, 0x5c, 0x38, 0xbb, 0x16, 0x21, 0x9a, 0xe1, 0xd3,
0xa1, 0xb0, 0xd9, 0x05, 0xfa, 0x64, 0xa9, 0x3e, 0x85, 0x30, 0x22, 0x4e, 0x7d, 0x74, 0x0c, 0xdb,
0x53, 0xd3, 0xc2, 0xf4, 0x2a, 0x63, 0x6a, 0xcf, 0x2c, 0x9f, 0xaa, 0x9f, 0xa5, 0x17, 0x95, 0xa7,
0xa6, 0xd5, 0xbf, 0x36, 0x1c, 0x91, 0x26, 0x28, 0xd6, 0xb8, 0x49, 0x60, 0x73, 0x31, 0xac, 0x71,
0x13, 0xc3, 0xee, 0x01, 0x0c, 0x27, 0xfe, 0x15, 0x1e, 0x91, 0x89, 0x6f, 0x54, 0xf2, 0x74, 0xe0,
0x85, 0x20, 0xd2, 0x0c, 0x02, 0xf5, 0x1f, 0x97, 0x66, 0xa9, 0x13, 0x77, 0xea, 0x45, 0xb3, 0x4c,
0x53, 0x9f, 0x5b, 0x57, 0xfd, 0xd1, 0x92, 0xf8, 0xb4, 0x02, 0x3a, 0x5c, 0xa5, 0x1b, 0x3e, 0x99,
0x25, 0xaa, 0x87, 0xab, 0x54, 0x79, 0x86, 0x8b, 0xd3, 0xac, 0xff, 0xc3, 0xc1, 0xf3, 0x45, 0x19,
0xd9, 0x8a, 0x28, 0x24, 0xad, 0xc0, 0x2d, 0x5b, 0x61, 0xcd, 0xed, 0xb1, 0x6c, 0xd1, 0xec, 0xaa,
0x45, 0x77, 0x61, 0x73, 0x62, 0x78, 0x3e, 0x1e, 0xdb, 0x0e, 0x1d, 0x60, 0x49, 0xcb, 0x07, 0xdf,
0x6d, 0xdb, 0x49, 0x95, 0x33, 0xb7, 0xae, 0x9c, 0x37, 0xf1, 0x55, 0x19, 0xf0, 0x5c, 0x6c, 0x8f,
0x87, 0x56, 0xe5, 0xc2, 0xf7, 0xfc, 0x03, 0xbe, 0xcf, 0xa4, 0xf9, 0xfe, 0x13, 0x54, 0xe2, 0x95,
0x1f, 0x70, 0x7d, 0x1a, 0x59, 0x7e, 0x5d, 0xb2, 0x7f, 0x27, 0x56, 0xcd, 0xbc, 0x26, 0xa3, 0x1c,
0x37, 0x25, 0xf7, 0x80, 0x29, 0xf9, 0x74, 0x53, 0xa6, 0xb8, 0x2e, 0xbb, 0x86, 0xeb, 0x36, 0x1e,
0xe7, 0xba, 0xdc, 0xb2, 0xeb, 0x70, 0x52, 0xca, 0xa7, 0x37, 0xdd, 0x10, 0x9e, 0xad, 0x14, 0x78,
0x72, 0xcf, 0xfd, 0xca, 0x41, 0x2d, 0x61, 0xed, 0xde, 0xcc, 0x1b, 0xf7, 0x5c, 0x62, 0x4e, 0x8d,
0x4b, 0xf2, 0x94, 0x74, 0x50, 0x15, 0x36, 0x1d, 0x76, 0x6f, 0xe4, 0xd2, 0xe8, 0xbb, 0xfe, 0x05,
0x1c, 0xdc, 0xd3, 0x44, 0xf8, 0x54, 0xea, 0xbf, 0xc0, 0xab, 0xfe, 0xec, 0xc2, 0x1b, 0xba, 0xe6,
0x05, 0x19, 0x38, 0x23, 0xc3, 0x27, 0x4f, 0xaa, 0xf7, 0xbd, 0x7b, 0xa4, 0xee, 0xc3, 0xe7, 0xf3,
0xe2, 0xac, 0xc9, 0x79, 0x0f, 0x0b, 0xf7, 0xfa, 0xe6, 0x94, 0x78, 0xbe, 0x31, 0x75, 0xb0, 0xe5,
0xb1, 0xe7, 0x5c, 0x9c, 0xc7, 0x14, 0x0f, 0x9d, 0xc0, 0x86, 0xe7, 0x47, 0x4f, 0x39, 0xde, 0x5c,
0xc8, 0x3e, 0x98, 0x4b, 0x3f, 0xc8, 0x6b, 0x21, 0xac, 0xee, 0xc1, 0x7e, 0xa2, 0xaa, 0x6c, 0xfd,
0xff, 0x45, 0x8f, 0xc7, 0xb0, 0xbd, 0xa4, 0x15, 0x02, 0xc8, 0x75, 0xa4, 0x53, 0xb1, 0xf1, 0x83,
0xf0, 0x19, 0x42, 0xb0, 0xd5, 0x1d, 0x74, 0x74, 0x19, 0x77, 0x54, 0xb5, 0x87, 0xd5, 0x81, 0x2e,
0x70, 0x68, 0x17, 0x5e, 0x2a, 0xa2, 0x2e, 0x9f, 0x49, 0xb8, 0x2f, 0x9d, 0x9e, 0xcb, 0x7a, 0x98,
0x93, 0x15, 0x81, 0x47, 0x55, 0xd8, 0xe9, 0x69, 0x92, 0xdc, 0x15, 0x4f, 0x25, 0xdc, 0x1b, 0xf4,
0xdb, 0x8b, 0x63, 0x99, 0xe3, 0x3f, 0x79, 0xd8, 0x5e, 0x6a, 0x02, 0x95, 0xa1, 0x20, 0x2b, 0xb2,
0x2e, 0x8b, 0xba, 0xd4, 0x0c, 0xab, 0xb5, 0xf5, 0x4e, 0x03, 0xf7, 0x06, 0x1f, 0x3a, 0x72, 0xbf,
0x2d, 0x35, 0x05, 0x0e, 0x15, 0x21, 0xdf, 0x1f, 0x34, 0x1a, 0x52, 0xbf, 0x2f, 0xf0, 0x01, 0xa0,
0x25, 0xca, 0x1d, 0xa9, 0x89, 0x07, 0xca, 0x47, 0x45, 0x3d, 0x57, 0x84, 0x4c, 0x2c, 0xa6, 0xa8,
0x38, 0x38, 0x2e, 0x64, 0xd1, 0x3e, 0x54, 0x59, 0x4c, 0x56, 0xce, 0xc4, 0x8e, 0xdc, 0xa4, 0x09,
0x2c, 0x76, 0xd5, 0x81, 0xa2, 0x0b, 0x1b, 0xe8, 0x0d, 0x54, 0x58, 0x5e, 0x6d, 0xb5, 0x70, 0xa3,
0x2d, 0xca, 0x0a, 0xd6, 0xe5, 0xae, 0x14, 0x74, 0x9a, 0x8b, 0xdd, 0x18, 0xc5, 0xf2, 0xa8, 0x02,
0x2f, 0x58, 0xac, 0x7f, 0x2e, 0xf6, 0x70, 0x53, 0x12, 0x9b, 0x1d, 0x59, 0x91, 0x84, 0x4d, 0xf4,
0x1a, 0x5e, 0xb1, 0xcc, 0xa2, 0xf7, 0x86, 0xa8, 0xcb, 0xaa, 0x22, 0x14, 0xd0, 0x4b, 0x78, 0xc6,
0xee, 0x88, 0x91, 0x02, 0xb4, 0x03, 0x68, 0xa0, 0x48, 0xdf, 0xf7, 0xa4, 0x86, 0x2e, 0x35, 0x71,
0x70, 0x7c, 0xa0, 0x49, 0x42, 0x71, 0x2e, 0x40, 0x43, 0x55, 0x5a, 0xb2, 0xd6, 0x95, 0x9a, 0x42,
0xe9, 0xfd, 0x1f, 0x39, 0x00, 0xaa, 0x18, 0xd5, 0x0e, 0xa9, 0x50, 0x4a, 0xfc, 0x2e, 0xd7, 0x97,
0x26, 0x9c, 0xf2, 0x6f, 0x41, 0xf5, 0xf5, 0x3d, 0x18, 0xa4, 0xc2, 0x96, 0x42, 0xae, 0x59, 0x28,
0x28, 0x84, 0xf6, 0xd2, 0xe1, 0xd1, 0x6d, 0xfb, 0x77, 0xa5, 0xd9, 0x2b, 0x9d, 0xc0, 0xf3, 0x14,
0x67, 0xa3, 0xaf, 0xd3, 0x8f, 0xa5, 0xac, 0xa0, 0xea, 0xf1, 0x63, 0xa0, 0xac, 0xda, 0x42, 0x8f,
0xf0, 0x9f, 0xc4, 0x3b, 0xf4, 0x88, 0xff, 0xf8, 0xdd, 0xa5, 0x47, 0x78, 0x41, 0x07, 0x8a, 0xf1,
0x1d, 0x7c, 0x90, 0x82, 0x4d, 0xfe, 0x00, 0x54, 0xab, 0x77, 0x43, 0x50, 0x07, 0xca, 0x4c, 0x5d,
0x99, 0x6e, 0x6c, 0xf4, 0x26, 0x15, 0x1c, 0x5d, 0xb5, 0x77, 0x47, 0x96, 0x91, 0xd5, 0xa3, 0xde,
0xc2, 0x56, 0xd3, 0x7b, 0x4b, 0x50, 0xad, 0xdf, 0x07, 0x61, 0xb7, 0x5e, 0xc6, 0x76, 0x6d, 0x72,
0xdd, 0xa1, 0xda, 0xe2, 0x78, 0xfa, 0x36, 0xae, 0x1e, 0xad, 0x22, 0xd2, 0x57, 0xe6, 0x3b, 0x0e,
0x11, 0xd8, 0x49, 0xdf, 0x70, 0x8f, 0xa8, 0xf3, 0x55, 0x7a, 0x9d, 0x95, 0x25, 0xf9, 0x8e, 0xbb,
0xc8, 0xd1, 0x65, 0xff, 0xcd, 0x7f, 0x01, 0x00, 0x00, 0xff, 0xff, 0x2c, 0xcd, 0x43, 0xb7, 0xdb,
0x0e, 0x00, 0x00,
}
// Reference imports to suppress errors if they are not otherwise used.
@ -984,6 +1233,8 @@ type SwapServerClient interface {
LoopInTerms(ctx context.Context, in *ServerLoopInTermsRequest, opts ...grpc.CallOption) (*ServerLoopInTerms, error)
NewLoopInSwap(ctx context.Context, in *ServerLoopInRequest, opts ...grpc.CallOption) (*ServerLoopInResponse, error)
LoopInQuote(ctx context.Context, in *ServerLoopInQuoteRequest, opts ...grpc.CallOption) (*ServerLoopInQuoteResponse, error)
SubscribeLoopOutUpdates(ctx context.Context, in *SubscribeUpdatesRequest, opts ...grpc.CallOption) (SwapServer_SubscribeLoopOutUpdatesClient, error)
SubscribeLoopInUpdates(ctx context.Context, in *SubscribeUpdatesRequest, opts ...grpc.CallOption) (SwapServer_SubscribeLoopInUpdatesClient, error)
}
type swapServerClient struct {
@ -1057,6 +1308,70 @@ func (c *swapServerClient) LoopInQuote(ctx context.Context, in *ServerLoopInQuot
return out, nil
}
func (c *swapServerClient) SubscribeLoopOutUpdates(ctx context.Context, in *SubscribeUpdatesRequest, opts ...grpc.CallOption) (SwapServer_SubscribeLoopOutUpdatesClient, error) {
stream, err := c.cc.NewStream(ctx, &_SwapServer_serviceDesc.Streams[0], "/looprpc.SwapServer/SubscribeLoopOutUpdates", opts...)
if err != nil {
return nil, err
}
x := &swapServerSubscribeLoopOutUpdatesClient{stream}
if err := x.ClientStream.SendMsg(in); err != nil {
return nil, err
}
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
return x, nil
}
type SwapServer_SubscribeLoopOutUpdatesClient interface {
Recv() (*SubscribeLoopOutUpdatesResponse, error)
grpc.ClientStream
}
type swapServerSubscribeLoopOutUpdatesClient struct {
grpc.ClientStream
}
func (x *swapServerSubscribeLoopOutUpdatesClient) Recv() (*SubscribeLoopOutUpdatesResponse, error) {
m := new(SubscribeLoopOutUpdatesResponse)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
func (c *swapServerClient) SubscribeLoopInUpdates(ctx context.Context, in *SubscribeUpdatesRequest, opts ...grpc.CallOption) (SwapServer_SubscribeLoopInUpdatesClient, error) {
stream, err := c.cc.NewStream(ctx, &_SwapServer_serviceDesc.Streams[1], "/looprpc.SwapServer/SubscribeLoopInUpdates", opts...)
if err != nil {
return nil, err
}
x := &swapServerSubscribeLoopInUpdatesClient{stream}
if err := x.ClientStream.SendMsg(in); err != nil {
return nil, err
}
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
return x, nil
}
type SwapServer_SubscribeLoopInUpdatesClient interface {
Recv() (*SubscribeLoopInUpdatesResponse, error)
grpc.ClientStream
}
type swapServerSubscribeLoopInUpdatesClient struct {
grpc.ClientStream
}
func (x *swapServerSubscribeLoopInUpdatesClient) Recv() (*SubscribeLoopInUpdatesResponse, error) {
m := new(SubscribeLoopInUpdatesResponse)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
// SwapServerServer is the server API for SwapServer service.
type SwapServerServer interface {
LoopOutTerms(context.Context, *ServerLoopOutTermsRequest) (*ServerLoopOutTerms, error)
@ -1066,6 +1381,8 @@ type SwapServerServer interface {
LoopInTerms(context.Context, *ServerLoopInTermsRequest) (*ServerLoopInTerms, error)
NewLoopInSwap(context.Context, *ServerLoopInRequest) (*ServerLoopInResponse, error)
LoopInQuote(context.Context, *ServerLoopInQuoteRequest) (*ServerLoopInQuoteResponse, error)
SubscribeLoopOutUpdates(*SubscribeUpdatesRequest, SwapServer_SubscribeLoopOutUpdatesServer) error
SubscribeLoopInUpdates(*SubscribeUpdatesRequest, SwapServer_SubscribeLoopInUpdatesServer) error
}
// UnimplementedSwapServerServer can be embedded to have forward compatible implementations.
@ -1093,6 +1410,12 @@ func (*UnimplementedSwapServerServer) NewLoopInSwap(ctx context.Context, req *Se
func (*UnimplementedSwapServerServer) LoopInQuote(ctx context.Context, req *ServerLoopInQuoteRequest) (*ServerLoopInQuoteResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method LoopInQuote not implemented")
}
func (*UnimplementedSwapServerServer) SubscribeLoopOutUpdates(req *SubscribeUpdatesRequest, srv SwapServer_SubscribeLoopOutUpdatesServer) error {
return status.Errorf(codes.Unimplemented, "method SubscribeLoopOutUpdates not implemented")
}
func (*UnimplementedSwapServerServer) SubscribeLoopInUpdates(req *SubscribeUpdatesRequest, srv SwapServer_SubscribeLoopInUpdatesServer) error {
return status.Errorf(codes.Unimplemented, "method SubscribeLoopInUpdates not implemented")
}
func RegisterSwapServerServer(s *grpc.Server, srv SwapServerServer) {
s.RegisterService(&_SwapServer_serviceDesc, srv)
@ -1224,6 +1547,48 @@ func _SwapServer_LoopInQuote_Handler(srv interface{}, ctx context.Context, dec f
return interceptor(ctx, in, info, handler)
}
func _SwapServer_SubscribeLoopOutUpdates_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(SubscribeUpdatesRequest)
if err := stream.RecvMsg(m); err != nil {
return err
}
return srv.(SwapServerServer).SubscribeLoopOutUpdates(m, &swapServerSubscribeLoopOutUpdatesServer{stream})
}
type SwapServer_SubscribeLoopOutUpdatesServer interface {
Send(*SubscribeLoopOutUpdatesResponse) error
grpc.ServerStream
}
type swapServerSubscribeLoopOutUpdatesServer struct {
grpc.ServerStream
}
func (x *swapServerSubscribeLoopOutUpdatesServer) Send(m *SubscribeLoopOutUpdatesResponse) error {
return x.ServerStream.SendMsg(m)
}
func _SwapServer_SubscribeLoopInUpdates_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(SubscribeUpdatesRequest)
if err := stream.RecvMsg(m); err != nil {
return err
}
return srv.(SwapServerServer).SubscribeLoopInUpdates(m, &swapServerSubscribeLoopInUpdatesServer{stream})
}
type SwapServer_SubscribeLoopInUpdatesServer interface {
Send(*SubscribeLoopInUpdatesResponse) error
grpc.ServerStream
}
type swapServerSubscribeLoopInUpdatesServer struct {
grpc.ServerStream
}
func (x *swapServerSubscribeLoopInUpdatesServer) Send(m *SubscribeLoopInUpdatesResponse) error {
return x.ServerStream.SendMsg(m)
}
var _SwapServer_serviceDesc = grpc.ServiceDesc{
ServiceName: "looprpc.SwapServer",
HandlerType: (*SwapServerServer)(nil),
@ -1257,6 +1622,17 @@ var _SwapServer_serviceDesc = grpc.ServiceDesc{
Handler: _SwapServer_LoopInQuote_Handler,
},
},
Streams: []grpc.StreamDesc{},
Streams: []grpc.StreamDesc{
{
StreamName: "SubscribeLoopOutUpdates",
Handler: _SwapServer_SubscribeLoopOutUpdates_Handler,
ServerStreams: true,
},
{
StreamName: "SubscribeLoopInUpdates",
Handler: _SwapServer_SubscribeLoopInUpdates_Handler,
ServerStreams: true,
},
},
Metadata: "server.proto",
}

@ -20,6 +20,12 @@ service SwapServer {
rpc LoopInQuote (ServerLoopInQuoteRequest)
returns (ServerLoopInQuoteResponse);
rpc SubscribeLoopOutUpdates(SubscribeUpdatesRequest)
returns (stream SubscribeLoopOutUpdatesResponse);
rpc SubscribeLoopInUpdates(SubscribeUpdatesRequest)
returns (stream SubscribeLoopInUpdatesResponse);
}
/**
@ -177,3 +183,79 @@ message ServerLoopOutPushPreimageRequest {
message ServerLoopOutPushPreimageResponse {
}
message SubscribeUpdatesRequest {
// The protocol version that the client adheres to.
ProtocolVersion protocol_version = 1;
// Swap hash is the hash of the swap to subscribe to updates for.
bytes swap_hash = 2;
}
// ServerSwapState is an enum which represents all the states a swap may have
// from the server's perspective.
enum ServerSwapState {
// The server has created the swap.
INITIATED = 0;
// The server has published the loop out on chain htlc.
HTLC_PUBLISHED = 1;
// The swap completed successfully.
SUCCESS = 2;
/*
The swap failed for a reason that is unknown to the server, this is only
set for older swaps.
*/
FAILED_UNKNOWN = 3;
// No htlc was confirmed in time for the loop in swap to complete.
FAILED_NO_HTLC = 4;
// A loop in htlc confirmed on chain, but it did not have the correct value.
FAILED_INVALID_HTLC_AMOUNT = 5;
/*
We did not succeed in completing the loop in off chain payment before the
timeout.
*/
FAILED_OFF_CHAIN_TIMEOUT = 6;
// The on chain timeout was claimed.
FAILED_TIMEOUT = 7;
/*
The server could not publish the loop out on chain htlc before the deadline
provided.
*/
FAILED_SWAP_DEADLINE = 8;
// The server could not publish the loop out on chain htlc.
FAILED_HTLC_PUBLICATION = 9;
// The server has published the loop out on chain timeout tx.
TIMEOUT_PUBLISHED = 10;
// The swap has failed for unknown reasons, it will not be completed.
UNEXPECTED_FAILURE = 11;
// The swap htlc has confirmed on chain.
HTLC_CONFIRMED = 12;
}
message SubscribeLoopOutUpdatesResponse {
// The unix timestamp in nanoseconds when the swap was updated.
int64 timestamp_ns = 1;
// The swap's current state.
ServerSwapState state = 2;
}
message SubscribeLoopInUpdatesResponse {
// The unix timestamp in nanoseconds when the swap was updated.
int64 timestamp_ns = 1;
// The swap's current state.
ServerSwapState state = 2;
}

@ -182,3 +182,18 @@ func (s *serverMock) GetLoopInQuote(ctx context.Context, amt btcutil.Amount) (
CltvDelta: testChargeOnChainCltvDelta,
}, nil
}
// SubscribeLoopOutUpdates provides a mocked implementation of state
// subscriptions.
func (s *serverMock) SubscribeLoopOutUpdates(_ context.Context,
_ lntypes.Hash) (<-chan *ServerUpdate, <-chan error, error) {
return nil, nil, nil
}
// SubscribeLoopInUpdates provides a mocked implementation of state subscriptions.
func (s *serverMock) SubscribeLoopInUpdates(_ context.Context,
_ lntypes.Hash) (<-chan *ServerUpdate, <-chan error, error) {
return nil, nil, nil
}

@ -6,7 +6,10 @@ import (
"encoding/hex"
"errors"
"fmt"
"io"
"net"
"strings"
"sync"
"time"
"github.com/btcsuite/btcd/btcec"
@ -24,6 +27,21 @@ import (
// supported by the loop client.
const protocolVersion = looprpc.ProtocolVersion_PREIMAGE_PUSH_LOOP_OUT
var (
// errServerSubscriptionComplete is returned when our subscription to
// server updates exits because the server has no more updates to
// provide us, because its part in the swap is complete.
errServerSubscriptionComplete = errors.New("server finished serving " +
"updates")
// errSubscriptionFailed is returned when our subscription returns with
// and EOF, indicating that the server restarted, we had an unexpected
// network failure. Since we do not have restart-recovery, we note that
// we will not resume our subscription once this error occurs.
errSubscriptionFailed = errors.New("failed, no further updates will " +
"be provided")
)
type swapServerClient interface {
GetLoopOutTerms(ctx context.Context) (
*LoopOutTerms, error)
@ -51,11 +69,31 @@ type swapServerClient interface {
swapHash lntypes.Hash, amount btcutil.Amount,
senderKey [33]byte, swapInvoice string, lastHop *route.Vertex) (
*newLoopInResponse, error)
// SubscribeLoopOutUpdates subscribes to loop out server state.
SubscribeLoopOutUpdates(ctx context.Context,
hash lntypes.Hash) (<-chan *ServerUpdate, <-chan error, error)
// SubscribeLoopInUpdates subscribes to loop in server state.
SubscribeLoopInUpdates(ctx context.Context,
hash lntypes.Hash) (<-chan *ServerUpdate, <-chan error, error)
}
type grpcSwapServerClient struct {
server looprpc.SwapServerClient
conn *grpc.ClientConn
wg sync.WaitGroup
}
// stop sends the signal for the server's goroutines to shutdown and waits for
// them to complete.
func (s *grpcSwapServerClient) stop() {
if err := s.conn.Close(); err != nil {
log.Warnf("could not close connection: %v", err)
}
s.wg.Wait()
}
var _ swapServerClient = (*grpcSwapServerClient)(nil)
@ -275,8 +313,144 @@ func (s *grpcSwapServerClient) NewLoopInSwap(ctx context.Context,
}, nil
}
func (s *grpcSwapServerClient) Close() {
s.conn.Close()
// ServerUpdate summarizes an update from the swap server.
type ServerUpdate struct {
// State is the state that the server has sent us.
State looprpc.ServerSwapState
// Timestamp is the time of the server state update.
Timestamp time.Time
}
// SubscribeLoopInUpdates subscribes to loop in server state and pipes updates
// into the channel provided.
func (s *grpcSwapServerClient) SubscribeLoopInUpdates(ctx context.Context,
hash lntypes.Hash) (<-chan *ServerUpdate, <-chan error, error) {
resp, err := s.server.SubscribeLoopInUpdates(
ctx, &looprpc.SubscribeUpdatesRequest{
ProtocolVersion: protocolVersion,
SwapHash: hash[:],
},
)
if err != nil {
return nil, nil, err
}
receive := func() (*ServerUpdate, error) {
response, err := resp.Recv()
if err != nil {
return nil, err
}
return &ServerUpdate{
State: response.State,
Timestamp: time.Unix(0, response.TimestampNs),
}, nil
}
updateChan, errChan := s.makeServerUpdate(ctx, receive)
return updateChan, errChan, nil
}
// SubscribeLoopOutUpdates subscribes to loop out server state and pipes updates
// into the channel provided.
func (s *grpcSwapServerClient) SubscribeLoopOutUpdates(ctx context.Context,
hash lntypes.Hash) (<-chan *ServerUpdate, <-chan error, error) {
resp, err := s.server.SubscribeLoopOutUpdates(
ctx, &looprpc.SubscribeUpdatesRequest{
ProtocolVersion: protocolVersion,
SwapHash: hash[:],
},
)
if err != nil {
return nil, nil, err
}
receive := func() (*ServerUpdate, error) {
response, err := resp.Recv()
if err != nil {
return nil, err
}
return &ServerUpdate{
State: response.State,
Timestamp: time.Unix(0, response.TimestampNs),
}, nil
}
updateChan, errChan := s.makeServerUpdate(ctx, receive)
return updateChan, errChan, nil
}
// makeServerUpdate takes a stream receive function and a channel that it
// should pipe updates into. It sends events into the updates channel until
// the client cancels, server client shuts down or the subscription is cancelled
// server side.
func (s *grpcSwapServerClient) makeServerUpdate(ctx context.Context,
receive func() (*ServerUpdate, error)) (<-chan *ServerUpdate,
<-chan error) {
// We will return exactly one error from this function so we buffer
// our error channel so that the function exit is not dependent on
// the error being read.
errChan := make(chan error, 1)
updateChan := make(chan *ServerUpdate)
// Create a goroutine that will pipe updates in to our updates channel.
s.wg.Add(1)
go func() {
defer s.wg.Done()
for {
// Try to receive from our stream. If there are no items
// to consume, this call will block. If our stream is
// cancelled by the server we will receive an error.
response, err := receive()
switch err {
// If we get a nil error, we proceed with to delivering
// the update we have just received.
case nil:
// If we get an EOF error, the server is finished
// sending us updates, so we return with a non-nil
// a subscription complete error to inform the caller
// that they will no longer receive updates.
case io.EOF:
errChan <- errServerSubscriptionComplete
return
// If we receive a non-nil error, we exit.
default:
// If we get a transport is closing error, we
// send a server restarting error so that the
// caller is informed that we will not get
// any more updates from the server (since we
// don't have retry logic yet).
if isErrConClosing(err) {
errChan <- errSubscriptionFailed
} else {
errChan <- err
}
return
}
select {
// Try to send our update to the update channel.
case updateChan <- response:
// If the client cancels their context, we exit with
// no error.
case <-ctx.Done():
errChan <- nil
return
}
}
}()
return updateChan, errChan
}
// getSwapServerConn returns a connection to the swap server. A non-empty
@ -286,9 +460,14 @@ func getSwapServerConn(address, proxyAddress string, insecure bool,
tlsPath string, interceptor *lsat.Interceptor) (*grpc.ClientConn, error) {
// Create a dial options array.
opts := []grpc.DialOption{grpc.WithUnaryInterceptor(
interceptor.UnaryInterceptor,
)}
opts := []grpc.DialOption{
grpc.WithUnaryInterceptor(
interceptor.UnaryInterceptor,
),
grpc.WithStreamInterceptor(
interceptor.StreamInterceptor,
),
}
// There are three options to connect to a swap server, either insecure,
// using a self-signed certificate or with a certificate signed by a
@ -331,6 +510,18 @@ func getSwapServerConn(address, proxyAddress string, insecure bool,
return conn, nil
}
// isErrConClosing identifies whether we have received a "transport is closing"
// error from a grpc stream, indicating that the server has shutdown. We need
// to string match this error because ErrConnClosing is part of an internal
// grpc package, so cannot be used directly.
func isErrConClosing(err error) bool {
if err == nil {
return false
}
return strings.Contains(err.Error(), "transport is closing")
}
type newLoopOutResponse struct {
swapInvoice string
prepayInvoice string

@ -0,0 +1,52 @@
package loop
import (
"context"
"github.com/lightninglabs/loop/swap"
"github.com/lightningnetwork/lnd/lntypes"
)
// subscribeAndLogUpdates subscribes to updates for a swap and logs them. This
// function will block, so should run as a goroutine. Note that our subscription
// does not survive server restarts; we will simply not have update logs if the
// server restarts during swap execution.
func subscribeAndLogUpdates(ctx context.Context, hash lntypes.Hash,
log *swap.PrefixLog, subscribe func(context.Context,
lntypes.Hash) (<-chan *ServerUpdate, <-chan error, error)) {
subscribeChan, errChan, err := subscribe(ctx, hash)
if err != nil {
log.Errorf("could not get swap subscription: %v", err)
return
}
for {
select {
// Consume any updates and log them.
case update := <-subscribeChan:
log.Infof("Server update: %v received, "+
"timestamp: %v", update.State, update.Timestamp)
// If we get an error from the server, we check whether it is
// due to server exit, or restart, and log this information
// for the client. Otherwise, we just log non-nil errors.
case err := <-errChan:
switch err {
case errServerSubscriptionComplete:
log.Infof("swap subscription: %v", err)
case nil:
default:
log.Errorf("swap subscription error: %v", err)
}
return
// Exit if our context is cancelled.
case <-ctx.Done():
return
}
}
}
Loading…
Cancel
Save