Skip to content
Merged

opt #6747

Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,7 @@ github.com/google/s2a-go v0.1.9 h1:LGD7gtMgezd8a/Xak7mEWL0PjoTQFvpRudN895yqKW0=
github.com/google/s2a-go v0.1.9/go.mod h1:YA0Ei2ZQL3acow2O62kdp9UlnvMmU7kA6Eutn0dXayM=
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 h1:El6M4kTTCOh6aBiKaUGG7oYTSPP8MxqL4YI3kZKwcP4=
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510/go.mod h1:pupxD2MaaD3pAXIBCelhxNneeOaAeabZDe5s4K6zSpQ=
github.com/google/subcommands v1.2.0 h1:vWQspBTo2nEqTUFita5/KeEWlUL8kQObDFbub/EN9oE=
github.com/google/subcommands v1.2.0/go.mod h1:ZjhPrFU+Olkh9WazFPsl27BQ4UPiG37m3yTrtFlrHVk=
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
Expand Down
24 changes: 19 additions & 5 deletions pkg/pipeline/CiHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ import (
"context"
"errors"
"fmt"
"regexp"
"slices"
"strconv"
"strings"
"time"

"github.com/devtron-labs/common-lib/utils"
"github.com/devtron-labs/common-lib/utils/workFlow"
cdWorkflowBean "github.com/devtron-labs/devtron/internal/sql/repository/pipelineConfig/bean/workflow/cdWorkflow"
Expand All @@ -30,11 +36,6 @@ import (
"github.com/devtron-labs/devtron/pkg/pipeline/constants"
"github.com/devtron-labs/devtron/pkg/pipeline/workflowStatus"
"github.com/devtron-labs/devtron/pkg/workflow/status/workflowStatusLatest"
"regexp"
"slices"
"strconv"
"strings"
"time"

"github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
bean2 "github.com/devtron-labs/devtron/api/bean"
Expand Down Expand Up @@ -601,6 +602,19 @@ func (impl *CiHandlerImpl) UpdateWorkflow(workflowStatus eventProcessorBean.CiCd
impl.Logger.Error("update wf failed for id " + strconv.Itoa(savedWorkflow.Id))
return savedWorkflow.Id, true, err
}

// Update latest status table for CI workflow
// Check if CiPipeline is loaded, if not pass 0 as appId to let the function fetch it
appId := 0
if savedWorkflow.CiPipeline != nil {
appId = savedWorkflow.CiPipeline.AppId
}
err = impl.workflowStatusUpdateService.UpdateCiWorkflowStatusLatest(savedWorkflow.CiPipelineId, appId, savedWorkflow.Id, savedWorkflow.TriggeredBy)
if err != nil {
impl.Logger.Errorw("error in updating ci workflow status latest", "err", err, "pipelineId", savedWorkflow.CiPipelineId, "workflowId", savedWorkflow.Id)
// Don't return error here as the main workflow update was successful
}

impl.sendCIFailEvent(savedWorkflow, status, message)
return savedWorkflow.Id, true, nil
}
Expand Down
56 changes: 43 additions & 13 deletions pkg/pipeline/CiService.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/devtron-labs/devtron/pkg/pipeline/types"
"github.com/devtron-labs/devtron/pkg/pipeline/workflowStatus"
"github.com/devtron-labs/devtron/pkg/sql"
"github.com/devtron-labs/devtron/pkg/workflow/status/workflowStatusLatest"
util3 "github.com/devtron-labs/devtron/util"
util2 "github.com/devtron-labs/devtron/util/event"
"go.uber.org/zap"
Expand All @@ -39,28 +40,31 @@ type CiService interface {
}

type CiServiceImpl struct {
Logger *zap.SugaredLogger
workflowStageStatusService workflowStatus.WorkFlowStageStatusService
eventClient client.EventClient
eventFactory client.EventFactory
config *types.CiConfig
ciWorkflowRepository pipelineConfig.CiWorkflowRepository
transactionManager sql.TransactionWrapper
Logger *zap.SugaredLogger
workflowStageStatusService workflowStatus.WorkFlowStageStatusService
eventClient client.EventClient
eventFactory client.EventFactory
config *types.CiConfig
ciWorkflowRepository pipelineConfig.CiWorkflowRepository
transactionManager sql.TransactionWrapper
workflowStatusUpdateService workflowStatusLatest.WorkflowStatusUpdateService
}

func NewCiServiceImpl(Logger *zap.SugaredLogger,
workflowStageStatusService workflowStatus.WorkFlowStageStatusService, eventClient client.EventClient,
eventFactory client.EventFactory,
ciWorkflowRepository pipelineConfig.CiWorkflowRepository,
transactionManager sql.TransactionWrapper,
workflowStatusUpdateService workflowStatusLatest.WorkflowStatusUpdateService,
) *CiServiceImpl {
cis := &CiServiceImpl{
Logger: Logger,
workflowStageStatusService: workflowStageStatusService,
eventClient: eventClient,
eventFactory: eventFactory,
ciWorkflowRepository: ciWorkflowRepository,
transactionManager: transactionManager,
Logger: Logger,
workflowStageStatusService: workflowStageStatusService,
eventClient: eventClient,
eventFactory: eventFactory,
ciWorkflowRepository: ciWorkflowRepository,
transactionManager: transactionManager,
workflowStatusUpdateService: workflowStatusUpdateService,
}
config, err := types.GetCiConfig()
if err != nil {
Expand Down Expand Up @@ -136,6 +140,19 @@ func (impl *CiServiceImpl) SaveCiWorkflowWithStage(wf *pipelineConfig.CiWorkflow
impl.Logger.Errorw("error in committing transaction", "workflowName", wf.Name, "error", err)
return err
}

// Update latest status table for CI workflow
// Check if CiPipeline is loaded, if not pass 0 as appId to let the function fetch it
appId := 0
if wf.CiPipeline != nil {
appId = wf.CiPipeline.AppId
}
err = impl.workflowStatusUpdateService.UpdateCiWorkflowStatusLatest(wf.CiPipelineId, appId, wf.Id, wf.TriggeredBy)
if err != nil {
impl.Logger.Errorw("error in updating ci workflow status latest", "err", err, "pipelineId", wf.CiPipelineId, "workflowId", wf.Id)
// Don't return error here as the main workflow update was successful
}

return nil

}
Expand Down Expand Up @@ -172,6 +189,19 @@ func (impl *CiServiceImpl) UpdateCiWorkflowWithStage(wf *pipelineConfig.CiWorkfl
impl.Logger.Errorw("error in committing transaction", "workflowName", wf.Name, "error", err)
return err
}

// Update latest status table for CI workflow
// Check if CiPipeline is loaded, if not pass 0 as appId to let the function fetch it
appId := 0
if wf.CiPipeline != nil {
appId = wf.CiPipeline.AppId
}
err = impl.workflowStatusUpdateService.UpdateCiWorkflowStatusLatest(wf.CiPipelineId, appId, wf.Id, wf.TriggeredBy)
if err != nil {
impl.Logger.Errorw("error in updating ci workflow status latest", "err", err, "pipelineId", wf.CiPipelineId, "workflowId", wf.Id)
// Don't return error here as the main workflow save was successful
}

return nil

}
69 changes: 59 additions & 10 deletions pkg/workflow/cd/CdWorkflowRunnerService.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/devtron-labs/devtron/pkg/sql"
"github.com/devtron-labs/devtron/pkg/workflow/cd/adapter"
"github.com/devtron-labs/devtron/pkg/workflow/cd/bean"
"github.com/devtron-labs/devtron/pkg/workflow/status/workflowStatusLatest"
"github.com/devtron-labs/devtron/util"
"github.com/go-pg/pg"
"go.uber.org/zap"
Expand All @@ -43,22 +44,25 @@ type CdWorkflowRunnerService interface {
}

type CdWorkflowRunnerServiceImpl struct {
logger *zap.SugaredLogger
cdWorkflowRepository pipelineConfig.CdWorkflowRepository
workflowStageService workflowStatus.WorkFlowStageStatusService
transactionManager sql.TransactionWrapper
config *types.CiConfig
logger *zap.SugaredLogger
cdWorkflowRepository pipelineConfig.CdWorkflowRepository
workflowStageService workflowStatus.WorkFlowStageStatusService
transactionManager sql.TransactionWrapper
config *types.CiConfig
workflowStatusUpdateService workflowStatusLatest.WorkflowStatusUpdateService
}

func NewCdWorkflowRunnerServiceImpl(logger *zap.SugaredLogger,
cdWorkflowRepository pipelineConfig.CdWorkflowRepository,
workflowStageService workflowStatus.WorkFlowStageStatusService,
transactionManager sql.TransactionWrapper) *CdWorkflowRunnerServiceImpl {
transactionManager sql.TransactionWrapper,
workflowStatusUpdateService workflowStatusLatest.WorkflowStatusUpdateService) *CdWorkflowRunnerServiceImpl {
impl := &CdWorkflowRunnerServiceImpl{
logger: logger,
cdWorkflowRepository: cdWorkflowRepository,
workflowStageService: workflowStageService,
transactionManager: transactionManager,
logger: logger,
cdWorkflowRepository: cdWorkflowRepository,
workflowStageService: workflowStageService,
transactionManager: transactionManager,
workflowStatusUpdateService: workflowStatusUpdateService,
}
ciConfig, err := types.GetCiConfig()
if err != nil {
Expand All @@ -76,6 +80,21 @@ func (impl *CdWorkflowRunnerServiceImpl) UpdateWfr(dto *bean.CdWorkflowRunnerDto
impl.logger.Errorw("error in updating runner status in db", "runnerId", runnerDbObj.Id, "err", err)
return err
}

// Update latest status table for CD workflow
// Check if CdWorkflow and Pipeline are loaded, if not pass 0 as appId/environmentId to let the function fetch them
appId := 0
environmentId := 0
if runnerDbObj.CdWorkflow != nil && runnerDbObj.CdWorkflow.Pipeline != nil {
appId = runnerDbObj.CdWorkflow.Pipeline.AppId
environmentId = runnerDbObj.CdWorkflow.Pipeline.EnvironmentId
}
err = impl.workflowStatusUpdateService.UpdateCdWorkflowStatusLatest(runnerDbObj.CdWorkflow.PipelineId, appId, environmentId, runnerDbObj.Id, runnerDbObj.WorkflowType.String(), int32(updatedBy))
if err != nil {
impl.logger.Errorw("error in updating cd workflow status latest", "err", err, "pipelineId", runnerDbObj.CdWorkflow.PipelineId, "workflowRunnerId", runnerDbObj.Id)
// Don't return error here as the main workflow update was successful
}

return nil
}

Expand Down Expand Up @@ -122,6 +141,21 @@ func (impl *CdWorkflowRunnerServiceImpl) SaveCDWorkflowRunnerWithStage(wfr *pipe
impl.logger.Errorw("error in committing transaction", "workflowName", wfr.Name, "error", err)
return wfr, err
}

// Update latest status table for CD workflow
// Check if CdWorkflow and Pipeline are loaded, if not pass 0 as appId/environmentId to let the function fetch them
appId := 0
environmentId := 0
if wfr.CdWorkflow != nil && wfr.CdWorkflow.Pipeline != nil {
appId = wfr.CdWorkflow.Pipeline.AppId
environmentId = wfr.CdWorkflow.Pipeline.EnvironmentId
}
err = impl.workflowStatusUpdateService.UpdateCdWorkflowStatusLatest(wfr.CdWorkflow.PipelineId, appId, environmentId, wfr.Id, wfr.WorkflowType.String(), wfr.TriggeredBy)
if err != nil {
impl.logger.Errorw("error in updating cd workflow status latest", "err", err, "pipelineId", wfr.CdWorkflow.PipelineId, "workflowRunnerId", wfr.Id)
// Don't return error here as the main workflow save was successful
}

return wfr, nil
}

Expand Down Expand Up @@ -159,6 +193,21 @@ func (impl *CdWorkflowRunnerServiceImpl) UpdateCdWorkflowRunnerWithStage(wfr *pi
impl.logger.Errorw("error in committing transaction", "workflowName", wfr.Name, "error", err)
return err
}

// Update latest status table for CD workflow
// Check if CdWorkflow and Pipeline are loaded, if not pass 0 as appId/environmentId to let the function fetch them
appId := 0
environmentId := 0
if wfr.CdWorkflow != nil && wfr.CdWorkflow.Pipeline != nil {
appId = wfr.CdWorkflow.Pipeline.AppId
environmentId = wfr.CdWorkflow.Pipeline.EnvironmentId
}
err = impl.workflowStatusUpdateService.UpdateCdWorkflowStatusLatest(wfr.CdWorkflow.PipelineId, appId, environmentId, wfr.Id, wfr.WorkflowType.String(), wfr.TriggeredBy)
if err != nil {
impl.logger.Errorw("error in updating cd workflow status latest", "err", err, "pipelineId", wfr.CdWorkflow.PipelineId, "workflowRunnerId", wfr.Id)
// Don't return error here as the main workflow update was successful
}

return nil

}
Expand Down
40 changes: 35 additions & 5 deletions pkg/workflow/dag/WorkflowDagExecutor.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ import (
"context"
"encoding/json"
"fmt"
"net/http"
"strings"
"sync"
"time"

"github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
"github.com/devtron-labs/common-lib/async"
"github.com/devtron-labs/common-lib/utils"
Expand Down Expand Up @@ -71,10 +76,6 @@ import (
util2 "github.com/devtron-labs/devtron/util/event"
errors2 "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/client-go/rest"
"net/http"
"strings"
"sync"
"time"

"github.com/devtron-labs/devtron/api/bean"
"github.com/devtron-labs/devtron/internal/sql/models"
Expand All @@ -88,6 +89,7 @@ import (
repository4 "github.com/devtron-labs/devtron/pkg/pipeline/repository"
"github.com/devtron-labs/devtron/pkg/pipeline/types"
serverBean "github.com/devtron-labs/devtron/pkg/server/bean"
"github.com/devtron-labs/devtron/pkg/workflow/status/workflowStatusLatest"
util4 "github.com/devtron-labs/devtron/util"
"github.com/devtron-labs/devtron/util/rbac"
"github.com/go-pg/pg"
Expand Down Expand Up @@ -158,7 +160,8 @@ type WorkflowDagExecutorImpl struct {
workflowService executor.WorkflowService
ciHandlerService trigger.HandlerService
workflowTriggerAuditService auditService.WorkflowTriggerAuditService
fluxApplicationService fluxApplication.FluxApplicationService
fluxApplicationService fluxApplication.FluxApplicationService
workflowStatusUpdateService workflowStatusLatest.WorkflowStatusUpdateService
}

func NewWorkflowDagExecutorImpl(Logger *zap.SugaredLogger, pipelineRepository pipelineConfig.PipelineRepository,
Expand Down Expand Up @@ -195,6 +198,7 @@ func NewWorkflowDagExecutorImpl(Logger *zap.SugaredLogger, pipelineRepository pi
ciHandlerService trigger.HandlerService,
workflowTriggerAuditService auditService.WorkflowTriggerAuditService,
fluxApplicationService fluxApplication.FluxApplicationService,
workflowStatusUpdateService workflowStatusLatest.WorkflowStatusUpdateService,
) *WorkflowDagExecutorImpl {
wde := &WorkflowDagExecutorImpl{logger: Logger,
pipelineRepository: pipelineRepository,
Expand Down Expand Up @@ -231,6 +235,7 @@ func NewWorkflowDagExecutorImpl(Logger *zap.SugaredLogger, pipelineRepository pi
ciHandlerService: ciHandlerService,
workflowTriggerAuditService: workflowTriggerAuditService,
fluxApplicationService: fluxApplicationService,
workflowStatusUpdateService: workflowStatusUpdateService,
}
config, err := types.GetCdConfig()
if err != nil {
Expand Down Expand Up @@ -940,6 +945,18 @@ func (impl *WorkflowDagExecutorImpl) UpdateCiWorkflowForCiSuccess(request *bean2
return err
}

// Update latest status table for CI workflow
// Check if CiPipeline is loaded, if not pass 0 as appId to let the function fetch it
appId := 0
if savedWorkflow.CiPipeline != nil {
appId = savedWorkflow.CiPipeline.AppId
}
err = impl.workflowStatusUpdateService.UpdateCiWorkflowStatusLatest(savedWorkflow.CiPipelineId, appId, savedWorkflow.Id, savedWorkflow.TriggeredBy)
if err != nil {
impl.logger.Errorw("error in updating ci workflow status latest", "err", err, "pipelineId", savedWorkflow.CiPipelineId, "workflowId", savedWorkflow.Id)
// Don't return error here as the main workflow update was successful
}

return nil
}

Expand Down Expand Up @@ -1118,6 +1135,19 @@ func (impl *WorkflowDagExecutorImpl) HandleCiSuccessEvent(triggerContext trigger
i += batchSize
}
impl.logger.Debugw("Completed: auto trigger for children Stage/CD pipelines", "Time taken", time.Since(start).Seconds())

// Update latest status table for CI workflow
// Check if pipelineModal is not nil before accessing AppId
appId := 0
if pipelineModal != nil {
appId = pipelineModal.AppId
}
err = impl.workflowStatusUpdateService.UpdateCiWorkflowStatusLatest(ciPipelineId, appId, buildArtifact.Id, request.UserId)
if err != nil {
impl.logger.Errorw("error in updating ci workflow status latest", "err", err, "pipelineId", ciPipelineId, "workflowId", buildArtifact.Id)
// Don't return error here as the main workflow was successful
}

return buildArtifact.Id, err
}

Expand Down
Loading