Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,16 @@ default void subscribeFromBeginning(int bucket) {
*/
void unsubscribe(long partitionId, int bucket);

/**
* Unsubscribe from the given bucket of a non-partitioned table dynamically.
*
* <p>Please use {@link #unsubscribe(long, int)} to unsubscribe a partitioned table.
*
* @param bucket the table bucket to unsubscribe.
* @throws java.lang.IllegalStateException if the table is a partitioned table.
*/
void unsubscribe(int bucket);

/**
* Subscribe to the given partitioned table bucket from beginning dynamically. If the table
* bucket is already subscribed, the start offset will be updated.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,23 @@ public void unsubscribe(long partitionId, int bucket) {
}
}

@Override
public void unsubscribe(int bucket) {
if (isPartitionedTable) {
throw new IllegalStateException(
"The table is a partitioned table, please use "
+ "\"unsubscribe(long partitionId, int bucket)\" to "
+ "unsubscribe a partitioned bucket instead.");
}
acquireAndEnsureOpen();
try {
TableBucket tableBucket = new TableBucket(tableId, bucket);
this.logScannerStatus.unassignScanBuckets(Collections.singletonList(tableBucket));
} finally {
release();
}
}

@Override
public void wakeup() {
logFetcher.wakeup();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.utils;

import java.util.Arrays;

import static org.apache.fluss.utils.Preconditions.checkNotNull;

/**
* A wrapper for byte[] that provides proper equals() and hashCode() implementations for use as Map
* keys.
*
* <p>The hashCode is pre-computed at construction time for better performance when used in
* hash-based collections.
*/
public final class ByteArrayWrapper {

private final byte[] data;
private final int hashCode;

public ByteArrayWrapper(byte[] data) {
this.data = checkNotNull(data, "data cannot be null");
this.hashCode = Arrays.hashCode(data);
}

public byte[] getData() {
return data;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof ByteArrayWrapper)) {
return false;
}
return Arrays.equals(data, ((ByteArrayWrapper) o).data);
}

@Override
public int hashCode() {
return hashCode;
}

