Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion be/src/storage/sstable/table_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ TableBuilder::TableBuilder(const Options& options, WritableFile* file) : rep_(ne
}

TableBuilder::~TableBuilder() {
assert(rep_->closed); // Catch errors where caller forgot to call Finish()
delete rep_->filter_block;
delete rep_;
}
Expand Down
98 changes: 77 additions & 21 deletions be/test/storage/lake/lake_primary_key_consistency_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,15 +99,15 @@ class Replayer {
}

bool check(const ChunkPtr& chunk) {
std::map<int, std::pair<int, int>> tmp_chunk;
std::map<std::string, std::pair<int, int>> tmp_chunk;
for (int i = 0; i < chunk->num_rows(); i++) {
if (tmp_chunk.count(chunk->columns()[0]->get(i).get_int32()) > 0) {
if (tmp_chunk.count(chunk->columns()[0]->get(i).get_slice().to_string()) > 0) {
// duplicate pk
LOG(ERROR) << "duplicate pk: " << chunk->columns()[0]->get(i).get_int32();
LOG(ERROR) << "duplicate pk: " << chunk->columns()[0]->get(i).get_slice().to_string();
return false;
}
tmp_chunk[chunk->columns()[0]->get(i).get_int32()] = {chunk->columns()[1]->get(i).get_int32(),
chunk->columns()[2]->get(i).get_int32()};
tmp_chunk[chunk->columns()[0]->get(i).get_slice().to_string()] = {chunk->columns()[1]->get(i).get_int32(),
chunk->columns()[2]->get(i).get_int32()};
}
if (tmp_chunk.size() != _replayer_index.size()) {
LOG(ERROR) << "inconsistency row number, actual : " << tmp_chunk.size()
Expand Down Expand Up @@ -146,24 +146,24 @@ class Replayer {
if (log.op == ReplayerOP::UPSERT) {
// Upsert
for (int i = 0; i < chunk->num_rows(); i++) {
_replayer_index[chunk->columns()[0]->get(i).get_int32()] = {
_replayer_index[chunk->columns()[0]->get(i).get_slice().to_string()] = {
chunk->columns()[1]->get(i).get_int32(), chunk->columns()[2]->get(i).get_int32()};
}
} else if (log.op == ReplayerOP::ERASE) {
// Delete
for (int i = 0; i < chunk->num_rows(); i++) {
_replayer_index.erase(chunk->columns()[0]->get(i).get_int32());
_replayer_index.erase(chunk->columns()[0]->get(i).get_slice().to_string());
}
} else if (log.op == ReplayerOP::PARTIAL_UPSERT || log.op == ReplayerOP::PARTIAL_UPDATE) {
// Partial update
for (int i = 0; i < chunk->num_rows(); i++) {
auto iter = _replayer_index.find(chunk->columns()[0]->get(i).get_int32());
auto iter = _replayer_index.find(chunk->columns()[0]->get(i).get_slice().to_string());
if (iter != _replayer_index.end()) {
_replayer_index[chunk->columns()[0]->get(i).get_int32()] = {
_replayer_index[chunk->columns()[0]->get(i).get_slice().to_string()] = {
chunk->columns()[1]->get(i).get_int32(), iter->second.second};
} else if (log.op == ReplayerOP::PARTIAL_UPSERT) {
// insert new record with default val
_replayer_index[chunk->columns()[0]->get(i).get_int32()] = {
_replayer_index[chunk->columns()[0]->get(i).get_slice().to_string()] = {
chunk->columns()[1]->get(i).get_int32(), 0};
} else {
// do nothing
Expand All @@ -186,11 +186,11 @@ class Replayer {
return false;
};
for (int i = 0; i < chunk->num_rows(); i++) {
auto iter = _replayer_index.find(chunk->columns()[0]->get(i).get_int32());
auto iter = _replayer_index.find(chunk->columns()[0]->get(i).get_slice().to_string());
if (iter == _replayer_index.end() || is_condition_meet_fn(iter->second, i)) {
// update if condition meet or not found
// insert new record
_replayer_index[chunk->columns()[0]->get(i).get_int32()] = {
_replayer_index[chunk->columns()[0]->get(i).get_slice().to_string()] = {
chunk->columns()[1]->get(i).get_int32(), chunk->columns()[2]->get(i).get_int32()};
}
}
Expand All @@ -206,7 +206,7 @@ class Replayer {
// logs for replay.
std::vector<ReplayEntry> _redo_logs;
// c0 -> <c1, c2>
std::map<int, std::pair<int, int>> _replayer_index;
std::map<std::string, std::pair<int, int>> _replayer_index;
};

class LakePrimaryKeyConsistencyTest : public TestBase, testing::WithParamInterface<PrimaryKeyParam> {
Expand All @@ -216,8 +216,8 @@ class LakePrimaryKeyConsistencyTest : public TestBase, testing::WithParamInterfa
_tablet_metadata->set_enable_persistent_index(true);
_tablet_metadata->set_persistent_index_type(GetParam().persistent_index_type);

_slots.emplace_back(0, "c0", TypeDescriptor{LogicalType::TYPE_INT});
_partial_slots.emplace_back(0, "c0", TypeDescriptor{LogicalType::TYPE_INT});
_slots.emplace_back(0, "c0", TypeDescriptor{LogicalType::TYPE_VARCHAR});
_partial_slots.emplace_back(0, "c0", TypeDescriptor{LogicalType::TYPE_VARCHAR});
_slots.emplace_back(1, "c1", TypeDescriptor{LogicalType::TYPE_INT});
_partial_slots.emplace_back(1, "c1", TypeDescriptor{LogicalType::TYPE_INT});
_slots.emplace_back(2, "c2", TypeDescriptor{LogicalType::TYPE_INT});
Expand Down Expand Up @@ -269,13 +269,16 @@ class LakePrimaryKeyConsistencyTest : public TestBase, testing::WithParamInterfa
config::enable_pindex_minor_compaction = false;
_old_enable_pk_strict_memcheck = config::enable_pk_strict_memcheck;
config::enable_pk_strict_memcheck = false;
_old_pk_parallel_execution_threshold_bytes = config::pk_parallel_execution_threshold_bytes;
config::pk_parallel_execution_threshold_bytes = 1;
}

void TearDown() override {
(void)fs::remove_all(kTestGroupPath);
config::l0_max_mem_usage = _old_l0_size;
config::write_buffer_size = _old_memtable_size;
config::enable_pk_strict_memcheck = _old_enable_pk_strict_memcheck;
config::pk_parallel_execution_threshold_bytes = _old_pk_parallel_execution_threshold_bytes;
}

std::shared_ptr<TabletMetadataPB> generate_tablet_metadata(KeysType keys_type) {
Expand All @@ -287,7 +290,7 @@ class LakePrimaryKeyConsistencyTest : public TestBase, testing::WithParamInterfa
//
// | column | type | KEY | NULL |
// +--------+------+-----+------+
// | c0 | INT | YES | NO |
// | c0 | STRING | YES | NO |
// | c1 | INT | NO | NO |
// | c2 | INT | NO | NO |
auto schema = metadata->mutable_schema();
Expand All @@ -299,9 +302,10 @@ class LakePrimaryKeyConsistencyTest : public TestBase, testing::WithParamInterfa
{
c0->set_unique_id(next_id());
c0->set_name("c0");
c0->set_type("INT");
c0->set_type("VARCHAR");
c0->set_is_key(true);
c0->set_is_nullable(false);
c0->set_length(3200);
}
auto c1 = schema->add_column();
{
Expand All @@ -327,14 +331,22 @@ class LakePrimaryKeyConsistencyTest : public TestBase, testing::WithParamInterfa
std::pair<ChunkPtr, std::vector<uint32_t>> gen_upsert_data(bool is_upsert) {
const size_t chunk_size = (size_t)_random_generator->random_n();
std::vector<std::vector<int>> cols(3);
std::vector<std::string> key_col_str;
std::vector<Slice> key_col;
std::vector<uint8_t> v3(chunk_size, is_upsert ? TOpType::UPSERT : TOpType::DELETE);
_random_generator->random_cols(chunk_size, &cols);
for (size_t i = 0; i < chunk_size; i++) {
key_col_str.emplace_back(std::to_string(cols[0][i]));
}
for (const auto& s : key_col_str) {
key_col.emplace_back(Slice(s));
}

auto c0 = Int32Column::create();
auto c0 = BinaryColumn::create();
auto c1 = Int32Column::create();
auto c2 = Int32Column::create();
auto c3 = Int8Column::create();
c0->append_numbers(cols[0].data(), cols[0].size() * sizeof(int));
c0->append_strings(key_col.data(), key_col.size());
c1->append_numbers(cols[1].data(), cols[1].size() * sizeof(int));
c2->append_numbers(cols[2].data(), cols[2].size() * sizeof(int));
c3->append_numbers(v3.data(), v3.size() * sizeof(uint8_t));
Expand All @@ -350,12 +362,20 @@ class LakePrimaryKeyConsistencyTest : public TestBase, testing::WithParamInterfa
std::pair<ChunkPtr, std::vector<uint32_t>> gen_partial_update_data() {
const size_t chunk_size = (size_t)_random_generator->random_n();
std::vector<std::vector<int>> cols(2);
std::vector<std::string> key_col_str;
std::vector<Slice> key_col;
std::vector<uint8_t> v3(chunk_size, TOpType::UPSERT);
_random_generator->random_cols(chunk_size, &cols);
for (size_t i = 0; i < chunk_size; i++) {
key_col_str.emplace_back(std::to_string(cols[0][i]));
}
for (const auto& s : key_col_str) {
key_col.emplace_back(Slice(s));
}

auto c0 = Int32Column::create();
auto c0 = BinaryColumn::create();
auto c1 = Int32Column::create();
c0->append_numbers(cols[0].data(), cols[0].size() * sizeof(int));
c0->append_strings(key_col.data(), key_col.size());
c1->append_numbers(cols[1].data(), cols[1].size() * sizeof(int));
auto indexes = std::vector<uint32_t>(chunk_size);
for (uint32_t i = 0; i < chunk_size; i++) {
Expand All @@ -364,6 +384,29 @@ class LakePrimaryKeyConsistencyTest : public TestBase, testing::WithParamInterfa
return {std::make_shared<Chunk>(Columns{std::move(c0), std::move(c1)}, _slot_cid_map), std::move(indexes)};
}

// 5% chance to force index memtable flush
std::unique_ptr<ConfigResetGuard<int64_t>> random_force_index_mem_flush() {
std::unique_ptr<ConfigResetGuard<int64_t>> force_flush_guard;
uint32_t r = _random_generator->random() % 100;
if (r < 5) {
// 5% chance to force index memtable flush
force_flush_guard = std::make_unique<ConfigResetGuard<int64_t>>(&config::l0_max_mem_usage, 1);
}
return force_flush_guard;
}

// 20% chance to enable pk parallel execution
std::unique_ptr<ConfigResetGuard<bool>> random_pk_parallel_execution() {
std::unique_ptr<ConfigResetGuard<bool>> pk_parallel_execution_guard;
uint32_t r = _random_generator->random() % 100;
if (r < 20) {
// 20% chance to enable pk parallel execution
pk_parallel_execution_guard =
std::make_unique<ConfigResetGuard<bool>>(&config::enable_pk_parallel_execution, true);
}
return pk_parallel_execution_guard;
}

ChunkPtr read(int64_t tablet_id, int64_t version) {
ASSIGN_OR_ABORT(auto metadata, _tablet_mgr->get_tablet_metadata(tablet_id, version));
auto reader = std::make_shared<TabletReader>(_tablet_mgr.get(), metadata, *_schema);
Expand All @@ -383,6 +426,8 @@ class LakePrimaryKeyConsistencyTest : public TestBase, testing::WithParamInterfa
}

Status upsert_op() {
std::unique_ptr<ConfigResetGuard<int64_t>> force_index_mem_flush_guard = random_force_index_mem_flush();
std::unique_ptr<ConfigResetGuard<bool>> pk_parallel_execution_guard = random_pk_parallel_execution();
auto txn_id = next_id();
ASSIGN_OR_ABORT(auto delta_writer, DeltaWriterBuilder()
.set_tablet_manager(_tablet_mgr.get())
Expand Down Expand Up @@ -418,6 +463,8 @@ class LakePrimaryKeyConsistencyTest : public TestBase, testing::WithParamInterfa
}

Status partial_update_op(PartialUpdateMode mode) {
std::unique_ptr<ConfigResetGuard<int64_t>> force_index_mem_flush_guard = random_force_index_mem_flush();
std::unique_ptr<ConfigResetGuard<bool>> pk_parallel_execution_guard = random_pk_parallel_execution();
auto txn_id = next_id();
ASSIGN_OR_ABORT(auto delta_writer, DeltaWriterBuilder()
.set_tablet_manager(_tablet_mgr.get())
Expand Down Expand Up @@ -453,6 +500,8 @@ class LakePrimaryKeyConsistencyTest : public TestBase, testing::WithParamInterfa
}

Status condition_update() {
std::unique_ptr<ConfigResetGuard<int64_t>> force_index_mem_flush_guard = random_force_index_mem_flush();
std::unique_ptr<ConfigResetGuard<bool>> pk_parallel_execution_guard = random_pk_parallel_execution();
auto txn_id = next_id();
// c2 as merge_condition
std::string merge_condition = "c2";
Expand Down Expand Up @@ -484,6 +533,8 @@ class LakePrimaryKeyConsistencyTest : public TestBase, testing::WithParamInterfa
}

Status upsert_with_batch_pub_op() {
std::unique_ptr<ConfigResetGuard<int64_t>> force_index_mem_flush_guard = random_force_index_mem_flush();
std::unique_ptr<ConfigResetGuard<bool>> pk_parallel_execution_guard = random_pk_parallel_execution();
size_t batch_cnt = std::max(_random_generator->random() % MaxBatchCnt, (size_t)1);
std::vector<int64_t> txn_ids;
for (int i = 0; i < batch_cnt; i++) {
Expand Down Expand Up @@ -525,6 +576,8 @@ class LakePrimaryKeyConsistencyTest : public TestBase, testing::WithParamInterfa
}

Status delete_op() {
std::unique_ptr<ConfigResetGuard<int64_t>> force_index_mem_flush_guard = random_force_index_mem_flush();
std::unique_ptr<ConfigResetGuard<bool>> pk_parallel_execution_guard = random_pk_parallel_execution();
auto chunk_index = gen_upsert_data(false);
auto txn_id = next_id();
ASSIGN_OR_ABORT(auto delta_writer, DeltaWriterBuilder()
Expand All @@ -550,6 +603,8 @@ class LakePrimaryKeyConsistencyTest : public TestBase, testing::WithParamInterfa
}

Status compact_op() {
std::unique_ptr<ConfigResetGuard<int64_t>> force_index_mem_flush_guard = random_force_index_mem_flush();
std::unique_ptr<ConfigResetGuard<bool>> pk_parallel_execution_guard = random_pk_parallel_execution();
auto txn_id = next_id();
auto task_context = std::make_unique<CompactionTaskContext>(txn_id, _tablet_metadata->id(), _version, false,
false, nullptr);
Expand Down Expand Up @@ -658,6 +713,7 @@ class LakePrimaryKeyConsistencyTest : public TestBase, testing::WithParamInterfa
int64_t _old_l0_size = 0;
int64_t _old_memtable_size = 0;
bool _old_enable_pk_strict_memcheck = false;
int64_t _old_pk_parallel_execution_threshold_bytes = 0;
};

TEST_P(LakePrimaryKeyConsistencyTest, test_local_pk_consistency) {
Expand Down
Loading