Skip to content

Commit 9351608

Browse files
authored
enhance: add a disk quota for the loaded binlog size to prevent load failure of querynode (#44893)
issue: #41435 --------- Signed-off-by: Shawn Wang <[email protected]>
1 parent ac062c6 commit 9351608

File tree

11 files changed

+288
-8
lines changed

11 files changed

+288
-8
lines changed

configs/milvus.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1209,6 +1209,7 @@ quotaAndLimits:
12091209
diskProtection:
12101210
enabled: true # When the total file size of object storage is greater than `diskQuota`, all dml requests would be rejected;
12111211
diskQuota: -1 # MB, (0, +inf), default no limit
1212+
loadedDiskQuota: -1 # MB, (0, +inf), default no limit
12121213
diskQuotaPerDB: -1 # MB, (0, +inf), default no limit
12131214
diskQuotaPerCollection: -1 # MB, (0, +inf), default no limit
12141215
diskQuotaPerPartition: -1 # MB, (0, +inf), default no limit

internal/querynodev2/metrics_info.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,7 @@ func getQuotaMetrics(node *QueryNode) (*metricsinfo.QueryNodeQuotaMetrics, error
153153
NumFlowGraph: node.pipelineManager.Num(),
154154
},
155155
GrowingSegmentsSize: totalGrowingSize,
156+
LoadedBinlogSize: node.manager.Segment.GetLoadedBinlogSize(),
156157
Effect: metricsinfo.NodeEffect{
157158
NodeID: node.GetNodeID(),
158159
CollectionIDs: lo.Keys(collections),

internal/querynodev2/segments/manager.go

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
"fmt"
3030
"sync"
3131

32+
"go.uber.org/atomic"
3233
"go.uber.org/zap"
3334
"golang.org/x/sync/singleflight"
3435

@@ -202,6 +203,10 @@ type SegmentManager interface {
202203
AddLogicalResource(usage ResourceUsage)
203204
SubLogicalResource(usage ResourceUsage)
204205
GetLogicalResource() ResourceUsage
206+
207+
AddLoadedBinlogSize(size int64)
208+
SubLoadedBinlogSize(size int64)
209+
GetLoadedBinlogSize() int64
205210
}
206211

207212
var _ SegmentManager = (*segmentManager)(nil)
@@ -377,6 +382,9 @@ type segmentManager struct {
377382
// only MemorySize and DiskSize are used, other fields are ignored.
378383
logicalResource ResourceUsage
379384
logicalResourceLock sync.Mutex
385+
386+
// loadedBinlogSize stats the total binlog size of all loaded segments of this querynode.
387+
loadedBinlogSize atomic.Int64
380388
}
381389

382390
func NewSegmentManager() *segmentManager {
@@ -424,6 +432,39 @@ func (mgr *segmentManager) GetLogicalResource() ResourceUsage {
424432
return mgr.logicalResource
425433
}
426434

435+
func (mgr *segmentManager) AddLoadedBinlogSize(size int64) {
436+
mgr.loadedBinlogSize.Add(size)
437+
}
438+
439+
func (mgr *segmentManager) SubLoadedBinlogSize(size int64) {
440+
// Clamp to zero to avoid negative values on concurrent or duplicate subtractions
441+
for {
442+
current := mgr.loadedBinlogSize.Load()
443+
newVal := current - size
444+
if newVal < 0 {
445+
newVal = 0
446+
}
447+
if mgr.loadedBinlogSize.CompareAndSwap(current, newVal) {
448+
if current < size {
449+
log.Warn("Loaded binlog size subtraction exceeds current value, clamped to 0",
450+
zap.Int64("current", current),
451+
zap.Int64("subtracted", size))
452+
}
453+
return
454+
}
455+
// retry on CompareAndSwap failure
456+
}
457+
}
458+
459+
func (mgr *segmentManager) GetLoadedBinlogSize() int64 {
460+
current := mgr.loadedBinlogSize.Load()
461+
if current < 0 {
462+
log.Warn("Loaded binlog size is negative, returning 0", zap.Int64("current", current))
463+
return 0
464+
}
465+
return current
466+
}
467+
427468
// put is the internal put method updating both global segments and secondary index.
428469
func (mgr *segmentManager) put(ctx context.Context, segmentType SegmentType, segment Segment) {
429470
mgr.globalSegments.Put(ctx, segmentType, segment)

internal/querynodev2/segments/manager_test.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,3 +190,30 @@ func (s *ManagerSuite) TestIncreaseVersion() {
190190
func TestManager(t *testing.T) {
191191
suite.Run(t, new(ManagerSuite))
192192
}
193+
194+
func TestLoadedBinlogSizeAccounting(t *testing.T) {
195+
m := NewSegmentManager()
196+
if got := m.GetLoadedBinlogSize(); got != 0 {
197+
t.Fatalf("expected initial 0, got %d", got)
198+
}
199+
200+
m.AddLoadedBinlogSize(100)
201+
if got := m.GetLoadedBinlogSize(); got != 100 {
202+
t.Fatalf("expected 100 after add, got %d", got)
203+
}
204+
205+
m.AddLoadedBinlogSize(50)
206+
if got := m.GetLoadedBinlogSize(); got != 150 {
207+
t.Fatalf("expected 150 after add, got %d", got)
208+
}
209+
210+
m.SubLoadedBinlogSize(20)
211+
if got := m.GetLoadedBinlogSize(); got != 130 {
212+
t.Fatalf("expected 130 after sub, got %d", got)
213+
}
214+
215+
m.SubLoadedBinlogSize(1000)
216+
if got := m.GetLoadedBinlogSize(); got != 0 {
217+
t.Fatalf("expected clamp to 0, got %d", got)
218+
}
219+
}

internal/querynodev2/segments/mock_segment_manager.go

Lines changed: 111 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/querynodev2/segments/segment.go

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -312,6 +312,7 @@ type LocalSegment struct {
312312

313313
// cached results, to avoid too many CGO calls
314314
memSize *atomic.Int64
315+
binlogSize *atomic.Int64
315316
rowNum *atomic.Int64
316317
insertCount *atomic.Int64
317318

@@ -389,6 +390,7 @@ func NewSegment(ctx context.Context,
389390
fieldJSONStats: make(map[int64]*querypb.JsonStatsInfo),
390391

391392
memSize: atomic.NewInt64(-1),
393+
binlogSize: atomic.NewInt64(0),
392394
rowNum: atomic.NewInt64(-1),
393395
insertCount: atomic.NewInt64(0),
394396
}
@@ -1352,10 +1354,17 @@ func (s *LocalSegment) CreateTextIndex(ctx context.Context, fieldID int64) error
13521354
}
13531355

13541356
func (s *LocalSegment) FinishLoad() error {
1357+
err := s.csegment.FinishLoad()
1358+
if err != nil {
1359+
return err
1360+
}
13551361
// TODO: disable logical resource handling for now
13561362
// usage := s.ResourceUsageEstimate()
13571363
// s.manager.AddLogicalResource(usage)
1358-
return s.csegment.FinishLoad()
1364+
binlogSize := calculateSegmentMemorySize(s.LoadInfo())
1365+
s.manager.AddLoadedBinlogSize(binlogSize)
1366+
s.binlogSize.Store(binlogSize)
1367+
return nil
13591368
}
13601369

13611370
type ReleaseScope int
@@ -1424,6 +1433,13 @@ func (s *LocalSegment) Release(ctx context.Context, opts ...releaseOption) {
14241433
// usage := s.ResourceUsageEstimate()
14251434
// s.manager.SubLogicalResource(usage)
14261435

1436+
binlogSize := s.binlogSize.Load()
1437+
if binlogSize > 0 {
1438+
// no concurrent change to s.binlogSize, so the subtraction is safe
1439+
s.manager.SubLoadedBinlogSize(binlogSize)
1440+
s.binlogSize.Store(0)
1441+
}
1442+
14271443
log.Info("delete segment from memory")
14281444
}
14291445

internal/querynodev2/segments/utils.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,24 @@ func calculateSegmentLogSize(segmentLoadInfo *querypb.SegmentLoadInfo) int64 {
244244
return segmentSize
245245
}
246246

247+
func calculateSegmentMemorySize(segmentLoadInfo *querypb.SegmentLoadInfo) int64 {
248+
segmentSize := int64(0)
249+
250+
for _, fieldBinlog := range segmentLoadInfo.BinlogPaths {
251+
segmentSize += getBinlogDataMemorySize(fieldBinlog)
252+
}
253+
254+
for _, fieldBinlog := range segmentLoadInfo.Statslogs {
255+
segmentSize += getBinlogDataMemorySize(fieldBinlog)
256+
}
257+
258+
for _, fieldBinlog := range segmentLoadInfo.Deltalogs {
259+
segmentSize += getBinlogDataMemorySize(fieldBinlog)
260+
}
261+
262+
return segmentSize
263+
}
264+
247265
func getFieldSizeFromFieldBinlog(fieldBinlog *datapb.FieldBinlog) int64 {
248266
fieldSize := int64(0)
249267
for _, binlog := range fieldBinlog.Binlogs {

internal/rootcoord/quota_center.go

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1378,6 +1378,22 @@ func (q *QuotaCenter) checkDiskQuota(denyWritingDBs map[int64]struct{}) error {
13781378
return err
13791379
}
13801380

1381+
// check disk quota of loaded collections
1382+
totalLoadedDiskQuota := Params.QuotaConfig.LoadedDiskQuota.GetAsFloat()
1383+
totalLoaded := 0.0
1384+
for _, queryNodeMetrics := range q.queryNodeMetrics {
1385+
// for streaming node, queryNodeMetrics.LoadedBinlogSize is always 0
1386+
totalLoaded += float64(queryNodeMetrics.LoadedBinlogSize)
1387+
}
1388+
if totalLoaded >= totalLoadedDiskQuota {
1389+
log.RatedWarn(10, "cluster loaded disk quota exceeded", zap.Float64("total loaded", totalLoaded), zap.Float64("total loaded disk quota", totalLoadedDiskQuota))
1390+
err := q.forceDenyWriting(commonpb.ErrorCode_DiskQuotaExhausted, true, nil, nil, nil)
1391+
if err != nil {
1392+
log.Warn("fail to force deny writing", zap.Error(err))
1393+
}
1394+
return err
1395+
}
1396+
13811397
collectionDiskQuota := Params.QuotaConfig.DiskQuotaPerCollection.GetAsFloat()
13821398
dbSizeInfo := make(map[int64]int64)
13831399
collections := make([]int64, 0)
@@ -1434,7 +1450,7 @@ func (q *QuotaCenter) checkDiskQuota(denyWritingDBs map[int64]struct{}) error {
14341450
func (q *QuotaCenter) checkDBDiskQuota(dbSizeInfo map[int64]int64) []int64 {
14351451
log := log.Ctx(context.Background()).WithRateGroup("rootcoord.QuotaCenter", 1.0, 60.0)
14361452
dbIDs := make([]int64, 0)
1437-
checkDiskQuota := func(dbID, binlogSize int64, quota float64) {
1453+
appendIfExceeded := func(dbID, binlogSize int64, quota float64) {
14381454
if float64(binlogSize) >= quota {
14391455
log.RatedWarn(10, "db disk quota exceeded",
14401456
zap.Int64("db", dbID),
@@ -1451,7 +1467,7 @@ func (q *QuotaCenter) checkDBDiskQuota(dbSizeInfo map[int64]int64) []int64 {
14511467
if dbDiskQuotaStr := db.GetProperty(common.DatabaseDiskQuotaKey); dbDiskQuotaStr != "" {
14521468
if dbDiskQuotaBytes, err := strconv.ParseFloat(dbDiskQuotaStr, 64); err == nil {
14531469
dbDiskQuotaMB := dbDiskQuotaBytes * 1024 * 1024
1454-
checkDiskQuota(dbID, binlogSize, dbDiskQuotaMB)
1470+
appendIfExceeded(dbID, binlogSize, dbDiskQuotaMB)
14551471
continue
14561472
} else {
14571473
log.Warn("invalid configuration for diskQuota.mb",
@@ -1460,7 +1476,7 @@ func (q *QuotaCenter) checkDBDiskQuota(dbSizeInfo map[int64]int64) []int64 {
14601476
}
14611477
}
14621478
}
1463-
checkDiskQuota(dbID, binlogSize, Params.QuotaConfig.DiskQuotaPerDB.GetAsFloat())
1479+
appendIfExceeded(dbID, binlogSize, Params.QuotaConfig.DiskQuotaPerDB.GetAsFloat())
14641480
}
14651481
return dbIDs
14661482
}

0 commit comments

Comments
 (0)