Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.utils;

import java.util.Arrays;

import static org.apache.fluss.utils.Preconditions.checkNotNull;

/**
* A wrapper for byte[] that provides proper equals() and hashCode() implementations for use as Map
* keys.
*
* <p>The hashCode is pre-computed at construction time for better performance when used in
* hash-based collections.
*/
public final class ByteArrayWrapper {

private final byte[] data;
private final int hashCode;

public ByteArrayWrapper(byte[] data) {
this.data = checkNotNull(data, "data cannot be null");
this.hashCode = Arrays.hashCode(data);
}

public byte[] getData() {
return data;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof ByteArrayWrapper)) {
return false;
}
return Arrays.equals(data, ((ByteArrayWrapper) o).data);
}

@Override
public int hashCode() {
return hashCode;
}

@Override
public String toString() {
return "ByteArrayWrapper{length=" + data.length + "}";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.utils;

import org.junit.jupiter.api.Test;

import java.util.HashMap;
import java.util.Map;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/** Tests for {@link ByteArrayWrapper}. */
class ByteArrayWrapperTest {

@Test
void testEqualsAndHashCode() {
byte[] data1 = new byte[] {1, 2, 3};
byte[] data2 = new byte[] {1, 2, 3};
byte[] data3 = new byte[] {1, 2, 4};

ByteArrayWrapper wrapper1 = new ByteArrayWrapper(data1);
ByteArrayWrapper wrapper2 = new ByteArrayWrapper(data2);
ByteArrayWrapper wrapper3 = new ByteArrayWrapper(data3);

// Same content should be equal
assertThat(wrapper1).isEqualTo(wrapper2);
assertThat(wrapper1.hashCode()).isEqualTo(wrapper2.hashCode());

// Different content should not be equal
assertThat(wrapper1).isNotEqualTo(wrapper3);
}

@Test
void testNullDataThrowsException() {
assertThatThrownBy(() -> new ByteArrayWrapper(null))
.isInstanceOf(NullPointerException.class)
.hasMessageContaining("data cannot be null");
}

@Test
void testAsMapKey() {
byte[] key1 = new byte[] {1, 2, 3};
byte[] key2 = new byte[] {1, 2, 3}; // Same content, different array
byte[] key3 = new byte[] {4, 5, 6};

Map<ByteArrayWrapper, String> map = new HashMap<>();
map.put(new ByteArrayWrapper(key1), "value1");

// Should find with same content
assertThat(map.get(new ByteArrayWrapper(key2))).isEqualTo("value1");

// Should not find with different content
assertThat(map.get(new ByteArrayWrapper(key3))).isNull();

// Should overwrite with same key
map.put(new ByteArrayWrapper(key2), "value2");
assertThat(map).hasSize(1);
assertThat(map.get(new ByteArrayWrapper(key1))).isEqualTo("value2");
}

@Test
void testGetData() {
byte[] data = new byte[] {1, 2, 3};
ByteArrayWrapper wrapper = new ByteArrayWrapper(data);

assertThat(wrapper.getData()).isSameAs(data);
}

@Test
void testEmptyArray() {
ByteArrayWrapper wrapper1 = new ByteArrayWrapper(new byte[0]);
ByteArrayWrapper wrapper2 = new ByteArrayWrapper(new byte[0]);

assertThat(wrapper1).isEqualTo(wrapper2);
assertThat(wrapper1.hashCode()).isEqualTo(wrapper2.hashCode());
}

@Test
void testToString() {
ByteArrayWrapper wrapper = new ByteArrayWrapper(new byte[] {1, 2, 3});
assertThat(wrapper.toString()).contains("length=3");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.flink.sink.writer.undo;

import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.utils.ByteArrayWrapper;

import java.util.HashSet;
import java.util.Set;

/**
* Encapsulates the recovery state for a single bucket.
*
* <p>This class tracks:
*
* <ul>
* <li>The bucket being recovered
* <li>The checkpoint offset (start point for reading changelog)
* <li>The log end offset (end point for reading changelog)
* <li>Processed primary keys for deduplication (streaming execution)
* <li>Progress tracking during changelog scanning
* </ul>
*/
public class BucketRecoveryContext {

private final TableBucket bucket;
private final long checkpointOffset;
private long logEndOffset;

private final Set<ByteArrayWrapper> processedKeys;
private long lastProcessedOffset;
private int totalRecordsProcessed;

public BucketRecoveryContext(TableBucket bucket, long checkpointOffset) {
this.bucket = bucket;
this.checkpointOffset = checkpointOffset;
this.logEndOffset = -1;
this.processedKeys = new HashSet<>();
this.lastProcessedOffset = checkpointOffset;
this.totalRecordsProcessed = 0;
}

public TableBucket getBucket() {
return bucket;
}

public long getCheckpointOffset() {
return checkpointOffset;
}

public long getLogEndOffset() {
return logEndOffset;
}

public void setLogEndOffset(long logEndOffset) {
this.logEndOffset = logEndOffset;
}

public Set<ByteArrayWrapper> getProcessedKeys() {
return processedKeys;
}

/**
* Checks if this bucket needs recovery.
*
* @return true if checkpoint offset is less than log end offset
*/
public boolean needsRecovery() {
return checkpointOffset < logEndOffset;
}

/**
* Checks if changelog scanning is complete for this bucket.
*
* <p>Complete means either:
*
* <ul>
* <li>No recovery is needed (checkpointOffset >= logEndOffset), or
* <li>We have processed all expected records (totalRecordsProcessed >= logEndOffset -
* checkpointOffset)
* </ul>
*
* <p>This implementation assumes that changelog offsets are contiguous (no gaps). The number of
* records to process equals logEndOffset - checkpointOffset.
*
* <p><b>Edge case:</b> If checkpointOffset == logEndOffset - 1 (only one record to process) and
* that record is skipped (e.g., UPDATE_AFTER), totalRecordsProcessed will be incremented but
* processedKeys may be empty. This is correct behavior - the record was processed (read and
* evaluated), it just didn't require an undo operation.
*
* @return true if changelog scanning is complete
*/
public boolean isComplete() {
// If no recovery is needed, we're already complete
if (!needsRecovery()) {
return true;
}
// Number of records to process = logEndOffset - checkpointOffset
// (offsets are contiguous, no gaps)
long expectedRecords = logEndOffset - checkpointOffset;
return totalRecordsProcessed >= expectedRecords;
}

/**
* Records that a changelog record has been processed.
*
* @param offset the offset of the processed record
*/
public void recordProcessed(long offset) {
lastProcessedOffset = offset;
totalRecordsProcessed++;
}

public int getTotalRecordsProcessed() {
return totalRecordsProcessed;
}

public long getLastProcessedOffset() {
return lastProcessedOffset;
}

@Override
public String toString() {
return "BucketRecoveryContext{"
+ "bucket="
+ bucket
+ ", checkpointOffset="
+ checkpointOffset
+ ", logEndOffset="
+ logEndOffset
+ ", processedKeys="
+ processedKeys.size()
+ ", complete="
+ isComplete()
+ '}';
}
}
Loading