Skip to content

Commit 03c6f25

Browse files
committed
"rate-limit" the number of events we process per timeframe
1 parent b0ff51f commit 03c6f25

File tree

8 files changed

+105
-15
lines changed

8 files changed

+105
-15
lines changed

src/main/java/org/hibernate/infra/sync/jira/JiraConfig.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,15 @@ interface JiraProjectGroup {
3434

3535
Scheduled scheduled();
3636

37+
EventProcessing processing();
38+
}
39+
40+
interface EventProcessing {
41+
@WithDefault("5")
42+
int eventsPerTimeframe();
43+
44+
@WithDefault("1")
45+
int timeframeInSeconds();
3746
}
3847

3948
interface Scheduled {

src/main/java/org/hibernate/infra/sync/jira/ProcessingConfig.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,9 @@
55

66
@ConfigMapping(prefix = "processing.events")
77
public interface ProcessingConfig {
8-
@WithDefault("1000")
8+
@WithDefault("10000")
99
int queueSize();
1010

11-
@WithDefault("20")
11+
@WithDefault("5")
1212
int threads();
1313
}

src/main/java/org/hibernate/infra/sync/jira/service/jira/HandlerProjectContext.java

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515
import io.quarkus.logging.Log;
1616

17-
public final class HandlerProjectContext {
17+
public final class HandlerProjectContext implements AutoCloseable {
1818

1919
// JIRA REST API creates upto 50 issues at a time:
2020
// https://developer.atlassian.com/cloud/jira/platform/rest/v2/api-group-issues/#api-rest-api-2-issue-bulk-post
@@ -26,18 +26,18 @@ public final class HandlerProjectContext {
2626
private final String projectGroupName;
2727
private final JiraRestClient sourceJiraClient;
2828
private final JiraRestClient destinationJiraClient;
29-
private final JiraConfig.JiraProjectGroup projectGroup;
29+
private final HandlerProjectGroupContext projectGroupContext;
3030
private final JiraConfig.JiraProject project;
3131
private final AtomicLong currentIssueKeyNumber;
3232
private final JiraIssueBulk bulk;
3333

3434
public HandlerProjectContext(String projectName, String projectGroupName, JiraRestClient sourceJiraClient,
35-
JiraRestClient destinationJiraClient, JiraConfig.JiraProjectGroup projectGroup) {
35+
JiraRestClient destinationJiraClient, HandlerProjectGroupContext projectGroupContext) {
3636
this.projectName = projectName;
3737
this.projectGroupName = projectGroupName;
3838
this.sourceJiraClient = sourceJiraClient;
3939
this.destinationJiraClient = destinationJiraClient;
40-
this.projectGroup = projectGroup;
40+
this.projectGroupContext = projectGroupContext;
4141
this.project = projectGroup().projects().get(projectName());
4242
this.currentIssueKeyNumber = new AtomicLong(getCurrentLatestJiraIssueKeyNumber());
4343
this.bulk = new JiraIssueBulk(createIssuePlaceholder(), ISSUES_PER_REQUEST);
@@ -64,7 +64,7 @@ public JiraRestClient destinationJiraClient() {
6464
}
6565

6666
public JiraConfig.JiraProjectGroup projectGroup() {
67-
return projectGroup;
67+
return projectGroupContext.projectGroup();
6868
}
6969

7070
public AtomicLong currentIssueKeyNumber() {
@@ -129,6 +129,10 @@ public void createNextPlaceholderBatch(Long upToKeyNumber) {
129129
}
130130
}
131131

132+
public void startProcessingEvent() throws InterruptedException {
133+
projectGroupContext.startProcessingEvent();
134+
}
135+
132136
private boolean requiredIssueKeyNumberShouldBeAvailable(Long key) {
133137
return currentIssueKeyNumber.get() >= key;
134138
}
@@ -156,8 +160,12 @@ private JiraIssue createIssuePlaceholder() {
156160
public String toString() {
157161
return "HandlerProjectContext[" + "projectName=" + projectName + ", " + "projectGroupName=" + projectGroupName
158162
+ ", " + "sourceJiraClient=" + sourceJiraClient + ", " + "destinationJiraClient="
159-
+ destinationJiraClient + ", " + "projectGroup=" + projectGroup + ", " + "currentIssueKeyNumber="
160-
+ currentIssueKeyNumber + ']';
163+
+ destinationJiraClient + ", " + "projectGroup=" + projectGroupContext.projectGroup() + ", "
164+
+ "currentIssueKeyNumber=" + currentIssueKeyNumber + ']';
161165
}
162166

167+
@Override
168+
public void close() {
169+
projectGroupContext.close();
170+
}
163171
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package org.hibernate.infra.sync.jira.service.jira;
2+
3+
import java.util.concurrent.Executors;
4+
import java.util.concurrent.ScheduledExecutorService;
5+
import java.util.concurrent.Semaphore;
6+
import java.util.concurrent.TimeUnit;
7+
8+
import org.hibernate.infra.sync.jira.JiraConfig;
9+
10+
public final class HandlerProjectGroupContext implements AutoCloseable {
11+
12+
private final ScheduledExecutorService rateLimiterExecutor = Executors.newScheduledThreadPool(1);
13+
private final Semaphore rateLimiter;
14+
private final JiraConfig.JiraProjectGroup projectGroup;
15+
16+
public HandlerProjectGroupContext(JiraConfig.JiraProjectGroup projectGroup) {
17+
this.projectGroup = projectGroup;
18+
19+
this.rateLimiter = new Semaphore(projectGroup.processing().eventsPerTimeframe());
20+
rateLimiterExecutor.scheduleAtFixedRate(() -> {
21+
rateLimiter.drainPermits();
22+
rateLimiter.release(5);
23+
}, projectGroup.processing().timeframeInSeconds(), projectGroup.processing().timeframeInSeconds(),
24+
TimeUnit.SECONDS);
25+
}
26+
27+
public void startProcessingEvent() throws InterruptedException {
28+
rateLimiter.acquire(1);
29+
}
30+
31+
public JiraConfig.JiraProjectGroup projectGroup() {
32+
return projectGroup;
33+
}
34+
35+
@Override
36+
public void close() {
37+
// when requesting to close the context we aren't expecting to process any other
38+
// events hence there's no point in continuing "releasing" more "permits":
39+
if (!rateLimiterExecutor.isShutdown()) {
40+
rateLimiterExecutor.shutdownNow();
41+
}
42+
}
43+
}

src/main/java/org/hibernate/infra/sync/jira/service/jira/JiraService.java

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import io.quarkus.scheduler.Scheduler;
3535
import io.quarkus.vertx.http.ManagementInterface;
3636
import io.vertx.core.json.JsonObject;
37+
import jakarta.annotation.PreDestroy;
3738
import jakarta.enterprise.context.ApplicationScoped;
3839
import jakarta.enterprise.event.Observes;
3940
import jakarta.inject.Inject;
@@ -59,10 +60,10 @@ public JiraService(ProcessingConfig processingConfig, JiraConfig jiraConfig, Rep
5960
for (var entry : jiraConfig.projectGroup().entrySet()) {
6061
JiraRestClient source = JiraRestClientBuilder.of(entry.getValue().source());
6162
JiraRestClient destination = JiraRestClientBuilder.of(entry.getValue().destination());
62-
63+
HandlerProjectGroupContext groupContext = new HandlerProjectGroupContext(entry.getValue());
6364
for (var project : entry.getValue().projects().entrySet()) {
64-
contextMap.put(project.getKey(), new HandlerProjectContext(project.getKey(), entry.getKey(), source,
65-
destination, entry.getValue()));
65+
contextMap.put(project.getKey(),
66+
new HandlerProjectContext(project.getKey(), entry.getKey(), source, destination, groupContext));
6667
}
6768
}
6869

@@ -217,6 +218,25 @@ public void syncLastUpdated(String projectGroup) {
217218
}
218219
}
219220

221+
@PreDestroy
222+
public void finishProcessingAndShutdown() {
223+
try {
224+
executor.shutdown();
225+
if (!executor.awaitTermination(2, TimeUnit.MINUTES)) {
226+
Log.infof("Not all events were processed before the shutdown");
227+
}
228+
for (HandlerProjectContext context : contextPerProject.values()) {
229+
try {
230+
context.close();
231+
} catch (Exception e) {
232+
Log.errorf(e, "Error closing context %s: %s", context, e.getMessage());
233+
}
234+
}
235+
} catch (InterruptedException e) {
236+
Thread.currentThread().interrupt();
237+
}
238+
}
239+
220240
private void syncByQuery(String query, HandlerProjectContext context) {
221241
JiraIssues issues = null;
222242
int start = 0;

src/main/java/org/hibernate/infra/sync/jira/service/jira/client/JiraRestClientBuilder.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,7 @@ public static JiraRestClient of(JiraConfig.Instance jira) {
2424
Map<String, String> headers = jira.loginKind().headers(jiraUser.email(), jiraUser.token());
2525

2626
QuarkusRestClientBuilder builder = QuarkusRestClientBuilder.newBuilder().baseUri(jira.apiUri())
27-
// specifying a timeout of 0 represents infinity
28-
.connectTimeout(0, TimeUnit.HOURS).readTimeout(0, TimeUnit.HOURS)
27+
.connectTimeout(5, TimeUnit.MINUTES).readTimeout(10, TimeUnit.MINUTES)
2928
.clientHeadersFactory((incomingHeaders, clientOutgoingHeaders) -> {
3029
for (var entry : headers.entrySet()) {
3130
clientOutgoingHeaders.add(entry.getKey(), entry.getValue());

src/main/java/org/hibernate/infra/sync/jira/service/jira/handler/JiraEventHandler.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,9 +148,13 @@ private static Map<String, String> createMapping(List<JiraSimpleObject> source,
148148
@Override
149149
public final void run() {
150150
try {
151+
context.startProcessingEvent();
151152
doRun();
152153
} catch (RuntimeException e) {
153154
failureCollector.critical("Failed to handled the event", e);
155+
} catch (InterruptedException e) {
156+
failureCollector.critical("Interrupted while waiting in the queue", e);
157+
Thread.currentThread().interrupt();
154158
} finally {
155159
failureCollector.close();
156160
}

src/test/java/org/hibernate/infra/sync/jira/handler/IssueTest.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import org.hibernate.infra.sync.jira.JiraConfig;
88
import org.hibernate.infra.sync.jira.mock.SampleJiraRestClient;
99
import org.hibernate.infra.sync.jira.service.jira.HandlerProjectContext;
10+
import org.hibernate.infra.sync.jira.service.jira.HandlerProjectGroupContext;
1011
import org.hibernate.infra.sync.jira.service.jira.handler.JiraCommentDeleteEventHandler;
1112
import org.hibernate.infra.sync.jira.service.jira.handler.JiraCommentUpsertEventHandler;
1213
import org.hibernate.infra.sync.jira.service.jira.handler.JiraIssueDeleteEventHandler;
@@ -15,6 +16,7 @@
1516
import org.hibernate.infra.sync.jira.service.jira.handler.JiraIssueUpsertEventHandler;
1617
import org.hibernate.infra.sync.jira.service.reporting.ReportingConfig;
1718

19+
import org.junit.jupiter.api.AfterEach;
1820
import org.junit.jupiter.api.BeforeEach;
1921
import org.junit.jupiter.api.Test;
2022

@@ -44,7 +46,12 @@ class IssueTest {
4446
@BeforeEach
4547
void setUp() {
4648
context = new HandlerProjectContext("JIRATEST1", PROJECT_GROUP_NAME, source, destination,
47-
jiraConfig.projectGroup().get(PROJECT_GROUP_NAME));
49+
new HandlerProjectGroupContext(jiraConfig.projectGroup().get(PROJECT_GROUP_NAME)));
50+
}
51+
52+
@AfterEach
53+
void tearDown() {
54+
context.close();
4855
}
4956

5057
@Test

0 commit comments

Comments
 (0)