@@ -99,15 +99,15 @@ class Replayer {
9999 }
100100
101101 bool check (const ChunkPtr& chunk) {
102- std::map<int , std::pair<int , int >> tmp_chunk;
102+ std::map<std::string , std::pair<int , int >> tmp_chunk;
103103 for (int i = 0 ; i < chunk->num_rows (); i++) {
104- if (tmp_chunk.count (chunk->columns ()[0 ]->get (i).get_int32 ()) > 0 ) {
104+ if (tmp_chunk.count (chunk->columns ()[0 ]->get (i).get_slice (). to_string ()) > 0 ) {
105105 // duplicate pk
106- LOG (ERROR) << " duplicate pk: " << chunk->columns ()[0 ]->get (i).get_int32 ();
106+ LOG (ERROR) << " duplicate pk: " << chunk->columns ()[0 ]->get (i).get_slice (). to_string ();
107107 return false ;
108108 }
109- tmp_chunk[chunk->columns ()[0 ]->get (i).get_int32 ()] = {chunk->columns ()[1 ]->get (i).get_int32 (),
110- chunk->columns ()[2 ]->get (i).get_int32 ()};
109+ tmp_chunk[chunk->columns ()[0 ]->get (i).get_slice (). to_string ()] = {chunk->columns ()[1 ]->get (i).get_int32 (),
110+ chunk->columns ()[2 ]->get (i).get_int32 ()};
111111 }
112112 if (tmp_chunk.size () != _replayer_index.size ()) {
113113 LOG (ERROR) << " inconsistency row number, actual : " << tmp_chunk.size ()
@@ -146,24 +146,24 @@ class Replayer {
146146 if (log.op == ReplayerOP::UPSERT) {
147147 // Upsert
148148 for (int i = 0 ; i < chunk->num_rows (); i++) {
149- _replayer_index[chunk->columns ()[0 ]->get (i).get_int32 ()] = {
149+ _replayer_index[chunk->columns ()[0 ]->get (i).get_slice (). to_string ()] = {
150150 chunk->columns ()[1 ]->get (i).get_int32 (), chunk->columns ()[2 ]->get (i).get_int32 ()};
151151 }
152152 } else if (log.op == ReplayerOP::ERASE) {
153153 // Delete
154154 for (int i = 0 ; i < chunk->num_rows (); i++) {
155- _replayer_index.erase (chunk->columns ()[0 ]->get (i).get_int32 ());
155+ _replayer_index.erase (chunk->columns ()[0 ]->get (i).get_slice (). to_string ());
156156 }
157157 } else if (log.op == ReplayerOP::PARTIAL_UPSERT || log.op == ReplayerOP::PARTIAL_UPDATE) {
158158 // Partial update
159159 for (int i = 0 ; i < chunk->num_rows (); i++) {
160- auto iter = _replayer_index.find (chunk->columns ()[0 ]->get (i).get_int32 ());
160+ auto iter = _replayer_index.find (chunk->columns ()[0 ]->get (i).get_slice (). to_string ());
161161 if (iter != _replayer_index.end ()) {
162- _replayer_index[chunk->columns ()[0 ]->get (i).get_int32 ()] = {
162+ _replayer_index[chunk->columns ()[0 ]->get (i).get_slice (). to_string ()] = {
163163 chunk->columns ()[1 ]->get (i).get_int32 (), iter->second .second };
164164 } else if (log.op == ReplayerOP::PARTIAL_UPSERT) {
165165 // insert new record with default val
166- _replayer_index[chunk->columns ()[0 ]->get (i).get_int32 ()] = {
166+ _replayer_index[chunk->columns ()[0 ]->get (i).get_slice (). to_string ()] = {
167167 chunk->columns ()[1 ]->get (i).get_int32 (), 0 };
168168 } else {
169169 // do nothing
@@ -186,11 +186,11 @@ class Replayer {
186186 return false ;
187187 };
188188 for (int i = 0 ; i < chunk->num_rows (); i++) {
189- auto iter = _replayer_index.find (chunk->columns ()[0 ]->get (i).get_int32 ());
189+ auto iter = _replayer_index.find (chunk->columns ()[0 ]->get (i).get_slice (). to_string ());
190190 if (iter == _replayer_index.end () || is_condition_meet_fn (iter->second , i)) {
191191 // update if condition meet or not found
192192 // insert new record
193- _replayer_index[chunk->columns ()[0 ]->get (i).get_int32 ()] = {
193+ _replayer_index[chunk->columns ()[0 ]->get (i).get_slice (). to_string ()] = {
194194 chunk->columns ()[1 ]->get (i).get_int32 (), chunk->columns ()[2 ]->get (i).get_int32 ()};
195195 }
196196 }
@@ -206,7 +206,7 @@ class Replayer {
206206 // logs for replay.
207207 std::vector<ReplayEntry> _redo_logs;
208208 // c0 -> <c1, c2>
209- std::map<int , std::pair<int , int >> _replayer_index;
209+ std::map<std::string , std::pair<int , int >> _replayer_index;
210210};
211211
212212class LakePrimaryKeyConsistencyTest : public TestBase , testing::WithParamInterface<PrimaryKeyParam> {
@@ -216,8 +216,8 @@ class LakePrimaryKeyConsistencyTest : public TestBase, testing::WithParamInterfa
216216 _tablet_metadata->set_enable_persistent_index (true );
217217 _tablet_metadata->set_persistent_index_type (GetParam ().persistent_index_type );
218218
219- _slots.emplace_back (0 , " c0" , TypeDescriptor{LogicalType::TYPE_INT });
220- _partial_slots.emplace_back (0 , " c0" , TypeDescriptor{LogicalType::TYPE_INT });
219+ _slots.emplace_back (0 , " c0" , TypeDescriptor{LogicalType::TYPE_VARCHAR });
220+ _partial_slots.emplace_back (0 , " c0" , TypeDescriptor{LogicalType::TYPE_VARCHAR });
221221 _slots.emplace_back (1 , " c1" , TypeDescriptor{LogicalType::TYPE_INT});
222222 _partial_slots.emplace_back (1 , " c1" , TypeDescriptor{LogicalType::TYPE_INT});
223223 _slots.emplace_back (2 , " c2" , TypeDescriptor{LogicalType::TYPE_INT});
@@ -269,13 +269,16 @@ class LakePrimaryKeyConsistencyTest : public TestBase, testing::WithParamInterfa
269269 config::enable_pindex_minor_compaction = false ;
270270 _old_enable_pk_strict_memcheck = config::enable_pk_strict_memcheck;
271271 config::enable_pk_strict_memcheck = false ;
272+ _old_pk_parallel_execution_threshold_bytes = config::pk_parallel_execution_threshold_bytes;
273+ config::pk_parallel_execution_threshold_bytes = 1 ;
272274 }
273275
274276 void TearDown () override {
275277 (void )fs::remove_all (kTestGroupPath );
276278 config::l0_max_mem_usage = _old_l0_size;
277279 config::write_buffer_size = _old_memtable_size;
278280 config::enable_pk_strict_memcheck = _old_enable_pk_strict_memcheck;
281+ config::pk_parallel_execution_threshold_bytes = _old_pk_parallel_execution_threshold_bytes;
279282 }
280283
281284 std::shared_ptr<TabletMetadataPB> generate_tablet_metadata (KeysType keys_type) {
@@ -287,7 +290,7 @@ class LakePrimaryKeyConsistencyTest : public TestBase, testing::WithParamInterfa
287290 //
288291 // | column | type | KEY | NULL |
289292 // +--------+------+-----+------+
290- // | c0 | INT | YES | NO |
293+ // | c0 | STRING | YES | NO |
291294 // | c1 | INT | NO | NO |
292295 // | c2 | INT | NO | NO |
293296 auto schema = metadata->mutable_schema ();
@@ -299,9 +302,10 @@ class LakePrimaryKeyConsistencyTest : public TestBase, testing::WithParamInterfa
299302 {
300303 c0->set_unique_id (next_id ());
301304 c0->set_name (" c0" );
302- c0->set_type (" INT " );
305+ c0->set_type (" VARCHAR " );
303306 c0->set_is_key (true );
304307 c0->set_is_nullable (false );
308+ c0->set_length (3200 );
305309 }
306310 auto c1 = schema->add_column ();
307311 {
@@ -327,14 +331,22 @@ class LakePrimaryKeyConsistencyTest : public TestBase, testing::WithParamInterfa
327331 std::pair<ChunkPtr, std::vector<uint32_t >> gen_upsert_data (bool is_upsert) {
328332 const size_t chunk_size = (size_t )_random_generator->random_n ();
329333 std::vector<std::vector<int >> cols (3 );
334+ std::vector<std::string> key_col_str;
335+ std::vector<Slice> key_col;
330336 std::vector<uint8_t > v3 (chunk_size, is_upsert ? TOpType::UPSERT : TOpType::DELETE);
331337 _random_generator->random_cols (chunk_size, &cols);
338+ for (size_t i = 0 ; i < chunk_size; i++) {
339+ key_col_str.emplace_back (std::to_string (cols[0 ][i]));
340+ }
341+ for (const auto & s : key_col_str) {
342+ key_col.emplace_back (Slice (s));
343+ }
332344
333- auto c0 = Int32Column ::create ();
345+ auto c0 = BinaryColumn ::create ();
334346 auto c1 = Int32Column::create ();
335347 auto c2 = Int32Column::create ();
336348 auto c3 = Int8Column::create ();
337- c0->append_numbers (cols[ 0 ] .data (), cols[ 0 ] .size () * sizeof ( int ));
349+ c0->append_strings (key_col .data (), key_col .size ());
338350 c1->append_numbers (cols[1 ].data (), cols[1 ].size () * sizeof (int ));
339351 c2->append_numbers (cols[2 ].data (), cols[2 ].size () * sizeof (int ));
340352 c3->append_numbers (v3.data (), v3.size () * sizeof (uint8_t ));
@@ -350,12 +362,20 @@ class LakePrimaryKeyConsistencyTest : public TestBase, testing::WithParamInterfa
350362 std::pair<ChunkPtr, std::vector<uint32_t >> gen_partial_update_data () {
351363 const size_t chunk_size = (size_t )_random_generator->random_n ();
352364 std::vector<std::vector<int >> cols (2 );
365+ std::vector<std::string> key_col_str;
366+ std::vector<Slice> key_col;
353367 std::vector<uint8_t > v3 (chunk_size, TOpType::UPSERT);
354368 _random_generator->random_cols (chunk_size, &cols);
369+ for (size_t i = 0 ; i < chunk_size; i++) {
370+ key_col_str.emplace_back (std::to_string (cols[0 ][i]));
371+ }
372+ for (const auto & s : key_col_str) {
373+ key_col.emplace_back (Slice (s));
374+ }
355375
356- auto c0 = Int32Column ::create ();
376+ auto c0 = BinaryColumn ::create ();
357377 auto c1 = Int32Column::create ();
358- c0->append_numbers (cols[ 0 ] .data (), cols[ 0 ] .size () * sizeof ( int ));
378+ c0->append_strings (key_col .data (), key_col .size ());
359379 c1->append_numbers (cols[1 ].data (), cols[1 ].size () * sizeof (int ));
360380 auto indexes = std::vector<uint32_t >(chunk_size);
361381 for (uint32_t i = 0 ; i < chunk_size; i++) {
@@ -364,6 +384,29 @@ class LakePrimaryKeyConsistencyTest : public TestBase, testing::WithParamInterfa
364384 return {std::make_shared<Chunk>(Columns{std::move (c0), std::move (c1)}, _slot_cid_map), std::move (indexes)};
365385 }
366386
387+ // 5% chance to force index memtable flush
388+ std::unique_ptr<ConfigResetGuard<int64_t >> random_force_index_mem_flush () {
389+ std::unique_ptr<ConfigResetGuard<int64_t >> force_flush_guard;
390+ uint32_t r = _random_generator->random () % 100 ;
391+ if (r < 5 ) {
392+ // 5% chance to force index memtable flush
393+ force_flush_guard = std::make_unique<ConfigResetGuard<int64_t >>(&config::l0_max_mem_usage, 1 );
394+ }
395+ return force_flush_guard;
396+ }
397+
398+ // 20% chance to enable pk parallel execution
399+ std::unique_ptr<ConfigResetGuard<bool >> random_pk_parallel_execution () {
400+ std::unique_ptr<ConfigResetGuard<bool >> pk_parallel_execution_guard;
401+ uint32_t r = _random_generator->random () % 100 ;
402+ if (r < 20 ) {
403+ // 20% chance to enable pk parallel execution
404+ pk_parallel_execution_guard =
405+ std::make_unique<ConfigResetGuard<bool >>(&config::enable_pk_parallel_execution, true );
406+ }
407+ return pk_parallel_execution_guard;
408+ }
409+
367410 ChunkPtr read (int64_t tablet_id, int64_t version) {
368411 ASSIGN_OR_ABORT (auto metadata, _tablet_mgr->get_tablet_metadata (tablet_id, version));
369412 auto reader = std::make_shared<TabletReader>(_tablet_mgr.get (), metadata, *_schema);
@@ -383,6 +426,8 @@ class LakePrimaryKeyConsistencyTest : public TestBase, testing::WithParamInterfa
383426 }
384427
385428 Status upsert_op () {
429+ std::unique_ptr<ConfigResetGuard<int64_t >> force_index_mem_flush_guard = random_force_index_mem_flush ();
430+ std::unique_ptr<ConfigResetGuard<bool >> pk_parallel_execution_guard = random_pk_parallel_execution ();
386431 auto txn_id = next_id ();
387432 ASSIGN_OR_ABORT (auto delta_writer, DeltaWriterBuilder ()
388433 .set_tablet_manager (_tablet_mgr.get ())
@@ -418,6 +463,8 @@ class LakePrimaryKeyConsistencyTest : public TestBase, testing::WithParamInterfa
418463 }
419464
420465 Status partial_update_op (PartialUpdateMode mode) {
466+ std::unique_ptr<ConfigResetGuard<int64_t >> force_index_mem_flush_guard = random_force_index_mem_flush ();
467+ std::unique_ptr<ConfigResetGuard<bool >> pk_parallel_execution_guard = random_pk_parallel_execution ();
421468 auto txn_id = next_id ();
422469 ASSIGN_OR_ABORT (auto delta_writer, DeltaWriterBuilder ()
423470 .set_tablet_manager (_tablet_mgr.get ())
@@ -453,6 +500,8 @@ class LakePrimaryKeyConsistencyTest : public TestBase, testing::WithParamInterfa
453500 }
454501
455502 Status condition_update () {
503+ std::unique_ptr<ConfigResetGuard<int64_t >> force_index_mem_flush_guard = random_force_index_mem_flush ();
504+ std::unique_ptr<ConfigResetGuard<bool >> pk_parallel_execution_guard = random_pk_parallel_execution ();
456505 auto txn_id = next_id ();
457506 // c2 as merge_condition
458507 std::string merge_condition = " c2" ;
@@ -484,6 +533,8 @@ class LakePrimaryKeyConsistencyTest : public TestBase, testing::WithParamInterfa
484533 }
485534
486535 Status upsert_with_batch_pub_op () {
536+ std::unique_ptr<ConfigResetGuard<int64_t >> force_index_mem_flush_guard = random_force_index_mem_flush ();
537+ std::unique_ptr<ConfigResetGuard<bool >> pk_parallel_execution_guard = random_pk_parallel_execution ();
487538 size_t batch_cnt = std::max (_random_generator->random () % MaxBatchCnt, (size_t )1 );
488539 std::vector<int64_t > txn_ids;
489540 for (int i = 0 ; i < batch_cnt; i++) {
@@ -525,6 +576,8 @@ class LakePrimaryKeyConsistencyTest : public TestBase, testing::WithParamInterfa
525576 }
526577
527578 Status delete_op () {
579+ std::unique_ptr<ConfigResetGuard<int64_t >> force_index_mem_flush_guard = random_force_index_mem_flush ();
580+ std::unique_ptr<ConfigResetGuard<bool >> pk_parallel_execution_guard = random_pk_parallel_execution ();
528581 auto chunk_index = gen_upsert_data (false );
529582 auto txn_id = next_id ();
530583 ASSIGN_OR_ABORT (auto delta_writer, DeltaWriterBuilder ()
@@ -550,6 +603,8 @@ class LakePrimaryKeyConsistencyTest : public TestBase, testing::WithParamInterfa
550603 }
551604
552605 Status compact_op () {
606+ std::unique_ptr<ConfigResetGuard<int64_t >> force_index_mem_flush_guard = random_force_index_mem_flush ();
607+ std::unique_ptr<ConfigResetGuard<bool >> pk_parallel_execution_guard = random_pk_parallel_execution ();
553608 auto txn_id = next_id ();
554609 auto task_context = std::make_unique<CompactionTaskContext>(txn_id, _tablet_metadata->id (), _version, false ,
555610 false , nullptr );
@@ -658,6 +713,7 @@ class LakePrimaryKeyConsistencyTest : public TestBase, testing::WithParamInterfa
658713 int64_t _old_l0_size = 0 ;
659714 int64_t _old_memtable_size = 0 ;
660715 bool _old_enable_pk_strict_memcheck = false ;
716+ int64_t _old_pk_parallel_execution_threshold_bytes = 0 ;
661717};
662718
663719TEST_P (LakePrimaryKeyConsistencyTest, test_local_pk_consistency) {
0 commit comments