Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
242 changes: 200 additions & 42 deletions backend/plugins/github_graphql/tasks/job_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,101 @@ import (

const RAW_GRAPHQL_JOBS_TABLE = "github_graphql_jobs"

type GraphqlQueryCheckRunWrapper struct {
// Collection mode configuration
const (
JOB_COLLECTION_MODE_BATCHING = "BATCHING"
JOB_COLLECTION_MODE_PAGINATING = "PAGINATING"
)

// Set the collection mode here
// BATCHING: Query multiple runs at once, no pagination (may miss jobs if >20 per run)
// PAGINATING: Query one run at a time with full pagination (complete data, more API calls)
const DEFAULT_JOB_COLLECTION_MODE = JOB_COLLECTION_MODE_BATCHING

// Mode-specific configuration
const (
DEFAULT_BATCHING_INPUT_STEP = 10 // Number of runs per request in BATCHING mode (must be > 1)
DEFAULT_BATCHING_PAGE_SIZE = 20 // Jobs per run in BATCHING mode (no pagination)
PAGINATING_INPUT_STEP = 1 // Number of runs per request in PAGINATING mode (always 1)
DEFAULT_PAGINATING_PAGE_SIZE = 50 // Jobs per page in PAGINATING mode (with pagination)
)

// JobCollectionConfig holds the configuration for job collection
type JobCollectionConfig struct {
Mode string
PageSize int
InputStep int
BatchingInputStep int
BatchingPageSize int
PaginatingPageSize int
}

// getJobCollectionConfig reads configuration from environment variables with fallback to defaults
func getJobCollectionConfig(taskCtx plugin.SubTaskContext) *JobCollectionConfig {
cfg := taskCtx.TaskContext().GetConfigReader()

config := &JobCollectionConfig{
Mode: DEFAULT_JOB_COLLECTION_MODE,
BatchingInputStep: DEFAULT_BATCHING_INPUT_STEP,
BatchingPageSize: DEFAULT_BATCHING_PAGE_SIZE,
PaginatingPageSize: DEFAULT_PAGINATING_PAGE_SIZE,
}

// Read collection mode from environment
if mode := taskCtx.TaskContext().GetConfig("GITHUB_GRAPHQL_JOB_COLLECTION_MODE"); mode != "" {
if mode == JOB_COLLECTION_MODE_BATCHING || mode == JOB_COLLECTION_MODE_PAGINATING {
config.Mode = mode
}
}

// Read batching input step (must be > 1)
if cfg.IsSet("GITHUB_GRAPHQL_JOB_BATCHING_INPUT_STEP") {
if step := cfg.GetInt("GITHUB_GRAPHQL_JOB_BATCHING_INPUT_STEP"); step > 1 {
config.BatchingInputStep = step
}
}

// Read page sizes
if cfg.IsSet("GITHUB_GRAPHQL_JOB_BATCHING_PAGE_SIZE") {
if size := cfg.GetInt("GITHUB_GRAPHQL_JOB_BATCHING_PAGE_SIZE"); size > 0 {
config.BatchingPageSize = size
}
}

if cfg.IsSet("GITHUB_GRAPHQL_JOB_PAGINATING_PAGE_SIZE") {
if size := cfg.GetInt("GITHUB_GRAPHQL_JOB_PAGINATING_PAGE_SIZE"); size > 0 {
config.PaginatingPageSize = size
}
}

// Set derived values based on mode
if config.Mode == JOB_COLLECTION_MODE_PAGINATING {
config.PageSize = config.PaginatingPageSize
config.InputStep = PAGINATING_INPUT_STEP // Always 1 for paginating
} else {
config.PageSize = config.BatchingPageSize
config.InputStep = config.BatchingInputStep // User-configurable for batching
}

return config
}

// Batch mode: query multiple runs at once (array of nodes)
type GraphqlQueryCheckRunWrapperBatch struct {
RateLimit struct {
Cost int
}
Node []GraphqlQueryCheckSuite `graphql:"node(id: $id)" graphql-extend:"true"`
}

// Paginating mode: query single run (single node)
type GraphqlQueryCheckRunWrapperSingle struct {
RateLimit struct {
Cost int
}
Node GraphqlQueryCheckSuite `graphql:"node(id: $id)"`
}

type GraphqlQueryCheckSuite struct {
Id string
Typename string `graphql:"__typename"`
Expand Down Expand Up @@ -111,47 +199,81 @@ var CollectJobsMeta = plugin.SubTaskMeta{

var _ plugin.SubTaskEntryPoint = CollectJobs

func getPageInfo(query interface{}, args *helper.GraphqlCollectorArgs) (*helper.GraphqlQueryPageInfo, error) {
queryWrapper := query.(*GraphqlQueryCheckRunWrapper)
hasNextPage := false
endCursor := ""
for _, node := range queryWrapper.Node {
if node.CheckSuite.CheckRuns.PageInfo.HasNextPage {
hasNextPage = true
endCursor = node.CheckSuite.CheckRuns.PageInfo.EndCursor
break
// createGetPageInfoFunc returns the appropriate page info function based on collection mode
func createGetPageInfoFunc(mode string) func(interface{}, *helper.GraphqlCollectorArgs) (*helper.GraphqlQueryPageInfo, error) {
if mode == JOB_COLLECTION_MODE_PAGINATING {
// PAGINATING mode: supports full pagination
return func(query interface{}, args *helper.GraphqlCollectorArgs) (*helper.GraphqlQueryPageInfo, error) {
queryWrapper := query.(*GraphqlQueryCheckRunWrapperSingle)
return &helper.GraphqlQueryPageInfo{
EndCursor: queryWrapper.Node.CheckSuite.CheckRuns.PageInfo.EndCursor,
HasNextPage: queryWrapper.Node.CheckSuite.CheckRuns.PageInfo.HasNextPage,
}, nil
}
}
return &helper.GraphqlQueryPageInfo{
EndCursor: endCursor,
HasNextPage: hasNextPage,
}, nil

// BATCHING mode: no pagination support
return func(query interface{}, args *helper.GraphqlCollectorArgs) (*helper.GraphqlQueryPageInfo, error) {
return &helper.GraphqlQueryPageInfo{
EndCursor: "",
HasNextPage: false,
}, nil
}
}

func buildQuery(reqData *helper.GraphqlRequestData) (interface{}, map[string]interface{}, error) {
query := &GraphqlQueryCheckRunWrapper{}
if reqData == nil {
return query, map[string]interface{}{}, nil
}
workflowRuns := reqData.Input.([]interface{})
checkSuiteIds := []map[string]interface{}{}
for _, iWorkflowRuns := range workflowRuns {
workflowRun := iWorkflowRuns.(*SimpleWorkflowRun)
checkSuiteIds = append(checkSuiteIds, map[string]interface{}{
`id`: graphql.ID(workflowRun.CheckSuiteNodeID),
})
}
variables := map[string]interface{}{
"node": checkSuiteIds,
"pageSize": graphql.Int(reqData.Pager.Size),
"skipCursor": (*graphql.String)(reqData.Pager.SkipCursor),
}
return query, variables, nil
// createBuildQueryFunc returns the appropriate build query function based on collection mode
func createBuildQueryFunc(mode string) func(*helper.GraphqlRequestData) (interface{}, map[string]interface{}, error) {
if mode == JOB_COLLECTION_MODE_PAGINATING {
// PAGINATING mode: single run per request
return func(reqData *helper.GraphqlRequestData) (interface{}, map[string]interface{}, error) {
if reqData == nil {
return &GraphqlQueryCheckRunWrapperSingle{}, map[string]interface{}{}, nil
}

workflowRun := reqData.Input.(*SimpleWorkflowRun)
query := &GraphqlQueryCheckRunWrapperSingle{}
variables := map[string]interface{}{
"id": graphql.ID(workflowRun.CheckSuiteNodeID),
"pageSize": graphql.Int(reqData.Pager.Size),
"skipCursor": (*graphql.String)(reqData.Pager.SkipCursor),
}
return query, variables, nil
}
}

// BATCHING mode: multiple runs per request
return func(reqData *helper.GraphqlRequestData) (interface{}, map[string]interface{}, error) {
if reqData == nil {
return &GraphqlQueryCheckRunWrapperBatch{}, map[string]interface{}{}, nil
}

workflowRuns := reqData.Input.([]interface{})
query := &GraphqlQueryCheckRunWrapperBatch{}
checkSuiteIds := []map[string]interface{}{}
for _, iWorkflowRuns := range workflowRuns {
workflowRun := iWorkflowRuns.(*SimpleWorkflowRun)
checkSuiteIds = append(checkSuiteIds, map[string]interface{}{
`id`: graphql.ID(workflowRun.CheckSuiteNodeID),
})
}
variables := map[string]interface{}{
"node": checkSuiteIds,
"pageSize": graphql.Int(reqData.Pager.Size),
"skipCursor": (*graphql.String)(reqData.Pager.SkipCursor),
}
return query, variables, nil
}
}

func CollectJobs(taskCtx plugin.SubTaskContext) errors.Error {
db := taskCtx.GetDal()
data := taskCtx.GetData().(*githubTasks.GithubTaskData)
logger := taskCtx.GetLogger()

// Get configuration from environment variables or defaults
config := getJobCollectionConfig(taskCtx)
logger.Info("GitHub Job Collector - Mode: %s, InputStep: %d, PageSize: %d",
config.Mode, config.InputStep, config.PageSize)

apiCollector, err := helper.NewStatefulApiCollector(helper.RawDataSubTaskArgs{
Ctx: taskCtx,
Expand All @@ -175,28 +297,40 @@ func CollectJobs(taskCtx plugin.SubTaskContext) errors.Error {
clauses = append(clauses, dal.Where("github_updated_at > ?", *apiCollector.GetSince()))
}

cursor, err := db.Cursor(
clauses...,
)
cursor, err := db.Cursor(clauses...)
if err != nil {
return err
}
defer cursor.Close()

iterator, err := helper.NewDalCursorIterator(db, cursor, reflect.TypeOf(SimpleWorkflowRun{}))
if err != nil {
return err
}

// Create closures that capture the runtime mode configuration
buildQueryFunc := createBuildQueryFunc(config.Mode)
var getPageInfoFunc func(interface{}, *helper.GraphqlCollectorArgs) (*helper.GraphqlQueryPageInfo, error)

if config.Mode == JOB_COLLECTION_MODE_PAGINATING {
getPageInfoFunc = createGetPageInfoFunc(config.Mode) // Enable pagination
} else {
getPageInfoFunc = nil // Disable pagination for BATCHING mode
}

err = apiCollector.InitGraphQLCollector(helper.GraphqlCollectorArgs{
Input: iterator,
InputStep: 10,
InputStep: config.InputStep,
GraphqlClient: data.GraphqlClient,
BuildQuery: buildQuery,
GetPageInfo: getPageInfo,
BuildQuery: buildQueryFunc,
GetPageInfo: getPageInfoFunc, // nil for BATCHING, function for PAGINATING
ResponseParser: func(queryWrapper any) (messages []json.RawMessage, err errors.Error) {
query := queryWrapper.(*GraphqlQueryCheckRunWrapper)
for _, node := range query.Node {
if config.Mode == JOB_COLLECTION_MODE_PAGINATING {
// Single node processing
query := queryWrapper.(*GraphqlQueryCheckRunWrapperSingle)
node := query.Node
runId := node.CheckSuite.WorkflowRun.DatabaseId

for _, checkRun := range node.CheckSuite.CheckRuns.Nodes {
dbCheckRun := &DbCheckRun{
RunId: runId,
Expand All @@ -215,11 +349,35 @@ func CollectJobs(taskCtx plugin.SubTaskContext) errors.Error {
}
messages = append(messages, errors.Must1(json.Marshal(dbCheckRun)))
}
} else {
// Batch processing (multiple nodes)
query := queryWrapper.(*GraphqlQueryCheckRunWrapperBatch)
for _, node := range query.Node {
runId := node.CheckSuite.WorkflowRun.DatabaseId
for _, checkRun := range node.CheckSuite.CheckRuns.Nodes {
dbCheckRun := &DbCheckRun{
RunId: runId,
GraphqlQueryCheckRun: &checkRun,
}
// A checkRun without a startedAt time is a run that was never started (skipped), GitHub returns
// a ZeroTime (Due to the GO implementation) for startedAt, so we need to check for that here.
dbCheckRun.StartedAt = utils.NilIfZeroTime(dbCheckRun.StartedAt)
dbCheckRun.CompletedAt = utils.NilIfZeroTime(dbCheckRun.CompletedAt)
updatedAt := dbCheckRun.StartedAt
if dbCheckRun.CompletedAt != nil {
updatedAt = dbCheckRun.CompletedAt
}
if apiCollector.GetSince() != nil && !apiCollector.GetSince().Before(*updatedAt) {
return messages, helper.ErrFinishCollect
}
messages = append(messages, errors.Must1(json.Marshal(dbCheckRun)))
}
}
}
return
},
IgnoreQueryErrors: true,
PageSize: 20,
PageSize: config.PageSize,
})
if err != nil {
return err
Expand Down
Loading