Skip to content

Commit 90ef90f

Browse files
authored
Merge pull request #97 from wayblink/segment-check
tolerrant for incorrect flush return segments
2 parents 071c280 + ec4264d commit 90ef90f

File tree

1 file changed

+42
-28
lines changed

1 file changed

+42
-28
lines changed

core/backup_context.go

Lines changed: 42 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -460,6 +460,14 @@ func (b BackupContext) executeCreateBackup(ctx context.Context, request *backupp
460460
}
461461

462462
// Flush
463+
segmentEntitiesBeforeFlush, err := b.milvusClient.GetPersistentSegmentInfo(ctx, collection.GetCollectionName())
464+
if err != nil {
465+
return backupInfo, err
466+
}
467+
log.Info("GetPersistentSegmentInfo before flush from milvus",
468+
zap.String("collectionName", collection.GetCollectionName()),
469+
zap.Int("segmentNumBeforeFlush", len(segmentEntitiesBeforeFlush)))
470+
463471
newSealedSegmentIDs, flushedSegmentIDs, timeOfSeal, err := b.milvusClient.Flush(ctx, collection.GetCollectionName(), false)
464472
if err != nil {
465473
log.Error(fmt.Sprintf("fail to flush the collection: %s", collection.GetCollectionName()))
@@ -473,48 +481,54 @@ func (b BackupContext) executeCreateBackup(ctx context.Context, request *backupp
473481
collection.BackupTimestamp = utils.ComposeTS(timeOfSeal, 0)
474482
collection.BackupPhysicalTimestamp = uint64(timeOfSeal)
475483

476-
flushSegments := append(newSealedSegmentIDs, flushedSegmentIDs...)
477-
segmentEntities, err := b.milvusClient.GetPersistentSegmentInfo(ctx, collection.GetCollectionName())
484+
flushSegmentIDs := append(newSealedSegmentIDs, flushedSegmentIDs...)
485+
segmentEntitiesAfterFlush, err := b.milvusClient.GetPersistentSegmentInfo(ctx, collection.GetCollectionName())
478486
if err != nil {
479487
return backupInfo, err
480488
}
481-
log.Info("GetPersistentSegmentInfo from milvus",
489+
log.Info("GetPersistentSegmentInfo after flush from milvus",
482490
zap.String("collectionName", collection.GetCollectionName()),
483-
zap.Int("segmentNum", len(segmentEntities)))
484-
485-
checkSegmentsFunc := func(flushSegmentIds []int64, segmentEntities []*entity.Segment) ([]*entity.Segment, error) {
486-
segmentDict := utils.ArrayToMap(flushSegmentIds)
487-
checkedSegments := make([]*entity.Segment, 0)
488-
for _, seg := range segmentEntities {
489-
sid := seg.ID
490-
if _, ok := segmentDict[sid]; ok {
491-
delete(segmentDict, sid)
492-
checkedSegments = append(checkedSegments, seg)
493-
} else {
494-
log.Warn("this may be new segments after flush, skip it", zap.Int64("id", sid))
495-
}
491+
zap.Int("segmentNumBeforeFlush", len(segmentEntitiesBeforeFlush)),
492+
zap.Int("segmentNumAfterFlush", len(segmentEntitiesAfterFlush)))
493+
494+
// fill segments
495+
filledSegments := make([]*entity.Segment, 0)
496+
segmentDict := utils.ArrayToMap(flushSegmentIDs)
497+
for _, seg := range segmentEntitiesAfterFlush {
498+
sid := seg.ID
499+
if _, ok := segmentDict[sid]; ok {
500+
delete(segmentDict, sid)
501+
filledSegments = append(filledSegments, seg)
502+
} else {
503+
log.Warn("this may be new segments after flush, skip it", zap.Int64("id", sid))
496504
}
497-
if len(segmentDict) > 0 {
498-
errorMsg := "Segment return in Flush not exist in GetPersistentSegmentInfo. segment ids: " + fmt.Sprint(utils.MapKeyArray(segmentDict))
499-
log.Warn(errorMsg)
500-
return checkedSegments, errors.New(errorMsg)
505+
}
506+
for _, seg := range segmentEntitiesBeforeFlush {
507+
sid := seg.ID
508+
if _, ok := segmentDict[sid]; ok {
509+
delete(segmentDict, sid)
510+
filledSegments = append(filledSegments, seg)
511+
} else {
512+
log.Warn("this may be old segments before flush, skip it", zap.Int64("id", sid))
501513
}
502-
return checkedSegments, nil
503514
}
504-
checkedSegments, err := checkSegmentsFunc(flushSegments, segmentEntities)
515+
if len(segmentDict) > 0 {
516+
// very rare situation, segments return in flush doesn't exist in either segmentEntitiesBeforeFlush and segmentEntitiesAfterFlush
517+
errorMsg := "Segment return in Flush not exist in GetPersistentSegmentInfo. segment ids: " + fmt.Sprint(utils.MapKeyArray(segmentDict))
518+
log.Warn(errorMsg)
519+
}
520+
505521
if err != nil {
506522
collection.StateCode = backuppb.BackupTaskStateCode_BACKUP_FAIL
507523
collection.ErrorMessage = err.Error()
508524
return backupInfo, err
509525
}
510-
log.Info("Finished segment check",
511-
zap.String("collectionName", collection.GetCollectionName()),
512-
zap.Int("before check", len(segmentEntities)),
513-
zap.Int("after check", len(checkedSegments)))
526+
log.Info("Finished fill segment",
527+
zap.String("collectionName", collection.GetCollectionName()))
514528

515529
segmentBackupInfos := make([]*backuppb.SegmentBackupInfo, 0)
516530
partSegInfoMap := make(map[int64][]*backuppb.SegmentBackupInfo)
517-
for _, segment := range checkedSegments {
531+
for _, segment := range filledSegments {
518532
segmentInfo, err := b.readSegmentInfo(ctx, segment.CollectionID, segment.ParititionID, segment.ID, segment.NumRows)
519533
if err != nil {
520534
return backupInfo, err
@@ -528,7 +542,7 @@ func (b BackupContext) executeCreateBackup(ctx context.Context, request *backupp
528542
}
529543
log.Info("readSegmentInfo from storage",
530544
zap.String("collectionName", collection.GetCollectionName()),
531-
zap.Int("segmentNum", len(checkedSegments)))
545+
zap.Int("segmentNum", len(filledSegments)))
532546

533547
leveledBackupInfo.segmentLevel = &backuppb.SegmentLevelBackupInfo{
534548
Infos: segmentLevelBackupInfos,

0 commit comments

Comments
 (0)