refac pubsub part 1

pull/17/head
Edouard Paris 5 years ago
parent f3900d9105
commit 95c25c3bcc

@ -10,7 +10,6 @@ import (
"github.com/edouardparis/lntop/app"
"github.com/edouardparis/lntop/config"
"github.com/edouardparis/lntop/events"
"github.com/edouardparis/lntop/logging"
"github.com/edouardparis/lntop/pubsub"
"github.com/edouardparis/lntop/ui"
@ -60,19 +59,15 @@ func run(c *cli.Context) error {
ctx := context.Background()
events := make(chan *events.Event)
ps := pubsub.New(app.Logger, app.Network)
go ps.Run(ctx)
go func() {
err := ui.Run(ctx, app, events)
if err != nil {
app.Logger.Debug("ui", logging.String("error", err.Error()))
}
ps.Stop()
}()
err = ui.Run(ctx, app, ps)
if err != nil {
app.Logger.Debug("ui", logging.String("error", err.Error()))
}
ps.Run(ctx, events)
return nil
return ps.Stop()
}
func pubsubRun(c *cli.Context) error {
@ -86,9 +81,8 @@ func pubsubRun(c *cli.Context) error {
return err
}
events := make(chan *events.Event)
ps := pubsub.New(app.Logger, app.Network)
ps.Run(context.Background(), events)
ps.Run(context.Background())
sig := make(chan os.Signal, 1)
signal.Notify(sig, os.Interrupt)

@ -1,23 +1,24 @@
package events
type Event string
const (
BlockReceived = "block.received"
ChannelActive = "channel.active"
ChannelBalanceUpdated = "channel.balance.updated"
ChannelInactive = "channel.inactive"
ChannelPending = "channel.pending"
InvoiceCreated = "invoice.created"
InvoiceSettled = "invoice.settled"
PeerUpdated = "peer.updated"
TransactionCreated = "transaction.created"
WalletBalanceUpdated = "wallet.balance.updated"
BlockReceived Event = "block.received"
ChannelActive Event = "channel.active"
ChannelBalanceUpdated Event = "channel.balance.updated"
ChannelInactive Event = "channel.inactive"
ChannelPending Event = "channel.pending"
InvoiceCreated Event = "invoice.created"
InvoiceSettled Event = "invoice.settled"
PeerUpdated Event = "peer.updated"
TransactionCreated Event = "transaction.created"
WalletBalanceUpdated Event = "wallet.balance.updated"
)
type Event struct {
Type string
ID string
}
type Publisher string
func New(kind string) *Event {
return &Event{Type: kind}
}
const (
Channels Publisher = "channels"
Invoices Publisher = "invoices"
Transactions Publisher = "transactions"
)

@ -12,6 +12,7 @@ import (
type PubSub struct {
stop chan bool
sub chan events.Event
logger logging.Logger
network network.Network
wg *sync.WaitGroup
@ -23,10 +24,11 @@ func New(logger logging.Logger, network network.Network) *PubSub {
network: network,
wg: &sync.WaitGroup{},
stop: make(chan bool),
sub: make(chan events.Event),
}
}
func (p *PubSub) invoices(ctx context.Context, sub chan *events.Event) {
func (p *PubSub) invoices(ctx context.Context) {
p.wg.Add(3)
invoices := make(chan *models.Invoice)
ctx, cancel := context.WithCancel(ctx)
@ -35,9 +37,9 @@ func (p *PubSub) invoices(ctx context.Context, sub chan *events.Event) {
for invoice := range invoices {
p.logger.Debug("receive invoice", logging.Object("invoice", invoice))
if invoice.Settled {
sub <- events.New(events.InvoiceSettled)
p.sub <- events.InvoiceSettled
} else {
sub <- events.New(events.InvoiceCreated)
p.sub <- events.InvoiceCreated
}
}
p.wg.Done()
@ -59,7 +61,7 @@ func (p *PubSub) invoices(ctx context.Context, sub chan *events.Event) {
}()
}
func (p *PubSub) transactions(ctx context.Context, sub chan *events.Event) {
func (p *PubSub) transactions(ctx context.Context) {
p.wg.Add(3)
transactions := make(chan *models.Transaction)
ctx, cancel := context.WithCancel(ctx)
@ -67,7 +69,7 @@ func (p *PubSub) transactions(ctx context.Context, sub chan *events.Event) {
go func() {
for tx := range transactions {
p.logger.Debug("receive transaction", logging.String("tx_hash", tx.TxHash))
sub <- events.New(events.TransactionCreated)
p.sub <- events.TransactionCreated
}
p.wg.Done()
}()
@ -88,18 +90,27 @@ func (p *PubSub) transactions(ctx context.Context, sub chan *events.Event) {
}()
}
func (p *PubSub) Stop() {
func (p *PubSub) Subscribe(pub events.Publisher) {}
func (p *PubSub) Unsubscribe(pub events.Publisher) {}
func (p *PubSub) Events() chan events.Event {
return p.sub
}
func (p *PubSub) Stop() error {
p.stop <- true
close(p.stop)
close(p.sub)
p.logger.Debug("Received signal, gracefully stopping")
return nil
}
func (p *PubSub) Run(ctx context.Context, sub chan *events.Event) {
func (p *PubSub) Run(ctx context.Context) {
p.logger.Debug("Starting...")
p.invoices(ctx, sub)
p.transactions(ctx, sub)
p.ticker(ctx, sub,
p.invoices(ctx)
p.transactions(ctx)
p.ticker(ctx, p.sub,
withTickerInfo(),
withTickerChannelsBalance(),
// no need for ticker Wallet balance, transactions subscriber is enough

@ -10,9 +10,9 @@ import (
"github.com/edouardparis/lntop/network/models"
)
type tickerFunc func(context.Context, logging.Logger, network.Network, chan *events.Event)
type tickerFunc func(context.Context, logging.Logger, network.Network, chan events.Event)
func (p *PubSub) ticker(ctx context.Context, sub chan *events.Event, fn ...tickerFunc) {
func (p *PubSub) ticker(ctx context.Context, sub chan events.Event, fn ...tickerFunc) {
p.wg.Add(1)
ticker := time.NewTicker(3 * time.Second)
go func() {
@ -34,30 +34,30 @@ func (p *PubSub) ticker(ctx context.Context, sub chan *events.Event, fn ...ticke
// withTickerInfo checks if general information did not changed changed in the ticker interval.
func withTickerInfo() tickerFunc {
var old *models.Info
return func(ctx context.Context, logger logging.Logger, net network.Network, sub chan *events.Event) {
return func(ctx context.Context, logger logging.Logger, net network.Network, sub chan events.Event) {
info, err := net.Info(ctx)
if err != nil {
logger.Error("network info returned an error", logging.Error(err))
}
if old != nil {
if old.BlockHeight != info.BlockHeight {
sub <- events.New(events.BlockReceived)
sub <- events.BlockReceived
}
if old.NumPeers != info.NumPeers {
sub <- events.New(events.PeerUpdated)
sub <- events.PeerUpdated
}
if old.NumPendingChannels < info.NumPendingChannels {
sub <- events.New(events.ChannelPending)
sub <- events.ChannelPending
}
if old.NumActiveChannels < info.NumActiveChannels {
sub <- events.New(events.ChannelActive)
sub <- events.ChannelActive
}
if old.NumInactiveChannels < info.NumInactiveChannels {
sub <- events.New(events.ChannelInactive)
sub <- events.ChannelInactive
}
}
old = info
@ -68,7 +68,7 @@ func withTickerInfo() tickerFunc {
// changed in the ticker interval.
func withTickerChannelsBalance() tickerFunc {
var old *models.ChannelsBalance
return func(ctx context.Context, logger logging.Logger, net network.Network, sub chan *events.Event) {
return func(ctx context.Context, logger logging.Logger, net network.Network, sub chan events.Event) {
channelsBalance, err := net.GetChannelsBalance(ctx)
if err != nil {
logger.Error("network channels balance returned an error", logging.Error(err))
@ -76,7 +76,7 @@ func withTickerChannelsBalance() tickerFunc {
if old != nil {
if old.Balance != channelsBalance.Balance ||
old.PendingOpenBalance != channelsBalance.PendingOpenBalance {
sub <- events.New(events.ChannelBalanceUpdated)
sub <- events.ChannelBalanceUpdated
}
}
old = channelsBalance
@ -87,7 +87,7 @@ func withTickerChannelsBalance() tickerFunc {
// changed in the ticker interval.
func withTickerWalletBalance() tickerFunc {
var old *models.WalletBalance
return func(ctx context.Context, logger logging.Logger, net network.Network, sub chan *events.Event) {
return func(ctx context.Context, logger logging.Logger, net network.Network, sub chan events.Event) {
walletBalance, err := net.GetWalletBalance(ctx)
if err != nil {
logger.Error("network wallet balance returned an error", logging.Error(err))
@ -96,7 +96,7 @@ func withTickerWalletBalance() tickerFunc {
if old.TotalBalance != walletBalance.TotalBalance ||
old.ConfirmedBalance != walletBalance.ConfirmedBalance ||
old.UnconfirmedBalance != walletBalance.UnconfirmedBalance {
sub <- events.New(events.WalletBalanceUpdated)
sub <- events.WalletBalanceUpdated
}
}
old = walletBalance

@ -80,7 +80,7 @@ func (c *controller) SetModels(ctx context.Context) error {
return c.models.RefreshChannels(ctx)
}
func (c *controller) Listen(ctx context.Context, g *gocui.Gui, sub chan *events.Event) {
func (c *controller) Listen(ctx context.Context, g *gocui.Gui, sub chan events.Event) {
c.logger.Debug("Listening...")
refresh := func(fn ...func(context.Context) error) {
for i := range fn {
@ -93,8 +93,8 @@ func (c *controller) Listen(ctx context.Context, g *gocui.Gui, sub chan *events.
}
for event := range sub {
c.logger.Debug("event received", logging.String("type", event.Type))
switch event.Type {
c.logger.Debug("new event received", logging.String("event", string(event)))
switch event {
case events.TransactionCreated:
refresh(
c.models.RefreshInfo,

@ -10,7 +10,14 @@ import (
"github.com/edouardparis/lntop/events"
)
func Run(ctx context.Context, app *app.App, sub chan *events.Event) error {
type Pubsub interface {
Subscribe(events.Publisher)
Unsubscribe(events.Publisher)
Events() chan events.Event
Stop() error
}
func Run(ctx context.Context, app *app.App, ps Pubsub) error {
g, err := gocui.NewGui(gocui.OutputNormal)
if err != nil {
return err
@ -31,10 +38,12 @@ func Run(ctx context.Context, app *app.App, sub chan *events.Event) error {
return err
}
go ctrl.Listen(ctx, g, sub)
go ctrl.Listen(ctx, g, ps.Events())
err = g.MainLoop()
close(sub)
if err != nil {
return errors.WithStack(err)
}
return errors.WithStack(err)
return errors.WithStack(ps.Stop())
}

Loading…
Cancel
Save