Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 48 additions & 2 deletions src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,35 @@ public static CommitLogReplayer construct(CommitLog commitLog, UUID localHostId)
cfPersisted.put(cfs.metadata.id, filter);
}
CommitLogPosition globalPosition = firstNotCovered(cfPersisted.values());
logger.debug("Global replay position is {} from columnfamilies {}", globalPosition, FBUtilities.toString(cfPersisted));

// Limit the amount of column family data logged to prevent massive log lines
if (logger.isDebugEnabled())
{
int maxColumnFamiliesToLog = 10;
int cfCount = cfPersisted.size();
if (cfCount <= maxColumnFamiliesToLog)
{
logger.debug("Global replay position is {} from {} columnfamilies: {}",
globalPosition, cfCount, FBUtilities.toString(cfPersisted));
}
else
{
// For large numbers of column families, just log the count and a sample
Map<TableId, IntervalSet<CommitLogPosition>> sample = new HashMap<>();
int count = 0;
for (Map.Entry<TableId, IntervalSet<CommitLogPosition>> entry : cfPersisted.entrySet())
{
if (count++ >= maxColumnFamiliesToLog)
break;
sample.put(entry.getKey(), entry.getValue());
}
logger.debug("Global replay position is {} from {} columnfamilies (showing first {}): {}",
globalPosition, cfCount, maxColumnFamiliesToLog, FBUtilities.toString(sample));
logger.debug("Use TRACE level to see all {} columnfamilies", cfCount);
logger.trace("Full columnfamilies list: {}", FBUtilities.toString(cfPersisted));
}
}

return new CommitLogReplayer(commitLog, globalPosition, cfPersisted, replayFilter);
}

Expand Down Expand Up @@ -459,7 +487,25 @@ public static IntervalSet<CommitLogPosition> persistedIntervals(Iterable<SSTable

if (!skippedSSTables.isEmpty()) {
logger.warn("Origin of {} sstables is unknown or doesn't match the local node; commitLogIntervals for them were ignored", skippedSSTables.size());
logger.debug("Ignored commitLogIntervals from the following sstables: {}", skippedSSTables);

// Limit the number of SSTable names logged to prevent massive log lines
int maxSSTablesToLog = 100;
if (skippedSSTables.size() <= maxSSTablesToLog) {
logger.debug("Ignored commitLogIntervals from the following sstables: {}", skippedSSTables);
} else {
List<String> sample = new ArrayList<>();
int count = 0;
for (String sstable : skippedSSTables)
{
if (count++ >= maxSSTablesToLog)
break;
sample.add(sstable);
}
logger.debug("Ignored commitLogIntervals from {} sstables (showing first {}): {}",
skippedSSTables.size(), maxSSTablesToLog, sample);
logger.debug("Use TRACE level to see all {} skipped sstables", skippedSSTables.size());
logger.trace("Full list of ignored sstables: {}", skippedSSTables);
}
}

if (truncatedAt != null)
Expand Down
207 changes: 207 additions & 0 deletions test/unit/org/apache/cassandra/db/commitlog/CommitLogReplayerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,40 @@

package org.apache.cassandra.db.commitlog;


import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;

import org.junit.After;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;

import org.slf4j.LoggerFactory;

import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.AppenderBase;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.utils.concurrent.Future;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
import org.apache.cassandra.utils.EstimatedHistogram;
import org.apache.cassandra.utils.streamhist.TombstoneHistogram;
import org.jboss.byteman.contrib.bmunit.BMRule;
import org.jboss.byteman.contrib.bmunit.BMRules;
import org.jboss.byteman.contrib.bmunit.BMUnitRunner;

import static org.junit.Assert.assertTrue;

import org.apache.cassandra.utils.ByteBufferUtil;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -60,4 +80,191 @@ public void testTrackingSegmentsWhenMutationFails()
Assert.assertTrue(!replayer.getSegmentWithInvalidOrFailedMutations().isEmpty());
Assert.assertTrue(replayer.getSegmentWithInvalidOrFailedMutations().contains(failedSegment));
}

/**
* Test that when there are few skipped SSTables (<= 100), all are logged in debug mode.
* This tests the fix for CNDB-15157.
*/
@Test
public void testPersistedIntervalsLogsAllSSTablesWhenFew()
{
ch.qos.logback.classic.Logger logger = (ch.qos.logback.classic.Logger) LoggerFactory.getLogger(CommitLogReplayer.class);
InMemoryAppender appender = new InMemoryAppender();
logger.addAppender(appender);
Level originalLevel = logger.getLevel();
logger.setLevel(Level.DEBUG);

try
{
// Create 50 mock SSTables with different host IDs (so they'll be skipped)
List<SSTableReader> sstables = new ArrayList<>();
UUID differentHostId = UUID.randomUUID();
for (int i = 0; i < 50; i++)
{
SSTableReader reader = mock(SSTableReader.class);
StatsMetadata metadata = createMinimalStatsMetadata(differentHostId);
when(reader.getSSTableMetadata()).thenReturn(metadata);
when(reader.getFilename()).thenReturn("sstable-" + i + ".db");
sstables.add(reader);
}

UUID localhostId = UUID.randomUUID();
CommitLogReplayer.persistedIntervals(sstables, null, localhostId);

// Verify that debug log contains all sstables (not truncated)
List<ILoggingEvent> debugEvents = appender.getEventsForLevel(Level.DEBUG);
boolean foundFullList = false;
for (ILoggingEvent event : debugEvents)
{
String message = event.getFormattedMessage();
if (message.contains("Ignored commitLogIntervals from the following sstables:"))
{
foundFullList = true;
// Should NOT contain the "showing first" message when count is <= 100
Assert.assertFalse("Should not limit logging when <= 100 SSTables",
message.contains("showing first"));
break;
}
}
assertTrue("Should have logged the full sstables list", foundFullList);
}
finally
{
logger.detachAppender(appender);
logger.setLevel(originalLevel);
}
}

