Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions internal/datacoord/compaction_policy_clustering.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,22 @@ func (policy *clusteringCompactionPolicy) Trigger(ctx context.Context) (map[Comp

events := make(map[CompactionTriggerType][]CompactionView, 0)
views := make([]CompactionView, 0)
partitionKeySortViews := make([]CompactionView, 0)
for _, collection := range collections {
collectionViews, _, err := policy.triggerOneCollection(ctx, collection.ID, false)
if err != nil {
// not throw this error because no need to fail because of one collection
log.Warn("fail to trigger collection clustering compaction", zap.Int64("collectionID", collection.ID), zap.Error(err))
}
views = append(views, collectionViews...)
isPartitionKeySorted := IsPartitionKeySortCompactionEnabled(collection.Properties)
if isPartitionKeySorted {
partitionKeySortViews = append(partitionKeySortViews, collectionViews...)
} else {
views = append(views, collectionViews...)
}
}
events[TriggerTypeClustering] = views
events[TriggerTypeClusteringPartitionKeySort] = partitionKeySortViews
return events, nil
}

Expand Down Expand Up @@ -120,12 +127,14 @@ func (policy *clusteringCompactionPolicy) triggerOneCollection(ctx context.Conte
}

partSegments := GetSegmentsChanPart(policy.meta, collectionID, SegmentFilterFunc(func(segment *SegmentInfo) bool {
isPartitionKeySorted := IsPartitionKeySortCompactionEnabled(collection.Properties)
return isSegmentHealthy(segment) &&
isFlushed(segment) &&
!segment.isCompacting && // not compacting now
!segment.GetIsImporting() && // not importing now
segment.GetLevel() != datapb.SegmentLevel_L0 && // ignore level zero segments
!segment.GetIsInvisible()
!segment.GetIsInvisible() &&
(!isPartitionKeySorted || segment.IsPartitionKeySorted)
}))

views := make([]CompactionView, 0)
Expand Down
50 changes: 38 additions & 12 deletions internal/datacoord/compaction_policy_single.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@ import (
"go.uber.org/zap"

"github.com/milvus-io/milvus/internal/datacoord/allocator"
"github.com/milvus-io/milvus/pkg/v2/common"
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
"github.com/milvus-io/milvus/pkg/v2/util/merr"
)

// singleCompactionPolicy is to compact one segment with too many delta logs
Expand All @@ -52,17 +54,23 @@ func (policy *singleCompactionPolicy) Trigger(ctx context.Context) (map[Compacti
events := make(map[CompactionTriggerType][]CompactionView, 0)
views := make([]CompactionView, 0)
sortViews := make([]CompactionView, 0)
partitionKeySortViews := make([]CompactionView, 0)
for _, collection := range collections {
collectionViews, collectionSortViews, _, err := policy.triggerOneCollection(ctx, collection.ID, false)
if err != nil {
// not throw this error because no need to fail because of one collection
log.Warn("fail to trigger single compaction", zap.Int64("collectionID", collection.ID), zap.Error(err))
}
views = append(views, collectionViews...)
sortViews = append(sortViews, collectionSortViews...)
if IsPartitionKeySortCompactionEnabled(collection.Properties) {
partitionKeySortViews = append(partitionKeySortViews, collectionSortViews...)
} else {
sortViews = append(sortViews, collectionSortViews...)
}
}
events[TriggerTypeSingle] = views
events[TriggerTypeSort] = sortViews
events[TriggerTypePartitionKeySort] = partitionKeySortViews
return events, nil
}

Expand All @@ -80,16 +88,6 @@ func (policy *singleCompactionPolicy) triggerSegmentSortCompaction(
log.Warn("fail to apply triggerSegmentSortCompaction, segment not healthy")
return nil
}
if !canTriggerSortCompaction(segment) {
log.Warn("fail to apply triggerSegmentSortCompaction",
zap.String("state", segment.GetState().String()),
zap.String("level", segment.GetLevel().String()),
zap.Bool("isSorted", segment.GetIsSorted()),
zap.Bool("isImporting", segment.GetIsImporting()),
zap.Bool("isCompacting", segment.isCompacting),
zap.Bool("isInvisible", segment.GetIsInvisible()))
return nil
}

collection, err := policy.handler.GetCollection(ctx, segment.GetCollectionID())
if err != nil {
Expand All @@ -101,6 +99,18 @@ func (policy *singleCompactionPolicy) triggerSegmentSortCompaction(
log.Warn("fail to apply triggerSegmentSortCompaction, collection not exist")
return nil
}
isPartitionIsolationEnabled := IsPartitionKeySortCompactionEnabled(collection.Properties)
if !canTriggerSortCompaction(segment, isPartitionIsolationEnabled) {
log.Warn("fail to apply triggerSegmentSortCompaction",
zap.String("state", segment.GetState().String()),
zap.String("level", segment.GetLevel().String()),
zap.Bool("isSorted", segment.GetIsSorted()),
zap.Bool("isImporting", segment.GetIsImporting()),
zap.Bool("isCompacting", segment.isCompacting),
zap.Bool("isInvisible", segment.GetIsInvisible()))
return nil
}

collectionTTL, err := getCollectionTTL(collection.Properties)
if err != nil {
log.Warn("failed to apply triggerSegmentSortCompaction, get collection ttl failed")
Expand All @@ -126,6 +136,11 @@ func (policy *singleCompactionPolicy) triggerSegmentSortCompaction(
return view
}

func IsPartitionKeySortCompactionEnabled(properties map[string]string) bool {
iso, _ := common.IsPartitionKeyIsolationPropEnabled(properties)
return Params.CommonCfg.EnableNamespace.GetAsBool() && iso
}

func (policy *singleCompactionPolicy) triggerSortCompaction(
ctx context.Context,
triggerID int64,
Expand All @@ -139,9 +154,20 @@ func (policy *singleCompactionPolicy) triggerSortCompaction(
}
views := make([]CompactionView, 0)

collection, err := policy.handler.GetCollection(ctx, collectionID)
if err != nil {
log.Warn("fail to apply triggerSegmentSortCompaction, unable to get collection from handler",
zap.Error(err))
return nil, err
}
if collection == nil {
log.Warn("fail to apply triggerSegmentSortCompaction, collection not exist")
return nil, merr.WrapErrCollectionNotFound(collectionID)
}
isPartitionIsolationEnabled := IsPartitionKeySortCompactionEnabled(collection.Properties)
triggerableSegments := policy.meta.SelectSegments(ctx, WithCollection(collectionID),
SegmentFilterFunc(func(seg *SegmentInfo) bool {
return canTriggerSortCompaction(seg)
return canTriggerSortCompaction(seg, isPartitionIsolationEnabled)
}))
if len(triggerableSegments) == 0 {
log.RatedInfo(20, "no triggerable segments")
Expand Down
4 changes: 2 additions & 2 deletions internal/datacoord/compaction_trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -814,10 +814,10 @@ func getExpandedSize(size int64) int64 {
return int64(float64(size) * Params.DataCoordCfg.SegmentExpansionRate.GetAsFloat())
}

func canTriggerSortCompaction(segment *SegmentInfo) bool {
func canTriggerSortCompaction(segment *SegmentInfo, isPartitionIsolationEnabled bool) bool {
return segment.GetState() == commonpb.SegmentState_Flushed &&
segment.GetLevel() != datapb.SegmentLevel_L0 &&
!segment.GetIsSorted() &&
(!segment.GetIsSorted() || (isPartitionIsolationEnabled && !segment.GetIsPartitionKeySorted())) &&
!segment.GetIsImporting() &&
!segment.isCompacting
}
28 changes: 26 additions & 2 deletions internal/datacoord/compaction_trigger_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ const (
TriggerTypeClustering
TriggerTypeSingle
TriggerTypeSort
TriggerTypePartitionKeySort
TriggerTypeClusteringPartitionKeySort
)

func (t CompactionTriggerType) String() string {
Expand All @@ -61,6 +63,10 @@ func (t CompactionTriggerType) String() string {
return "Single"
case TriggerTypeSort:
return "Sort"
case TriggerTypePartitionKeySort:
return "PartitionKeySort"
case TriggerTypeClusteringPartitionKeySort:
return "ClusteringPartitionKeySort"
default:
return ""
}
Expand Down Expand Up @@ -289,7 +295,17 @@ func (m *CompactionTriggerManager) loop(ctx context.Context) {
log.Warn("segment no need to do sort compaction", zap.Int64("segmentID", segID))
continue
}
m.notify(ctx, TriggerTypeSort, []CompactionView{view})
segment := m.meta.GetSegment(ctx, segID)
if segment == nil {
log.Warn("segment not found", zap.Int64("segmentID", segID))
continue
}
collection := m.meta.GetCollection(segment.GetCollectionID())
if !IsPartitionKeySortCompactionEnabled(collection.Properties) {
m.notify(ctx, TriggerTypeSort, []CompactionView{view})
} else {
m.notify(ctx, TriggerTypePartitionKeySort, []CompactionView{view})
}
}
}
}
Expand Down Expand Up @@ -358,6 +374,10 @@ func (m *CompactionTriggerManager) notify(ctx context.Context, eventType Compact
m.SubmitSingleViewToScheduler(ctx, outView, datapb.CompactionType_MixCompaction)
case TriggerTypeSort:
m.SubmitSingleViewToScheduler(ctx, outView, datapb.CompactionType_SortCompaction)
case TriggerTypePartitionKeySort:
m.SubmitSingleViewToScheduler(ctx, outView, datapb.CompactionType_PartitionKeySortCompaction)
case TriggerTypeClusteringPartitionKeySort:
m.SubmitSingleViewToScheduler(ctx, outView, datapb.CompactionType_ClusteringPartitionKeySortCompaction)
}
}
}
Expand Down Expand Up @@ -527,13 +547,17 @@ func (m *CompactionTriggerManager) SubmitClusteringViewToScheduler(ctx context.C
log.Warn("pre-allocate result segments failed", zap.String("view", view.String()), zap.Error(err))
return
}
typ := datapb.CompactionType_ClusteringCompaction
if IsPartitionKeySortCompactionEnabled(collection.Properties) {
typ = datapb.CompactionType_MixCompaction
}
task := &datapb.CompactionTask{
PlanID: taskID,
TriggerID: view.(*ClusteringSegmentsView).triggerID,
State: datapb.CompactionTaskState_pipelining,
StartTime: time.Now().Unix(),
CollectionTtl: view.(*ClusteringSegmentsView).collectionTTL.Nanoseconds(),
Type: datapb.CompactionType_ClusteringCompaction,
Type: typ,
CollectionID: view.GetGroupLabel().CollectionID,
PartitionID: view.GetGroupLabel().PartitionID,
Channel: view.GetGroupLabel().Channel,
Expand Down
18 changes: 18 additions & 0 deletions internal/datanode/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,24 @@ func (node *DataNode) CompactionV2(ctx context.Context, req *datapb.CompactionPl
compactionParams,
[]int64{pk.GetFieldID()},
)
case datapb.CompactionType_PartitionKeySortCompaction:
if req.GetPreAllocatedSegmentIDs() == nil || req.GetPreAllocatedSegmentIDs().GetBegin() == 0 {
return merr.Status(merr.WrapErrParameterInvalidMsg("invalid pre-allocated segmentID range")), nil
}
pk, err := typeutil.GetPartitionKeyFieldSchema(req.GetSchema())
partitionkey, err := typeutil.GetPartitionKeyFieldSchema(req.GetSchema())
if err != nil {
return merr.Status(err), err
}
task = compactor.NewSortCompactionTask(
taskCtx,
binlogIO,
req,
compactionParams,
[]int64{partitionkey.GetFieldID(), pk.GetFieldID()},
)
case datapb.CompactionType_ClusteringPartitionKeySortCompaction:
//TODO
default:
log.Warn("Unknown compaction type", zap.String("type", req.GetType().String()))
return merr.Status(merr.WrapErrParameterInvalidMsg("Unknown compaction type: %v", req.GetType().String())), nil
Expand Down
3 changes: 3 additions & 0 deletions pkg/proto/data_coord.proto
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,7 @@ message SegmentInfo {
// A segment generated by datacoord of old arch, will be false.
// After the growing segment is full managed by streamingnode, the true value can never be seen at coordinator.
bool is_created_by_streaming = 30;
bool is_partition_key_sorted = 31;
}

message SegmentStartPosition {
Expand Down Expand Up @@ -628,6 +629,8 @@ enum CompactionType {
Level0DeleteCompaction = 7;
ClusteringCompaction = 8;
SortCompaction = 9;
PartitionKeySortCompaction = 10;
ClusteringPartitionKeySortCompaction = 11;
}

message CompactionStateRequest {
Expand Down
Loading
Loading