@Override
public String toString() {
return "ByteArrayWrapper{length=" + data.length + "}";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.utils;

import org.junit.jupiter.api.Test;

import java.util.HashMap;
import java.util.Map;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/** Tests for {@link ByteArrayWrapper}. */
class ByteArrayWrapperTest {

@Test
void testEqualsAndHashCode() {
byte[] data1 = new byte[] {1, 2, 3};
byte[] data2 = new byte[] {1, 2, 3};
byte[] data3 = new byte[] {1, 2, 4};

ByteArrayWrapper wrapper1 = new ByteArrayWrapper(data1);
ByteArrayWrapper wrapper2 = new ByteArrayWrapper(data2);
ByteArrayWrapper wrapper3 = new ByteArrayWrapper(data3);

// Same content should be equal
assertThat(wrapper1).isEqualTo(wrapper2);
assertThat(wrapper1.hashCode()).isEqualTo(wrapper2.hashCode());

// Different content should not be equal
assertThat(wrapper1).isNotEqualTo(wrapper3);
}

@Test
void testNullDataThrowsException() {
assertThatThrownBy(() -> new ByteArrayWrapper(null))
.isInstanceOf(NullPointerException.class)
.hasMessageContaining("data cannot be null");
}

@Test
void testAsMapKey() {
byte[] key1 = new byte[] {1, 2, 3};
byte[] key2 = new byte[] {1, 2, 3}; // Same content, different array
byte[] key3 = new byte[] {4, 5, 6};

Map<ByteArrayWrapper, String> map = new HashMap<>();
map.put(new ByteArrayWrapper(key1), "value1");

// Should find with same content
assertThat(map.get(new ByteArrayWrapper(key2))).isEqualTo("value1");

// Should not find with different content
assertThat(map.get(new ByteArrayWrapper(key3))).isNull();

// Should overwrite with same key
map.put(new ByteArrayWrapper(key2), "value2");
assertThat(map).hasSize(1);
assertThat(map.get(new ByteArrayWrapper(key1))).isEqualTo("value2");
}

@Test
void testGetData() {
byte[] data = new byte[] {1, 2, 3};
ByteArrayWrapper wrapper = new ByteArrayWrapper(data);

assertThat(wrapper.getData()).isSameAs(data);
}

@Test
void testEmptyArray() {
ByteArrayWrapper wrapper1 = new ByteArrayWrapper(new byte[0]);
ByteArrayWrapper wrapper2 = new ByteArrayWrapper(new byte[0]);

assertThat(wrapper1).isEqualTo(wrapper2);
assertThat(wrapper1.hashCode()).isEqualTo(wrapper2.hashCode());
}

@Test
void testToString() {
ByteArrayWrapper wrapper = new ByteArrayWrapper(new byte[] {1, 2, 3});
assertThat(wrapper.toString()).contains("length=3");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.flink.sink.writer.undo;

import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.utils.ByteArrayWrapper;

import java.util.HashSet;
import java.util.Set;

/**
* Encapsulates the recovery state for a single bucket.
*
* <p>This class tracks:
*
* <ul>
* <li>The bucket being recovered
* <li>The checkpoint offset (start point for reading changelog)
* <li>The log end offset (end point for reading changelog)
* <li>Processed primary keys for deduplication (streaming execution)
* <li>Progress tracking during changelog scanning
* </ul>
*/
public class BucketRecoveryContext {

private final TableBucket bucket;
private final long checkpointOffset;
private final long logEndOffset;

private final Set<ByteArrayWrapper> processedKeys;
private long lastProcessedOffset;
private int totalRecordsProcessed;

public BucketRecoveryContext(TableBucket bucket, long checkpointOffset, long logEndOffset) {
this.bucket = bucket;
this.checkpointOffset = checkpointOffset;
this.logEndOffset = logEndOffset;
this.processedKeys = new HashSet<>();
this.lastProcessedOffset = checkpointOffset;
this.totalRecordsProcessed = 0;
}

public TableBucket getBucket() {
return bucket;
}

public long getCheckpointOffset() {
return checkpointOffset;
}

public long getLogEndOffset() {
return logEndOffset;
}

public Set<ByteArrayWrapper> getProcessedKeys() {
return processedKeys;
}

/**
* Checks if this bucket needs recovery.
*
* @return true if checkpoint offset is less than log end offset
*/
public boolean needsRecovery() {
return checkpointOffset < logEndOffset;
}

/**
* Checks if changelog scanning is complete for this bucket.
*
* <p>Complete means either:
*
* <ul>
* <li>No recovery is needed (checkpointOffset >= logEndOffset), or
* <li>The last processed offset has reached or passed logEndOffset - 1 (lastProcessedOffset
* >= logEndOffset - 1)
* </ul>
*
* @return true if changelog scanning is complete
*/
public boolean isComplete() {
// If no recovery is needed, we're already complete
if (!needsRecovery()) {
return true;
}
return lastProcessedOffset >= logEndOffset - 1;
}

/**
* Records that a changelog record has been processed.
*
* @param offset the offset of the processed record
*/
public void recordProcessed(long offset) {
lastProcessedOffset = offset;
totalRecordsProcessed++;
}

public int getTotalRecordsProcessed() {
return totalRecordsProcessed;
}

public long getLastProcessedOffset() {
return lastProcessedOffset;
}

@Override
public String toString() {
return "BucketRecoveryContext{"
+ "bucket="
+ bucket
+ ", checkpointOffset="
+ checkpointOffset
+ ", logEndOffset="
+ logEndOffset
+ ", processedKeys="
+ processedKeys.size()
+ ", complete="
+ isComplete()
+ '}';
}
}
Loading