/**
* Test that when there are many skipped SSTables (> 100), only the first 100 are logged.
* This tests the fix for CNDB-15157.
*/
@Test
public void testPersistedIntervalsLimitsLoggingWhenMany()
{
ch.qos.logback.classic.Logger logger = (ch.qos.logback.classic.Logger) LoggerFactory.getLogger(CommitLogReplayer.class);
InMemoryAppender appender = new InMemoryAppender();
logger.addAppender(appender);
Level originalLevel = logger.getLevel();
logger.setLevel(Level.TRACE); // Need TRACE level to capture the full list message

try
{
// Create 150 mock SSTables with different host IDs (so they'll be skipped)
List<SSTableReader> sstables = new ArrayList<>();
UUID differentHostId = UUID.randomUUID();
for (int i = 0; i < 150; i++)
{
SSTableReader reader = mock(SSTableReader.class);
StatsMetadata metadata = createMinimalStatsMetadata(differentHostId);
when(reader.getSSTableMetadata()).thenReturn(metadata);
when(reader.getFilename()).thenReturn("sstable-" + i + ".db");
sstables.add(reader);
}

UUID localhostId = UUID.randomUUID();
CommitLogReplayer.persistedIntervals(sstables, null, localhostId);

// Verify that debug log is limited
List<ILoggingEvent> debugEvents = appender.getEventsForLevel(Level.DEBUG);
boolean foundLimitedList = false;
boolean foundTraceMessage = false;
for (ILoggingEvent event : debugEvents)
{
String message = event.getFormattedMessage();
if (message.contains("Ignored commitLogIntervals from 150 sstables (showing first 100)"))
{
foundLimitedList = true;
}
if (message.contains("Use TRACE level to see all 150 skipped sstables"))
{
foundTraceMessage = true;
}
}
assertTrue("Should have logged limited sstables list", foundLimitedList);
assertTrue("Should have logged message about TRACE level", foundTraceMessage);

// Verify trace log has full list
List<ILoggingEvent> traceEvents = appender.getEventsForLevel(Level.TRACE);
boolean foundFullListInTrace = false;
for (ILoggingEvent event : traceEvents)
{
String message = event.getFormattedMessage();
if (message.contains("Full list of ignored sstables:"))
{
foundFullListInTrace = true;
break;
}
}
assertTrue("Should have logged full list at TRACE level", foundFullListInTrace);
}
finally
{
logger.detachAppender(appender);
logger.setLevel(originalLevel);
}
}

/**
* Creates a minimal StatsMetadata with only the fields needed for testing persistedIntervals.
*/
private static StatsMetadata createMinimalStatsMetadata(UUID originatingHostId)
{
return new StatsMetadata(new EstimatedHistogram(150, true),
new EstimatedHistogram(150, true),
IntervalSet.empty(),
0L, // minTimestamp
0L, // maxTimestamp
Long.MAX_VALUE, // minLocalDeletionTime
Long.MAX_VALUE, // maxLocalDeletionTime
Integer.MAX_VALUE, // minTTL
Integer.MAX_VALUE, // maxTTL
0.0, // compressionRatio
TombstoneHistogram.createDefault(), // estimatedTombstoneDropTime
0, // sstableLevel
null, // clusteringTypes
null, // coveredClustering
false, // hasLegacyCounterShards
0L, // repairedAt
0L, // totalColumnsSet
0L, // totalRows
0.0, // tokenSpaceCoverage
originatingHostId,
null, // pendingRepair
false, // isTransient
false, //boolean hasPartitionLevelDeletions,
ByteBufferUtil.EMPTY_BYTE_BUFFER, //ByteBuffer firstKey,
ByteBufferUtil.EMPTY_BYTE_BUFFER, //ByteBuffer lastKey,
Collections.emptyMap(), // maxColumnValueLengths
null); // zeroCopyMetadata
}

private static class InMemoryAppender extends AppenderBase<ILoggingEvent>
{
private final List<ILoggingEvent> events = new ArrayList<>();

private InMemoryAppender()
{
start();
}

@Override
protected synchronized void append(ILoggingEvent event)
{
events.add(event);
}

public synchronized List<ILoggingEvent> getEventsForLevel(Level level)
{
List<ILoggingEvent> result = new ArrayList<>();
for (ILoggingEvent event : events)
{
if (event.getLevel() == level)
{
result.add(event);
}
}
return result;
}
}
}