1212import java .util .concurrent .ThreadPoolExecutor ;
1313import java .util .concurrent .TimeUnit ;
1414import java .util .concurrent .atomic .AtomicLong ;
15+ import java .util .function .Supplier ;
1516
1617import org .hibernate .infra .sync .jira .JiraConfig ;
1718import org .hibernate .infra .sync .jira .ProcessingConfig ;
2324import org .hibernate .infra .sync .jira .service .jira .model .hook .JiraWebHookObject ;
2425import org .hibernate .infra .sync .jira .service .jira .model .hook .JiraWebhookEventType ;
2526import org .hibernate .infra .sync .jira .service .jira .model .rest .JiraComment ;
27+ import org .hibernate .infra .sync .jira .service .jira .model .rest .JiraComments ;
2628import org .hibernate .infra .sync .jira .service .jira .model .rest .JiraIssue ;
2729import org .hibernate .infra .sync .jira .service .jira .model .rest .JiraIssueLink ;
2830import org .hibernate .infra .sync .jira .service .jira .model .rest .JiraIssues ;
@@ -46,15 +48,18 @@ public class JiraService {
4648
4749 private final ReportingConfig reportingConfig ;
4850 private final ExecutorService executor ;
51+ private final Supplier <Integer > workQueueSize ;
4952 private final Map <String , HandlerProjectContext > contextPerProject ;
5053 private final JiraConfig jiraConfig ;
5154 private final Scheduler scheduler ;
5255
5356 @ Inject
5457 public JiraService (ProcessingConfig processingConfig , JiraConfig jiraConfig , ReportingConfig reportingConfig ,
5558 Scheduler scheduler ) {
59+ LinkedBlockingDeque <Runnable > workQueue = new LinkedBlockingDeque <>(processingConfig .queueSize ());
60+ workQueueSize = workQueue ::size ;
5661 executor = new ThreadPoolExecutor (processingConfig .threads (), processingConfig .threads (), 0L ,
57- TimeUnit .MILLISECONDS , new LinkedBlockingDeque <>( processingConfig . queueSize ()) );
62+ TimeUnit .MILLISECONDS , workQueue );
5863
5964 Map <String , HandlerProjectContext > contextMap = new HashMap <>();
6065 for (var entry : jiraConfig .projectGroup ().entrySet ()) {
@@ -107,7 +112,33 @@ public void registerManagementRoutes(@Observes ManagementInterface mi) {
107112 if (issueToSync .isEmpty ()) {
108113 scheduler .unscheduleJob (identity );
109114 } else {
110- triggerSyncEvent (issueToSync .get ());
115+ triggerSyncEvent (issueToSync .get (), context );
116+ largestSyncedJiraIssueKeyNumber .set (JiraIssue .keyToLong (issueToSync .get ().key ));
117+ }
118+ }).schedule ();
119+ rc .end ();
120+ });
121+ mi .router ().get ("/sync/issues/init/:project" ).consumes (MediaType .APPLICATION_JSON ).blockingHandler (rc -> {
122+ String project = rc .pathParam ("project" );
123+
124+ HandlerProjectContext context = contextPerProject .get (project );
125+
126+ if (context == null ) {
127+ throw new IllegalArgumentException ("Unknown project '%s'" .formatted (project ));
128+ }
129+
130+ AtomicLong largestSyncedJiraIssueKeyNumber = new AtomicLong (context .getLargestSyncedJiraIssueKeyNumber ());
131+
132+ String identity = "Init Sync for project %s" .formatted (project );
133+ scheduler .newJob (identity ).setConcurrentExecution (Scheduled .ConcurrentExecution .SKIP )
134+ // every 10 seconds:
135+ .setCron ("0/10 * * * * ?" ).setTask (executionContext -> {
136+ Optional <JiraIssue > issueToSync = context
137+ .getNextIssueToSync (largestSyncedJiraIssueKeyNumber .get ());
138+ if (issueToSync .isEmpty ()) {
139+ scheduler .unscheduleJob (identity );
140+ } else {
141+ triggerSyncEvent (issueToSync .get (), context );
111142 largestSyncedJiraIssueKeyNumber .set (JiraIssue .keyToLong (issueToSync .get ().key ));
112143 }
113144 }).schedule ();
@@ -127,7 +158,7 @@ public void registerManagementRoutes(@Observes ManagementInterface mi) {
127158
128159 executor .submit (() -> {
129160 for (String issueKey : issueKeys ) {
130- triggerSyncEvent (context .sourceJiraClient ().getIssue (issueKey ));
161+ triggerSyncEvent (context .sourceJiraClient ().getIssue (issueKey ), context );
131162 }
132163 });
133164 rc .end ();
@@ -158,8 +189,9 @@ public void registerManagementRoutes(@Observes ManagementInterface mi) {
158189 throw new IllegalArgumentException ("Unknown project '%s'" .formatted (project ));
159190 }
160191
192+ // TODO: needs an issue key
161193 // can trigger directly as we do not make any REST requests here:
162- triggerCommentSyncEvents (project , comments );
194+ triggerCommentSyncEvents (project , null , comments );
163195 rc .end ();
164196 });
165197 }
@@ -243,25 +275,34 @@ private void syncByQuery(String query, HandlerProjectContext context) {
243275 int max = 100 ;
244276 do {
245277 issues = context .sourceJiraClient ().find (query , start , max );
246- issues .issues .forEach (this :: triggerSyncEvent );
278+ issues .issues .forEach (jiraIssue -> triggerSyncEvent ( jiraIssue , context ) );
247279
248280 start += max ;
249281 } while (!issues .issues .isEmpty ());
250282 }
251283
252- private void triggerSyncEvent (JiraIssue jiraIssue ) {
284+ private void triggerSyncEvent (JiraIssue jiraIssue , HandlerProjectContext context ) {
285+ Log .infof ("Adding sync events for a jira issue: %s; Already queued events: %s" , jiraIssue .key ,
286+ workQueueSize .get ());
287+ JiraWebHookIssue issue = new JiraWebHookIssue ();
288+ issue .id = jiraIssue .id ;
289+ issue .key = jiraIssue .key ;
290+
253291 JiraWebHookEvent event = new JiraWebHookEvent ();
254292 event .webhookEvent = JiraWebhookEventType .ISSUE_UPDATED .getName ();
255- event .issue = new JiraWebHookIssue ();
256- event .issue .id = jiraIssue .id ;
257- event .issue .key = jiraIssue .key ;
293+ event .issue = issue ;
258294
259295 String projectKey = Objects .toString (jiraIssue .fields .project .properties ().get ("key" ));
260296 acknowledge (projectKey , event );
261297
262298 // now sync comments:
263299 if (jiraIssue .fields .comment != null && jiraIssue .fields .comment .comments != null ) {
264- triggerCommentSyncEvents (projectKey , jiraIssue .fields .comment .comments );
300+ triggerCommentSyncEvents (projectKey , issue , jiraIssue .fields .comment .comments );
301+ } else {
302+ // comments not always come in the jira request... so if we didn't get any, just
303+ // in case we will query for them:
304+ JiraComments comments = context .sourceJiraClient ().getComments (jiraIssue .id , 0 , 500 );
305+ triggerCommentSyncEvents (projectKey , issue , comments .comments );
265306 }
266307
267308 // and links:
@@ -277,11 +318,12 @@ private void triggerSyncEvent(JiraIssue jiraIssue) {
277318 }
278319 }
279320
280- private void triggerCommentSyncEvents (String projectKey , List <JiraComment > comments ) {
321+ private void triggerCommentSyncEvents (String projectKey , JiraWebHookIssue issue , List <JiraComment > comments ) {
281322 for (JiraComment comment : comments ) {
282323 var event = new JiraWebHookEvent ();
283324 event .comment = new JiraWebHookObject ();
284325 event .comment .id = Long .parseLong (comment .id );
326+ event .issue = issue ;
285327 event .webhookEvent = JiraWebhookEventType .COMMENT_UPDATED .getName ();
286328 acknowledge (projectKey , event );
287329 }
0 commit comments