Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 81 additions & 19 deletions cmd/ci-operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"io"
"io/fs"
"math/rand"
"net"
"net/http"
"os"
"os/exec"
"path"
Expand Down Expand Up @@ -956,17 +958,60 @@ func (o *options) Report(errs ...error) {
}
}

func (o *options) Run() []error {
func (o *options) startHTTPServer(cancel func(), srv *http.Server) error {
srvIP := os.Getenv(api.CIOperatorHTTPServerIPEnvVarName)
if srvIP == "" {
return nil
}

ipAndPort := srvIP + ":" + strconv.Itoa(api.CIOperatorHTTPServerPort)
ln, err := net.Listen("tcp", ipAndPort)
if err != nil {
return fmt.Errorf("listen tcp on %s: %w", ipAndPort, err)
}

go func() {
logrus.Infof("Running the http server at %s", ipAndPort)
if err := srv.Serve(ln); !errors.Is(err, http.ErrServerClosed) {
logrus.WithError(err).Error("HTTP server has been aborted unexpectedly. Terminating.")
cancel()
}
}()

return nil
}

func (o *options) Run() (errs []error) {
start := time.Now()
var srv *http.Server

defer func() {
logrus.Infof("Ran for %s", time.Since(start).Truncate(time.Second))
o.metricsAgent.Stop()
if srv != nil {
if err := srv.Close(); err != nil {
errs = append(errs, fmt.Errorf("close http server: %w", err))
}
}
}()

ctx, cancel := context.WithCancel(context.Background())
handler := func(s os.Signal) {
logrus.Infof("error: Process interrupted with signal %s, cancelling execution...", s)
cancel()
}

srvMux := http.NewServeMux()
srv = &http.Server{
Handler: srvMux,
BaseContext: func(net.Listener) context.Context { return ctx },
}

if err := o.startHTTPServer(cancel, srv); err != nil {
errs = append(errs, fmt.Errorf("run http server: %w", err))
return
}

var leaseClient *lease.Client
if o.isLeaseClientAvailable() {
leaseClient = &o.leaseClient
Expand All @@ -976,69 +1021,81 @@ func (o *options) Run() []error {

streams, err := integratedStreams(o.configSpec, o.resolverClient, o.clusterConfig)
if err != nil {
return []error{results.ForReason("config_resolver").WithError(err).Errorf("failed to generate integrated streams: %v", err)}
errs = append(errs, results.ForReason("config_resolver").WithError(err).Errorf("failed to generate integrated streams: %v", err))
return
}

client, err := coreclientset.NewForConfig(o.clusterConfig)
if err != nil {
return []error{fmt.Errorf("could not get core client for cluster config: %w", err)}
errs = append(errs, fmt.Errorf("could not get core client for cluster config: %w", err))
return
}

nodeArchitectures, err := resolveNodeArchitectures(ctx, client.Nodes())
if err != nil {
return []error{fmt.Errorf("could not resolve the node architectures: %w", err)}
errs = append(errs, fmt.Errorf("could not resolve the node architectures: %w", err))
return
}

cfg := o.ToGraphConfig()
cfg.LeaseClient = leaseClient
cfg.NodeArchitectures = nodeArchitectures
cfg.IntegratedStreams = streams
cfg.InjectedTest = o.injectTest != ""
cfg.HTTPServerMux = srvMux
// load the graph from the configuration
buildSteps, promotionSteps, err := defaults.FromConfig(ctx, cfg)
if err != nil {
return []error{results.ForReason("defaulting_config").WithError(err).Errorf("failed to generate steps from config: %v", err)}
errs = append(errs, results.ForReason("defaulting_config").WithError(err).Errorf("failed to generate steps from config: %v", err))
return
}

// Before we create the namespace, we need to ensure all inputs to the graph
// have been resolved. We must run this step before we resolve the partial
// graph or otherwise two jobs with different targets would create different
// artifact caches.
if err := o.resolveInputs(buildSteps); err != nil {
return []error{results.ForReason("resolving_inputs").WithError(err).Errorf("could not resolve inputs: %v", err)}
errs = append(errs, results.ForReason("resolving_inputs").WithError(err).Errorf("could not resolve inputs: %v", err))
return
}

if err := o.writeMetadataJSON(); err != nil {
return []error{fmt.Errorf("unable to write metadata.json for build: %w", err)}
errs = append(errs, fmt.Errorf("unable to write metadata.json for build: %w", err))
return
}
// convert the full graph into the subset we must run
nodes, err := api.BuildPartialGraph(buildSteps, o.targets.values)
if err != nil {
return []error{results.ForReason("building_graph").WithError(err).Errorf("could not build execution graph: %v", err)}
errs = append(errs, results.ForReason("building_graph").WithError(err).Errorf("could not build execution graph: %v", err))
return
}

// Resolve which of the steps should enable multi arch based on the graph build steps.
api.ResolveMultiArch(nodes)

stepList, errs := nodes.TopologicalSort()
if errs != nil {
return append([]error{results.ForReason("building_graph").ForError(errors.New("could not sort nodes"))}, errs...)
stepList, sortErrs := nodes.TopologicalSort()
if len(sortErrs) > 0 {
errs = append(errs, append([]error{results.ForReason("building_graph").ForError(errors.New("could not sort nodes"))}, sortErrs...)...)
return
}
logrus.Infof("Running %s", strings.Join(nodeNames(stepList), ", "))
if o.printGraph {
if err := printDigraph(os.Stdout, stepList); err != nil {
return []error{fmt.Errorf("could not print graph: %w", err)}
errs = append(errs, fmt.Errorf("could not print graph: %w", err))
return
}
return nil
return
}
graph, errs := calculateGraph(stepList)
if errs != nil {
return errs
graph, calculateGraphErrs := calculateGraph(stepList)
if len(calculateGraphErrs) > 0 {
errs = append(errs, calculateGraphErrs...)
return
}
defer func() {
serializedGraph, err := json.Marshal(graph)
if err != nil {
logrus.WithError(err).Error("Failed to marshal graph")
errs = append(errs, fmt.Errorf("failed to marshal graph: %w", err))
return
}

Expand All @@ -1047,13 +1104,14 @@ func (o *options) Run() []error {
// initialize the namespace if necessary and create any resources that must
// exist prior to execution
if err := o.initializeNamespace(); err != nil {
return []error{results.ForReason("initializing_namespace").WithError(err).Errorf("could not initialize namespace: %v", err)}
errs = append(errs, results.ForReason("initializing_namespace").WithError(err).Errorf("could not initialize namespace: %v", err))
return
}
o.metricsAgent.Record(metrics.NewInsightsEvent(metrics.InsightNamespaceCreated, metrics.Context{"namespace": o.namespace}))
info := o.getResolverInfo(o.jobSpec)
o.metricsAgent.RecordConfigurationInsight(o.targets.values, o.promote, info.Org, info.Repo, info.Branch, info.Variant, o.baseNamespace, o.consoleHost, o.nodeName, o.clusterProfiles)

return interrupt.New(handler, o.saveNamespaceArtifacts).Run(func() []error {
if runErrs := interrupt.New(handler, o.saveNamespaceArtifacts).Run(func() []error {
if leaseClient != nil {
if err := o.initializeLeaseClient(); err != nil {
return []error{fmt.Errorf("failed to create the lease client: %w", err)}
Expand Down Expand Up @@ -1114,7 +1172,11 @@ func (o *options) Run() []error {
o.metricsAgent.Record(metrics.NewInsightsEvent(metrics.InsightExecutionCompleted, metrics.Context{"duration_seconds": time.Since(start).Seconds(), "success": true}))

return nil
})
}); len(runErrs) > 0 {
errs = append(errs, runErrs...)
}

return
}

// determineSkippedImages determines which images can be skipped when
Expand Down
3 changes: 2 additions & 1 deletion pkg/api/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ const (
NestedPodmanSCC = "nested-podman"
NestedPodmanClusterRole = "nested-podman-creater"

CIOperatorHTTPServerPort = 8080
CIOperatorHTTPServerIPEnvVarName = "HTTP_SERVER_IP"
CIOperatorHTTPServerPort = 8080
)

var (
Expand Down
3 changes: 3 additions & 0 deletions pkg/defaults/config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package defaults

import (
"net/http"
"time"

coreapi "k8s.io/api/core/v1"
Expand Down Expand Up @@ -49,6 +50,8 @@ type Config struct {
MetricsAgent *metrics.MetricsAgent
SkippedImages sets.Set[string]
params *api.DeferredParameters

HTTPServerMux *http.ServeMux
}

type Clients struct {
Expand Down
3 changes: 2 additions & 1 deletion pkg/prowgen/podspec.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
kerrors "k8s.io/apimachinery/pkg/util/errors"

"github.com/openshift/ci-tools/pkg/api"
cioperatorapi "github.com/openshift/ci-tools/pkg/api"
"github.com/openshift/ci-tools/pkg/steps/utils"
)
Expand Down Expand Up @@ -475,7 +476,7 @@ var (
}

smallHTTPServerEnv = corev1.EnvVar{
Name: "HTTP_SERVER_IP",
Name: api.CIOperatorHTTPServerIPEnvVarName,
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "status.podIP",
Expand Down