Goroutine unit manager
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

manager.go 3.1KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. package gum
  2. import (
  3. "log"
  4. "os"
  5. "os/signal"
  6. "reflect"
  7. "strconv"
  8. "strings"
  9. )
  10. var idGen = IdGenerator()
  11. type WorkUnit interface {
  12. Run(UnitManager)
  13. Shutdown()
  14. }
  15. type UnitManager interface {
  16. ShouldStop() <-chan bool
  17. Done()
  18. Panic(err error)
  19. }
  20. type WorkUnitManager struct {
  21. stop chan bool
  22. workerQuit chan bool
  23. unit WorkUnit
  24. panic chan error
  25. isPaniced bool
  26. }
  27. func (w *WorkUnitManager) ShouldStop() <-chan bool {
  28. return w.stop
  29. }
  30. func (w *WorkUnitManager) Done() {
  31. w.workerQuit <- true
  32. }
  33. func (w *WorkUnitManager) Panic(err error) {
  34. w.panic <- err
  35. w.isPaniced = true
  36. w.workerQuit <- true
  37. close(w.stop)
  38. }
  39. type Manager struct {
  40. signalIn chan os.Signal
  41. shutdownSigs []os.Signal
  42. workers map[string]*WorkUnitManager
  43. Quit chan bool
  44. panic chan error // Used for panicing goroutines
  45. }
  46. func (m *Manager) Run() {
  47. log.Println("Starting manager ...")
  48. for unitName, w := range m.workers {
  49. log.Printf("Starting <%s>\n", unitName)
  50. go w.unit.Run(w)
  51. }
  52. for {
  53. select {
  54. case sig := <-m.signalIn:
  55. if !in(m.shutdownSigs, sig) {
  56. break
  57. }
  58. log.Println("shutting event received ... ")
  59. // send shutdown event to all worker units
  60. for name, w := range m.workers {
  61. log.Printf("shutting down <%s>\n", name)
  62. w.stop <- true
  63. }
  64. // Wait for all units to quit
  65. for name, w := range m.workers {
  66. <-w.workerQuit
  67. log.Printf("<%s> down", name)
  68. }
  69. // All workers have shutdown
  70. log.Println("All workers have shutdown, shutting down manager ...")
  71. m.Quit <- true
  72. case p := <-m.panic:
  73. for name, w := range m.workers {
  74. if w.isPaniced {
  75. log.Printf("Panicing for <%s>: %s", name, p)
  76. }
  77. }
  78. for name, w := range m.workers {
  79. log.Printf("shuting down <%s>\n", name)
  80. if !w.isPaniced {
  81. w.stop <- true
  82. }
  83. }
  84. // Wait for all units to quit
  85. for name, w := range m.workers {
  86. <-w.workerQuit
  87. log.Printf("<%s> down", name)
  88. }
  89. // All workers have shutdown
  90. log.Println("All workers have shutdown, shutting down manager ...")
  91. m.Quit <- true
  92. }
  93. }
  94. }
  95. func (m *Manager) ShutdownOn(sig os.Signal) {
  96. signal.Notify(m.signalIn, sig)
  97. m.shutdownSigs = append(m.shutdownSigs, sig)
  98. }
  99. type IDGenerator func(string) int
  100. func IdGenerator() IDGenerator {
  101. ids := make(map[string]int)
  102. return func(unit string) int {
  103. ret := ids[unit]
  104. ids[unit]++
  105. return ret
  106. }
  107. }
  108. func (m *Manager) AddUnit(unit WorkUnit) {
  109. workUnitManager := &WorkUnitManager{
  110. workerQuit: make(chan bool, 1),
  111. stop: make(chan bool, 1),
  112. unit: unit,
  113. panic: m.panic,
  114. }
  115. unitType := reflect.TypeOf(unit)
  116. unitName := strings.Split(unitType.String(), ".")[1]
  117. unitId := idGen(unitName)
  118. unitName += strconv.Itoa(unitId)
  119. log.Println("Adding unit ", unitName)
  120. m.workers[unitName] = workUnitManager
  121. }
  122. func NewManager() *Manager {
  123. return &Manager{
  124. signalIn: make(chan os.Signal, 1),
  125. Quit: make(chan bool, 1),
  126. workers: make(map[string]*WorkUnitManager),
  127. panic: make(chan error, 1),
  128. }
  129. }
  130. // Test if signal is in array
  131. func in(arr []os.Signal, sig os.Signal) bool {
  132. for _, s := range arr {
  133. if s == sig {
  134. return true
  135. }
  136. }
  137. return false
  138. }