Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public void onInProgress(CompactionProgress progress)
}

@Override
public void onCompleted(UUID id, boolean isSuccess)
public void onCompleted(UUID id, Throwable err)
{
backgroundCompactions.onCompleted(this, id);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,9 @@ public Throwable rejected(Throwable t)

protected Throwable cleanup(Throwable err)
{
final boolean isSuccess = err == null;
final Throwable originalError = err;
for (CompactionObserver compObserver : compObservers)
err = Throwables.perform(err, () -> compObserver.onCompleted(transaction.opId(), isSuccess));
err = Throwables.perform(err, () -> compObserver.onCompleted(transaction.opId(), originalError));

return Throwables.perform(err, () -> transaction.close());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.cassandra.db.compaction;

import java.util.UUID;
import javax.annotation.Nullable;

/**
* An observer of a compaction operation. It is notified when a compaction operation is started.
Expand All @@ -40,7 +41,7 @@ public interface CompactionObserver
* Indicates that a compaction with the given id has completed.
* <p/>
* @param id the id of the compaction
* @param isSuccess true if compaction finished without any exceptions
* @param error error if compaction failed with any exceptions; or null if completed successfully
*/
void onCompleted(UUID id, boolean isSuccess);
void onCompleted(UUID id, @Nullable Throwable error);
}
Original file line number Diff line number Diff line change
Expand Up @@ -1152,7 +1152,7 @@ public void onInProgress(CompactionProgress progress)
}

