Skip to content
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
bf31b97
added route in gc and initial controller
meethereum Sep 13, 2025
d0c8769
added initial status reporting logic
meethereum Sep 29, 2025
3a194ca
refactor : minor fixes and todos
meethereum Sep 29, 2025
b013bd1
fix : bug in extracting satellite name and default interval for state…
meethereum Sep 29, 2025
c62c251
lint fix
meethereum Sep 29, 2025
ac5e332
fix leaky goroutines in reporting process
meethereum Sep 29, 2025
e9c3bf4
use existing start function
meethereum Sep 29, 2025
4e15f4c
cleanup : reporting_process.go
meethereum Oct 7, 2025
25d073e
eliminate channels and hold state in a single variable
meethereum Oct 8, 2025
a0eb13c
fix : go.mod
meethereum Oct 8, 2025
d9e61b2
Merge branch 'container-registry:main' into status-reporting
meethereum Oct 8, 2025
37e6419
chore : give 2xx status codes and timeout for heartbeat request to gc
meethereum Oct 8, 2025
763fdc2
Merge branch 'status-reporting' of https://github.com/meethereum/harb…
meethereum Oct 8, 2025
fa1f204
lint
meethereum Oct 8, 2025
4105d3c
fix : update dagger version in dagger.json
meethereum Oct 8, 2025
4892862
chore : update config structuring
meethereum Oct 16, 2025
fb8eefd
fix : ground control go.mod and go.sum
meethereum Oct 20, 2025
1801f2c
fix: dagger build pipeline
meethereum Oct 20, 2025
da1effd
fix : dagger release pipeline
meethereum Oct 23, 2025
9d8be02
added docs
meethereum Oct 23, 2025
71cd308
fix : heartbeat pretty print
meethereum Oct 27, 2025
b7a233f
chore : consistent error handling
meethereum Oct 27, 2025
f94b272
chore : hot reload fix
meethereum Oct 27, 2025
d127f52
better variable naming
meethereum Oct 27, 2025
e5436be
fix : logging message in hot reload
meethereum Oct 27, 2025
c1ad01b
fix : conditional startup of scheduler
meethereum Oct 27, 2025
91fdf62
revert breaking change
meethereum Oct 27, 2025
d4f9134
change default heartbeat interval
meethereum Oct 28, 2025
ab13639
doc updates
meethereum Oct 28, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ground-control/internal/server/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,6 @@ func (s *Server) RegisterRoutes() http.Handler {
r.HandleFunc("/satellites/{satellite}", s.GetSatelliteByName).Methods("GET") // Get specific satellite
r.HandleFunc("/satellites/{satellite}", s.DeleteSatelliteByName).Methods("DELETE") // Delete specific satellite
// r.HandleFunc("/satellites/{satellite}/images", s.GetImagesForSatellite).Methods("GET") // Get satellite images

r.HandleFunc("/satellites/status",s.statusReportHandler).Methods("POST")
return r
}
25 changes: 25 additions & 0 deletions ground-control/internal/server/satellite_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"net/http"
"os"
"strconv"
"time"

"github.com/container-registry/harbor-satellite/ground-control/internal/database"
"github.com/container-registry/harbor-satellite/ground-control/internal/utils"
Expand All @@ -25,6 +26,18 @@ type RegisterSatelliteParams struct {
ConfigName string `json:"config_name"`
}

type SatelliteStatusParams struct {
Name string `json:"name"` // Satellite identifier
Activity string `json:"activity"` // Current activity satellite is doing
StateReportInterval string `json:"state_report_interval"` // Interval between status reports
LatestStateDigest string `json:"latest_state_digest"` // Digest of latest state artifact
LatestConfigDigest string `json:"latest_config_digest"` // Digest of latest config artifact
MemoryUsedBytes uint64 `json:"memory_used_bytes"` // Memory currently used by satellite
StorageUsedBytes uint64 `json:"storage_used_bytes"` // Storage currently used by satellite
CPUPercent float64 `json:"cpu_percent"` // CPU usage percentage
RequestCreatedTime time.Time `json:"request_created_time"` // Timestamp of request creation
}

type RegisterSatelliteResponse struct {
Token string `json:"token"`
}
Expand Down Expand Up @@ -340,6 +353,18 @@ func (s *Server) listSatelliteHandler(w http.ResponseWriter, r *http.Request) {
WriteJSONResponse(w, http.StatusOK, result)
}

func (s *Server) statusReportHandler(w http.ResponseWriter, r *http.Request) {
var req SatelliteStatusParams
if err := DecodeRequestBody(r, &req); err != nil {
log.Println(err)
HandleAppError(w, err)
return
}
// todo : process the heartbeat. eg:- save latest state in db
fmt.Printf("satellite reported status : %v", req)

}

