88 "os"
99 "os/exec"
1010 "path/filepath"
11+ "strconv"
1112 "strings"
1213
1314 "github.com/beego/beego/v2/core/logs"
@@ -91,13 +92,13 @@ func (r *Runner) GetDockerImageName(sourceType, version string) string {
9192}
9293
9394// ExecuteDockerCommand executes a Docker command with the given parameters
94- func (r * Runner ) ExecuteDockerCommand (ctx context.Context , flag string , command Command , sourceType , version , configPath string , additionalArgs ... string ) ([]byte , error ) {
95+ func (r * Runner ) ExecuteDockerCommand (ctx context.Context , containerName , flag string , command Command , sourceType , version , configPath string , additionalArgs ... string ) ([]byte , error ) {
9596 outputDir := filepath .Dir (configPath )
9697 if err := utils .CreateDirectory (outputDir , DefaultDirPermissions ); err != nil {
9798 return nil , err
9899 }
99100
100- dockerArgs := r .buildDockerArgs (ctx , flag , command , sourceType , version , configPath , outputDir , additionalArgs ... )
101+ dockerArgs := r .buildDockerArgs (ctx , containerName , flag , command , sourceType , version , configPath , outputDir , additionalArgs ... )
101102 if len (dockerArgs ) == 0 {
102103 return nil , fmt .Errorf ("failed to build docker args" )
103104 }
@@ -120,7 +121,7 @@ func (r *Runner) ExecuteDockerCommand(ctx context.Context, flag string, command
120121}
121122
122123// buildDockerArgs constructs Docker command arguments
123- func (r * Runner ) buildDockerArgs (ctx context.Context , flag string , command Command , sourceType , version , configPath , outputDir string , additionalArgs ... string ) []string {
124+ func (r * Runner ) buildDockerArgs (ctx context.Context , containerName , flag string , command Command , sourceType , version , configPath , outputDir string , additionalArgs ... string ) []string {
124125 hostOutputDir := r .getHostOutputDir (outputDir )
125126
126127 repositoryBase , err := web .AppConfig .String ("CONTAINER_REGISTRY_BASE" )
@@ -145,7 +146,7 @@ func (r *Runner) buildDockerArgs(ctx context.Context, flag string, command Comma
145146 }
146147
147148 // base docker args
148- dockerArgs := []string {"run" , "--rm" }
149+ dockerArgs := []string {"run" , "--name" , containerName }
149150
150151 if hostOutputDir != "" {
151152 dockerArgs = append (dockerArgs , "-v" , fmt .Sprintf ("%s:/mnt/config" , hostOutputDir ))
@@ -182,9 +183,9 @@ func (r *Runner) getHostOutputDir(outputDir string) string {
182183 return outputDir
183184}
184185
185- func (r * Runner ) FetchSpec (ctx context.Context , destinationType , sourceType , version , _ string ) (models.SpecOutput , error ) {
186+ func (r * Runner ) FetchSpec (ctx context.Context , destinationType , sourceType , version , workflowID string ) (models.SpecOutput , error ) {
186187 flag := utils .Ternary (destinationType != "" , "destination-type" , "" ).(string )
187- dockerArgs := r .buildDockerArgs (ctx , flag , Spec , sourceType , version , "" , "" , destinationType )
188+ dockerArgs := r .buildDockerArgs (ctx , workflowID , flag , Spec , sourceType , version , "" , "" , destinationType )
188189
189190 cmd := exec .CommandContext (ctx , "docker" , dockerArgs ... )
190191 logs .Info ("Running Docker command: docker %s\n " , strings .Join (dockerArgs , " " ))
@@ -216,7 +217,7 @@ func (r *Runner) TestConnection(ctx context.Context, flag, sourceType, version,
216217 }
217218
218219 configPath := filepath .Join (workDir , "config.json" )
219- output , err := r .ExecuteDockerCommand (ctx , flag , Check , sourceType , version , configPath )
220+ output , err := r .ExecuteDockerCommand (ctx , workflowID , flag , Check , sourceType , version , configPath )
220221 if err != nil {
221222 return nil , err
222223 }
@@ -270,7 +271,7 @@ func (r *Runner) GetCatalog(ctx context.Context, sourceType, version, config, wo
270271 if jobName != "" && semver .Compare (version , "v0.2.0" ) >= 0 {
271272 catalogsArgs = append (catalogsArgs , "--destination-database-prefix" , jobName )
272273 }
273- _ , err = r .ExecuteDockerCommand (ctx , "config" , Discover , sourceType , version , configPath , catalogsArgs ... )
274+ _ , err = r .ExecuteDockerCommand (ctx , workflowID , "config" , Discover , sourceType , version , configPath , catalogsArgs ... )
274275 if err != nil {
275276 return nil , err
276277 }
@@ -281,56 +282,216 @@ func (r *Runner) GetCatalog(ctx context.Context, sourceType, version, config, wo
281282
282283// RunSync runs the sync command to transfer data from source to destination
283284func (r * Runner ) RunSync (ctx context.Context , jobID int , workflowID string ) (map [string ]interface {}, error ) {
284- // Generate unique directory name
285- workDir , err := r .setupWorkDirectory (fmt .Sprintf ("%x" , sha256 .Sum256 ([]byte (workflowID ))))
285+ // Deterministic container name to enable adoption across retries
286+ containerName := WorkflowHash (workflowID )
287+
288+ // Setup work dir and configs
289+ workDir , err := r .setupWorkDirectory (containerName )
286290 if err != nil {
291+ logs .Error ("workflowID %s: failed to setup work directory: %s" , workflowID , err )
287292 return nil , err
288293 }
289- logs .Info ("working directory path %s\n " , workDir )
290- // Get current job state
291- jobORM := database .NewJobORM ()
292- job , err := jobORM .GetByID (jobID , false )
294+
295+ // Marker to indicate we have launched once; prevents relaunch after retries
296+ launchedMarker := filepath .Join (workDir , "logs" )
297+
298+ // Inspect container state
299+ state := getContainerState (ctx , containerName , workflowID )
300+
301+ // 1) If container is running, adopt and wait for completion
302+ if state .Exists && state .Running {
303+ logs .Info ("workflowID %s: adopting running container %s" , workflowID , containerName )
304+ if err := waitContainer (ctx , containerName , workflowID ); err != nil {
305+ logs .Error ("workflowID %s: container wait failed: %s" , workflowID , err )
306+ return nil , err
307+ }
308+ state = getContainerState (ctx , containerName , workflowID )
309+ }
310+
311+ // 2) If container exists and exited, treat as finished: cleanup and return status
312+ if state .Exists && ! state .Running && state .ExitCode != nil {
313+ logs .Info ("workflowID %s: container %s exited with code %d" , workflowID , containerName , * state .ExitCode )
314+ if * state .ExitCode == 0 {
315+ return map [string ]interface {}{"status" : "completed" }, nil
316+ }
317+ // Return typed error so policy can decide retry vs. fail-fast
318+ return nil , fmt .Errorf ("workflowID %s: container %s exit %d" , workflowID , containerName , * state .ExitCode )
319+ }
320+
321+ // 3) First launch path: only if we never launched and nothing is running
322+ if _ , err := os .Stat (launchedMarker ); os .IsNotExist (err ) {
323+ logs .Info ("workflowID %s: first launch path, preparing configs" , workflowID )
324+ jobORM := database .NewJobORM ()
325+ job , err := jobORM .GetByID (jobID , false )
326+ if err != nil {
327+ logs .Error ("workflowID %s: failed to fetch job %d: %s" , workflowID , jobID , err )
328+ return nil , err
329+ }
330+ configs := []FileConfig {
331+ {Name : "config.json" , Data : job .SourceID .Config },
332+ {Name : "streams.json" , Data : job .StreamsConfig },
333+ {Name : "writer.json" , Data : job .DestID .Config },
334+ {Name : "state.json" , Data : job .State },
335+ {Name : "user_id.txt" , Data : r .anonymousID },
336+ }
337+ if err := r .writeConfigFiles (workDir , configs ); err != nil {
338+ logs .Error ("workflowID %s: failed to write config files: %s" , workflowID , err )
339+ return nil , err
340+ }
341+
342+ configPath := filepath .Join (workDir , "config.json" )
343+ logs .Info ("workflowID %s: executing docker container %s" , workflowID , containerName )
344+
345+ if _ , err = r .ExecuteDockerCommand (
346+ ctx ,
347+ containerName ,
348+ "config" ,
349+ Sync ,
350+ job .SourceID .Type ,
351+ job .SourceID .Version ,
352+ configPath ,
353+ "--catalog" , "/mnt/config/streams.json" ,
354+ "--destination" , "/mnt/config/writer.json" ,
355+ "--state" , "/mnt/config/state.json" ,
356+ ); err != nil {
357+ logs .Error ("workflowID %s: docker execution failed: %s" , workflowID , err )
358+ return nil , err
359+ }
360+
361+ logs .Info ("workflowID %s: container %s completed successfully" , workflowID , containerName )
362+ return map [string ]interface {}{"status" : "completed" }, nil
363+ }
364+ // Skip if container is not running, was already launched (logs exist), and no new run is needed.
365+ logs .Info ("workflowID %s: container %s already handled, skipping launch" , workflowID , containerName )
366+ return map [string ]interface {}{"status" : "skipped" }, nil
367+ }
368+
369+ type ContainerState struct {
370+ Exists bool
371+ Running bool
372+ ExitCode * int
373+ }
374+
375+ func getContainerState (ctx context.Context , name , workflowID string ) ContainerState {
376+ // docker inspect returns fields if exists
377+ cmd := exec .CommandContext (ctx , "docker" , "inspect" , "-f" , "{{.State.Status}} {{.State.Running}} {{.State.ExitCode}}" , name )
378+ out , err := cmd .CombinedOutput ()
293379 if err != nil {
294- return nil , err
380+ // treat not found as non-existent
381+ logs .Warn ("workflowID %s: docker inspect failed for %s: %s, output: %s" , workflowID , name , err , string (out ))
382+ return ContainerState {Exists : false }
383+ }
384+ // Split Docker inspect output into fields: status, running flag, and exit code
385+ // Example: "exited false 137" → parts[0]="exited", parts[1]="false", parts[2]="137"
386+ parts := strings .Fields (strings .TrimSpace (string (out )))
387+ if len (parts ) < 3 {
388+ return ContainerState {Exists : false }
389+ }
390+ // Docker .State.Status can be "created", "running", "paused", "restarting", "removing", "exited", or "dead"; we only handle running vs exited/dead.
391+ status := parts [0 ]
392+ running := parts [1 ] == "true"
393+ var ec * int
394+ if ! running && (status == "exited" || status == "dead" ) {
395+ if code , convErr := strconv .Atoi (parts [2 ]); convErr == nil {
396+ ec = & code
397+ }
295398 }
399+ return ContainerState {Exists : true , Running : running , ExitCode : ec }
400+ }
296401
297- // Prepare all configuration files
298- configs := []FileConfig {
299- {Name : "config.json" , Data : job .SourceID .Config },
300- {Name : "streams.json" , Data : job .StreamsConfig },
301- {Name : "writer.json" , Data : job .DestID .Config },
302- {Name : "state.json" , Data : job .State },
303- {Name : "user_id.txt" , Data : r .anonymousID },
402+ func waitContainer (ctx context.Context , name , workflowID string ) error {
403+ // docker wait prints exit code; validate non-zero as error
404+ cmd := exec .CommandContext (ctx , "docker" , "wait" , name )
405+ out , err := cmd .CombinedOutput ()
406+ if err != nil {
407+ logs .Error ("workflowID %s: docker wait failed for %s: %s, output: %s" , workflowID , name , err , string (out ))
408+ return fmt .Errorf ("docker wait failed: %s" , err )
409+ }
410+ strOut := strings .TrimSpace (string (out ))
411+ code , convErr := strconv .Atoi (strOut )
412+ if convErr != nil {
413+ logs .Error ("workflowID %s: failed to parse exit code from docker wait output %q: %s" , workflowID , strOut , convErr )
414+ return fmt .Errorf ("failed to parse exit code: %s" , convErr )
304415 }
305416
306- if err := r .writeConfigFiles (workDir , configs ); err != nil {
307- return nil , err
417+ if code != 0 {
418+ return fmt .Errorf ("workflowID %s: container %s exited with code %d" , workflowID , name , code )
419+ }
420+ return nil
421+ }
422+
423+ // StopContainer stops a container by name, falling back to kill if needed.
424+ func StopContainer (ctx context.Context , workflowID string ) error {
425+ containerName := WorkflowHash (workflowID )
426+ logs .Info ("workflowID %s: stop request received for container %s" , workflowID , containerName )
427+
428+ if strings .TrimSpace (containerName ) == "" {
429+ logs .Warn ("workflowID %s: empty container name" , workflowID )
430+ return fmt .Errorf ("empty container name" )
431+ }
432+
433+ stopCmd := exec .CommandContext (ctx , "docker" , "stop" , "-t" , "5" , containerName )
434+ if out , err := stopCmd .CombinedOutput (); err != nil {
435+ logs .Warn ("workflowID %s: docker stop failed for %s: %s, output: %s" , workflowID , containerName , err , string (out ))
436+ killCmd := exec .CommandContext (ctx , "docker" , "kill" , containerName )
437+ if kout , kerr := killCmd .CombinedOutput (); kerr != nil {
438+ logs .Error ("workflowID %s: docker kill failed for %s: %s, output: %s" , workflowID , containerName , kerr , string (kout ))
439+ return fmt .Errorf ("docker kill failed: %s" , kerr )
440+ }
441+ }
442+
443+ rmCmd := exec .CommandContext (ctx , "docker" , "rm" , "-f" , containerName )
444+ if rmOut , rmErr := rmCmd .CombinedOutput (); rmErr != nil {
445+ logs .Warn ("workflowID %s: docker rm failed for %s: %s, output: %s" , workflowID , containerName , rmErr , string (rmOut ))
446+ } else {
447+ logs .Info ("workflowID %s: container %s removed successfully" , workflowID , containerName )
448+ }
449+
450+ return nil
451+ }
452+
453+ // PersistJobStateFromFile reads the state JSON file and updates the job state
454+ func (r * Runner ) PersistJobStateFromFile (jobID int , workflowID string ) error {
455+ hashWorkflowID := WorkflowHash (workflowID )
456+ workDir , err := r .setupWorkDirectory (hashWorkflowID )
457+ if err != nil {
458+ logs .Error ("workflowID %s: failed to setup work directory: %s" , workflowID , err )
459+ return err
308460 }
309461
310- configPath := filepath .Join (workDir , "config.json" )
311462 statePath := filepath .Join (workDir , "state.json" )
463+ state , err := utils .ParseJSONFile (statePath )
464+ if err != nil {
465+ logs .Error ("workflowID %s: failed to parse state file %s: %s" , workflowID , statePath , err )
466+ return err
467+ }
312468
313- // Execute sync command
314- _ , err = r .ExecuteDockerCommand (ctx , "config" , Sync , job .SourceID .Type , job .SourceID .Version , configPath ,
315- "--catalog" , "/mnt/config/streams.json" ,
316- "--destination" , "/mnt/config/writer.json" ,
317- "--state" , "/mnt/config/state.json" )
469+ jobORM := database .NewJobORM ()
470+ job , err := jobORM .GetByID (jobID , false )
318471 if err != nil {
319- return nil , err
472+ logs .Error ("workflowID %s: failed to fetch job %d: %s" , workflowID , jobID , err )
473+ return err
320474 }
321- // Parse state file
322- result , err := utils . ParseJSONFile ( statePath )
475+
476+ stateJSON , err := json . Marshal ( state )
323477 if err != nil {
324- return nil , err
478+ logs .Error ("workflowID %s: failed to marshal state: %s" , workflowID , err )
479+ return err
325480 }
326481
327- // Update job state if we have valid result
328- if stateJSON , err := json .Marshal (result ); err == nil {
329- job .State = string (stateJSON )
330- job .Active = true
331- if err := jobORM .Update (job ); err != nil {
332- return nil , err
333- }
482+ job .State = string (stateJSON )
483+ job .Active = true
484+
485+ if err := jobORM .Update (job ); err != nil {
486+ logs .Error ("workflowID %s: failed to update job %d: %s" , workflowID , jobID , err )
487+ return err
334488 }
335- return result , nil
489+
490+ logs .Info ("workflowID %s: job state persisted successfully for jobID %d" , workflowID , jobID )
491+ return nil
492+ }
493+
494+ // WorkflowHash returns a deterministic hash string for a given workflowID
495+ func WorkflowHash (workflowID string ) string {
496+ return fmt .Sprintf ("%x" , sha256 .Sum256 ([]byte (workflowID )))
336497}
0 commit comments