@Override
public void onCompleted(UUID id, boolean isSuccess)
public void onCompleted(UUID id, Throwable err)
{

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
public class SharedCompactionObserver implements CompactionObserver
{
private final AtomicInteger toReportOnComplete = new AtomicInteger(0);
private final AtomicBoolean onCompleteIsSuccess = new AtomicBoolean(true);
private final AtomicReference<Throwable> onCompleteException = new AtomicReference(null);
private final AtomicReference<CompactionProgress> inProgressReported = new AtomicReference<>(null);
private final CompactionObserver observer;

Expand All @@ -61,14 +61,14 @@ public void onInProgress(CompactionProgress progress)
}

@Override
public void onCompleted(UUID id, boolean isSuccess)
public void onCompleted(UUID id, Throwable err)
{
onCompleteIsSuccess.compareAndSet(true, isSuccess); // acts like AND
onCompleteException.compareAndSet(null, err); // acts like AND
final int remainingToComplete = toReportOnComplete.decrementAndGet();
assert inProgressReported.get() != null : "onCompleted called before onInProgress";
assert remainingToComplete >= 0 : "onCompleted called without corresponding registerExpectedSubtask";
// The individual operation ID given here may be different from the shared ID. Pass on the shared one.
if (remainingToComplete == 0)
observer.onCompleted(inProgressReported.get().operationId(), onCompleteIsSuccess.get());
observer.onCompleted(inProgressReported.get().operationId(), onCompleteException.get());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -367,9 +367,9 @@ public void onInProgress(CompactionProgress progress)
}

@Override
public void onCompleted(UUID id, boolean isSuccess)
public void onCompleted(UUID id, Throwable err)
{
strategy.onCompleted(id, isSuccess);
strategy.onCompleted(id, err);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.function.BiPredicate;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -215,14 +216,24 @@ public synchronized CompactionTasks getUserDefinedTasks(Collection<? extends Com
/// same effect as compacting all of the sstables in the arena together in one operation.
public synchronized List<CompactionAggregate.UnifiedAggregate> getMaximalAggregates()
{
maybeUpdateSelector();
return getMaximalAggregates(realm.getLiveSSTables());
}

public synchronized List<CompactionAggregate.UnifiedAggregate> getMaximalAggregates(Collection<? extends CompactionSSTable> sstables)
{
maybeUpdateSelector(); // must be called before computing compaction arenas
return getMaximalAggregatesWithArenas(getCompactionArenas(sstables, UnifiedCompactionStrategy::isSuitableForCompaction));
}

private synchronized List<CompactionAggregate.UnifiedAggregate> getMaximalAggregatesWithArenas(Collection<Arena> compactionArenas)
{
// The aggregates are split into arenas by repair status and disk, as well as in non-overlapping sections to
// enable some parallelism and efficient use of extra space. The result will be split across shards according to
// its density.
// Depending on the parallelism, the operation may require up to 100% extra space to complete.
List<CompactionAggregate.UnifiedAggregate> aggregates = new ArrayList<>();

for (Arena arena : getCompactionArenas(realm.getLiveSSTables(), UnifiedCompactionStrategy::isSuitableForCompaction))
for (Arena arena : compactionArenas)
{
// If possible, we want to issue separate compactions for non-overlapping sets of sstables, to allow
// for smaller extra space requirements. However, if the sharding configuration has changed, a major
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1382,7 +1382,7 @@ private void compactData()
//Thread.sleep(5);

// then remove the old sstables
strategy.onCompleted(id, true);
strategy.onCompleted(id, null);
counters.numCompactions.incrementAndGet();
counters.numCompactionsPending.decrementAndGet();
counters.numCompactedSSTables.addAndGet(candidates.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -660,7 +660,7 @@ private void testCompactionStatistics(Set<SSTableReader> sstables,
{
Set<SSTableReader> compSSTables = pair.left;
long totSSTablesLen = totUncompressedLength(compSSTables);
strategy.onCompleted(pair.right, true);
strategy.onCompleted(pair.right, null);

numCompactions--;
numCompactionsInProgress--;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ public void testCompactionReporting()

verify(operationObserver, times(1)).onOperationStart(tableOpCaptor.capture());
verify(compObserver, times(1)).onInProgress(compactionCaptor.capture());
verify(compObserver, times(1)).onCompleted(eq(txn.opId()), eq(true));
verify(compObserver, times(1)).onCompleted(eq(txn.opId()), eq(null));
}

@Test
Expand All @@ -308,7 +308,7 @@ public void testFailCompactionTask()
taskMock.addObserver(compObserver);
Mockito.doThrow(new RuntimeException("Test throw")).when(taskMock).executeInternal();
Assert.assertThrows(RuntimeException.class, () -> taskMock.execute());
Mockito.verify(compObserver, times(1)).onCompleted(any(UUID.class), eq(false));
Mockito.verify(compObserver, times(1)).onCompleted(any(UUID.class), any(Throwable.class));
}

private Set<SSTableReader> generateData(int numSSTables, int numKeys)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,17 +68,19 @@ public void testOnCompleted()
{
sharedCompactionObserver.registerExpectedSubtask();
sharedCompactionObserver.onInProgress(mockProgress);
sharedCompactionObserver.onCompleted(operationId, true);
verify(mockObserver, times(1)).onCompleted(operationId, true);
sharedCompactionObserver.onCompleted(operationId, null);
verify(mockObserver, times(1)).onCompleted(operationId, null);
}

@Test
public void testOnCompletedFailure()
{
sharedCompactionObserver.registerExpectedSubtask();
sharedCompactionObserver.onInProgress(mockProgress);
sharedCompactionObserver.onCompleted(operationId, false);
verify(mockObserver, times(1)).onCompleted(operationId, false);

Exception err = new RuntimeException();
sharedCompactionObserver.onCompleted(operationId, err);
verify(mockObserver, times(1)).onCompleted(operationId, err);
}

@Test
Expand All @@ -89,11 +91,11 @@ public void testMultipleSubtasksCompletion()
sharedCompactionObserver.onInProgress(mockProgress);
sharedCompactionObserver.onInProgress(mockProgress);

sharedCompactionObserver.onCompleted(operationId, true);
verify(mockObserver, never()).onCompleted(any(UUID.class), anyBoolean());
sharedCompactionObserver.onCompleted(operationId, null);
verify(mockObserver, never()).onCompleted(any(UUID.class), any());

sharedCompactionObserver.onCompleted(operationId, true);
verify(mockObserver, times(1)).onCompleted(operationId, true);
sharedCompactionObserver.onCompleted(operationId, null);
verify(mockObserver, times(1)).onCompleted(operationId, null);
}

@Test
Expand All @@ -102,12 +104,12 @@ public void testMultipleSubtasksInProgressAfterCompletion()
sharedCompactionObserver.registerExpectedSubtask();
sharedCompactionObserver.registerExpectedSubtask();
sharedCompactionObserver.onInProgress(mockProgress);
sharedCompactionObserver.onCompleted(operationId, true);
verify(mockObserver, never()).onCompleted(any(UUID.class), anyBoolean());
sharedCompactionObserver.onCompleted(operationId, null);
verify(mockObserver, never()).onCompleted(any(UUID.class), any());

sharedCompactionObserver.onInProgress(mockProgress);
sharedCompactionObserver.onCompleted(operationId, true);
verify(mockObserver, times(1)).onCompleted(operationId, true);
sharedCompactionObserver.onCompleted(operationId, null);
verify(mockObserver, times(1)).onCompleted(operationId, null);
}

@Test
Expand All @@ -118,11 +120,12 @@ public void testMultipleSubtasksCompletionWithFailure()
sharedCompactionObserver.onInProgress(mockProgress);
sharedCompactionObserver.onInProgress(mockProgress);

sharedCompactionObserver.onCompleted(operationId, true);
verify(mockObserver, never()).onCompleted(any(UUID.class), anyBoolean());
sharedCompactionObserver.onCompleted(operationId, null);
verify(mockObserver, never()).onCompleted(any(UUID.class), any());

sharedCompactionObserver.onCompleted(operationId, false);
verify(mockObserver, times(1)).onCompleted(operationId, false);
Exception err = new RuntimeException();
sharedCompactionObserver.onCompleted(operationId, err);
verify(mockObserver, times(1)).onCompleted(operationId, err);
}

@Test
Expand All @@ -143,7 +146,7 @@ public void testConcurrentAccess() throws InterruptedException, ExecutionExcepti
sharedCompactionObserver.onInProgress(mockProgress);
if (ThreadLocalRandom.current().nextBoolean())
FBUtilities.sleepQuietly(ThreadLocalRandom.current().nextInt(1));
sharedCompactionObserver.onCompleted(operationId, true);
sharedCompactionObserver.onCompleted(operationId, null);
latch.countDown();
}));
}
Expand All @@ -152,23 +155,23 @@ public void testConcurrentAccess() throws InterruptedException, ExecutionExcepti
future.get();

verify(mockObserver, times(1)).onInProgress(mockProgress);
verify(mockObserver, times(1)).onCompleted(operationId, true);
verify(mockObserver, times(1)).onCompleted(operationId, null);
executor.shutdown();
}

@Test
public void testErrorNoRegister()
{
Util.assumeAssertsEnabled();
Assert.assertThrows(AssertionError.class, () -> sharedCompactionObserver.onCompleted(operationId, true));
Assert.assertThrows(AssertionError.class, () -> sharedCompactionObserver.onCompleted(operationId, null));
}

@Test
public void testErrorNoInProgress()
{
Util.assumeAssertsEnabled();
sharedCompactionObserver.registerExpectedSubtask();
Assert.assertThrows(AssertionError.class, () -> sharedCompactionObserver.onCompleted(operationId, true));
Assert.assertThrows(AssertionError.class, () -> sharedCompactionObserver.onCompleted(operationId, null));
}

@Test
Expand Down