func (s *Server) GetSatelliteByName(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
satellite := vars["satellite"]
Expand Down
25 changes: 24 additions & 1 deletion internal/satellite/satellite.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package satellite

import (
"context"

"github.com/container-registry/harbor-satellite/internal/logger"
"github.com/container-registry/harbor-satellite/internal/scheduler"
"github.com/container-registry/harbor-satellite/internal/state"
Expand All @@ -24,16 +23,19 @@ func NewSatellite(cm *config.ConfigManager) *Satellite {
func (s *Satellite) Run(ctx context.Context) error {
log := logger.FromContext(ctx)
log.Info().Msg("Starting Satellite")
var heartbeatPayload scheduler.UpstreamInfo

fetchAndReplicateStateProcess := state.NewFetchAndReplicateStateProcess(s.cm)
ztrProcess := state.NewZtrProcess(s.cm)
statusReportingProcess := state.NewStatusReportingProcess(s.cm)

// Create schedulers instead of using ScheduleFunc
if !s.cm.IsZTRDone() {
ztrScheduler, err := scheduler.NewSchedulerWithInterval(
s.cm.GetRegistrationInterval(),
ztrProcess,
log,
&heartbeatPayload,
)
if err != nil {
log.Error().Err(err).Msg("Failed to create ZTR scheduler")
Expand All @@ -48,14 +50,35 @@ func (s *Satellite) Run(ctx context.Context) error {
s.cm.GetStateReplicationInterval(),
fetchAndReplicateStateProcess,
log,
&heartbeatPayload,
)

if err != nil {
log.Error().Err(err).Msg("Failed to create state replication scheduler")
return err
}
s.schedulers = append(s.schedulers, stateScheduler)

// Create state reporting
statusReportingScheduler, err := scheduler.NewSchedulerWithInterval(
s.cm.GetStateReportingInterval(),
statusReportingProcess,
log,
&heartbeatPayload,
)

if err != nil {
log.Error().Err(err).Msg("Failed to create status reporting scheduler")
return err
}
s.schedulers = append(s.schedulers, statusReportingScheduler)

go stateScheduler.Run(ctx)

if !(s.cm.IsHeartbeatDisabled()) {
go statusReportingScheduler.Run(ctx)
}

return ctx.Err()
}

Expand Down
2 changes: 1 addition & 1 deletion internal/scheduler/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ type Process interface {
Name() string

// Execute runs the process
Execute(ctx context.Context) error
Execute(ctx context.Context, upstreamPayload *UpstreamInfo) error

// IsRunning returns true if the process is running
IsRunning() bool
Expand Down
47 changes: 31 additions & 16 deletions internal/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,30 +10,45 @@ import (
"github.com/rs/zerolog"
)

type UpstreamInfo struct {
LatestStateDigest string
LatestConfigDigest string
CurrentActivity string
StateURL string
}

// Scheduler manages the execution of processes with configurable intervals
type Scheduler struct {
name string
ticker *time.Ticker
process Process
log *zerolog.Logger
interval time.Duration
mu sync.Mutex
name string
ticker *time.Ticker
process Process
log *zerolog.Logger
interval time.Duration
mu sync.Mutex
upstreamPayload *UpstreamInfo
}

const (
ActivityStateSynced = "state synced successfully"
ActivityEncounteredError = "encountered error"
ActivityReconcilingState = "reconciling state"
)

// NewSchedulerWithInterval creates a new scheduler with a parsed interval string
func NewSchedulerWithInterval(intervalExpr string, process Process, log *zerolog.Logger) (*Scheduler, error) {
duration, err := parseEveryExpr(intervalExpr)
func NewSchedulerWithInterval(intervalExpr string, process Process, log *zerolog.Logger, upstreamPayload *UpstreamInfo) (*Scheduler, error) {
duration, err := ParseEveryExpr(intervalExpr)
if err != nil {
return nil, fmt.Errorf("failed to parse interval: %w", err)
}

ticker := time.NewTicker(duration)
scheduler := &Scheduler{
name: process.Name(),
ticker: ticker,
process: process,
log: log,
interval: duration,
name: process.Name(),
ticker: ticker,
process: process,
log: log,
interval: duration,
upstreamPayload: upstreamPayload,
}

return scheduler, nil
Expand Down Expand Up @@ -83,7 +98,7 @@ func (s *Scheduler) ResetInterval(newInterval time.Duration) {

// ResetIntervalFromExpr changes the ticker interval using an expression string
func (s *Scheduler) ResetIntervalFromExpr(intervalExpr string) error {
duration, err := parseEveryExpr(intervalExpr)
duration, err := ParseEveryExpr(intervalExpr)
if err != nil {
return fmt.Errorf("failed to parse interval: %w", err)
}
Expand Down Expand Up @@ -119,7 +134,7 @@ func (s *Scheduler) launchProcess(ctx context.Context) {
Msg("Scheduler triggering task execution")

go func() {
if err := s.process.Execute(ctx); err != nil {
if err := s.process.Execute(ctx, s.upstreamPayload); err != nil {
s.log.Warn().
Str("Process", s.process.Name()).
Err(err).
Expand All @@ -133,7 +148,7 @@ func (s *Scheduler) launchProcess(ctx context.Context) {
}
}

func parseEveryExpr(expr string) (time.Duration, error) {
func ParseEveryExpr(expr string) (time.Duration, error) {
const prefix = "@every "
if expr == "" {
return 0, fmt.Errorf("empty expression provided")
Expand Down
4 changes: 3 additions & 1 deletion internal/state/registration_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"sync"

"github.com/container-registry/harbor-satellite/internal/logger"
"github.com/container-registry/harbor-satellite/internal/scheduler"
"github.com/container-registry/harbor-satellite/pkg/config"
"github.com/rs/zerolog"
)
Expand Down Expand Up @@ -38,7 +39,7 @@ func NewZtrProcess(cm *config.ConfigManager) *ZtrProcess {
}
}

func (z *ZtrProcess) Execute(ctx context.Context) error {
func (z *ZtrProcess) Execute(ctx context.Context, upstreamPayload *scheduler.UpstreamInfo) error {
z.start()
defer z.stop()

Expand Down Expand Up @@ -72,6 +73,7 @@ func (z *ZtrProcess) Execute(ctx context.Context) error {

// Close the z.Done channel on successful ZTR alone.
close(z.Done)

return nil
}

Expand Down
112 changes: 112 additions & 0 deletions internal/state/report.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package state

import (
"context"
"fmt"
"math"
"net/url"
"strings"
"time"

"github.com/shirou/gopsutil/v4/cpu"
"github.com/shirou/gopsutil/v4/disk"
"github.com/shirou/gopsutil/v4/mem"
)

type StatusReportParams struct {
Name string `json:"name"` // Satellite identifier
Activity string `json:"activity"` // Current activity satellite is doing
StateReportInterval string `json:"state_report_interval"` // Interval between status reports
LatestStateDigest string `json:"latest_state_digest"` // Digest of latest state artifact
LatestConfigDigest string `json:"latest_config_digest"` // Digest of latest config artifact
MemoryUsedBytes uint64 `json:"memory_used_bytes"` // Memory currently used by satellite
StorageUsedBytes uint64 `json:"storage_used_bytes"` // Storage currently used by satellite
CPUPercent float64 `json:"cpu_percent"` // CPU usage percentage
RequestCreatedTime time.Time `json:"request_created_time"` // Timestamp of request creation
}

func collectStatusReportParams(ctx context.Context, duration time.Duration, req *StatusReportParams) error {
cpuPercent, err := getAvgCpuUsage(ctx, 1*time.Second, duration)
if err != nil {
return err
}

memUsed, err := getMemoryUsedBytes(ctx)
if err != nil {
return err
}

storUsed, err := getStorageUsedBytes(ctx, "/")
if err != nil {
return err
}

req.CPUPercent = cpuPercent
req.MemoryUsedBytes = memUsed
req.StorageUsedBytes = storUsed
req.RequestCreatedTime = time.Now()

return nil
}

func getAvgCpuUsage(ctx context.Context, sampleInterval time.Duration, totalDuration time.Duration) (float64, error) {
var sum float64
var count int

ticker := time.NewTicker(sampleInterval)
defer ticker.Stop()

timeout := time.After(totalDuration)

for {
select {
case <-ctx.Done():
return 0, ctx.Err()
case <-timeout:
if count == 0 {
return 0, fmt.Errorf("no samples collected")
}
avg := sum / float64(count)
return math.Round(avg*100) / 100, nil
case <-ticker.C:
percent, err := cpu.PercentWithContext(ctx, 0, false)
if err != nil {
continue
}
if len(percent) > 0 {
sum += percent[0]
count++
}
}
}
}

func getStorageUsedBytes(ctx context.Context, path string) (uint64, error) {
usageStat, err := disk.UsageWithContext(ctx, path)
if err != nil {
return 0, fmt.Errorf("failed to get storage used: %w", err)
}
return usageStat.Used, nil
}

func getMemoryUsedBytes(ctx context.Context) (uint64, error) {
vmStat, err := mem.VirtualMemoryWithContext(ctx)
if err != nil {
return 0, fmt.Errorf("failed to get memory used: %w", err)
}
return vmStat.Used, nil
}

func extractSatelliteNameFromURL(stateURL string) (string, error) {
u, err := url.Parse(stateURL)
if err != nil {
return "", fmt.Errorf("invalid state URL %q: %w", stateURL, err)
}

parts := strings.FieldsFunc(u.Path, func(r rune) bool { return r == '/' })
if len(parts) < 4 {
return "", fmt.Errorf("state URL %q does not have enough path segments to extract satellite name", stateURL)
}

return parts[2], nil
}
Loading
Loading