Skip to content

Commit e732c3b

Browse files
committed
enhance: add a disk quota for the loaded binlog size to prevent load failures of querynode
Signed-off-by: Shawn Wang <[email protected]>
1 parent e18e7d3 commit e732c3b

File tree

8 files changed

+213
-4
lines changed

8 files changed

+213
-4
lines changed

configs/milvus.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1209,6 +1209,7 @@ quotaAndLimits:
12091209
diskProtection:
12101210
enabled: true # When the total file size of object storage is greater than `diskQuota`, all dml requests would be rejected;
12111211
diskQuota: -1 # MB, (0, +inf), default no limit
1212+
loadedDiskQuota: -1 # MB, (0, +inf), default no limit
12121213
diskQuotaPerDB: -1 # MB, (0, +inf), default no limit
12131214
diskQuotaPerCollection: -1 # MB, (0, +inf), default no limit
12141215
diskQuotaPerPartition: -1 # MB, (0, +inf), default no limit

internal/querynodev2/metrics_info.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,7 @@ func getQuotaMetrics(node *QueryNode) (*metricsinfo.QueryNodeQuotaMetrics, error
153153
NumFlowGraph: node.pipelineManager.Num(),
154154
},
155155
GrowingSegmentsSize: totalGrowingSize,
156+
LoadedBinlogSize: node.manager.Segment.GetLoadedBinlogSize(),
156157
Effect: metricsinfo.NodeEffect{
157158
NodeID: node.GetNodeID(),
158159
CollectionIDs: lo.Keys(collections),

internal/querynodev2/segments/manager.go

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
"fmt"
3030
"sync"
3131

32+
"go.uber.org/atomic"
3233
"go.uber.org/zap"
3334
"golang.org/x/sync/singleflight"
3435

@@ -202,6 +203,10 @@ type SegmentManager interface {
202203
AddLogicalResource(usage ResourceUsage)
203204
SubLogicalResource(usage ResourceUsage)
204205
GetLogicalResource() ResourceUsage
206+
207+
AddLoadedBinlogSize(size int64)
208+
SubLoadedBinlogSize(size int64)
209+
GetLoadedBinlogSize() int64
205210
}
206211

207212
var _ SegmentManager = (*segmentManager)(nil)
@@ -377,6 +382,9 @@ type segmentManager struct {
377382
// only MemorySize and DiskSize are used, other fields are ignored.
378383
logicalResource ResourceUsage
379384
logicalResourceLock sync.Mutex
385+
386+
// loadedBinlogSize stats the total binlog size of all loaded segments of this querynode.
387+
loadedBinlogSize atomic.Int64
380388
}
381389

382390
func NewSegmentManager() *segmentManager {
@@ -424,6 +432,39 @@ func (mgr *segmentManager) GetLogicalResource() ResourceUsage {
424432
return mgr.logicalResource
425433
}
426434

435+
func (mgr *segmentManager) AddLoadedBinlogSize(size int64) {
436+
mgr.loadedBinlogSize.Add(size)
437+
}
438+
439+
func (mgr *segmentManager) SubLoadedBinlogSize(size int64) {
440+
// Clamp to zero to avoid negative values on concurrent or duplicate subtractions
441+
for {
442+
current := mgr.loadedBinlogSize.Load()
443+
newVal := current - size
444+
if newVal < 0 {
445+
newVal = 0
446+
}
447+
if mgr.loadedBinlogSize.CompareAndSwap(current, newVal) {
448+
if current < size {
449+
log.Warn("Loaded binlog size subtraction exceeds current value, clamped to 0",
450+
zap.Int64("current", current),
451+
zap.Int64("subtracted", size))
452+
}
453+
return
454+
}
455+
// retry on CompareAndSwap failure
456+
}
457+
}
458+
459+
func (mgr *segmentManager) GetLoadedBinlogSize() int64 {
460+
current := mgr.loadedBinlogSize.Load()
461+
if current < 0 {
462+
log.Warn("Loaded binlog size is negative, returning 0", zap.Int64("current", current))
463+
return 0
464+
}
465+
return current
466+
}
467+
427468
// put is the internal put method updating both global segments and secondary index.
428469
func (mgr *segmentManager) put(ctx context.Context, segmentType SegmentType, segment Segment) {
429470
mgr.globalSegments.Put(ctx, segmentType, segment)

internal/querynodev2/segments/mock_segment_manager.go

Lines changed: 111 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/querynodev2/segments/segment.go

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -312,6 +312,7 @@ type LocalSegment struct {
312312

313313
// cached results, to avoid too many CGO calls
314314
memSize *atomic.Int64
315+
binlogSize *atomic.Int64
315316
rowNum *atomic.Int64
316317
insertCount *atomic.Int64
317318

@@ -389,6 +390,7 @@ func NewSegment(ctx context.Context,
389390
fieldJSONStats: make(map[int64]*querypb.JsonStatsInfo),
390391

391392
memSize: atomic.NewInt64(-1),
393+
binlogSize: atomic.NewInt64(0),
392394
rowNum: atomic.NewInt64(-1),
393395
insertCount: atomic.NewInt64(0),
394396
}
@@ -1352,10 +1354,17 @@ func (s *LocalSegment) CreateTextIndex(ctx context.Context, fieldID int64) error
13521354
}
13531355

13541356
func (s *LocalSegment) FinishLoad() error {
1357+
err := s.csegment.FinishLoad()
1358+
if err != nil {
1359+
return err
1360+
}
13551361
// TODO: disable logical resource handling for now
13561362
// usage := s.ResourceUsageEstimate()
13571363
// s.manager.AddLogicalResource(usage)
1358-
return s.csegment.FinishLoad()
1364+
binlogSize := calculateSegmentLogSize(s.LoadInfo())
1365+
s.manager.AddLoadedBinlogSize(binlogSize)
1366+
s.binlogSize.Store(binlogSize)
1367+
return nil
13591368
}
13601369

13611370
type ReleaseScope int
@@ -1424,6 +1433,13 @@ func (s *LocalSegment) Release(ctx context.Context, opts ...releaseOption) {
14241433
// usage := s.ResourceUsageEstimate()
14251434
// s.manager.SubLogicalResource(usage)
14261435

1436+
binlogSize := s.binlogSize.Load()
1437+
if binlogSize > 0 {
1438+
// no concurrent change to s.binlogSize, so the subtraction is safe
1439+
s.manager.SubLoadedBinlogSize(binlogSize)
1440+
s.binlogSize.Store(0)
1441+
}
1442+
14271443
log.Info("delete segment from memory")
14281444
}
14291445

internal/rootcoord/quota_center.go

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1378,6 +1378,22 @@ func (q *QuotaCenter) checkDiskQuota(denyWritingDBs map[int64]struct{}) error {
13781378
return err
13791379
}
13801380

1381+
// check disk quota of loaded collections
1382+
totalLoadedDiskQuota := Params.QuotaConfig.LoadedDiskQuota.GetAsFloat()
1383+
totalLoaded := 0.0
1384+
for _, queryNodeMetrics := range q.queryNodeMetrics {
1385+
// for streaming node, queryNodeMetrics.LoadedBinlogSize is always 0
1386+
totalLoaded += float64(queryNodeMetrics.LoadedBinlogSize)
1387+
}
1388+
if totalLoaded >= totalLoadedDiskQuota {
1389+
log.RatedWarn(10, "cluster loaded disk quota exceeded", zap.Float64("total loaded", totalLoaded), zap.Float64("total loaded disk quota", totalLoadedDiskQuota))
1390+
err := q.forceDenyWriting(commonpb.ErrorCode_DiskQuotaExhausted, true, nil, nil, nil)
1391+
if err != nil {
1392+
log.Warn("fail to force deny writing", zap.Error(err))
1393+
}
1394+
return err
1395+
}
1396+
13811397
collectionDiskQuota := Params.QuotaConfig.DiskQuotaPerCollection.GetAsFloat()
13821398
dbSizeInfo := make(map[int64]int64)
13831399
collections := make([]int64, 0)
@@ -1434,7 +1450,7 @@ func (q *QuotaCenter) checkDiskQuota(denyWritingDBs map[int64]struct{}) error {
14341450
func (q *QuotaCenter) checkDBDiskQuota(dbSizeInfo map[int64]int64) []int64 {
14351451
log := log.Ctx(context.Background()).WithRateGroup("rootcoord.QuotaCenter", 1.0, 60.0)
14361452
dbIDs := make([]int64, 0)
1437-
checkDiskQuota := func(dbID, binlogSize int64, quota float64) {
1453+
appendIfExceeded := func(dbID, binlogSize int64, quota float64) {
14381454
if float64(binlogSize) >= quota {
14391455
log.RatedWarn(10, "db disk quota exceeded",
14401456
zap.Int64("db", dbID),
@@ -1451,7 +1467,7 @@ func (q *QuotaCenter) checkDBDiskQuota(dbSizeInfo map[int64]int64) []int64 {
14511467
if dbDiskQuotaStr := db.GetProperty(common.DatabaseDiskQuotaKey); dbDiskQuotaStr != "" {
14521468
if dbDiskQuotaBytes, err := strconv.ParseFloat(dbDiskQuotaStr, 64); err == nil {
14531469
dbDiskQuotaMB := dbDiskQuotaBytes * 1024 * 1024
1454-
checkDiskQuota(dbID, binlogSize, dbDiskQuotaMB)
1470+
appendIfExceeded(dbID, binlogSize, dbDiskQuotaMB)
14551471
continue
14561472
} else {
14571473
log.Warn("invalid configuration for diskQuota.mb",
@@ -1460,7 +1476,7 @@ func (q *QuotaCenter) checkDBDiskQuota(dbSizeInfo map[int64]int64) []int64 {
14601476
}
14611477
}
14621478
}
1463-
checkDiskQuota(dbID, binlogSize, Params.QuotaConfig.DiskQuotaPerDB.GetAsFloat())
1479+
appendIfExceeded(dbID, binlogSize, Params.QuotaConfig.DiskQuotaPerDB.GetAsFloat())
14641480
}
14651481
return dbIDs
14661482
}

pkg/util/metricsinfo/quota_metric.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ type QueryNodeQuotaMetrics struct {
6262
Rms []RateMetric
6363
Fgm FlowGraphMetric
6464
GrowingSegmentsSize int64
65+
LoadedBinlogSize int64
6566
Effect NodeEffect
6667
DeleteBufferInfo DeleteBufferInfo
6768
StreamingQuota *StreamingQuotaMetrics

pkg/util/paramtable/quota_param.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,7 @@ type quotaConfig struct {
153153
GrowingSegmentsSizeHighWaterLevel ParamItem `refreshable:"true"`
154154
DiskProtectionEnabled ParamItem `refreshable:"true"`
155155
DiskQuota ParamItem `refreshable:"true"`
156+
LoadedDiskQuota ParamItem `refreshable:"true"`
156157
DiskQuotaPerDB ParamItem `refreshable:"true"`
157158
DiskQuotaPerCollection ParamItem `refreshable:"true"`
158159
DiskQuotaPerPartition ParamItem `refreshable:"true"`
@@ -1904,6 +1905,27 @@ but the rate will not be lower than minRateRatio * dmlRate.`,
19041905
}
19051906
p.DiskQuota.Init(base.mgr)
19061907

1908+
p.LoadedDiskQuota = ParamItem{
1909+
Key: "quotaAndLimits.limitWriting.diskProtection.loadedDiskQuota",
1910+
Version: "2.6.4",
1911+
DefaultValue: quota,
1912+
Formatter: func(v string) string {
1913+
if !p.DiskProtectionEnabled.GetAsBool() {
1914+
return max
1915+
}
1916+
level := getAsFloat(v)
1917+
// (0, +inf)
1918+
if level <= 0 {
1919+
return max
1920+
}
1921+
// megabytes to bytes
1922+
return fmt.Sprintf("%f", megaBytes2Bytes(level))
1923+
},
1924+
Doc: "MB, (0, +inf), default no limit",
1925+
Export: true,
1926+
}
1927+
p.LoadedDiskQuota.Init(base.mgr)
1928+
19071929
p.DiskQuotaPerDB = ParamItem{
19081930
Key: "quotaAndLimits.limitWriting.diskProtection.diskQuotaPerDB",
19091931
Version: "2.4.1",

0 commit comments

Comments
 (0)