Skip to content

Commit d1576db

Browse files
committed
Move processing queue to group context
since each group can have its own processing limits, and if we block the executor with pending tasks for one group, other group won't be able to get processed, while actually it could
1 parent 83de4ec commit d1576db

File tree

6 files changed

+83
-68
lines changed

6 files changed

+83
-68
lines changed

src/main/java/org/hibernate/infra/replicate/jira/JiraConfig.java

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,24 @@ interface JiraProjectGroup {
9494
}
9595

9696
interface EventProcessing {
97+
98+
/**
99+
* Define how many events can be acknowledged and put on the pending queue
100+
* before acknowledging an event results in blocking the response and waiting
101+
* for the queue to free some space.
102+
*/
103+
@WithDefault("10000")
104+
int queueSize();
105+
106+
/**
107+
* Define the number of threads to use when processing queued events.
108+
* <p>
109+
* Note, having a lot of processing threads might not bring much benefit as
110+
* processing may also be limited by {@link JiraConfig.EventProcessing}
111+
*/
112+
@WithDefault("2")
113+
int threads();
114+
97115
/**
98116
* Defines how many events can be processed within the
99117
* {@link #timeframeInSeconds() timeframe}
@@ -127,20 +145,20 @@ interface Scheduled {
127145

128146
interface JiraProject {
129147
/**
130-
* Downstream project id (not a project key!).
131-
* Use {@code rest/api/2/project/YOUR-PROJECT-ID} to get the info.
148+
* Downstream project id (not a project key!). Use
149+
* {@code rest/api/2/project/YOUR-PROJECT-ID} to get the info.
132150
*/
133151
String projectId();
134152

135153
/**
136-
* Downstream project key.
137-
* Use {@code rest/api/2/project/YOUR-PROJECT-ID} to get the info.
154+
* Downstream project key. Use {@code rest/api/2/project/YOUR-PROJECT-ID} to get
155+
* the info.
138156
*/
139157
String projectKey();
140158

141159
/**
142-
* Upstream project key.
143-
* Use {@code rest/api/2/project/YOUR-PROJECT-ID} to get the info.
160+
* Upstream project key. Use {@code rest/api/2/project/YOUR-PROJECT-ID} to get
161+
* the info.
144162
*/
145163
String originalProjectKey();
146164

@@ -154,8 +172,9 @@ interface WebHookSecurity {
154172
/**
155173
* Whether to enable signature verification.
156174
* <p>
157-
* Jira web hooks can send a {@code x-hub-signature} header with a signature of a request body.
158-
* This signature can be then verified using the secret used to configure the web hook.
175+
* Jira web hooks can send a {@code x-hub-signature} header with a signature of
176+
* a request body. This signature can be then verified using the secret used to
177+
* configure the web hook.
159178
*/
160179
@WithDefault("false")
161180
boolean enabled();

src/main/java/org/hibernate/infra/replicate/jira/ProcessingConfig.java

Lines changed: 0 additions & 23 deletions
This file was deleted.

src/main/java/org/hibernate/infra/replicate/jira/service/jira/HandlerProjectContext.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,4 +183,12 @@ public String toString() {
183183
public void close() {
184184
projectGroupContext.close();
185185
}
186+
187+
public int pendingEventsInCurrentContext() {
188+
return projectGroupContext.pendingEventsInCurrentContext();
189+
}
190+
191+
public void submitTask(Runnable runnable) {
192+
projectGroupContext.submitTask(runnable);
193+
}
186194
}
Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,42 @@
11
package org.hibernate.infra.replicate.jira.service.jira;
22

3+
import java.util.concurrent.ExecutorService;
34
import java.util.concurrent.Executors;
5+
import java.util.concurrent.LinkedBlockingDeque;
46
import java.util.concurrent.ScheduledExecutorService;
57
import java.util.concurrent.Semaphore;
8+
import java.util.concurrent.ThreadPoolExecutor;
69
import java.util.concurrent.TimeUnit;
10+
import java.util.function.Supplier;
711

812
import org.hibernate.infra.replicate.jira.JiraConfig;
913

14+
import io.quarkus.logging.Log;
15+
1016
public final class HandlerProjectGroupContext implements AutoCloseable {
1117

18+
private final ExecutorService eventHandlingExecutor;
19+
private final Supplier<Integer> workQueueSize;
1220
private final ScheduledExecutorService rateLimiterExecutor = Executors.newScheduledThreadPool(1);
1321
private final Semaphore rateLimiter;
1422
private final JiraConfig.JiraProjectGroup projectGroup;
1523

1624
public HandlerProjectGroupContext(JiraConfig.JiraProjectGroup projectGroup) {
1725
this.projectGroup = projectGroup;
1826

19-
final int permits = projectGroup.processing().eventsPerTimeframe();
27+
JiraConfig.EventProcessing processing = projectGroup.processing();
28+
29+
final int permits = processing.eventsPerTimeframe();
2030
this.rateLimiter = new Semaphore(permits);
2131
rateLimiterExecutor.scheduleAtFixedRate(() -> {
2232
rateLimiter.drainPermits();
2333
rateLimiter.release(permits);
24-
}, projectGroup.processing().timeframeInSeconds(), projectGroup.processing().timeframeInSeconds(),
25-
TimeUnit.SECONDS);
34+
}, processing.timeframeInSeconds(), processing.timeframeInSeconds(), TimeUnit.SECONDS);
35+
36+
LinkedBlockingDeque<Runnable> workQueue = new LinkedBlockingDeque<>(processing.queueSize());
37+
workQueueSize = workQueue::size;
38+
eventHandlingExecutor = new ThreadPoolExecutor(processing.threads(), processing.threads(), 0L,
39+
TimeUnit.MILLISECONDS, workQueue);
2640
}
2741

2842
public void startProcessingEvent() throws InterruptedException {
@@ -33,12 +47,30 @@ public JiraConfig.JiraProjectGroup projectGroup() {
3347
return projectGroup;
3448
}
3549

50+
public int pendingEventsInCurrentContext() {
51+
return workQueueSize.get();
52+
}
53+
54+
public void submitTask(Runnable task) {
55+
eventHandlingExecutor.submit(task);
56+
}
57+
3658
@Override
3759
public void close() {
3860
// when requesting to close the context we aren't expecting to process any other
3961
// events hence there's no point in continuing "releasing" more "permits":
4062
if (!rateLimiterExecutor.isShutdown()) {
4163
rateLimiterExecutor.shutdownNow();
4264
}
65+
if (!eventHandlingExecutor.isShutdown()) {
66+
try {
67+
eventHandlingExecutor.shutdown();
68+
if (!eventHandlingExecutor.awaitTermination(2, TimeUnit.MINUTES)) {
69+
Log.warnf("Not all events were processed before the shutdown");
70+
}
71+
} catch (InterruptedException e) {
72+
Thread.currentThread().interrupt();
73+
}
74+
}
4375
}
4476
}

src/main/java/org/hibernate/infra/replicate/jira/service/jira/JiraService.java

Lines changed: 11 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,9 @@
77
import java.util.Objects;
88
import java.util.Optional;
99
import java.util.Set;
10-
import java.util.concurrent.ExecutorService;
11-
import java.util.concurrent.LinkedBlockingDeque;
12-
import java.util.concurrent.ThreadPoolExecutor;
13-
import java.util.concurrent.TimeUnit;
1410
import java.util.concurrent.atomic.AtomicLong;
15-
import java.util.function.Supplier;
1611

1712
import org.hibernate.infra.replicate.jira.JiraConfig;
18-
import org.hibernate.infra.replicate.jira.ProcessingConfig;
1913
import org.hibernate.infra.replicate.jira.service.jira.client.JiraRestClient;
2014
import org.hibernate.infra.replicate.jira.service.jira.client.JiraRestClientBuilder;
2115
import org.hibernate.infra.replicate.jira.service.jira.model.hook.JiraWebHookEvent;
@@ -47,19 +41,12 @@
4741
public class JiraService {
4842

4943
private final ReportingConfig reportingConfig;
50-
private final ExecutorService executor;
51-
private final Supplier<Integer> workQueueSize;
5244
private final Map<String, HandlerProjectContext> contextPerProject;
5345
private final JiraConfig jiraConfig;
5446
private final Scheduler scheduler;
5547

5648
@Inject
57-
public JiraService(ProcessingConfig processingConfig, JiraConfig jiraConfig, ReportingConfig reportingConfig,
58-
Scheduler scheduler) {
59-
LinkedBlockingDeque<Runnable> workQueue = new LinkedBlockingDeque<>(processingConfig.queueSize());
60-
workQueueSize = workQueue::size;
61-
executor = new ThreadPoolExecutor(processingConfig.threads(), processingConfig.threads(), 0L,
62-
TimeUnit.MILLISECONDS, workQueue);
49+
public JiraService(JiraConfig jiraConfig, ReportingConfig reportingConfig, Scheduler scheduler) {
6350

6451
Map<String, HandlerProjectContext> contextMap = new HashMap<>();
6552
for (var entry : jiraConfig.projectGroup().entrySet()) {
@@ -158,7 +145,7 @@ public void registerManagementRoutes(@Observes ManagementInterface mi) {
158145
throw new IllegalArgumentException("Unknown project '%s'".formatted(project));
159146
}
160147

161-
executor.submit(() -> {
148+
context.submitTask(() -> {
162149
for (String issueKey : issueKeys) {
163150
triggerSyncEvent(context.sourceJiraClient().getIssue(issueKey), context);
164151
}
@@ -176,7 +163,7 @@ public void registerManagementRoutes(@Observes ManagementInterface mi) {
176163
throw new IllegalArgumentException("Unknown project '%s'".formatted(project));
177164
}
178165

179-
executor.submit(() -> syncByQuery(query, context));
166+
context.submitTask(() -> syncByQuery(query, context));
180167
rc.end();
181168
});
182169
mi.router().post("/sync/comments/list").consumes(MediaType.APPLICATION_JSON).blockingHandler(rc -> {
@@ -223,7 +210,7 @@ public void acknowledge(String project, JiraWebHookEvent event) {
223210
}
224211

225212
for (Runnable handler : eventType.handlers(reportingConfig, event, context)) {
226-
executor.submit(handler);
213+
context.submitTask(handler);
227214
}
228215
}, () -> Log.infof("Event type %s is not supported and cannot be handled.", event.webhookEvent));
229216
}
@@ -254,20 +241,12 @@ public void syncLastUpdated(String projectGroup) {
254241

255242
@PreDestroy
256243
public void finishProcessingAndShutdown() {
257-
try {
258-
executor.shutdown();
259-
if (!executor.awaitTermination(2, TimeUnit.MINUTES)) {
260-
Log.infof("Not all events were processed before the shutdown");
244+
for (HandlerProjectContext context : contextPerProject.values()) {
245+
try {
246+
context.close();
247+
} catch (Exception e) {
248+
Log.errorf(e, "Error closing context %s: %s", context, e.getMessage());
261249
}
262-
for (HandlerProjectContext context : contextPerProject.values()) {
263-
try {
264-
context.close();
265-
} catch (Exception e) {
266-
Log.errorf(e, "Error closing context %s: %s", context, e.getMessage());
267-
}
268-
}
269-
} catch (InterruptedException e) {
270-
Thread.currentThread().interrupt();
271250
}
272251
}
273252

@@ -285,7 +264,8 @@ private void syncByQuery(String query, HandlerProjectContext context) {
285264

286265
private void triggerSyncEvent(JiraIssue jiraIssue, HandlerProjectContext context) {
287266
Log.infof("Adding sync events for a jira issue: %s; Already queued events: %s", jiraIssue.key,
288-
workQueueSize.get());
267+
context.pendingEventsInCurrentContext());
268+
289269
JiraWebHookIssue issue = new JiraWebHookIssue();
290270
issue.id = jiraIssue.id;
291271
issue.key = jiraIssue.key;

src/main/resources/application.properties

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ jira.project-group."hibernate".destination.login-kind=BEARER_TOKEN
1515
jira.project-group."hibernate".destination.api-user.email=${JIRA_API_USER_REDHAT}
1616
jira.project-group."hibernate".destination.api-user.token=${JIRA_API_TOKEN_REDHAT}
1717
jira.project-group."hibernate".can-set-reporter=true
18+
# Processing queue configuration:
19+
jira.project-group."hibernate".processing.queue-size=${PROCESSING_QUEUE_SIZE:10000}
1820
#
1921
# Management endpoints:
2022
quarkus.management.enabled=true
@@ -29,9 +31,6 @@ quarkus.security.users.embedded.enabled=true
2931
quarkus.security.users.embedded.plain-text=true
3032
quarkus.security.users.embedded.users."management-user"=${MANAGEMENT_USER_PASSWORD}
3133
#
32-
# Processing queue configuration:
33-
processing.events.queue-size=${PROCESSING_QUEUE_SIZE:1000}
34-
#
3534
# Scheduler:
3635
# >> By default, the scheduler is not started unless a @Scheduled business method is found.
3736
# >> You may need to force the start of the scheduler for "pure" programmatic scheduling via quarkus.scheduler.start-mode=forced

0 commit comments

Comments
 (0)