Skip to content

Commit 51a6cef

Browse files
committed
feat: leader election
1 parent 3762e45 commit 51a6cef

File tree

4 files changed

+36
-18
lines changed

4 files changed

+36
-18
lines changed

cmd/operator.go

Lines changed: 28 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@ import (
1818
"github.com/flanksource/canary-checker/pkg/controllers"
1919
"github.com/flanksource/canary-checker/pkg/labels"
2020
"github.com/flanksource/commons/logger"
21+
"github.com/flanksource/duty/job"
2122
dutyKubernetes "github.com/flanksource/duty/kubernetes"
23+
"github.com/flanksource/duty/leader"
2224
"github.com/flanksource/duty/shutdown"
2325
"github.com/spf13/cobra"
2426
"go.opentelemetry.io/otel"
@@ -37,10 +39,10 @@ var (
3739
Use: "operator",
3840
Short: "Start the kubernetes operator",
3941
Run: func(cmd *cobra.Command, args []string) {
40-
if err := run(cmd, args); err != nil {
42+
if err := run(); err != nil {
4143
shutdown.ShutdownAndExit(1, err.Error())
4244
} else {
43-
shutdown.ShutdownAndExit(0, err.Error())
45+
shutdown.ShutdownAndExit(0, "")
4446
}
4547
},
4648
}
@@ -56,15 +58,7 @@ func init() {
5658
// +kubebuilder:scaffold:scheme
5759
}
5860

59-
func run(cmd *cobra.Command, args []string) error {
60-
logger := logger.GetLogger("operator")
61-
logger.SetLogLevel(k8sLogLevel)
62-
63-
scheme := runtime.NewScheme()
64-
65-
_ = clientgoscheme.AddToScheme(scheme)
66-
_ = canaryv1.AddToScheme(scheme)
67-
61+
func run() error {
6862
ctx, err := InitContext()
6963
if err != nil {
7064
return err
@@ -78,12 +72,23 @@ func run(cmd *cobra.Command, args []string) error {
7872
return errors.New("operator requires a kubernetes connection")
7973
}
8074

81-
ctx.WithTracer(otel.GetTracerProvider().Tracer("canary-checker"))
75+
ctx.WithTracer(otel.GetTracerProvider().Tracer(app))
8276

8377
apicontext.DefaultContext = ctx.WithNamespace(runner.WatchNamespace)
8478

8579
cache.PostgresCache = cache.NewPostgresCache(apicontext.DefaultContext)
8680

81+
if enableLeaderElection {
82+
job.DisableCronStartOnSchedule()
83+
84+
go func() {
85+
err := leader.Register(ctx, app, runner.WatchNamespace, nil, nil, nil)
86+
if err != nil {
87+
shutdown.ShutdownAndExit(1, fmt.Sprintf("leader election failed: %v", err))
88+
}
89+
}()
90+
}
91+
8792
if runner.OperatorExecutor {
8893
logger.Infof("Starting executors")
8994

@@ -94,6 +99,16 @@ func run(cmd *cobra.Command, args []string) error {
9499
}
95100

96101
go serve()
102+
return startControllers()
103+
}
104+
105+
func startControllers() error {
106+
scheme := runtime.NewScheme()
107+
_ = clientgoscheme.AddToScheme(scheme)
108+
_ = canaryv1.AddToScheme(scheme)
109+
110+
logger := logger.GetLogger("operator")
111+
logger.SetLogLevel(k8sLogLevel)
97112

98113
ctrl.SetLogger(logr.FromSlogHandler(logger.Handler()))
99114
setupLog := ctrl.Log.WithName("setup")
@@ -102,6 +117,7 @@ func run(cmd *cobra.Command, args []string) error {
102117
Scheme: scheme,
103118
LeaderElection: enableLeaderElection,
104119
LeaderElectionNamespace: runner.WatchNamespace,
120+
LeaderElectionID: "fa62cd4d.flanksource.com",
105121
Metrics: ctrlMetrics.Options{
106122
BindAddress: ":0",
107123
},

cmd/root.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,24 +24,26 @@ import (
2424
"go.opentelemetry.io/otel"
2525
)
2626

27+
const app = "canary-checker"
28+
2729
func InitContext() (context.Context, error) {
28-
ctx, closer, err := duty.Start("canary-checker", duty.SkipChangelogMigration, duty.SkipMigrationByDefaultMode)
30+
ctx, closer, err := duty.Start(app, duty.SkipChangelogMigration, duty.SkipMigrationByDefaultMode)
2931
if err != nil {
30-
return ctx, fmt.Errorf("Failed to initialize db: %v", err.Error())
32+
return ctx, fmt.Errorf("failed to initialize db: %v", err.Error())
3133
}
3234
shutdown.AddHook(closer)
3335

3436
if err := properties.LoadFile(propertiesFile); err != nil {
3537
return ctx, fmt.Errorf("failed to load properties: %v", err)
3638
}
3739

38-
ctx.WithTracer(otel.GetTracerProvider().Tracer("canary-checker"))
40+
ctx.WithTracer(otel.GetTracerProvider().Tracer(app))
3941

4042
return ctx, nil
4143
}
4244

4345
var Root = &cobra.Command{
44-
Use: "canary-checker",
46+
Use: app,
4547
PersistentPreRun: func(cmd *cobra.Command, args []string) {
4648
logger.UseSlog()
4749
shutdown.WaitForSignal()

cmd/run.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ var Run = &cobra.Command{
3333
log.Fatalln("Must specify at least one canary")
3434
}
3535

36-
ctx, closer, err := duty.Start("canary-checker", duty.ClientOnly, duty.SkipMigrationByDefaultMode)
36+
ctx, closer, err := duty.Start(app, duty.ClientOnly, duty.SkipMigrationByDefaultMode)
3737
if err != nil {
3838
logger.Fatalf("Failed to initialize db: %v", err.Error())
3939
}

cmd/topology.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ var RunTopology = &cobra.Command{
7474

7575
defer shutdown.Shutdown()
7676
var err error
77-
apicontext.DefaultContext, _, err = duty.Start("canary-checker", duty.ClientOnly, duty.SkipMigrationByDefaultMode)
77+
apicontext.DefaultContext, _, err = duty.Start(app, duty.ClientOnly, duty.SkipMigrationByDefaultMode)
7878
if err != nil {
7979
logger.Errorf(err.Error())
8080
return

0 commit comments

Comments
 (0)