Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
* [ENHANCEMENT] Ingester: Instrument Ingester CPU profile with userID for read APIs. #7184
* [ENHANCEMENT] Ingester: Add fetch timeout for Ingester expanded postings cache. #7185
* [ENHANCEMENT] Ingester: Add feature flag to collect metrics of how expensive an unoptimized regex matcher is and new limits to protect Ingester query path against expensive unoptimized regex matchers. #7194 #7210
* [ENHANCEMENT] Compactor: Add partition group creation time to visit marker. #7217
* [BUGFIX] Ring: Change DynamoDB KV to retry indefinitely for WatchKey. #7088
* [BUGFIX] Ruler: Add XFunctions validation support. #7111
* [BUGFIX] Querier: propagate Prometheus info annotations in protobuf responses. #7132
Expand Down
4 changes: 2 additions & 2 deletions pkg/compactor/blocks_cleaner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1000,9 +1000,9 @@ func TestBlocksCleaner_CleanPartitionedGroupInfo(t *testing.T) {
require.NoError(t, err)
require.False(t, partitionedGroupFileExists)

partitionedGroupFileExists, err = userBucket.Exists(ctx, visitMarker.GetVisitMarkerFilePath())
visitMarkerExists, err := userBucket.Exists(ctx, visitMarker.GetVisitMarkerFilePath())
require.NoError(t, err)
require.False(t, partitionedGroupFileExists)
require.False(t, visitMarkerExists)
}

func TestBlocksCleaner_DeleteEmptyBucketIndex(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/compactor/partition_compaction_grouper.go
Original file line number Diff line number Diff line change
Expand Up @@ -632,7 +632,7 @@ func (g *PartitionCompactionGrouper) pickPartitionCompactionJob(partitionCompact
partitionCount := partitionedGroup.partitionedGroupInfo.PartitionCount
partitionID := partitionedGroup.partition.PartitionID
partitionedGroupLogger := log.With(g.logger, "rangeStart", partitionedGroup.rangeStartTime().String(), "rangeEnd", partitionedGroup.rangeEndTime().String(), "rangeDuration", partitionedGroup.rangeDuration().String(), "partitioned_group_id", partitionedGroupID, "partition_id", partitionID, "partition_count", partitionCount, "group_hash", groupHash)
visitMarker := newPartitionVisitMarker(g.ringLifecyclerID, partitionedGroupID, partitionID)
visitMarker := newPartitionVisitMarker(g.ringLifecyclerID, partitionedGroupID, partitionedGroup.partitionedGroupInfo.CreationTime, partitionID)
visitMarkerManager := NewVisitMarkerManager(g.bkt, g.logger, g.ringLifecyclerID, visitMarker)
if isVisited, err := g.isGroupVisited(partitionID, visitMarkerManager); err != nil {
level.Warn(partitionedGroupLogger).Log("msg", "unable to check if partition is visited", "err", err, "group", partitionedGroup.String())
Expand Down
2 changes: 1 addition & 1 deletion pkg/compactor/partition_compaction_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (p *PartitionCompactionPlanner) PlanWithPartition(_ context.Context, metasB
// claimed same partition in grouper at same time.
time.Sleep(p.plannerDelay)

visitMarker := newPartitionVisitMarker(p.ringLifecyclerID, partitionedGroupID, partitionID)
visitMarker := newPartitionVisitMarker(p.ringLifecyclerID, partitionedGroupID, partitionInfo.PartitionedGroupCreationTime, partitionID)
visitMarkerManager := NewVisitMarkerManager(p.bkt, p.logger, p.ringLifecyclerID, visitMarker)
existingPartitionVisitMarker := &partitionVisitMarker{}
err := visitMarkerManager.ReadVisitMarker(p.ctx, existingPartitionVisitMarker)
Expand Down
14 changes: 9 additions & 5 deletions pkg/compactor/partition_visit_marker.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,22 @@ type partitionVisitMarker struct {
CompactorID string `json:"compactorID"`
Status VisitStatus `json:"status"`
PartitionedGroupID uint32 `json:"partitionedGroupID"`
PartitionID int `json:"partitionID"`
// VisitTime is a unix timestamp of when the partitioning group plan was created, in order to validate if the marker
// is referring to the latest version of the group plan
PartitionedGroupCreationTime int64 `json:"partitionedGroupCreationTime"`
PartitionID int `json:"partitionID"`
// VisitTime is a unix timestamp of when the partition was visited (mark updated).
VisitTime int64 `json:"visitTime"`
// Version of the file.
Version int `json:"version"`
}

func newPartitionVisitMarker(compactorID string, partitionedGroupID uint32, partitionID int) *partitionVisitMarker {
func newPartitionVisitMarker(compactorID string, partitionedGroupID uint32, partitioned_group_creation_time int64, partitionID int) *partitionVisitMarker {
return &partitionVisitMarker{
CompactorID: compactorID,
PartitionedGroupID: partitionedGroupID,
PartitionID: partitionID,
CompactorID: compactorID,
PartitionedGroupID: partitionedGroupID,
PartitionedGroupCreationTime: partitioned_group_creation_time,
PartitionID: partitionID,
}
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/compactor/partitioned_group_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,8 @@ func (p *PartitionedGroupInfo) getPartitionedGroupStatus(
status.PendingPartitions++
allPartitionCompleted = false
status.PendingOrFailedPartitions = append(status.PendingOrFailedPartitions, partition)
} else if visitMarker.VisitTime < p.CreationTime {
} else if visitMarker.VisitTime < p.CreationTime ||
(visitMarker.PartitionedGroupCreationTime > 0 && visitMarker.PartitionedGroupCreationTime < p.CreationTime) {
status.VisitMarkersToDelete = append(status.VisitMarkersToDelete, visitMarker)
allPartitionCompleted = false
} else if (visitMarker.GetStatus() == Pending || visitMarker.GetStatus() == InProgress) && !visitMarker.IsExpired(partitionVisitMarkerTimeout) {
Expand Down
Loading