Skip to content
Merged
118 changes: 102 additions & 16 deletions pkg/pipeline/CiHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
eventProcessorBean "github.com/devtron-labs/devtron/pkg/eventProcessor/bean"
"github.com/devtron-labs/devtron/pkg/pipeline/adapter"
"github.com/devtron-labs/devtron/pkg/pipeline/constants"
util2 "github.com/devtron-labs/devtron/pkg/pipeline/util"
"github.com/devtron-labs/devtron/pkg/pipeline/workflowStatus"
"github.com/devtron-labs/devtron/pkg/workflow/workflowStatusLatest"
"regexp"
Expand Down Expand Up @@ -660,9 +661,10 @@ func (impl *CiHandlerImpl) FetchCiStatusForTriggerViewV1(appId int) ([]*pipeline
return []*pipelineConfig.CiWorkflowStatus{}, nil
}

latestStatusEntries, err := impl.workflowStatusLatestService.GetCiWorkflowStatusLatestByPipelineIds(allPipelineIds)
// Prepare pipeline status lookup data (handles linked CI pipelines)
pipelines, pipelineIdForStatus, statusLookupPipelineIds, latestStatusEntries, err := impl.preparePipelineStatusLookup(allPipelineIds)
if err != nil {
impl.Logger.Errorw("error in checking latest status table, falling back to old method", "appId", appId, "err", err)
impl.Logger.Errorw("error in preparing pipeline status lookup, falling back to old method", "appId", appId, "err", err)
return impl.ciWorkflowRepository.FIndCiWorkflowStatusesByAppId(appId)
}

Expand All @@ -674,19 +676,21 @@ func (impl *CiHandlerImpl) FetchCiStatusForTriggerViewV1(appId int) ([]*pipeline
impl.Logger.Errorw("error in fetching ci workflow status from latest ci workflow entries ", "latestStatusEntries", latestStatusEntries, "err", err)
return nil, err
} else {
allStatuses = append(allStatuses, statusesFromLatestTable...)
mappedStatuses := impl.mapStatusesToLinkedPipelines(statusesFromLatestTable, pipelines, pipelineIdForStatus)
allStatuses = append(allStatuses, mappedStatuses...)
}
}

pipelinesNotInLatestTable := impl.getPipelineIdsNotInLatestTable(allPipelineIds, latestStatusEntries)
pipelinesNotInLatestTable := impl.getPipelineIdsNotInLatestTable(statusLookupPipelineIds, latestStatusEntries)

if len(pipelinesNotInLatestTable) > 0 {
statusesFromOldQuery, err := impl.fetchCiStatusUsingFallbackMethod(pipelinesNotInLatestTable)
if err != nil {
impl.Logger.Errorw("error in fetching using fallback method by pipelineIds", "pipelineIds", pipelinesNotInLatestTable, "err", err)
return nil, err
} else {
allStatuses = append(allStatuses, statusesFromOldQuery...)
mappedStatuses := impl.mapStatusesToLinkedPipelines(statusesFromOldQuery, pipelines, pipelineIdForStatus)
allStatuses = append(allStatuses, mappedStatuses...)
}
}

Expand Down Expand Up @@ -781,6 +785,38 @@ func (impl *CiHandlerImpl) fetchLastTriggeredWorkflowsHybrid(pipelineIds []int)
return allWorkflows, nil
}

