From 5b6f847eceea2838cc7e5eedcbfffb4bfef611ff Mon Sep 17 00:00:00 2001 From: sputn1ck Date: Wed, 13 Sep 2023 15:13:33 +0200 Subject: [PATCH] fsm: expand fsm This commit adds: - a default observer to the FSM - more info to the action entry and exit funcs - an optional initial wait time for the WaitForState function --- fsm/example_fsm.go | 4 +-- fsm/fsm.go | 62 ++++++++++++++++++++++++++++++++-------------- fsm/observer.go | 53 ++++++++++++++++++++++++++++++++++++++- 3 files changed, 97 insertions(+), 22 deletions(-) diff --git a/fsm/example_fsm.go b/fsm/example_fsm.go index 723f9dd..9ba2059 100644 --- a/fsm/example_fsm.go +++ b/fsm/example_fsm.go @@ -31,7 +31,7 @@ func NewExampleFSMContext(service ExampleService, service: service, store: store, } - exampleFSM.StateMachine = NewStateMachine(exampleFSM.GetStates()) + exampleFSM.StateMachine = NewStateMachine(exampleFSM.GetStates(), 10) return exampleFSM } @@ -55,7 +55,7 @@ var ( // GetStates returns the states for the example FSM. func (e *ExampleFSM) GetStates() States { return States{ - Default: State{ + EmptyState: State{ Transitions: Transitions{ OnRequestStuff: InitFSM, }, diff --git a/fsm/fsm.go b/fsm/fsm.go index 479f203..99fe339 100644 --- a/fsm/fsm.go +++ b/fsm/fsm.go @@ -17,8 +17,8 @@ var ( ) const ( - // Default represents the default state of the system. - Default StateType = "" + // EmptyState represents the default state of the system. + EmptyState StateType = "" // NoOp represents a no-op event. NoOp EventType = "NoOp" @@ -88,19 +88,19 @@ type StateMachine struct { // ActionEntryFunc is a function that is called before an action is // executed. - ActionEntryFunc func() + ActionEntryFunc func(Notification) // ActionExitFunc is a function that is called after an action is - // executed. - ActionExitFunc func() - - // mutex ensures that only 1 event is processed by the state machine at - // any given time. - mutex sync.Mutex + // executed, it is called with the EventType returned by the action. + ActionExitFunc func(NextEvent EventType) // LastActionError is an error set by the last action executed. LastActionError error + // DefaultObserver is the default observer that is notified when the + // state machine transitions between states. + DefaultObserver *CachedObserver + // previous represents the previous state. previous StateType @@ -114,13 +114,35 @@ type StateMachine struct { // observerMutex ensures that observers are only added or removed // safely. observerMutex sync.Mutex + + // mutex ensures that only 1 event is processed by the state machine at + // any given time. + mutex sync.Mutex } // NewStateMachine creates a new state machine. -func NewStateMachine(states States) *StateMachine { +func NewStateMachine(states States, observerSize int) *StateMachine { + return NewStateMachineWithState(states, EmptyState, observerSize) +} + +// NewStateMachineWithState creates a new state machine and sets the initial +// state. +func NewStateMachineWithState(states States, current StateType, + observerSize int) *StateMachine { + + observers := []Observer{} + var defaultObserver *CachedObserver + + if observerSize > 0 { + defaultObserver = NewCachedObserver(observerSize) + observers = append(observers, defaultObserver) + } + return &StateMachine{ - States: states, - observers: make([]Observer, 0), + States: states, + current: current, + DefaultObserver: defaultObserver, + observers: observers, } } @@ -189,18 +211,20 @@ func (s *StateMachine) SendEvent(event EventType, eventCtx EventContext) error { // Notify the state machine's observers. s.observerMutex.Lock() + notification := Notification{ + PreviousState: s.previous, + NextState: s.current, + Event: event, + } + for _, observer := range s.observers { - observer.Notify(Notification{ - PreviousState: s.previous, - NextState: s.current, - Event: event, - }) + observer.Notify(notification) } s.observerMutex.Unlock() // Execute the state machines ActionEntryFunc. if s.ActionEntryFunc != nil { - s.ActionEntryFunc() + s.ActionEntryFunc(notification) } // Execute the current state's entry function @@ -219,7 +243,7 @@ func (s *StateMachine) SendEvent(event EventType, eventCtx EventContext) error { // Execute the state machines ActionExitFunc. if s.ActionExitFunc != nil { - s.ActionExitFunc() + s.ActionExitFunc(nextEvent) } // If the next event is a no-op, we're done. diff --git a/fsm/observer.go b/fsm/observer.go index b9c7286..9e08cc8 100644 --- a/fsm/observer.go +++ b/fsm/observer.go @@ -46,9 +46,60 @@ func (c *CachedObserver) GetCachedNotifications() []Notification { return c.cachedNotifications.Get() } +// WaitForStateOption is an option that can be passed to the WaitForState +// function. +type WaitForStateOption interface { + apply(*fsmOptions) +} + +// fsmOptions is a struct that holds all options that can be passed to the +// WaitForState function. +type fsmOptions struct { + initialWait time.Duration +} + +// InitialWaitOption is an option that can be passed to the WaitForState +// function to wait for a given duration before checking the state. +type InitialWaitOption struct { + initialWait time.Duration +} + +// WithWaitForStateOption creates a new InitialWaitOption. +func WithWaitForStateOption(initialWait time.Duration) WaitForStateOption { + return &InitialWaitOption{ + initialWait, + } +} + +// apply implements the WaitForStateOption interface. +func (w *InitialWaitOption) apply(o *fsmOptions) { + o.initialWait = w.initialWait +} + // WaitForState waits for the state machine to reach the given state. +// If the optional initialWait parameter is set, the function will wait for +// the given duration before checking the state. This is useful if the +// function is called immediately after sending an event to the state machine +// and the state machine needs some time to process the event. func (s *CachedObserver) WaitForState(ctx context.Context, - timeout time.Duration, state StateType) error { + timeout time.Duration, state StateType, + opts ...InitialWaitOption) error { + + var options fsmOptions + + for _, opt := range opts { + opt.apply(&options) + } + + // Wait for the initial wait duration if set. + if options.initialWait > 0 { + select { + case <-time.After(options.initialWait): + + case <-ctx.Done(): + return ctx.Err() + } + } timeoutCtx, cancel := context.WithTimeout(ctx, timeout) defer cancel()