Skip to content

Commit 2906920

Browse files
authored
support maxShardNum parameter in restore request (#428)
Signed-off-by: wayblink <[email protected]>
1 parent ab69696 commit 2906920

File tree

7 files changed

+285
-222
lines changed

7 files changed

+285
-222
lines changed

core/backup_impl_create_backup.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -470,7 +470,7 @@ func (b *BackupContext) backupCollectionPrepare(ctx context.Context, backupInfo
470470
}
471471

472472
newSegIDs := lo.Map(unfilledSegments, func(segment *entity.Segment, _ int) int64 { return segment.ID })
473-
log.Info("Finished fill segment",
473+
log.Debug("Finished fill segment",
474474
zap.String("databaseName", collectionBackup.GetDbName()),
475475
zap.String("collectionName", collectionBackup.GetCollectionName()),
476476
zap.Int64s("segments", newSegIDs))
@@ -741,7 +741,7 @@ func (b *BackupContext) executeCreateBackup(ctx context.Context, request *backup
741741
backupInfo.ErrorMessage = err.Error()
742742
return err
743743
}
744-
log.Info("finish executeCreateBackup",
744+
log.Info("finish backup all collections",
745745
zap.String("requestId", request.GetRequestId()),
746746
zap.String("backupName", request.GetBackupName()),
747747
zap.Strings("collections", request.GetCollectionNames()),

core/backup_impl_restore_backup.go

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,8 @@ func (b *BackupContext) RestoreBackup(ctx context.Context, request *backuppb.Res
4242
zap.String("bucketName", request.GetBucketName()),
4343
zap.String("path", request.GetPath()),
4444
zap.String("databaseCollections", utils.GetRestoreDBCollections(request)),
45-
zap.Bool("skipDiskQuotaCheck", request.GetSkipImportDiskQuotaCheck()))
45+
zap.Bool("skipDiskQuotaCheck", request.GetSkipImportDiskQuotaCheck()),
46+
zap.Int32("maxShardNum", request.GetMaxShardNum()))
4647

4748
resp := &backuppb.RestoreBackupResponse{
4849
RequestId: request.GetRequestId(),
@@ -311,6 +312,7 @@ func (b *BackupContext) RestoreBackup(ctx context.Context, request *backuppb.Res
311312
DropExistIndex: request.GetDropExistIndex(),
312313
SkipCreateCollection: request.GetSkipCreateCollection(),
313314
SkipDiskQuotaCheck: request.GetSkipImportDiskQuotaCheck(),
315+
MaxShardNum: request.GetMaxShardNum(),
314316
}
315317
restoreCollectionTasks = append(restoreCollectionTasks, restoreCollectionTask)
316318
task.CollectionRestoreTasks = restoreCollectionTasks
@@ -388,7 +390,15 @@ func (b *BackupContext) executeRestoreBackupTask(ctx context.Context, backupBuck
388390
return task, err
389391
}
390392

391-
b.meta.UpdateRestoreTask(id, setRestoreStateCode(backuppb.RestoreTaskStateCode_SUCCESS), setRestoreEndTime(time.Now().Unix()))
393+
endTime := time.Now().Unix()
394+
task.EndTime = endTime
395+
b.meta.UpdateRestoreTask(id, setRestoreStateCode(backuppb.RestoreTaskStateCode_SUCCESS), setRestoreEndTime(endTime))
396+
397+
log.Info("finish restore all collections",
398+
zap.String("backupName", backup.GetName()),
399+
zap.Int("collections", len(backup.GetCollectionBackups())),
400+
zap.String("taskID", task.GetId()),
401+
zap.Int64("duration in seconds", task.GetEndTime()-task.GetStartTime()))
392402
return task, nil
393403
}
394404

@@ -401,7 +411,8 @@ func (b *BackupContext) executeRestoreCollectionTask(ctx context.Context, backup
401411
zap.String("backup_collection_name", task.GetCollBackup().DbName),
402412
zap.String("target_db_name", targetDBName),
403413
zap.String("target_collection_name", targetCollectionName),
404-
zap.Bool("skipDiskQuotaCheck", task.GetSkipDiskQuotaCheck()))
414+
zap.Bool("skipDiskQuotaCheck", task.GetSkipDiskQuotaCheck()),
415+
zap.Int32("maxShardNum", task.GetMaxShardNum()))
405416
log.Info("start restore",
406417
zap.String("backupBucketName", backupBucketName),
407418
zap.String("backupPath", backupPath))
@@ -460,21 +471,28 @@ func (b *BackupContext) executeRestoreCollectionTask(ctx context.Context, backup
460471
//so here it is necessary to be compatible with the situation where SkipCreateCollection and DropExistCollection are enabled at the same time.
461472
if !task.GetSkipCreateCollection() || task.GetDropExistCollection() {
462473
err := retry.Do(ctx, func() error {
474+
// overwrite shardNum by request parameter
475+
shardNum := task.GetCollBackup().GetShardsNum()
476+
if shardNum > task.GetMaxShardNum() && task.GetMaxShardNum() != 0 {
477+
shardNum = task.GetMaxShardNum()
478+
log.Info("overwrite shardNum by request parameter", zap.Int32("oldShardNum", task.GetCollBackup().GetShardsNum()), zap.Int32("newShardNum", shardNum))
479+
480+
}
463481
if hasPartitionKey {
464482
partitionNum := len(task.GetCollBackup().GetPartitionBackups())
465483
return b.getMilvusClient().CreateCollection(
466484
ctx,
467485
targetDBName,
468486
collectionSchema,
469-
task.GetCollBackup().GetShardsNum(),
487+
shardNum,
470488
gomilvus.WithConsistencyLevel(entity.ConsistencyLevel(task.GetCollBackup().GetConsistencyLevel())),
471489
gomilvus.WithPartitionNum(int64(partitionNum)))
472490
}
473491
return b.getMilvusClient().CreateCollection(
474492
ctx,
475493
targetDBName,
476494
collectionSchema,
477-
task.GetCollBackup().GetShardsNum(),
495+
shardNum,
478496
gomilvus.WithConsistencyLevel(entity.ConsistencyLevel(task.GetCollBackup().GetConsistencyLevel())))
479497
}, retry.Attempts(10), retry.Sleep(1*time.Second))
480498
if err != nil {

core/proto/backup.proto

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,8 @@ message RestoreBackupRequest {
283283
bool skipImportDiskQuotaCheck = 17;
284284
// whether restore RBAC
285285
bool rbac = 18;
286+
// target max shard number
287+
int32 maxShardNum = 19;
286288
}
287289

288290
message RestorePartitionTask {
@@ -321,6 +323,8 @@ message RestoreCollectionTask {
321323
// if true will skip create collections
322324
bool skipCreateCollection = 18;
323325
bool skipDiskQuotaCheck = 19;
326+
// target max shard number
327+
int32 maxShardNum = 20;
324328
}
325329

326330
message RestoreBackupTask {

0 commit comments

Comments
 (0)