// preparePipelineStatusLookup prepares pipeline mapping for linked CI pipelines and returns status lookup data
func (impl *CiHandlerImpl) preparePipelineStatusLookup(pipelineIds []int) (pipelines []*pipelineConfig.CiPipeline, pipelineIdForStatus map[int]int, statusLookupPipelineIds []int, latestStatusEntries []*pipelineConfig.CiWorkflowStatusLatest, err error) {
pipelines, err = impl.ciPipelineRepository.FindByIdsIn(pipelineIds)
if err != nil {
impl.Logger.Errorw("error in getting ci pipelines by ids", "pipelineIds", pipelineIds, "err", err)
return nil, nil, nil, nil, err
}

pipelineIdForStatus = make(map[int]int, len(pipelines)) // linkedPipelineId -> parentPipelineId (or self if not linked)
statusLookupPipelineIds = make([]int, 0, len(pipelines))

for _, pipeline := range pipelines {
if pipeline.ParentCiPipeline > 0 {
// linked CI pipeline - use parent pipeline ID for status lookup
pipelineIdForStatus[pipeline.Id] = pipeline.ParentCiPipeline
statusLookupPipelineIds = append(statusLookupPipelineIds, pipeline.ParentCiPipeline)
} else {
// regular CI pipeline - use its own ID
pipelineIdForStatus[pipeline.Id] = pipeline.Id
statusLookupPipelineIds = append(statusLookupPipelineIds, pipeline.Id)
}
}
statusLookupPipelineIds = util2.RemoveDuplicateInts(statusLookupPipelineIds)
latestStatusEntries, err = impl.workflowStatusLatestService.GetCiWorkflowStatusLatestByPipelineIds(statusLookupPipelineIds)
if err != nil {
impl.Logger.Errorw("error in checking latest status table", "statusLookupPipelineIds", statusLookupPipelineIds, "err", err)
return nil, nil, nil, nil, err
}

return pipelines, pipelineIdForStatus, statusLookupPipelineIds, latestStatusEntries, nil
}

