Skip to content

Commit 79ad476

Browse files
clnollpedroslopez
andcommitted
chore: use dataplane group names in MetricsReporter (#16500)
Co-authored-by: Pedro S. Lopez <[email protected]>
1 parent 937cae7 commit 79ad476

File tree

4 files changed

+50
-39
lines changed

4 files changed

+50
-39
lines changed

airbyte-metrics/reporter/src/main/java/io/airbyte/metrics/reporter/Emitter.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,10 @@ final class NumPendingJobs extends Emitter {
2323

2424
public NumPendingJobs(final MetricClient client, final MetricRepository db) {
2525
super(client, () -> {
26-
db.numberOfPendingJobsByGeography().forEach((geography, count) -> client.gauge(
26+
db.numberOfPendingJobsByDataplaneGroupName().forEach((dataplaneGroupName, count) -> client.gauge(
2727
OssMetricsRegistry.NUM_PENDING_JOBS,
2828
count,
29-
new MetricAttribute(MetricTags.GEOGRAPHY, geography != null ? geography : Emitter.UNKNOWN)));
29+
new MetricAttribute(MetricTags.GEOGRAPHY, dataplaneGroupName != null ? dataplaneGroupName : Emitter.UNKNOWN)));
3030

3131
return null;
3232
});
@@ -86,10 +86,10 @@ final class OldestPendingJob extends Emitter {
8686

8787
OldestPendingJob(final MetricClient client, final MetricRepository db) {
8888
super(client, () -> {
89-
db.oldestPendingJobAgeSecsByGeography().forEach((geographyType, count) -> client.gauge(
89+
db.oldestPendingJobAgeSecsByDataplaneGroupName().forEach((dataplaneGroupName, count) -> client.gauge(
9090
OssMetricsRegistry.OLDEST_PENDING_JOB_AGE_SECS,
9191
count,
92-
new MetricAttribute(MetricTags.GEOGRAPHY, geographyType != null ? geographyType : Emitter.UNKNOWN)));
92+
new MetricAttribute(MetricTags.GEOGRAPHY, dataplaneGroupName != null ? dataplaneGroupName : Emitter.UNKNOWN)));
9393
return null;
9494
});
9595
}

airbyte-metrics/reporter/src/main/java/io/airbyte/metrics/reporter/MetricRepository.java

Lines changed: 25 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.EnumMap;
2222
import java.util.List;
2323
import java.util.Map;
24+
import java.util.UUID;
2425
import org.jooq.DSLContext;
2526
import org.jooq.Field;
2627
import org.jooq.Record;
@@ -37,29 +38,37 @@ class MetricRepository {
3738
// Another option we didn't use here is to build this into SQL query - it will lead SQL much less
3839
// readable while not decreasing any complexity.
3940
private static final List<String> REGISTERED_ATTEMPT_QUEUE = List.of("SYNC", "AWS_PARIS_SYNC", "null");
40-
private static final List<String> REGISTERED_GEOGRAPHY = List.of("US", "AUTO", "EU");
41+
private static final UUID DEFAULT_ORGANIZATION_ID = UUID.fromString("00000000-0000-0000-0000-000000000000");
4142

4243
MetricRepository(final DSLContext ctx) {
4344
this.ctx = ctx;
4445
}
4546

46-
Map<String, Integer> numberOfPendingJobsByGeography() {
47-
final String geographyResultAlias = "geography";
47+
List<String> getDataplaneGroupNames() {
48+
return ctx.select(DATAPLANE_GROUP.NAME)
49+
.from(DATAPLANE_GROUP)
50+
.where(DATAPLANE_GROUP.TOMBSTONE.eq(false))
51+
.and(DATAPLANE_GROUP.ORGANIZATION_ID.eq(DEFAULT_ORGANIZATION_ID))
52+
.fetchInto(String.class);
53+
}
54+
55+
Map<String, Integer> numberOfPendingJobsByDataplaneGroupName() {
56+
final String dataplaneGroupNameResultAlias = "data_plane_group_name";
4857
final String countResultAlias = "result";
49-
final var result = ctx.select(DATAPLANE_GROUP.NAME.cast(String.class).as(geographyResultAlias), count(asterisk()).as(countResultAlias))
58+
final var result = ctx.select(DATAPLANE_GROUP.NAME.cast(String.class).as(dataplaneGroupNameResultAlias), count(asterisk()).as(countResultAlias))
5059
.from(JOBS)
5160
.join(CONNECTION)
5261
.on(CONNECTION.ID.cast(VARCHAR(255)).eq(JOBS.SCOPE))
5362
.join(DATAPLANE_GROUP)
5463
.on(CONNECTION.DATAPLANE_GROUP_ID.eq(DATAPLANE_GROUP.ID))
5564
.where(JOBS.STATUS.eq(JobStatus.pending))
5665
.groupBy(DATAPLANE_GROUP.NAME);
57-
final Field<String> geographyResultField = DSL.field(name(geographyResultAlias), String.class);
66+
final Field<String> dataplaneGroupNameResultField = DSL.field(name(dataplaneGroupNameResultAlias), String.class);
5867
final Field<Integer> countResultField = DSL.field(name(countResultAlias), Integer.class);
59-
final Map<String, Integer> queriedMap = result.fetchMap(geographyResultField, countResultField);
60-
for (final String potentialGeography : REGISTERED_GEOGRAPHY) {
61-
if (!queriedMap.containsKey(potentialGeography)) {
62-
queriedMap.put(potentialGeography, 0);
68+
final Map<String, Integer> queriedMap = result.fetchMap(dataplaneGroupNameResultField, countResultField);
69+
for (final String potentialDataplaneGroup : getDataplaneGroupNames()) {
70+
if (!queriedMap.containsKey(potentialDataplaneGroup)) {
71+
queriedMap.put(potentialDataplaneGroup, 0);
6372
}
6473
}
6574
return queriedMap;
@@ -98,11 +107,11 @@ int numberOfOrphanRunningJobs() {
98107
.fetchOne(0, int.class);
99108
}
100109

101-
Map<String, Double> oldestPendingJobAgeSecsByGeography() {
110+
Map<String, Double> oldestPendingJobAgeSecsByDataplaneGroupName() {
102111
final var query =
103112
"""
104113
SELECT
105-
cast(dataplane_group.name as varchar) AS geography,
114+
cast(dataplane_group.name as varchar) AS dataplane_group_name,
106115
MAX(EXTRACT(EPOCH FROM (current_timestamp - jobs.created_at)))::float AS run_duration_seconds
107116
FROM jobs
108117
JOIN connection
@@ -113,12 +122,12 @@ Map<String, Double> oldestPendingJobAgeSecsByGeography() {
113122
GROUP BY dataplane_group.name;
114123
""";
115124
final var result = ctx.fetch(query);
116-
final Field<String> geographyResultField = DSL.field(name("geography"), String.class);
125+
final Field<String> dataplaneGroupNameResultField = DSL.field(name("dataplane_group_name"), String.class);
117126
final Field<Double> runDurationSecondsField = DSL.field(name("run_duration_seconds"), Double.class);
118-
final Map<String, Double> queriedMap = result.intoMap(geographyResultField, runDurationSecondsField);
119-
for (final String potentialGeography : REGISTERED_GEOGRAPHY) {
120-
if (!queriedMap.containsKey(potentialGeography)) {
121-
queriedMap.put(potentialGeography, 0.0);
127+
final Map<String, Double> queriedMap = result.intoMap(dataplaneGroupNameResultField, runDurationSecondsField);
128+
for (final String potentialDataplaneGroup : getDataplaneGroupNames()) {
129+
if (!queriedMap.containsKey(potentialDataplaneGroup)) {
130+
queriedMap.put(potentialDataplaneGroup, 0.0);
122131
}
123132
}
124133
return queriedMap;

airbyte-metrics/reporter/src/test/java/io/airbyte/metrics/reporter/EmitterTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,13 +44,13 @@ void setUp() {
4444
@Test
4545
void TestNumPendingJobs() {
4646
final var value = Map.of(AUTO_REGION, 101, EU_REGION, 20);
47-
when(repo.numberOfPendingJobsByGeography()).thenReturn(value);
47+
when(repo.numberOfPendingJobsByDataplaneGroupName()).thenReturn(value);
4848

4949
final var emitter = new NumPendingJobs(client, repo);
5050
emitter.emit();
5151

5252
assertEquals(Duration.ofSeconds(15), emitter.getDuration());
53-
verify(repo).numberOfPendingJobsByGeography();
53+
verify(repo).numberOfPendingJobsByDataplaneGroupName();
5454
verify(client).gauge(OssMetricsRegistry.NUM_PENDING_JOBS, 101,
5555
new MetricAttribute(MetricTags.GEOGRAPHY, AUTO_REGION));
5656
verify(client).gauge(OssMetricsRegistry.NUM_PENDING_JOBS, 20,
@@ -109,13 +109,13 @@ void TestOldestRunningJob() {
109109
@Test
110110
void TestOldestPendingJob() {
111111
final var value = Map.of(AUTO_REGION, 101.0, EU_REGION, 20.0);
112-
when(repo.oldestPendingJobAgeSecsByGeography()).thenReturn(value);
112+
when(repo.oldestPendingJobAgeSecsByDataplaneGroupName()).thenReturn(value);
113113

114114
final var emitter = new OldestPendingJob(client, repo);
115115
emitter.emit();
116116

117117
assertEquals(Duration.ofSeconds(15), emitter.getDuration());
118-
verify(repo).oldestPendingJobAgeSecsByGeography();
118+
verify(repo).oldestPendingJobAgeSecsByDataplaneGroupName();
119119
verify(client).gauge(OssMetricsRegistry.OLDEST_PENDING_JOB_AGE_SECS, 101,
120120
new MetricAttribute(MetricTags.GEOGRAPHY, AUTO_REGION));
121121
verify(client).gauge(OssMetricsRegistry.OLDEST_PENDING_JOB_AGE_SECS, 20,

airbyte-metrics/reporter/src/test/java/io/airbyte/metrics/reporter/MetricRepositoryTest.java

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
import static org.junit.jupiter.api.Assertions.assertTrue;
1616

1717
import io.airbyte.db.instance.configs.jooq.generated.enums.ActorType;
18-
import io.airbyte.db.instance.configs.jooq.generated.enums.GeographyType;
1918
import io.airbyte.db.instance.configs.jooq.generated.enums.NamespaceDefinitionType;
2019
import io.airbyte.db.instance.configs.jooq.generated.enums.StatusType;
2120
import io.airbyte.db.instance.jobs.jooq.generated.enums.AttemptStatus;
@@ -77,7 +76,7 @@ void shouldReturnReleaseStages() {
7776
final var inactiveConnectionId = UUID.randomUUID();
7877
final var euDataplaneGroupId = UUID.randomUUID();
7978
ctx.insertInto(DATAPLANE_GROUP, DATAPLANE_GROUP.ID, DATAPLANE_GROUP.ORGANIZATION_ID, DATAPLANE_GROUP.NAME)
80-
.values(euDataplaneGroupId, DEFAULT_ORGANIZATION_ID, GeographyType.EU.name()).execute();
79+
.values(euDataplaneGroupId, DEFAULT_ORGANIZATION_ID, EU_REGION).execute();
8180

8281
ctx.insertInto(CONNECTION, CONNECTION.ID, CONNECTION.STATUS, CONNECTION.NAMESPACE_DEFINITION, CONNECTION.SOURCE_ID,
8382
CONNECTION.DESTINATION_ID, CONNECTION.NAME, CONNECTION.CATALOG, CONNECTION.MANUAL, CONNECTION.DATAPLANE_GROUP_ID)
@@ -124,7 +123,8 @@ void pendingJobsShouldReturnCorrectCount() throws SQLException {
124123
final var dstId = UUID.randomUUID();
125124
final var euDataplaneGroupId = UUID.randomUUID();
126125
ctx.insertInto(DATAPLANE_GROUP, DATAPLANE_GROUP.ID, DATAPLANE_GROUP.ORGANIZATION_ID, DATAPLANE_GROUP.NAME)
127-
.values(euDataplaneGroupId, DEFAULT_ORGANIZATION_ID, GeographyType.EU.name()).execute();
126+
.values(euDataplaneGroupId, DEFAULT_ORGANIZATION_ID, EU_REGION)
127+
.values(UUID.randomUUID(), DEFAULT_ORGANIZATION_ID, AUTO_REGION).execute();
128128

129129
ctx.insertInto(CONNECTION, CONNECTION.ID, CONNECTION.NAMESPACE_DEFINITION, CONNECTION.SOURCE_ID, CONNECTION.DESTINATION_ID,
130130
CONNECTION.NAME, CONNECTION.CATALOG, CONNECTION.MANUAL, CONNECTION.STATUS, CONNECTION.DATAPLANE_GROUP_ID)
@@ -139,7 +139,7 @@ void pendingJobsShouldReturnCorrectCount() throws SQLException {
139139
.values(4L, connectionUuid.toString(), JobStatus.running)
140140
.execute();
141141

142-
final var res = db.numberOfPendingJobsByGeography();
142+
final var res = db.numberOfPendingJobsByDataplaneGroupName();
143143
assertEquals(2, res.get(EU_REGION));
144144
assertEquals(0, res.get(AUTO_REGION));
145145
}
@@ -151,7 +151,8 @@ void pendingJobsShouldReturnZero() throws SQLException {
151151
final var dstId = UUID.randomUUID();
152152
final var euDataplaneGroupId = UUID.randomUUID();
153153
ctx.insertInto(DATAPLANE_GROUP, DATAPLANE_GROUP.ID, DATAPLANE_GROUP.ORGANIZATION_ID, DATAPLANE_GROUP.NAME)
154-
.values(euDataplaneGroupId, DEFAULT_ORGANIZATION_ID, GeographyType.EU.name()).execute();
154+
.values(euDataplaneGroupId, DEFAULT_ORGANIZATION_ID, EU_REGION)
155+
.values(UUID.randomUUID(), DEFAULT_ORGANIZATION_ID, AUTO_REGION).execute();
155156

156157
ctx.insertInto(CONNECTION, CONNECTION.ID, CONNECTION.NAMESPACE_DEFINITION, CONNECTION.SOURCE_ID, CONNECTION.DESTINATION_ID,
157158
CONNECTION.NAME, CONNECTION.CATALOG, CONNECTION.MANUAL, CONNECTION.STATUS, CONNECTION.DATAPLANE_GROUP_ID)
@@ -165,7 +166,7 @@ void pendingJobsShouldReturnZero() throws SQLException {
165166
.values(2L, connectionUuid.toString(), JobStatus.failed)
166167
.execute();
167168

168-
final var result = db.numberOfPendingJobsByGeography();
169+
final var result = db.numberOfPendingJobsByDataplaneGroupName();
169170
assertEquals(result.get(AUTO_REGION), 0);
170171
assertEquals(result.get(EU_REGION), 0);
171172
}
@@ -184,7 +185,7 @@ void shouldReturnOnlyPendingSeconds() throws SQLException {
184185
final var dstId = UUID.randomUUID();
185186
final var euDataplaneGroupId = UUID.randomUUID();
186187
ctx.insertInto(DATAPLANE_GROUP, DATAPLANE_GROUP.ID, DATAPLANE_GROUP.ORGANIZATION_ID, DATAPLANE_GROUP.NAME)
187-
.values(euDataplaneGroupId, DEFAULT_ORGANIZATION_ID, GeographyType.EU.name()).execute();
188+
.values(euDataplaneGroupId, DEFAULT_ORGANIZATION_ID, EU_REGION).execute();
188189

189190
ctx.insertInto(CONNECTION, CONNECTION.ID, CONNECTION.NAMESPACE_DEFINITION, CONNECTION.SOURCE_ID, CONNECTION.DESTINATION_ID,
190191
CONNECTION.NAME, CONNECTION.CATALOG, CONNECTION.MANUAL, CONNECTION.STATUS, CONNECTION.DATAPLANE_GROUP_ID)
@@ -204,7 +205,7 @@ void shouldReturnOnlyPendingSeconds() throws SQLException {
204205
.values(4L, connectionUuid.toString(), JobStatus.failed)
205206
.execute();
206207

207-
final Double result = db.oldestPendingJobAgeSecsByGeography().get(EU_REGION);
208+
final Double result = db.oldestPendingJobAgeSecsByDataplaneGroupName().get(EU_REGION);
208209
// expected age is 1000 seconds, but allow for +/- 1 second to account for timing/rounding errors
209210
assertTrue(999 < result && result < 1001);
210211
}
@@ -216,7 +217,8 @@ void shouldReturnNothingIfNotApplicable() {
216217
final var dstId = UUID.randomUUID();
217218
final var euDataplaneGroupId = UUID.randomUUID();
218219
ctx.insertInto(DATAPLANE_GROUP, DATAPLANE_GROUP.ID, DATAPLANE_GROUP.ORGANIZATION_ID, DATAPLANE_GROUP.NAME)
219-
.values(euDataplaneGroupId, DEFAULT_ORGANIZATION_ID, GeographyType.EU.name()).execute();
220+
.values(euDataplaneGroupId, DEFAULT_ORGANIZATION_ID, EU_REGION)
221+
.values(UUID.randomUUID(), DEFAULT_ORGANIZATION_ID, AUTO_REGION).execute();
220222

221223
ctx.insertInto(CONNECTION, CONNECTION.ID, CONNECTION.NAMESPACE_DEFINITION, CONNECTION.SOURCE_ID, CONNECTION.DESTINATION_ID,
222224
CONNECTION.NAME, CONNECTION.CATALOG, CONNECTION.MANUAL, CONNECTION.STATUS, CONNECTION.DATAPLANE_GROUP_ID)
@@ -229,7 +231,7 @@ void shouldReturnNothingIfNotApplicable() {
229231
.values(2L, connectionUuid.toString(), JobStatus.running)
230232
.values(3L, connectionUuid.toString(), JobStatus.failed).execute();
231233

232-
final var result = db.oldestPendingJobAgeSecsByGeography();
234+
final var result = db.oldestPendingJobAgeSecsByDataplaneGroupName();
233235
assertEquals(result.get(EU_REGION), 0.0);
234236
assertEquals(result.get(AUTO_REGION), 0.0);
235237
}
@@ -303,7 +305,7 @@ void shouldReturnNumConnectionsBasic() {
303305

304306
final var euDataplaneGroupId = UUID.randomUUID();
305307
ctx.insertInto(DATAPLANE_GROUP, DATAPLANE_GROUP.ID, DATAPLANE_GROUP.ORGANIZATION_ID, DATAPLANE_GROUP.NAME)
306-
.values(euDataplaneGroupId, DEFAULT_ORGANIZATION_ID, GeographyType.EU.name()).execute();
308+
.values(euDataplaneGroupId, DEFAULT_ORGANIZATION_ID, EU_REGION).execute();
307309

308310
ctx.insertInto(CONNECTION, CONNECTION.ID, CONNECTION.NAMESPACE_DEFINITION, CONNECTION.SOURCE_ID, CONNECTION.DESTINATION_ID,
309311
CONNECTION.NAME, CONNECTION.CATALOG, CONNECTION.MANUAL, CONNECTION.STATUS, CONNECTION.DATAPLANE_GROUP_ID)
@@ -337,7 +339,7 @@ void shouldIgnoreNonRunningConnections() {
337339

338340
final var euDataplaneGroupId = UUID.randomUUID();
339341
ctx.insertInto(DATAPLANE_GROUP, DATAPLANE_GROUP.ID, DATAPLANE_GROUP.ORGANIZATION_ID, DATAPLANE_GROUP.NAME)
340-
.values(euDataplaneGroupId, DEFAULT_ORGANIZATION_ID, GeographyType.EU.name()).execute();
342+
.values(euDataplaneGroupId, DEFAULT_ORGANIZATION_ID, EU_REGION).execute();
341343

342344
ctx.insertInto(CONNECTION, CONNECTION.ID, CONNECTION.NAMESPACE_DEFINITION, CONNECTION.SOURCE_ID, CONNECTION.DESTINATION_ID,
343345
CONNECTION.NAME, CONNECTION.CATALOG, CONNECTION.MANUAL, CONNECTION.STATUS, CONNECTION.DATAPLANE_GROUP_ID)
@@ -375,7 +377,7 @@ void shouldIgnoreDeletedWorkspaces() {
375377

376378
final var euDataplaneGroupId = UUID.randomUUID();
377379
ctx.insertInto(DATAPLANE_GROUP, DATAPLANE_GROUP.ID, DATAPLANE_GROUP.ORGANIZATION_ID, DATAPLANE_GROUP.NAME)
378-
.values(euDataplaneGroupId, DEFAULT_ORGANIZATION_ID, GeographyType.EU.name()).execute();
380+
.values(euDataplaneGroupId, DEFAULT_ORGANIZATION_ID, EU_REGION).execute();
379381

380382
ctx.insertInto(CONNECTION, CONNECTION.ID, CONNECTION.NAMESPACE_DEFINITION, CONNECTION.SOURCE_ID, CONNECTION.DESTINATION_ID,
381383
CONNECTION.NAME, CONNECTION.CATALOG, CONNECTION.MANUAL, CONNECTION.STATUS, CONNECTION.DATAPLANE_GROUP_ID)
@@ -500,7 +502,7 @@ void shouldCountInJobsWithMissingRun() throws SQLException {
500502
final var syncConfigType = JobConfigType.sync;
501503
final var euDataplaneGroupId = UUID.randomUUID();
502504
ctx.insertInto(DATAPLANE_GROUP, DATAPLANE_GROUP.ID, DATAPLANE_GROUP.ORGANIZATION_ID, DATAPLANE_GROUP.NAME)
503-
.values(euDataplaneGroupId, DEFAULT_ORGANIZATION_ID, GeographyType.EU.name()).execute();
505+
.values(euDataplaneGroupId, DEFAULT_ORGANIZATION_ID, EU_REGION).execute();
504506

505507
ctx.insertInto(CONNECTION, CONNECTION.ID, CONNECTION.NAMESPACE_DEFINITION, CONNECTION.SOURCE_ID, CONNECTION.DESTINATION_ID,
506508
CONNECTION.NAME, CONNECTION.CATALOG, CONNECTION.SCHEDULE, CONNECTION.MANUAL, CONNECTION.STATUS, CONNECTION.CREATED_AT,
@@ -535,7 +537,7 @@ void shouldNotCountNormalJobsInAbnormalMetric() {
535537
final var syncConfigType = JobConfigType.sync;
536538
final var euDataplaneGroupId = UUID.randomUUID();
537539
ctx.insertInto(DATAPLANE_GROUP, DATAPLANE_GROUP.ID, DATAPLANE_GROUP.ORGANIZATION_ID, DATAPLANE_GROUP.NAME)
538-
.values(euDataplaneGroupId, DEFAULT_ORGANIZATION_ID, GeographyType.EU.name()).execute();
540+
.values(euDataplaneGroupId, DEFAULT_ORGANIZATION_ID, EU_REGION).execute();
539541

540542
ctx.insertInto(CONNECTION, CONNECTION.ID, CONNECTION.NAMESPACE_DEFINITION, CONNECTION.SOURCE_ID, CONNECTION.DESTINATION_ID,
541543
CONNECTION.NAME, CONNECTION.CATALOG, CONNECTION.SCHEDULE, CONNECTION.MANUAL, CONNECTION.STATUS, CONNECTION.CREATED_AT,

0 commit comments

Comments
 (0)