From cd0f0f2054d387bc5e971b7801570c5403589ad0 Mon Sep 17 00:00:00 2001 From: FlomoN Date: Fri, 17 Oct 2025 15:28:01 +0200 Subject: [PATCH 1/2] fix: split pagination and batching for github graphql job_collector task --- .../github_graphql/tasks/job_collector.go | 135 +++++++++++++++--- 1 file changed, 113 insertions(+), 22 deletions(-) diff --git a/backend/plugins/github_graphql/tasks/job_collector.go b/backend/plugins/github_graphql/tasks/job_collector.go index c67d6a87968..91c724dc64d 100644 --- a/backend/plugins/github_graphql/tasks/job_collector.go +++ b/backend/plugins/github_graphql/tasks/job_collector.go @@ -34,13 +34,41 @@ import ( const RAW_GRAPHQL_JOBS_TABLE = "github_graphql_jobs" -type GraphqlQueryCheckRunWrapper struct { +// Collection mode configuration +const ( + JOB_COLLECTION_MODE_BATCHING = "BATCHING" + JOB_COLLECTION_MODE_PAGINATING = "PAGINATING" +) + +// Set the collection mode here +// BATCHING: Query multiple runs at once, no pagination (may miss jobs if >20 per run) +// PAGINATING: Query one run at a time with full pagination (complete data, more API calls) +const JOB_COLLECTION_MODE = JOB_COLLECTION_MODE_PAGINATING + +// Mode-specific configuration +const ( + BATCHING_INPUT_STEP = 10 // Number of runs per request in BATCHING mode + BATCHING_PAGE_SIZE = 20 // Jobs per run in BATCHING mode (no pagination) + PAGINATING_INPUT_STEP = 1 // Number of runs per request in PAGINATING mode + PAGINATING_PAGE_SIZE = 50 // Jobs per page in PAGINATING mode (with pagination) +) + +// Batch mode: query multiple runs at once (array of nodes) +type GraphqlQueryCheckRunWrapperBatch struct { RateLimit struct { Cost int } Node []GraphqlQueryCheckSuite `graphql:"node(id: $id)" graphql-extend:"true"` } +// Paginating mode: query single run (single node) +type GraphqlQueryCheckRunWrapperSingle struct { + RateLimit struct { + Cost int + } + Node GraphqlQueryCheckSuite `graphql:"node(id: $id)"` +} + type GraphqlQueryCheckSuite struct { Id string Typename string `graphql:"__typename"` @@ -112,28 +140,47 @@ var CollectJobsMeta = plugin.SubTaskMeta{ var _ plugin.SubTaskEntryPoint = CollectJobs func getPageInfo(query interface{}, args *helper.GraphqlCollectorArgs) (*helper.GraphqlQueryPageInfo, error) { - queryWrapper := query.(*GraphqlQueryCheckRunWrapper) - hasNextPage := false - endCursor := "" - for _, node := range queryWrapper.Node { - if node.CheckSuite.CheckRuns.PageInfo.HasNextPage { - hasNextPage = true - endCursor = node.CheckSuite.CheckRuns.PageInfo.EndCursor - break - } + // Only PAGINATING mode supports pagination + if JOB_COLLECTION_MODE == JOB_COLLECTION_MODE_PAGINATING { + queryWrapper := query.(*GraphqlQueryCheckRunWrapperSingle) + return &helper.GraphqlQueryPageInfo{ + EndCursor: queryWrapper.Node.CheckSuite.CheckRuns.PageInfo.EndCursor, + HasNextPage: queryWrapper.Node.CheckSuite.CheckRuns.PageInfo.HasNextPage, + }, nil } + + // BATCHING mode: no pagination support + // Always return false for HasNextPage to collect only first page of jobs return &helper.GraphqlQueryPageInfo{ - EndCursor: endCursor, - HasNextPage: hasNextPage, + EndCursor: "", + HasNextPage: false, }, nil } func buildQuery(reqData *helper.GraphqlRequestData) (interface{}, map[string]interface{}, error) { - query := &GraphqlQueryCheckRunWrapper{} if reqData == nil { - return query, map[string]interface{}{}, nil + // Return appropriate empty query based on mode + if JOB_COLLECTION_MODE == JOB_COLLECTION_MODE_PAGINATING { + return &GraphqlQueryCheckRunWrapperSingle{}, map[string]interface{}{}, nil + } + return &GraphqlQueryCheckRunWrapperBatch{}, map[string]interface{}{}, nil } + + if JOB_COLLECTION_MODE == JOB_COLLECTION_MODE_PAGINATING { + // Single run mode + workflowRun := reqData.Input.(*SimpleWorkflowRun) + query := &GraphqlQueryCheckRunWrapperSingle{} + variables := map[string]interface{}{ + "id": graphql.ID(workflowRun.CheckSuiteNodeID), + "pageSize": graphql.Int(reqData.Pager.Size), + "skipCursor": (*graphql.String)(reqData.Pager.SkipCursor), + } + return query, variables, nil + } + + // Batch mode (default) workflowRuns := reqData.Input.([]interface{}) + query := &GraphqlQueryCheckRunWrapperBatch{} checkSuiteIds := []map[string]interface{}{} for _, iWorkflowRuns := range workflowRuns { workflowRun := iWorkflowRuns.(*SimpleWorkflowRun) @@ -152,6 +199,10 @@ func buildQuery(reqData *helper.GraphqlRequestData) (interface{}, map[string]int func CollectJobs(taskCtx plugin.SubTaskContext) errors.Error { db := taskCtx.GetDal() data := taskCtx.GetData().(*githubTasks.GithubTaskData) + logger := taskCtx.GetLogger() + + // Log the collection mode + logger.Info("GitHub Job Collector Mode: %s", JOB_COLLECTION_MODE) apiCollector, err := helper.NewStatefulApiCollector(helper.RawDataSubTaskArgs{ Ctx: taskCtx, @@ -175,28 +226,44 @@ func CollectJobs(taskCtx plugin.SubTaskContext) errors.Error { clauses = append(clauses, dal.Where("github_updated_at > ?", *apiCollector.GetSince())) } - cursor, err := db.Cursor( - clauses..., - ) + cursor, err := db.Cursor(clauses...) if err != nil { return err } defer cursor.Close() + iterator, err := helper.NewDalCursorIterator(db, cursor, reflect.TypeOf(SimpleWorkflowRun{})) if err != nil { return err } + // Set configuration based on mode + var inputStep, pageSize int + var getPageInfoFunc func(interface{}, *helper.GraphqlCollectorArgs) (*helper.GraphqlQueryPageInfo, error) + + if JOB_COLLECTION_MODE == JOB_COLLECTION_MODE_PAGINATING { + inputStep = PAGINATING_INPUT_STEP + pageSize = PAGINATING_PAGE_SIZE + getPageInfoFunc = getPageInfo // Enable pagination + } else { + inputStep = BATCHING_INPUT_STEP + pageSize = BATCHING_PAGE_SIZE + getPageInfoFunc = nil // Disable pagination + } + err = apiCollector.InitGraphQLCollector(helper.GraphqlCollectorArgs{ Input: iterator, - InputStep: 10, + InputStep: inputStep, GraphqlClient: data.GraphqlClient, BuildQuery: buildQuery, - GetPageInfo: getPageInfo, + GetPageInfo: getPageInfoFunc, // nil for BATCHING, function for PAGINATING ResponseParser: func(queryWrapper any) (messages []json.RawMessage, err errors.Error) { - query := queryWrapper.(*GraphqlQueryCheckRunWrapper) - for _, node := range query.Node { + if JOB_COLLECTION_MODE == JOB_COLLECTION_MODE_PAGINATING { + // Single node processing + query := queryWrapper.(*GraphqlQueryCheckRunWrapperSingle) + node := query.Node runId := node.CheckSuite.WorkflowRun.DatabaseId + for _, checkRun := range node.CheckSuite.CheckRuns.Nodes { dbCheckRun := &DbCheckRun{ RunId: runId, @@ -215,11 +282,35 @@ func CollectJobs(taskCtx plugin.SubTaskContext) errors.Error { } messages = append(messages, errors.Must1(json.Marshal(dbCheckRun))) } + } else { + // Batch processing (multiple nodes) + query := queryWrapper.(*GraphqlQueryCheckRunWrapperBatch) + for _, node := range query.Node { + runId := node.CheckSuite.WorkflowRun.DatabaseId + for _, checkRun := range node.CheckSuite.CheckRuns.Nodes { + dbCheckRun := &DbCheckRun{ + RunId: runId, + GraphqlQueryCheckRun: &checkRun, + } + // A checkRun without a startedAt time is a run that was never started (skipped), GitHub returns + // a ZeroTime (Due to the GO implementation) for startedAt, so we need to check for that here. + dbCheckRun.StartedAt = utils.NilIfZeroTime(dbCheckRun.StartedAt) + dbCheckRun.CompletedAt = utils.NilIfZeroTime(dbCheckRun.CompletedAt) + updatedAt := dbCheckRun.StartedAt + if dbCheckRun.CompletedAt != nil { + updatedAt = dbCheckRun.CompletedAt + } + if apiCollector.GetSince() != nil && !apiCollector.GetSince().Before(*updatedAt) { + return messages, helper.ErrFinishCollect + } + messages = append(messages, errors.Must1(json.Marshal(dbCheckRun))) + } + } } return }, IgnoreQueryErrors: true, - PageSize: 20, + PageSize: pageSize, }) if err != nil { return err From ae48792dd0f4940b041025943f4b1064514b3740 Mon Sep 17 00:00:00 2001 From: FlomoN Date: Fri, 17 Oct 2025 20:43:44 +0200 Subject: [PATCH 2/2] feat: github graphql job collection mode, page size, batch size configurable via Environment Variables --- .../github_graphql/tasks/job_collector.go | 189 ++++++++++++------ 1 file changed, 128 insertions(+), 61 deletions(-) diff --git a/backend/plugins/github_graphql/tasks/job_collector.go b/backend/plugins/github_graphql/tasks/job_collector.go index 91c724dc64d..f0deb7b41bf 100644 --- a/backend/plugins/github_graphql/tasks/job_collector.go +++ b/backend/plugins/github_graphql/tasks/job_collector.go @@ -43,16 +43,76 @@ const ( // Set the collection mode here // BATCHING: Query multiple runs at once, no pagination (may miss jobs if >20 per run) // PAGINATING: Query one run at a time with full pagination (complete data, more API calls) -const JOB_COLLECTION_MODE = JOB_COLLECTION_MODE_PAGINATING +const DEFAULT_JOB_COLLECTION_MODE = JOB_COLLECTION_MODE_BATCHING // Mode-specific configuration const ( - BATCHING_INPUT_STEP = 10 // Number of runs per request in BATCHING mode - BATCHING_PAGE_SIZE = 20 // Jobs per run in BATCHING mode (no pagination) - PAGINATING_INPUT_STEP = 1 // Number of runs per request in PAGINATING mode - PAGINATING_PAGE_SIZE = 50 // Jobs per page in PAGINATING mode (with pagination) + DEFAULT_BATCHING_INPUT_STEP = 10 // Number of runs per request in BATCHING mode (must be > 1) + DEFAULT_BATCHING_PAGE_SIZE = 20 // Jobs per run in BATCHING mode (no pagination) + PAGINATING_INPUT_STEP = 1 // Number of runs per request in PAGINATING mode (always 1) + DEFAULT_PAGINATING_PAGE_SIZE = 50 // Jobs per page in PAGINATING mode (with pagination) ) +// JobCollectionConfig holds the configuration for job collection +type JobCollectionConfig struct { + Mode string + PageSize int + InputStep int + BatchingInputStep int + BatchingPageSize int + PaginatingPageSize int +} + +// getJobCollectionConfig reads configuration from environment variables with fallback to defaults +func getJobCollectionConfig(taskCtx plugin.SubTaskContext) *JobCollectionConfig { + cfg := taskCtx.TaskContext().GetConfigReader() + + config := &JobCollectionConfig{ + Mode: DEFAULT_JOB_COLLECTION_MODE, + BatchingInputStep: DEFAULT_BATCHING_INPUT_STEP, + BatchingPageSize: DEFAULT_BATCHING_PAGE_SIZE, + PaginatingPageSize: DEFAULT_PAGINATING_PAGE_SIZE, + } + + // Read collection mode from environment + if mode := taskCtx.TaskContext().GetConfig("GITHUB_GRAPHQL_JOB_COLLECTION_MODE"); mode != "" { + if mode == JOB_COLLECTION_MODE_BATCHING || mode == JOB_COLLECTION_MODE_PAGINATING { + config.Mode = mode + } + } + + // Read batching input step (must be > 1) + if cfg.IsSet("GITHUB_GRAPHQL_JOB_BATCHING_INPUT_STEP") { + if step := cfg.GetInt("GITHUB_GRAPHQL_JOB_BATCHING_INPUT_STEP"); step > 1 { + config.BatchingInputStep = step + } + } + + // Read page sizes + if cfg.IsSet("GITHUB_GRAPHQL_JOB_BATCHING_PAGE_SIZE") { + if size := cfg.GetInt("GITHUB_GRAPHQL_JOB_BATCHING_PAGE_SIZE"); size > 0 { + config.BatchingPageSize = size + } + } + + if cfg.IsSet("GITHUB_GRAPHQL_JOB_PAGINATING_PAGE_SIZE") { + if size := cfg.GetInt("GITHUB_GRAPHQL_JOB_PAGINATING_PAGE_SIZE"); size > 0 { + config.PaginatingPageSize = size + } + } + + // Set derived values based on mode + if config.Mode == JOB_COLLECTION_MODE_PAGINATING { + config.PageSize = config.PaginatingPageSize + config.InputStep = PAGINATING_INPUT_STEP // Always 1 for paginating + } else { + config.PageSize = config.BatchingPageSize + config.InputStep = config.BatchingInputStep // User-configurable for batching + } + + return config +} + // Batch mode: query multiple runs at once (array of nodes) type GraphqlQueryCheckRunWrapperBatch struct { RateLimit struct { @@ -139,61 +199,70 @@ var CollectJobsMeta = plugin.SubTaskMeta{ var _ plugin.SubTaskEntryPoint = CollectJobs -func getPageInfo(query interface{}, args *helper.GraphqlCollectorArgs) (*helper.GraphqlQueryPageInfo, error) { - // Only PAGINATING mode supports pagination - if JOB_COLLECTION_MODE == JOB_COLLECTION_MODE_PAGINATING { - queryWrapper := query.(*GraphqlQueryCheckRunWrapperSingle) - return &helper.GraphqlQueryPageInfo{ - EndCursor: queryWrapper.Node.CheckSuite.CheckRuns.PageInfo.EndCursor, - HasNextPage: queryWrapper.Node.CheckSuite.CheckRuns.PageInfo.HasNextPage, - }, nil +// createGetPageInfoFunc returns the appropriate page info function based on collection mode +func createGetPageInfoFunc(mode string) func(interface{}, *helper.GraphqlCollectorArgs) (*helper.GraphqlQueryPageInfo, error) { + if mode == JOB_COLLECTION_MODE_PAGINATING { + // PAGINATING mode: supports full pagination + return func(query interface{}, args *helper.GraphqlCollectorArgs) (*helper.GraphqlQueryPageInfo, error) { + queryWrapper := query.(*GraphqlQueryCheckRunWrapperSingle) + return &helper.GraphqlQueryPageInfo{ + EndCursor: queryWrapper.Node.CheckSuite.CheckRuns.PageInfo.EndCursor, + HasNextPage: queryWrapper.Node.CheckSuite.CheckRuns.PageInfo.HasNextPage, + }, nil + } } // BATCHING mode: no pagination support - // Always return false for HasNextPage to collect only first page of jobs - return &helper.GraphqlQueryPageInfo{ - EndCursor: "", - HasNextPage: false, - }, nil + return func(query interface{}, args *helper.GraphqlCollectorArgs) (*helper.GraphqlQueryPageInfo, error) { + return &helper.GraphqlQueryPageInfo{ + EndCursor: "", + HasNextPage: false, + }, nil + } } -func buildQuery(reqData *helper.GraphqlRequestData) (interface{}, map[string]interface{}, error) { - if reqData == nil { - // Return appropriate empty query based on mode - if JOB_COLLECTION_MODE == JOB_COLLECTION_MODE_PAGINATING { - return &GraphqlQueryCheckRunWrapperSingle{}, map[string]interface{}{}, nil +// createBuildQueryFunc returns the appropriate build query function based on collection mode +func createBuildQueryFunc(mode string) func(*helper.GraphqlRequestData) (interface{}, map[string]interface{}, error) { + if mode == JOB_COLLECTION_MODE_PAGINATING { + // PAGINATING mode: single run per request + return func(reqData *helper.GraphqlRequestData) (interface{}, map[string]interface{}, error) { + if reqData == nil { + return &GraphqlQueryCheckRunWrapperSingle{}, map[string]interface{}{}, nil + } + + workflowRun := reqData.Input.(*SimpleWorkflowRun) + query := &GraphqlQueryCheckRunWrapperSingle{} + variables := map[string]interface{}{ + "id": graphql.ID(workflowRun.CheckSuiteNodeID), + "pageSize": graphql.Int(reqData.Pager.Size), + "skipCursor": (*graphql.String)(reqData.Pager.SkipCursor), + } + return query, variables, nil } - return &GraphqlQueryCheckRunWrapperBatch{}, map[string]interface{}{}, nil } - if JOB_COLLECTION_MODE == JOB_COLLECTION_MODE_PAGINATING { - // Single run mode - workflowRun := reqData.Input.(*SimpleWorkflowRun) - query := &GraphqlQueryCheckRunWrapperSingle{} + // BATCHING mode: multiple runs per request + return func(reqData *helper.GraphqlRequestData) (interface{}, map[string]interface{}, error) { + if reqData == nil { + return &GraphqlQueryCheckRunWrapperBatch{}, map[string]interface{}{}, nil + } + + workflowRuns := reqData.Input.([]interface{}) + query := &GraphqlQueryCheckRunWrapperBatch{} + checkSuiteIds := []map[string]interface{}{} + for _, iWorkflowRuns := range workflowRuns { + workflowRun := iWorkflowRuns.(*SimpleWorkflowRun) + checkSuiteIds = append(checkSuiteIds, map[string]interface{}{ + `id`: graphql.ID(workflowRun.CheckSuiteNodeID), + }) + } variables := map[string]interface{}{ - "id": graphql.ID(workflowRun.CheckSuiteNodeID), + "node": checkSuiteIds, "pageSize": graphql.Int(reqData.Pager.Size), "skipCursor": (*graphql.String)(reqData.Pager.SkipCursor), } return query, variables, nil } - - // Batch mode (default) - workflowRuns := reqData.Input.([]interface{}) - query := &GraphqlQueryCheckRunWrapperBatch{} - checkSuiteIds := []map[string]interface{}{} - for _, iWorkflowRuns := range workflowRuns { - workflowRun := iWorkflowRuns.(*SimpleWorkflowRun) - checkSuiteIds = append(checkSuiteIds, map[string]interface{}{ - `id`: graphql.ID(workflowRun.CheckSuiteNodeID), - }) - } - variables := map[string]interface{}{ - "node": checkSuiteIds, - "pageSize": graphql.Int(reqData.Pager.Size), - "skipCursor": (*graphql.String)(reqData.Pager.SkipCursor), - } - return query, variables, nil } func CollectJobs(taskCtx plugin.SubTaskContext) errors.Error { @@ -201,8 +270,10 @@ func CollectJobs(taskCtx plugin.SubTaskContext) errors.Error { data := taskCtx.GetData().(*githubTasks.GithubTaskData) logger := taskCtx.GetLogger() - // Log the collection mode - logger.Info("GitHub Job Collector Mode: %s", JOB_COLLECTION_MODE) + // Get configuration from environment variables or defaults + config := getJobCollectionConfig(taskCtx) + logger.Info("GitHub Job Collector - Mode: %s, InputStep: %d, PageSize: %d", + config.Mode, config.InputStep, config.PageSize) apiCollector, err := helper.NewStatefulApiCollector(helper.RawDataSubTaskArgs{ Ctx: taskCtx, @@ -237,28 +308,24 @@ func CollectJobs(taskCtx plugin.SubTaskContext) errors.Error { return err } - // Set configuration based on mode - var inputStep, pageSize int + // Create closures that capture the runtime mode configuration + buildQueryFunc := createBuildQueryFunc(config.Mode) var getPageInfoFunc func(interface{}, *helper.GraphqlCollectorArgs) (*helper.GraphqlQueryPageInfo, error) - if JOB_COLLECTION_MODE == JOB_COLLECTION_MODE_PAGINATING { - inputStep = PAGINATING_INPUT_STEP - pageSize = PAGINATING_PAGE_SIZE - getPageInfoFunc = getPageInfo // Enable pagination + if config.Mode == JOB_COLLECTION_MODE_PAGINATING { + getPageInfoFunc = createGetPageInfoFunc(config.Mode) // Enable pagination } else { - inputStep = BATCHING_INPUT_STEP - pageSize = BATCHING_PAGE_SIZE - getPageInfoFunc = nil // Disable pagination + getPageInfoFunc = nil // Disable pagination for BATCHING mode } err = apiCollector.InitGraphQLCollector(helper.GraphqlCollectorArgs{ Input: iterator, - InputStep: inputStep, + InputStep: config.InputStep, GraphqlClient: data.GraphqlClient, - BuildQuery: buildQuery, + BuildQuery: buildQueryFunc, GetPageInfo: getPageInfoFunc, // nil for BATCHING, function for PAGINATING ResponseParser: func(queryWrapper any) (messages []json.RawMessage, err errors.Error) { - if JOB_COLLECTION_MODE == JOB_COLLECTION_MODE_PAGINATING { + if config.Mode == JOB_COLLECTION_MODE_PAGINATING { // Single node processing query := queryWrapper.(*GraphqlQueryCheckRunWrapperSingle) node := query.Node @@ -310,7 +377,7 @@ func CollectJobs(taskCtx plugin.SubTaskContext) errors.Error { return }, IgnoreQueryErrors: true, - PageSize: pageSize, + PageSize: config.PageSize, }) if err != nil { return err