44	"context" 
55	"database/sql" 
66	_ "embed" 
7+ 	"encoding/json" 
78	"errors" 
89	"fmt" 
910	"strings" 
@@ -17,6 +18,7 @@ import (
1718	"github.com/cschleiden/go-workflows/workflow" 
1819	_ "github.com/go-sql-driver/mysql" 
1920	"github.com/google/uuid" 
21+ 	"go.opentelemetry.io/otel/trace" 
2022)
2123
2224//go:embed schema.sql 
@@ -58,7 +60,7 @@ type mysqlBackend struct {
5860}
5961
6062// CreateWorkflowInstance creates a new workflow instance 
61- func  (b  * mysqlBackend ) CreateWorkflowInstance (ctx  context.Context , m   history.WorkflowEvent ) error  {
63+ func  (b  * mysqlBackend ) CreateWorkflowInstance (ctx  context.Context , instance   * workflow. Instance ,  event   history.Event ) error  {
6264	tx , err  :=  b .db .BeginTx (ctx , & sql.TxOptions {
6365		Isolation : sql .LevelReadCommitted ,
6466	})
@@ -68,12 +70,12 @@ func (b *mysqlBackend) CreateWorkflowInstance(ctx context.Context, m history.Wor
6870	defer  tx .Rollback ()
6971
7072	// Create workflow instance 
71- 	if  err  :=  createInstance (ctx , tx , m . WorkflowInstance , false ); err  !=  nil  {
73+ 	if  err  :=  createInstance (ctx , tx , instance ,  event . Attributes .( * history. ExecutionStartedAttributes ). Metadata , false ); err  !=  nil  {
7274		return  err 
7375	}
7476
7577	// Initial history is empty, store only new events 
76- 	if  err  :=  insertPendingEvents (ctx , tx , m . WorkflowInstance . InstanceID , []history.Event {m . HistoryEvent }); err  !=  nil  {
78+ 	if  err  :=  insertPendingEvents (ctx , tx , instance . InstanceID , []history.Event {event }); err  !=  nil  {
7779		return  fmt .Errorf ("inserting new event: %w" , err )
7880	}
7981
@@ -88,6 +90,10 @@ func (b *mysqlBackend) Logger() log.Logger {
8890	return  b .options .Logger 
8991}
9092
93+ func  (b  * mysqlBackend ) Tracer () trace.Tracer  {
94+ 	return  b .options .TracerProvider .Tracer (backend .TracerName )
95+ }
96+ 
9197func  (b  * mysqlBackend ) CancelWorkflowInstance (ctx  context.Context , instance  * workflow.Instance , event  * history.Event ) error  {
9298	tx , err  :=  b .db .BeginTx (ctx , & sql.TxOptions {
9399		Isolation : sql .LevelReadCommitted ,
@@ -199,7 +205,7 @@ func (b *mysqlBackend) GetWorkflowInstanceState(ctx context.Context, instance *w
199205	return  backend .WorkflowStateActive , nil 
200206}
201207
202- func  createInstance (ctx  context.Context , tx  * sql.Tx , wfi  * workflow.Instance , ignoreDuplicate  bool ) error  {
208+ func  createInstance (ctx  context.Context , tx  * sql.Tx , wfi  * workflow.Instance , metadata   * workflow. Metadata ,  ignoreDuplicate  bool ) error  {
203209	var  parentInstanceID  * string 
204210	var  parentEventID  * int64 
205211	if  wfi .SubWorkflow () {
@@ -210,13 +216,19 @@ func createInstance(ctx context.Context, tx *sql.Tx, wfi *workflow.Instance, ign
210216		parentEventID  =  & n 
211217	}
212218
219+ 	metadataJson , err  :=  json .Marshal (metadata )
220+ 	if  err  !=  nil  {
221+ 		return  fmt .Errorf ("marshaling metadata: %w" , err )
222+ 	}
223+ 
213224	res , err  :=  tx .ExecContext (
214225		ctx ,
215- 		"INSERT IGNORE INTO `instances` (instance_id, execution_id, parent_instance_id, parent_schedule_event_id) VALUES (?, ?, ?, ?)" ,
226+ 		"INSERT IGNORE INTO `instances` (instance_id, execution_id, parent_instance_id, parent_schedule_event_id, metadata ) VALUES (?,  ?, ?, ?, ?)" ,
216227		wfi .InstanceID ,
217228		wfi .ExecutionID ,
218229		parentInstanceID ,
219230		parentEventID ,
231+ 		string (metadataJson ),
220232	)
221233	if  err  !=  nil  {
222234		return  fmt .Errorf ("inserting workflow instance: %w" , err )
@@ -273,7 +285,7 @@ func (b *mysqlBackend) GetWorkflowTask(ctx context.Context) (*task.Workflow, err
273285	now  :=  time .Now ()
274286	row  :=  tx .QueryRowContext (
275287		ctx ,
276- 		`SELECT i.id, i.instance_id, i.execution_id, i.parent_instance_id, i.parent_schedule_event_id, i.sticky_until 
288+ 		`SELECT i.id, i.instance_id, i.execution_id, i.parent_instance_id, i.parent_schedule_event_id, i.metadata, i. sticky_until 
277289			FROM instances i 
278290			INNER JOIN pending_events pe ON i.instance_id = pe.instance_id 
279291			WHERE 
@@ -293,8 +305,9 @@ func (b *mysqlBackend) GetWorkflowTask(ctx context.Context) (*task.Workflow, err
293305	var  instanceID , executionID  string 
294306	var  parentInstanceID  * string 
295307	var  parentEventID  * int64 
308+ 	var  metadataJson  sql.NullString 
296309	var  stickyUntil  * time.Time 
297- 	if  err  :=  row .Scan (& id , & instanceID , & executionID , & parentInstanceID , & parentEventID , & stickyUntil ); err  !=  nil  {
310+ 	if  err  :=  row .Scan (& id , & instanceID , & executionID , & parentInstanceID , & parentEventID , & metadataJson ,  & stickyUntil ); err  !=  nil  {
298311		if  err  ==  sql .ErrNoRows  {
299312			return  nil , nil 
300313		}
@@ -329,9 +342,17 @@ func (b *mysqlBackend) GetWorkflowTask(ctx context.Context) (*task.Workflow, err
329342		wfi  =  core .NewWorkflowInstance (instanceID , executionID )
330343	}
331344
345+ 	var  metadata  * core.WorkflowMetadata 
346+ 	if  metadataJson .Valid  {
347+ 		if  err  :=  json .Unmarshal ([]byte (metadataJson .String ), & metadata ); err  !=  nil  {
348+ 			return  nil , fmt .Errorf ("parsing workflow metadata: %w" , err )
349+ 		}
350+ 	}
351+ 
332352	t  :=  & task.Workflow {
333353		ID :               wfi .InstanceID ,
334354		WorkflowInstance : wfi ,
355+ 		Metadata :         metadata ,
335356		NewEvents :        []history.Event {},
336357	}
337358
@@ -489,20 +510,18 @@ func (b *mysqlBackend) CompleteWorkflowTask(
489510	}
490511
491512	// Insert new workflow events 
492- 	groupedEvents  :=  make (map [* workflow.Instance ][]history.Event )
493- 	for  _ , m  :=  range  workflowEvents  {
494- 		if  _ , ok  :=  groupedEvents [m .WorkflowInstance ]; ! ok  {
495- 			groupedEvents [m .WorkflowInstance ] =  []history.Event {}
496- 		}
497- 
498- 		groupedEvents [m .WorkflowInstance ] =  append (groupedEvents [m .WorkflowInstance ], m .HistoryEvent )
499- 	}
513+ 	groupedEvents  :=  history .EventsByWorkflowInstance (workflowEvents )
500514
501515	for  targetInstance , events  :=  range  groupedEvents  {
502- 		if  targetInstance .InstanceID  !=  instance .InstanceID  {
503- 			// Create new instance 
504- 			if  err  :=  createInstance (ctx , tx , targetInstance , true ); err  !=  nil  {
505- 				return  err 
516+ 		for  _ , event  :=  range  events  {
517+ 			if  event .Type  ==  history .EventType_WorkflowExecutionStarted  {
518+ 				a  :=  event .Attributes .(* history.ExecutionStartedAttributes )
519+ 				// Create new instance 
520+ 				if  err  :=  createInstance (ctx , tx , targetInstance , a .Metadata , true ); err  !=  nil  {
521+ 					return  err 
522+ 				}
523+ 
524+ 				break 
506525			}
507526		}
508527
@@ -561,9 +580,11 @@ func (b *mysqlBackend) GetActivityTask(ctx context.Context) (*task.Activity, err
561580	now  :=  time .Now ()
562581	res  :=  tx .QueryRowContext (
563582		ctx ,
564- 		`SELECT id, activity_id, instance_id, execution_id, event_type, timestamp, schedule_event_id, attributes, visible_at 
583+ 		`SELECT activities.id, activity_id, activities.instance_id, activities.execution_id, 
584+ 			instances.metadata, event_type, timestamp, schedule_event_id, attributes, visible_at 
565585			FROM activities 
566- 			WHERE locked_until IS NULL OR locked_until < ? 
586+ 				INNER JOIN instances ON activities.instance_id = instances.instance_id 
587+ 			WHERE activities.locked_until IS NULL OR activities.locked_until < ? 
567588			LIMIT 1 
568589			FOR UPDATE SKIP LOCKED` ,
569590		now ,
@@ -572,16 +593,24 @@ func (b *mysqlBackend) GetActivityTask(ctx context.Context) (*task.Activity, err
572593	var  id  int64 
573594	var  instanceID , executionID  string 
574595	var  attributes  []byte 
596+ 	var  metadataJson  sql.NullString 
575597	event  :=  history.Event {}
576598
577- 	if  err  :=  res .Scan (& id , & event .ID , & instanceID , & executionID , & event .Type , & event .Timestamp , & event .ScheduleEventID , & attributes , & event .VisibleAt ); err  !=  nil  {
599+ 	if  err  :=  res .Scan (
600+ 		& id , & event .ID , & instanceID , & executionID , & metadataJson , & event .Type ,
601+ 		& event .Timestamp , & event .ScheduleEventID , & attributes , & event .VisibleAt ); err  !=  nil  {
578602		if  err  ==  sql .ErrNoRows  {
579603			return  nil , nil 
580604		}
581605
582606		return  nil , fmt .Errorf ("finding activity task to lock: %w" , err )
583607	}
584608
609+ 	var  metadata  * workflow.Metadata 
610+ 	if  err  :=  json .Unmarshal ([]byte (metadataJson .String ), & metadata ); err  !=  nil  {
611+ 		return  nil , fmt .Errorf ("unmarshaling metadata: %w" , err )
612+ 	}
613+ 
585614	a , err  :=  history .DeserializeAttributes (event .Type , attributes )
586615	if  err  !=  nil  {
587616		return  nil , fmt .Errorf ("deserializing attributes: %w" , err )
@@ -602,6 +631,7 @@ func (b *mysqlBackend) GetActivityTask(ctx context.Context) (*task.Activity, err
602631	t  :=  & task.Activity {
603632		ID :               event .ID ,
604633		WorkflowInstance : core .NewWorkflowInstance (instanceID , executionID ),
634+ 		Metadata :         metadata ,
605635		Event :            event ,
606636	}
607637
0 commit comments