Skip to content

Commit 1e0738d

Browse files
jasonstackdjatnieks
authored andcommitted
CNDB-12774: change CompactionObserver#onCompleted to include failure exception (#1573)
https://jenkins-stargazer.aws.dsinternal.org/view/cc-builds/job/ds-cassandra-publish-j11/1293/
1 parent 26cdf48 commit 1e0738d

11 files changed

+41
-39
lines changed

src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ public void onInProgress(CompactionProgress progress)
119119
}
120120

121121
@Override
122-
public void onCompleted(TimeUUID id, boolean isSuccess)
122+
public void onCompleted(TimeUUID id, Throwable err)
123123
{
124124
backgroundCompactions.onCompleted(this, id);
125125
}

src/java/org/apache/cassandra/db/compaction/AbstractCompactionTask.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -160,9 +160,9 @@ public Throwable rejected(Throwable t)
160160

161161
protected Throwable cleanup(Throwable err)
162162
{
163-
final boolean isSuccess = err == null;
163+
final Throwable originalError = err;
164164
for (CompactionObserver compObserver : compObservers)
165-
err = Throwables.perform(err, () -> compObserver.onCompleted(transaction.opId(), isSuccess));
165+
err = Throwables.perform(err, () -> compObserver.onCompleted(transaction.opId(), originalError));
166166

167167
return Throwables.perform(err, () -> transaction.close());
168168
}

src/java/org/apache/cassandra/db/compaction/CompactionObserver.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import org.apache.cassandra.utils.TimeUUID;
2222

23+
import javax.annotation.Nullable;
2324

2425
/**
2526
* An observer of a compaction operation. It is notified when a compaction operation is started.
@@ -36,7 +37,7 @@ public interface CompactionObserver
3637
public void onInProgress(CompactionProgress progress) { }
3738

3839
@Override
39-
public void onCompleted(TimeUUID id, boolean isSuccess) { }
40+
public void onCompleted(TimeUUID id, @Nullable Throwable error) { }
4041
};
4142

4243
/**
@@ -50,7 +51,7 @@ public void onCompleted(TimeUUID id, boolean isSuccess) { }
5051
* Indicates that a compaction with the given id has completed.
5152
* <p/>
5253
* @param id the id of the compaction
53-
* @param isSuccess true if compaction finished without any exceptions
54+
* @param error error if compaction failed with any exceptions; or null if completed successfully
5455
*/
55-
void onCompleted(TimeUUID id, boolean isSuccess);
56+
void onCompleted(TimeUUID id, @Nullable Throwable error);
5657
}

src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1286,7 +1286,7 @@ public void onInProgress(CompactionProgress progress)
12861286
}
12871287

12881288
@Override
1289-
public void onCompleted(TimeUUID id, boolean isSuccess)
1289+
public void onCompleted(TimeUUID id, Throwable err)
12901290
{
12911291

12921292
}

src/java/org/apache/cassandra/db/compaction/SharedCompactionObserver.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919
package org.apache.cassandra.db.compaction;
2020

21-
import java.util.concurrent.atomic.AtomicBoolean;
2221
import java.util.concurrent.atomic.AtomicInteger;
2322
import java.util.concurrent.atomic.AtomicReference;
2423

@@ -36,7 +35,7 @@
3635
public class SharedCompactionObserver implements CompactionObserver
3736
{
3837
private final AtomicInteger toReportOnComplete = new AtomicInteger(0);
39-
private final AtomicBoolean onCompleteIsSuccess = new AtomicBoolean(true);
38+
private final AtomicReference<Throwable> onCompleteException = new AtomicReference(null);
4039
private final AtomicReference<CompactionProgress> inProgressReported = new AtomicReference<>(null);
4140
private final CompactionObserver observer;
4241

@@ -62,14 +61,14 @@ public void onInProgress(CompactionProgress progress)
6261
}
6362

