3636 ErrLockTimeout = errors .New ("timeout: can't acquire database lock" )
3737)
3838
39+ // PostStepCallbackOffset is a sentinel offset added to a normal SQL migration
40+ // version. During the post-step callback for version V, the database version is
41+ // persisted as (V + PostStepCallbackOffset). This enables the migrate package
42+ // to detect and re-run a post-step callback which errored after the
43+ // corresponding SQL migration was applied successfully.
44+ // Note that only the post-step callback is re-run, the SQL migration is not
45+ // re-applied.
46+ //
47+ // If the persisted version is >= PostStepCallbackOffset and not `dirty`, then
48+ // the post-step callback for (version - PostStepCallbackOffset) will be re-run
49+ // on the next migration run.
50+ // If the persisted version is `dirty`, manual intervention is required, as it's
51+ // not possible by the migration framework to determine whether post-step
52+ // callback actually was executed successfully or not during the last execution
53+ // attempt.
54+ //
55+ // NOTE:
56+ // Changing this value is a breaking change for any database that currently
57+ // has a post-step phase recorded. Do not change it unless you also provide
58+ // a safe transition strategy.
59+ // Also note that no SQL migration can use a version >= PostStepCallbackOffset.
60+ // Such versions are reserved for the post-step callback phase, and any SQL
61+ // migrations with such versions will cause an error.
62+ const PostStepCallbackOffset = 1000000000
63+
3964// ErrShortLimit is an error returned when not enough migrations
4065// can be returned by a source for a given limit.
4166type ErrShortLimit struct {
@@ -296,6 +321,23 @@ func (m *Migrate) Migrate(version uint) error {
296321 return m .unlockErr (ErrDirty {curVersion })
297322 }
298323
324+ // If the current version is a clean post-step callback version, then
325+ // we need to rerun the post-step callback for the previous version
326+ // before we can continue with any SQL migration(s).
327+ if IsPostStepCallbackVersion (curVersion ) {
328+ sqlMigVersion := SQLMigrationVersion (curVersion )
329+
330+ err := m .executePostStepCallbackForSQLMig (sqlMigVersion )
331+ if err != nil {
332+ return m .unlockErr (err )
333+ }
334+
335+ curVersion , dirty , err = m .databaseDrv .Version ()
336+ if err != nil {
337+ return m .unlockErr (err )
338+ }
339+ }
340+
299341 ret := make (chan interface {}, m .PrefetchMigrations )
300342 go m .read (curVersion , int (version ), ret )
301343
@@ -322,6 +364,23 @@ func (m *Migrate) Steps(n int) error {
322364 return m .unlockErr (ErrDirty {curVersion })
323365 }
324366
367+ // If the current version is a clean post-step callback version, then
368+ // we need to rerun the post-step callback for the previous version
369+ // before we can continue with any SQL migration(s).
370+ if IsPostStepCallbackVersion (curVersion ) {
371+ sqlMigVersion := SQLMigrationVersion (curVersion )
372+
373+ err := m .executePostStepCallbackForSQLMig (sqlMigVersion )
374+ if err != nil {
375+ return m .unlockErr (err )
376+ }
377+
378+ curVersion , dirty , err = m .databaseDrv .Version ()
379+ if err != nil {
380+ return m .unlockErr (err )
381+ }
382+ }
383+
325384 ret := make (chan interface {}, m .PrefetchMigrations )
326385
327386 if n > 0 {
@@ -349,6 +408,23 @@ func (m *Migrate) Up() error {
349408 return m .unlockErr (ErrDirty {curVersion })
350409 }
351410
411+ // If the current version is a clean post-step callback version, then
412+ // we need to rerun the post-step callback for the previous version
413+ // before we can continue with any SQL migration(s).
414+ if IsPostStepCallbackVersion (curVersion ) {
415+ sqlMigVersion := SQLMigrationVersion (curVersion )
416+
417+ err := m .executePostStepCallbackForSQLMig (sqlMigVersion )
418+ if err != nil {
419+ return m .unlockErr (err )
420+ }
421+
422+ curVersion , dirty , err = m .databaseDrv .Version ()
423+ if err != nil {
424+ return m .unlockErr (err )
425+ }
426+ }
427+
352428 ret := make (chan interface {}, m .PrefetchMigrations )
353429
354430 go m .readUp (curVersion , - 1 , ret )
@@ -371,6 +447,23 @@ func (m *Migrate) Down() error {
371447 return m .unlockErr (ErrDirty {curVersion })
372448 }
373449
450+ // If the current version is a clean post-step callback version, then
451+ // we need to rerun the post-step callback for the previous version
452+ // before we can continue with any SQL migration(s).
453+ if IsPostStepCallbackVersion (curVersion ) {
454+ sqlMigVersion := SQLMigrationVersion (curVersion )
455+
456+ err := m .executePostStepCallbackForSQLMig (sqlMigVersion )
457+ if err != nil {
458+ return m .unlockErr (err )
459+ }
460+
461+ curVersion , dirty , err = m .databaseDrv .Version ()
462+ if err != nil {
463+ return m .unlockErr (err )
464+ }
465+ }
466+
374467 ret := make (chan interface {}, m .PrefetchMigrations )
375468 go m .readDown (curVersion , - 1 , ret )
376469 return m .unlockErr (m .runMigrations (ret ))
@@ -409,6 +502,23 @@ func (m *Migrate) Run(migration ...*Migration) error {
409502 return m .unlockErr (ErrDirty {curVersion })
410503 }
411504
505+ // If the current version is a clean post step callback version, then
506+ // we need to rerun the post step callback for the previous version
507+ // before we can continue with any SQL migration(s).
508+ if IsPostStepCallbackVersion (curVersion ) {
509+ sqlMigVersion := SQLMigrationVersion (curVersion )
510+
511+ err := m .executePostStepCallbackForSQLMig (sqlMigVersion )
512+ if err != nil {
513+ return m .unlockErr (err )
514+ }
515+
516+ curVersion , dirty , err = m .databaseDrv .Version ()
517+ if err != nil {
518+ return m .unlockErr (err )
519+ }
520+ }
521+
412522 ret := make (chan interface {}, m .PrefetchMigrations )
413523
414524 go func () {
@@ -787,6 +897,30 @@ func (m *Migrate) readDown(from int, limit int, ret chan<- interface{}) {
787897 }
788898}
789899
900+ // readSingle reads a single migration for the given version, and sends it
901+ // over the passed channel.
902+ func (m * Migrate ) readSingle (ver uint , ret chan <- interface {}) {
903+ defer close (ret )
904+
905+ if err := m .versionExists (ver ); err != nil {
906+ ret <- err
907+ return
908+ }
909+
910+ migr , err := m .newMigration (ver , int (ver ))
911+ if err != nil {
912+ ret <- err
913+ return
914+ }
915+
916+ ret <- migr
917+ go func () {
918+ if err := migr .Buffer (); err != nil {
919+ m .logErr (err )
920+ }
921+ }()
922+ }
923+
790924// runMigrations reads *Migration and error from a channel. Any other type
791925// sent on this channel will result in a panic. Each migration is then
792926// proxied to the database driver and run against the database.
@@ -807,6 +941,12 @@ func (m *Migrate) runMigrations(ret <-chan interface{}) error {
807941 case * Migration :
808942 migr := r
809943
944+ if migr .Version >= PostStepCallbackOffset {
945+ return fmt .Errorf ("migration version %v is " +
946+ "invalid, must be < %v" , migr .Version ,
947+ PostStepCallbackOffset )
948+ }
949+
810950 // set version with dirty state
811951 if err := m .databaseDrv .SetVersion (migr .TargetVersion , true ); err != nil {
812952 return err
@@ -818,23 +958,9 @@ func (m *Migrate) runMigrations(ret <-chan interface{}) error {
818958 return err
819959 }
820960
821- // If there is a post execution function for
822- // this migration, run it now.
823- cb , ok := m .opts .postStepCallbacks [migr .Version ]
824- if ok {
825- m .logVerbosePrintf ("Running post step " +
826- "callback for %v\n " , migr .LogString ())
827-
828- err := cb (migr , m .databaseDrv )
829- if err != nil {
830- return fmt .Errorf ("failed to " +
831- "execute post " +
832- "step callback: %w" ,
833- err )
834- }
835-
836- m .logVerbosePrintf ("Post step callback " +
837- "finished for %v\n " , migr .LogString ())
961+ err := m .executePostStepCallback (migr )
962+ if err != nil {
963+ return err
838964 }
839965 }
840966
@@ -863,6 +989,109 @@ func (m *Migrate) runMigrations(ret <-chan interface{}) error {
863989 return nil
864990}
865991
992+ // executePostStepCallback checks if a post-step callback exists for the passed
993+ // migration and proceeds to execute if one exists.
994+ func (m * Migrate ) executePostStepCallback (migr * Migration ) error {
995+ cb , ok := m .opts .postStepCallbacks [migr .Version ]
996+ if ok {
997+ m .logVerbosePrintf ("Running post step callback for %v\n " ,
998+ migr .LogString ())
999+
1000+ postStepVersion := int (migr .Version ) + PostStepCallbackOffset
1001+
1002+ // Persist that we are in the post-step phase for this version.
1003+ if err := m .databaseDrv .SetVersion (postStepVersion , true ); err != nil {
1004+ return err
1005+ }
1006+
1007+ err := cb (migr , m .databaseDrv )
1008+ if err != nil {
1009+ // Mark the database version as the postStepVersion but
1010+ // in a clean state, to indicate that the post-step
1011+ // callback errored. We will therefore re-run the
1012+ // post-step callback on the next migration run.
1013+ if setErr := m .databaseDrv .SetVersion (postStepVersion , false ); setErr != nil {
1014+ // Note that if we error here, the database
1015+ // version will remain in a dirty state. As we
1016+ // cannot know if the post-step callback was
1017+ // executed or not in that scenario, manual
1018+ // intervention is required.
1019+ return fmt .Errorf ("WARNING, failed to set " +
1020+ "migration version after post " +
1021+ "migration step errored. Manual " +
1022+ "intervention needed! Post migration " +
1023+ "error: %w, version setting error : %w" ,
1024+ err , setErr )
1025+ }
1026+
1027+ return fmt .Errorf ("failed to execute post step " +
1028+ "callback: %w" , err )
1029+ }
1030+
1031+ m .logVerbosePrintf ("Post step callback finished for %v\n " ,
1032+ migr .LogString ())
1033+ }
1034+
1035+ return nil
1036+ }
1037+
1038+ // executePostStepCallbackForSQLMig executes only the post-step callback for the
1039+ // passed SQL migration version.
1040+ // The function can be used to re-execute the post-step callback for a SQL
1041+ // migration version where the SQL migration was successfully applied, but where
1042+ // the post-step callback failed.
1043+ func (m * Migrate ) executePostStepCallbackForSQLMig (sqlMigVersion int ) error {
1044+ var (
1045+ r interface {}
1046+ migRet = make (chan interface {}, m .PrefetchMigrations )
1047+ err error
1048+ )
1049+
1050+ // Fetch the migration for the specified SQL migration version.
1051+ go m .readSingle (uint (sqlMigVersion ), migRet )
1052+
1053+ select {
1054+ case r = <- migRet :
1055+ case <- time .After (30 * time .Second ):
1056+ return fmt .Errorf ("timeout waiting for single migration " +
1057+ "version %v" , sqlMigVersion )
1058+ }
1059+
1060+ if m .stop () {
1061+ return nil
1062+ }
1063+
1064+ switch r := r .(type ) {
1065+ case * Migration :
1066+ // If the migration was found, execute the post step callback.
1067+ migr := r
1068+
1069+ err = m .executePostStepCallback (migr )
1070+ if err != nil {
1071+ return err
1072+ }
1073+
1074+ m .logVerbosePrintf ("successfully re-executed post step " +
1075+ "callback for SQL migration version: %v\n " ,
1076+ sqlMigVersion )
1077+
1078+ // set clean state
1079+ if err = m .databaseDrv .SetVersion (migr .TargetVersion , false ); err != nil {
1080+ return err
1081+ }
1082+
1083+ return nil
1084+
1085+ case error :
1086+ return fmt .Errorf ("reading SQL migration at version " +
1087+ "%v failed: %w" , sqlMigVersion , r )
1088+
1089+ default :
1090+ return fmt .Errorf ("unknown type: %T when reading " +
1091+ "single migration" , r )
1092+ }
1093+ }
1094+
8661095// versionExists checks the source if either the up or down migration for
8671096// the specified migration version exists.
8681097func (m * Migrate ) versionExists (version uint ) (result error ) {
0 commit comments