Skip to content

Commit 224ca18

Browse files
committed
Add handling of downstream automation events
1 parent 9a1ece3 commit 224ca18

File tree

15 files changed

+388
-38
lines changed

15 files changed

+388
-38
lines changed

src/main/java/org/hibernate/infra/replicate/jira/JiraConfig.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,11 @@ interface JiraProject {
192192
* Allows enabling signature verification.
193193
*/
194194
WebHookSecurity security();
195+
196+
/**
197+
* Allows enabling signature verification of downstream events.
198+
*/
199+
WebHookSecurity downstreamSecurity();
195200
}
196201

197202
interface WebHookSecurity {
@@ -205,11 +210,20 @@ interface WebHookSecurity {
205210
@WithDefault("false")
206211
boolean enabled();
207212

213+
@WithDefault("SIGNATURE")
214+
Type type();
215+
208216
/**
209-
* The secret used to sing the web hook request body.
217+
* Verification secret, e.g. the secret used to sing the web hook request body.
218+
* Can also be just some token that we will compare. Depends on the security
219+
* type.
210220
*/
211221
@WithDefault("not-a-secret")
212222
String secret();
223+
224+
enum Type {
225+
SIGNATURE, TOKEN
226+
}
213227
}
214228

215229
interface Instance {
@@ -374,6 +388,9 @@ interface UserValueMapping extends ValueMapping {
374388
*/
375389
@WithDefault("not-a-user")
376390
Set<String> ignoredUpstreamUsers();
391+
392+
@WithDefault("not-a-user")
393+
Set<String> ignoredDownstreamUsers();
377394
}
378395

379396
/**

src/main/java/org/hibernate/infra/replicate/jira/resource/JiraWebHookListenerResource.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package org.hibernate.infra.replicate.jira.resource;
22

33
import org.hibernate.infra.replicate.jira.service.jira.JiraService;
4+
import org.hibernate.infra.replicate.jira.service.jira.model.action.JiraActionEvent;
45
import org.hibernate.infra.replicate.jira.service.jira.model.hook.JiraWebHookEvent;
56
import org.hibernate.infra.replicate.jira.service.validation.ConfiguredProject;
67

@@ -26,8 +27,18 @@ public class JiraWebHookListenerResource {
2627
@Consumes(MediaType.APPLICATION_JSON)
2728
public String somethingHappenedUpstream(@RestPath @NotNull @ConfiguredProject String project,
2829
@QueryParam("triggeredByUser") String triggeredByUser, JiraWebHookEvent event) {
29-
Log.infof("Received a notification about %s project: %.200s...", project, event);
30+
Log.tracef("Received a notification about %s project: %.200s...", project, event);
3031
jiraService.acknowledge(project, event, triggeredByUser);
3132
return "ack";
3233
}
34+
35+
@POST
36+
@Path("/mirror/{project}")
37+
@Consumes(MediaType.APPLICATION_JSON)
38+
public String somethingHappenedDownstream(@RestPath @NotNull @ConfiguredProject(upstream = false) String project,
39+
JiraActionEvent data) {
40+
Log.tracef("Received a downstream notification about %s project: %s...", project, data);
41+
jiraService.downstreamAcknowledge(project, data);
42+
return "ack";
43+
}
3344
}

src/main/java/org/hibernate/infra/replicate/jira/service/jira/HandlerProjectContext.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,10 @@ public void startProcessingEvent() throws InterruptedException {
181181
projectGroupContext.startProcessingEvent();
182182
}
183183

184+
public void startProcessingDownstreamEvent() throws InterruptedException {
185+
projectGroupContext.startProcessingDownstreamEvent();
186+
}
187+
184188
public JiraUser notMappedAssignee() {
185189
return notMappedAssignee;
186190
}
@@ -229,6 +233,14 @@ public void submitTask(Runnable runnable) {
229233
projectGroupContext.submitTask(runnable);
230234
}
231235

236+
public int pendingDownstreamEventsInCurrentContext() {
237+
return projectGroupContext.pendingDownstreamEventsInCurrentContext();
238+
}
239+
240+
public void submitDownstreamTask(Runnable runnable) {
241+
projectGroupContext.submitDownstreamTask(runnable);
242+
}
243+
232244
public Optional<HandlerProjectContext> contextForProjectInSameGroup(String project) {
233245
if (!projectGroup().projects().containsKey(project)) {
234246
// different project group, don't bother
@@ -351,4 +363,12 @@ private static boolean versionNeedsUpdate(JiraVersion upstreamVersion, JiraVersi
351363
public boolean isUserIgnored(String triggeredByUser) {
352364
return projectGroupContext.projectGroup().users().ignoredUpstreamUsers().contains(triggeredByUser);
353365
}
366+
367+
public boolean isDownstreamUserIgnored(String triggeredByUser) {
368+
return projectGroupContext.projectGroup().users().ignoredDownstreamUsers().contains(triggeredByUser);
369+
}
370+
371+
public String upstreamUser(String mappedValue) {
372+
return projectGroupContext.upstreamUser(mappedValue);
373+
}
354374
}

src/main/java/org/hibernate/infra/replicate/jira/service/jira/HandlerProjectGroupContext.java

Lines changed: 46 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
package org.hibernate.infra.replicate.jira.service.jira;
22

3+
import java.util.Collections;
4+
import java.util.HashMap;
5+
import java.util.Map;
36
import java.util.concurrent.ExecutorService;
47
import java.util.concurrent.Executors;
58
import java.util.concurrent.LinkedBlockingDeque;
@@ -17,9 +20,14 @@ public final class HandlerProjectGroupContext implements AutoCloseable {
1720

1821
private final ExecutorService eventHandlingExecutor;
1922
private final Supplier<Integer> workQueueSize;
23+
24+
private final ExecutorService downstreamEventHandlingExecutor;
25+
private final Supplier<Integer> downstreamWorkQueueSize;
2026
private final ScheduledExecutorService rateLimiterExecutor = Executors.newScheduledThreadPool(1);
2127
private final Semaphore rateLimiter;
28+
private final Semaphore downstreamRateLimiter;
2229
private final JiraConfig.JiraProjectGroup projectGroup;
30+
private final Map<String, String> invertedUsers;
2331

2432
public HandlerProjectGroupContext(JiraConfig.JiraProjectGroup projectGroup) {
2533
this.projectGroup = projectGroup;
@@ -28,21 +36,39 @@ public HandlerProjectGroupContext(JiraConfig.JiraProjectGroup projectGroup) {
2836

2937
final int permits = processing.eventsPerTimeframe();
3038
this.rateLimiter = new Semaphore(permits);
39+
this.downstreamRateLimiter = new Semaphore(permits);
3140
rateLimiterExecutor.scheduleAtFixedRate(() -> {
3241
rateLimiter.drainPermits();
3342
rateLimiter.release(permits);
43+
downstreamRateLimiter.drainPermits();
44+
downstreamRateLimiter.release(permits);
3445
}, processing.timeframeInSeconds(), processing.timeframeInSeconds(), TimeUnit.SECONDS);
3546

3647
LinkedBlockingDeque<Runnable> workQueue = new LinkedBlockingDeque<>(processing.queueSize());
3748
workQueueSize = workQueue::size;
3849
eventHandlingExecutor = new ThreadPoolExecutor(processing.threads(), processing.threads(), 0L,
3950
TimeUnit.MILLISECONDS, workQueue);
51+
52+
LinkedBlockingDeque<Runnable> downstreamWorkQueue = new LinkedBlockingDeque<>(processing.queueSize());
53+
downstreamWorkQueueSize = downstreamWorkQueue::size;
54+
downstreamEventHandlingExecutor = new ThreadPoolExecutor(processing.threads(), processing.threads(), 0L,
55+
TimeUnit.MILLISECONDS, downstreamWorkQueue);
56+
57+
Map<String, String> invertedUsers = new HashMap<>();
58+
for (var entry : projectGroup.users().mapping().entrySet()) {
59+
invertedUsers.put(entry.getValue(), entry.getKey());
60+
}
61+
this.invertedUsers = Collections.unmodifiableMap(invertedUsers);
4062
}
4163

4264
public void startProcessingEvent() throws InterruptedException {
4365
rateLimiter.acquire(1);
4466
}
4567

68+
public void startProcessingDownstreamEvent() throws InterruptedException {
69+
downstreamRateLimiter.acquire(1);
70+
}
71+
4672
public JiraConfig.JiraProjectGroup projectGroup() {
4773
return projectGroup;
4874
}
@@ -51,26 +77,43 @@ public int pendingEventsInCurrentContext() {
5177
return workQueueSize.get();
5278
}
5379

80+
public int pendingDownstreamEventsInCurrentContext() {
81+
return downstreamWorkQueueSize.get();
82+
}
83+
5484
public void submitTask(Runnable task) {
5585
eventHandlingExecutor.submit(task);
5686
}
5787

88+
public void submitDownstreamTask(Runnable task) {
89+
downstreamEventHandlingExecutor.submit(task);
90+
}
91+
5892
@Override
5993
public void close() {
6094
// when requesting to close the context we aren't expecting to process any other
6195
// events hence there's no point in continuing "releasing" more "permits":
6296
if (!rateLimiterExecutor.isShutdown()) {
6397
rateLimiterExecutor.shutdownNow();
6498
}
65-
if (!eventHandlingExecutor.isShutdown()) {
99+
closeEventExecutor(eventHandlingExecutor);
100+
closeEventExecutor(downstreamEventHandlingExecutor);
101+
}
102+
103+
private static void closeEventExecutor(ExecutorService executor) {
104+
if (!executor.isShutdown()) {
66105
try {
67-
eventHandlingExecutor.shutdown();
68-
if (!eventHandlingExecutor.awaitTermination(2, TimeUnit.MINUTES)) {
106+
executor.shutdown();
107+
if (!executor.awaitTermination(2, TimeUnit.MINUTES)) {
69108
Log.warnf("Not all events were processed before the shutdown");
70109
}
71110
} catch (InterruptedException e) {
72111
Thread.currentThread().interrupt();
73112
}
74113
}
75114
}
115+
116+
public String upstreamUser(String mappedValue) {
117+
return invertedUsers.get(mappedValue);
118+
}
76119
}

src/main/java/org/hibernate/infra/replicate/jira/service/jira/JiraService.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.hibernate.infra.replicate.jira.service.jira.handler.JiraIssueDeleteEventHandler;
1919
import org.hibernate.infra.replicate.jira.service.jira.handler.JiraIssueSimpleUpsertEventHandler;
2020
import org.hibernate.infra.replicate.jira.service.jira.handler.JiraIssueTransitionOnlyEventHandler;
21+
import org.hibernate.infra.replicate.jira.service.jira.model.action.JiraActionEvent;
2122
import org.hibernate.infra.replicate.jira.service.jira.model.hook.JiraWebHookEvent;
2223
import org.hibernate.infra.replicate.jira.service.jira.model.hook.JiraWebHookIssue;
2324
import org.hibernate.infra.replicate.jira.service.jira.model.hook.JiraWebHookIssueLink;
@@ -322,6 +323,29 @@ public void acknowledge(String project, JiraWebHookEvent event, String triggered
322323
}, () -> Log.infof("Event type %s is not supported and cannot be handled.", event.webhookEvent));
323324
}
324325

326+
public void downstreamAcknowledge(String project, JiraActionEvent event) {
327+
event.eventType().ifPresentOrElse(eventType -> {
328+
var context = contextPerProject.get(project);
329+
if (context == null) {
330+
FailureCollector failureCollector = FailureCollector.collector(reportingConfig);
331+
failureCollector.critical("Unable to determine handler context for project %s. Was it not configured ?"
332+
.formatted(project));
333+
failureCollector.close();
334+
throw new ConstraintViolationException("Project " + project + " is not configured.", Set.of());
335+
}
336+
337+
if (context.isDownstreamUserIgnored(event.triggeredByUser)) {
338+
Log.infof("Event was triggered by %s user that is in the ignore list.", event.triggeredByUser);
339+
return;
340+
}
341+
342+
for (Runnable handler : eventType.handlers(reportingConfig, event, context)) {
343+
context.submitDownstreamTask(handler);
344+
}
345+
}, () -> Log.infof("Event type %s is not supported and cannot be handled.", event.event));
346+
347+
}
348+
325349
public void syncLastUpdated(String projectGroup) {
326350
try (FailureCollector failureCollector = FailureCollector.collector(reportingConfig)) {
327351
Log.infof("Starting scheduled sync of issues for the project group %s", projectGroup);

src/main/java/org/hibernate/infra/replicate/jira/service/jira/handler/JiraEventHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,7 @@ public final void run() {
186186
context.startProcessingEvent();
187187
doRun();
188188
} catch (RuntimeException e) {
189-
failureCollector.critical("Failed to handled the event: %s".formatted(this), e);
189+
failureCollector.critical("Failed to handle the event: %s".formatted(this), e);
190190
} catch (InterruptedException e) {
191191
failureCollector.critical("Interrupted while waiting in the queue", e);
192192
Thread.currentThread().interrupt();
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package org.hibernate.infra.replicate.jira.service.jira.handler.action;
2+
3+
import org.hibernate.infra.replicate.jira.service.jira.HandlerProjectContext;
4+
import org.hibernate.infra.replicate.jira.service.jira.model.action.JiraActionEvent;
5+
import org.hibernate.infra.replicate.jira.service.jira.model.rest.JiraIssue;
6+
import org.hibernate.infra.replicate.jira.service.reporting.FailureCollector;
7+
import org.hibernate.infra.replicate.jira.service.reporting.ReportingConfig;
8+
9+
import io.quarkus.logging.Log;
10+
11+
public abstract class JiraActionEventHandler implements Runnable {
12+
13+
protected final JiraActionEvent event;
14+
protected final FailureCollector failureCollector;
15+
protected final HandlerProjectContext context;
16+
17+
protected JiraActionEventHandler(ReportingConfig reportingConfig, HandlerProjectContext context,
18+
JiraActionEvent event) {
19+
this.event = event;
20+
this.failureCollector = FailureCollector.collector(reportingConfig);
21+
this.context = context;
22+
}
23+
24+
@Override
25+
public final void run() {
26+
try {
27+
context.startProcessingDownstreamEvent();
28+
doRun();
29+
} catch (RuntimeException e) {
30+
failureCollector.critical("Failed to handle the event: %s".formatted(this), e);
31+
} catch (InterruptedException e) {
32+
failureCollector.critical("Interrupted while waiting in the queue", e);
33+
Thread.currentThread().interrupt();
34+
} finally {
35+
failureCollector.close();
36+
Log.infof("Finished processing %s. Pending events in %s to process: %s", this.toString(),
37+
context.projectGroupName(), context.pendingDownstreamEventsInCurrentContext());
38+
}
39+
}
40+
41+
protected String toSourceKey(String key) {
42+
return "%s-%d".formatted(context.project().originalProjectKey(), JiraIssue.keyToLong(key));
43+
}
44+
45+
protected abstract void doRun();
46+
47+
public abstract String toString();
48+
49+
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package org.hibernate.infra.replicate.jira.service.jira.handler.action;
2+
3+
import org.hibernate.infra.replicate.jira.service.jira.HandlerProjectContext;
4+
import org.hibernate.infra.replicate.jira.service.jira.model.action.JiraActionEvent;
5+
import org.hibernate.infra.replicate.jira.service.jira.model.rest.JiraFields;
6+
import org.hibernate.infra.replicate.jira.service.jira.model.rest.JiraIssue;
7+
import org.hibernate.infra.replicate.jira.service.jira.model.rest.JiraUser;
8+
import org.hibernate.infra.replicate.jira.service.reporting.ReportingConfig;
9+
10+
public class JiraAssigneeActionEventHandler extends JiraActionEventHandler {
11+
12+
public JiraAssigneeActionEventHandler(ReportingConfig reportingConfig, HandlerProjectContext context,
13+
JiraActionEvent event) {
14+
super(reportingConfig, context, event);
15+
}
16+
17+
@Override
18+
protected void doRun() {
19+
JiraIssue issue = context.destinationJiraClient().getIssue(event.key);
20+
21+
JiraIssue updated = new JiraIssue();
22+
updated.fields = JiraFields.empty();
23+
if (issue.fields.assignee != null) {
24+
String accountId = context.upstreamUser(
25+
issue.fields.assignee.mappedIdentifier(context.projectGroup().users().mappedPropertyName()));
26+
27+
if (accountId != null) {
28+
updated.fields.assignee = new JiraUser(accountId);
29+
30+
}
31+
} else {
32+
updated.fields.assignee = new JiraUser("-1");
33+
}
34+
context.sourceJiraClient().update(toSourceKey(event.key), updated);
35+
}
36+
37+
@Override
38+
public String toString() {
39+
return "JiraAssigneeActionEventHandler[" + "event=" + event + ", project=" + context.projectName() + ']';
40+
}
41+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package org.hibernate.infra.replicate.jira.service.jira.model.action;
2+
3+
import java.util.Optional;
4+
5+
import org.hibernate.infra.replicate.jira.service.jira.model.JiraBaseObject;
6+
import org.hibernate.infra.replicate.jira.service.jira.model.hook.JiraActionEventType;
7+
8+
public class JiraActionEvent extends JiraBaseObject {
9+
public String id;
10+
public String key;
11+
public String event;
12+
public String assignee;
13+
public String status;
14+
15+
public String triggeredByUser;
16+
17+
public Optional<JiraActionEventType> eventType() {
18+
return JiraActionEventType.of(event);
19+
}
20+
21+
@Override
22+
public String toString() {
23+
return "JiraActionEvent{" + "id='" + id + '\'' + ", key='" + key + '\'' + ", event='" + event + '\''
24+
+ ", assignee='" + assignee + '\'' + ", status='" + status + '\'' + '}';
25+
}
26+
}

0 commit comments

Comments
 (0)