6463
@Override
65-
public void onCompleted(TimeUUID id, boolean isSuccess)
64+
public void onCompleted(TimeUUID id, Throwable err)
6665
{
67-
onCompleteIsSuccess.compareAndSet(true, isSuccess); // acts like AND
66+
onCompleteException.compareAndSet(null, err); // acts like AND
6867
final int remainingToComplete = toReportOnComplete.decrementAndGet();
6968
assert inProgressReported.get() != null : "onCompleted called before onInProgress";
7069
assert remainingToComplete >= 0 : "onCompleted called without corresponding registerExpectedSubtask";
7170
// The individual operation ID given here may be different from the shared ID. Pass on the shared one.
7271
if (remainingToComplete == 0)
73-
observer.onCompleted(inProgressReported.get().operationId(), onCompleteIsSuccess.get());
72+
observer.onCompleted(inProgressReported.get().operationId(), onCompleteException.get());
7473
}
7574
}

src/java/org/apache/cassandra/db/compaction/UnifiedCompactionContainer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -397,9 +397,9 @@ public void onInProgress(CompactionProgress progress)
397397
}
398398

399399
@Override
400-
public void onCompleted(TimeUUID id, boolean isSuccess)
400+
public void onCompleted(TimeUUID id, Throwable err)
401401
{
402-
strategy.onCompleted(id, isSuccess);
402+
strategy.onCompleted(id, err);
403403
}
404404

405405
@Override

src/java/org/apache/cassandra/db/compaction/UnifiedCompactionStrategy.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
import java.util.function.BiPredicate;
3131
import java.util.regex.Matcher;
3232
import java.util.regex.Pattern;
33-
import java.util.stream.Collectors;
3433

3534
import com.google.common.annotations.VisibleForTesting;
3635
import com.google.common.base.Preconditions;

test/unit/org/apache/cassandra/db/compaction/CompactionSimulationTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1382,7 +1382,7 @@ private void compactData()
13821382
//Thread.sleep(5);
13831383

13841384
// then remove the old sstables
1385-
strategy.onCompleted(id, true);
1385+
strategy.onCompleted(id, null);
13861386
counters.numCompactions.incrementAndGet();
13871387
counters.numCompactionsPending.decrementAndGet();
13881388
counters.numCompactedSSTables.addAndGet(candidates.size());

test/unit/org/apache/cassandra/db/compaction/CompactionStrategyStatisticsTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -663,7 +663,7 @@ private void testCompactionStatistics(Set<SSTableReader> sstables,
663663
{
664664
Set<SSTableReader> compSSTables = pair.left;
665665
long totSSTablesLen = totUncompressedLength(compSSTables);
666-
strategy.onCompleted(pair.right, true);
666+
strategy.onCompleted(pair.right, null);
667667

668668
numCompactions--;
669669
numCompactionsInProgress--;

test/unit/org/apache/cassandra/db/compaction/CompactionTaskTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -340,7 +340,7 @@ public void testCompactionReporting()
340340

341341
verify(operationObserver, times(1)).onOperationStart(tableOpCaptor.capture());
342342
verify(compObserver, times(1)).onInProgress(compactionCaptor.capture());
343-
verify(compObserver, times(1)).onCompleted(eq(txn.opId()), eq(true));
343+
verify(compObserver, times(1)).onCompleted(eq(txn.opId()), eq(null));
344344
}
345345

346346
@Test
@@ -354,7 +354,7 @@ public void testFailCompactionTask()
354354
taskMock.addObserver(compObserver);
355355
Mockito.doThrow(new RuntimeException("Test throw")).when(taskMock).executeInternal();
356356
Assert.assertThrows(RuntimeException.class, () -> taskMock.execute());
357-
Mockito.verify(compObserver, times(1)).onCompleted(any(TimeUUID.class), eq(false));
357+
Mockito.verify(compObserver, times(1)).onCompleted(any(TimeUUID.class), any(Throwable.class));
358358
}
359359

360360
private Set<SSTableReader> generateData(int numSSTables, int numKeys)

0 commit comments

Comments
 (0)