// getPipelineIdsNotInLatestTable finds pipeline IDs that are NOT in the latest status table
func (impl *CiHandlerImpl) getPipelineIdsNotInLatestTable(allPipelineIds []int, latestStatusEntries []*pipelineConfig.CiWorkflowStatusLatest) []int {
var pipelinesInLatestTable []int
Expand All @@ -801,6 +837,34 @@ func (impl *CiHandlerImpl) getPipelineIdsNotInLatestTable(allPipelineIds []int,
return missingPipelineIds
}

// mapStatusesToLinkedPipelines maps parent pipeline statuses back to linked pipelines
func (impl *CiHandlerImpl) mapStatusesToLinkedPipelines(
statuses []*pipelineConfig.CiWorkflowStatus,
pipelines []*pipelineConfig.CiPipeline,
pipelineIdForStatus map[int]int,
) []*pipelineConfig.CiWorkflowStatus {
statusMap := make(map[int]*pipelineConfig.CiWorkflowStatus)
for _, status := range statuses {
statusMap[status.CiPipelineId] = status
}

var result []*pipelineConfig.CiWorkflowStatus
for _, pipeline := range pipelines {
parentPipelineId := pipelineIdForStatus[pipeline.Id]
if parentStatus, exists := statusMap[parentPipelineId]; exists {
linkedStatus := &pipelineConfig.CiWorkflowStatus{
CiPipelineId: pipeline.Id,
CiPipelineName: pipeline.Name,
CiStatus: parentStatus.CiStatus,
StorageConfigured: parentStatus.StorageConfigured,
CiWorkflowId: parentStatus.CiWorkflowId,
}
result = append(result, linkedStatus)
}
}
return result
}

func (impl *CiHandlerImpl) FetchCiStatusForTriggerView(appId int) ([]*pipelineConfig.CiWorkflowStatus, error) {
var ciWorkflowStatuses []*pipelineConfig.CiWorkflowStatus

Expand Down Expand Up @@ -997,6 +1061,10 @@ func (impl *CiHandlerImpl) FetchCiStatusForTriggerViewForEnvironment(request res
appObjectArr = append(appObjectArr, object)
}
appResults, _ := request.CheckAuthBatch(token, appObjectArr, []string{})

linkedPipelineDetails := make(map[int]*pipelineConfig.CiPipeline) // linkedPipelineId -> pipeline object
parentToLinkedMap := make(map[int][]int) // parentPipelineId -> []linkedPipelineId

for _, ciPipeline := range ciPipelines {
appObject := objects[ciPipeline.Id] // here only app permission have to check
if !appResults[appObject] {
Expand All @@ -1005,7 +1073,15 @@ func (impl *CiHandlerImpl) FetchCiStatusForTriggerViewForEnvironment(request res
}
ciPipelineId := impl.getPipelineIdForTriggerView(ciPipeline)
ciPipelineIds = append(ciPipelineIds, ciPipelineId)

// Store mapping for linked CI pipelines
if ciPipeline.ParentCiPipeline > 0 {
linkedPipelineDetails[ciPipeline.Id] = ciPipeline
// Add to slice of linked pipelines for this parent
parentToLinkedMap[ciPipelineId] = append(parentToLinkedMap[ciPipelineId], ciPipeline.Id)
}
}

if len(ciPipelineIds) == 0 {
return ciWorkflowStatuses, nil
}
Expand All @@ -1015,24 +1091,34 @@ func (impl *CiHandlerImpl) FetchCiStatusForTriggerViewForEnvironment(request res
return ciWorkflowStatuses, err
}

notTriggeredWorkflows := make(map[int]bool)
// create workflow map for quick lookup
workflowMap := make(map[int]*pipelineConfig.CiWorkflow)
for _, workflow := range latestCiWorkflows {
workflowMap[workflow.CiPipelineId] = workflow
}

triggeredWorkflows := make(map[int]bool)

for _, ciWorkflow := range latestCiWorkflows {
ciWorkflowStatus := &pipelineConfig.CiWorkflowStatus{}
ciWorkflowStatus.CiPipelineId = ciWorkflow.CiPipelineId
ciWorkflowStatus.CiPipelineName = ciWorkflow.CiPipeline.Name
ciWorkflowStatus.CiStatus = ciWorkflow.Status
ciWorkflowStatus.StorageConfigured = ciWorkflow.BlobStorageEnabled
ciWorkflowStatus.CiWorkflowId = ciWorkflow.Id
ciWorkflowStatuses = append(ciWorkflowStatuses, ciWorkflowStatus)
notTriggeredWorkflows[ciWorkflowStatus.CiPipelineId] = true
// check if this workflow belongs to a parent pipeline that has linked CIs
if linkedPipelineIds, isParentOfLinked := parentToLinkedMap[ciWorkflow.CiPipelineId]; isParentOfLinked {
// create workflow status for each linked pipeline
for _, linkedPipelineId := range linkedPipelineIds {
ciWorkflowStatus := adapter.GetCiWorkflowStatusForLinkedCiPipeline(linkedPipelineId, linkedPipelineDetails[linkedPipelineId].Name, ciWorkflow)
ciWorkflowStatuses = append(ciWorkflowStatuses, ciWorkflowStatus)
}
} else {
ciWorkflowStatus := adapter.GetCiWorkflowStatusFromCiWorkflow(ciWorkflow)
ciWorkflowStatuses = append(ciWorkflowStatuses, ciWorkflowStatus)
}
triggeredWorkflows[ciWorkflow.CiPipelineId] = true
}

for _, ciPipelineId := range ciPipelineIds {
if _, ok := notTriggeredWorkflows[ciPipelineId]; !ok {
if _, ok := triggeredWorkflows[ciPipelineId]; !ok {
ciWorkflowStatus := &pipelineConfig.CiWorkflowStatus{}
ciWorkflowStatus.CiPipelineId = ciPipelineId
ciWorkflowStatus.CiStatus = "Not Triggered"
ciWorkflowStatus.CiStatus = pipelineConfigBean.NotTriggered
ciWorkflowStatuses = append(ciWorkflowStatuses, ciWorkflowStatus)
}
}
Expand Down
10 changes: 10 additions & 0 deletions pkg/pipeline/adapter/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,3 +416,13 @@ func GetCiWorkflowStatusFromCiWorkflow(ciWorkflow *pipelineConfig.CiWorkflow) *p
CiWorkflowId: ciWorkflow.Id,
}
}

func GetCiWorkflowStatusForLinkedCiPipeline(linkedCiPipelineId int, linkedCiPipelineName string, ciWorkflow *pipelineConfig.CiWorkflow) *pipelineConfig.CiWorkflowStatus {
return &pipelineConfig.CiWorkflowStatus{
CiPipelineId: linkedCiPipelineId,
CiPipelineName: linkedCiPipelineName,
CiStatus: ciWorkflow.Status,
StorageConfigured: ciWorkflow.BlobStorageEnabled,
CiWorkflowId: ciWorkflow.Id,
}
}
13 changes: 13 additions & 0 deletions pkg/pipeline/util/CiCdUtil.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,16 @@ func GetWorkflowCacheConfigWithBackwardCompatibility(WorkflowCacheConfig common.
}
}
}

// RemoveDuplicateInts helper function to remove duplicate integers from slice
func RemoveDuplicateInts(slice []int) []int {
keys := make(map[int]bool)
var result []int
for _, item := range slice {
if !keys[item] {
keys[item] = true
result = append(result, item)
}
}
return result
}
Loading