@@ -44,18 +44,31 @@ func (rb *redisBackend) GetWorkflowTask(ctx context.Context) (*task.Workflow, er
4444
4545 for _ , eventR := range result .([]interface {}) {
4646 eventStr := eventR .(string )
47- var event futureEvent
48- if err := json .Unmarshal ([]byte (eventStr ), & event ); err != nil {
47+ var futureEvent futureEvent
48+ if err := json .Unmarshal ([]byte (eventStr ), & futureEvent ); err != nil {
4949 return nil , errors .Wrap (err , "could not unmarshal event" )
5050 }
5151
52- msgID , err := addEventToStream (ctx , rb .rdb , pendingEventsKey (event .Instance .InstanceID ), event .Event )
52+ instanceState , err := readInstance (ctx , rb .rdb , futureEvent .Instance .InstanceID )
53+ if err != nil {
54+ if err == backend .ErrInstanceNotFound {
55+ rb .options .Logger .Debug ("Ignoring future event for non-existing instance" , "instance_id" , futureEvent .Instance .InstanceID , "event_id" , futureEvent .Event .ID )
56+ } else {
57+ return nil , errors .Wrap (err , "could not read instance" )
58+ }
59+ }
60+
61+ if instanceState .State != backend .WorkflowStateActive {
62+ rb .options .Logger .Debug ("Ignoring future event for already completed instance" , "instance_id" , futureEvent .Instance .InstanceID , "event_id" , futureEvent .Event .ID )
63+ }
64+
65+ msgID , err := addEventToStream (ctx , rb .rdb , pendingEventsKey (futureEvent .Instance .InstanceID ), futureEvent .Event )
5366 if err != nil {
5467 return nil , errors .Wrap (err , "could not add future event to stream" )
5568 }
5669
5770 // Instance now has at least one pending event, try to queue task
58- if _ , err := rb .workflowQueue .Enqueue (ctx , event .Instance .InstanceID , & workflowTaskData {
71+ if _ , err := rb .workflowQueue .Enqueue (ctx , futureEvent .Instance .InstanceID , & workflowTaskData {
5972 LastPendingEventMessageID : * msgID ,
6073 }); err != nil {
6174 if err != taskqueue .ErrTaskAlreadyInQueue {
0 commit comments