diff --git a/configs/milvus.yaml b/configs/milvus.yaml index f499e577531c8..46a5fd778ad51 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -1209,6 +1209,7 @@ quotaAndLimits: diskProtection: enabled: true # When the total file size of object storage is greater than `diskQuota`, all dml requests would be rejected; diskQuota: -1 # MB, (0, +inf), default no limit + loadedDiskQuota: -1 # MB, (0, +inf), default no limit diskQuotaPerDB: -1 # MB, (0, +inf), default no limit diskQuotaPerCollection: -1 # MB, (0, +inf), default no limit diskQuotaPerPartition: -1 # MB, (0, +inf), default no limit diff --git a/internal/querynodev2/metrics_info.go b/internal/querynodev2/metrics_info.go index 6b37b266cef9a..59fbd48dfe161 100644 --- a/internal/querynodev2/metrics_info.go +++ b/internal/querynodev2/metrics_info.go @@ -153,6 +153,7 @@ func getQuotaMetrics(node *QueryNode) (*metricsinfo.QueryNodeQuotaMetrics, error NumFlowGraph: node.pipelineManager.Num(), }, GrowingSegmentsSize: totalGrowingSize, + LoadedBinlogSize: node.manager.Segment.GetLoadedBinlogSize(), Effect: metricsinfo.NodeEffect{ NodeID: node.GetNodeID(), CollectionIDs: lo.Keys(collections), diff --git a/internal/querynodev2/segments/manager.go b/internal/querynodev2/segments/manager.go index 2226f98d34845..6371009f1c605 100644 --- a/internal/querynodev2/segments/manager.go +++ b/internal/querynodev2/segments/manager.go @@ -29,6 +29,7 @@ import ( "fmt" "sync" + "go.uber.org/atomic" "go.uber.org/zap" "golang.org/x/sync/singleflight" @@ -202,6 +203,10 @@ type SegmentManager interface { AddLogicalResource(usage ResourceUsage) SubLogicalResource(usage ResourceUsage) GetLogicalResource() ResourceUsage + + AddLoadedBinlogSize(size int64) + SubLoadedBinlogSize(size int64) + GetLoadedBinlogSize() int64 } var _ SegmentManager = (*segmentManager)(nil) @@ -377,6 +382,9 @@ type segmentManager struct { // only MemorySize and DiskSize are used, other fields are ignored. logicalResource ResourceUsage logicalResourceLock sync.Mutex + + // loadedBinlogSize stats the total binlog size of all loaded segments of this querynode. + loadedBinlogSize atomic.Int64 } func NewSegmentManager() *segmentManager { @@ -424,6 +432,39 @@ func (mgr *segmentManager) GetLogicalResource() ResourceUsage { return mgr.logicalResource } +func (mgr *segmentManager) AddLoadedBinlogSize(size int64) { + mgr.loadedBinlogSize.Add(size) +} + +func (mgr *segmentManager) SubLoadedBinlogSize(size int64) { + // Clamp to zero to avoid negative values on concurrent or duplicate subtractions + for { + current := mgr.loadedBinlogSize.Load() + newVal := current - size + if newVal < 0 { + newVal = 0 + } + if mgr.loadedBinlogSize.CompareAndSwap(current, newVal) { + if current < size { + log.Warn("Loaded binlog size subtraction exceeds current value, clamped to 0", + zap.Int64("current", current), + zap.Int64("subtracted", size)) + } + return + } + // retry on CompareAndSwap failure + } +} + +func (mgr *segmentManager) GetLoadedBinlogSize() int64 { + current := mgr.loadedBinlogSize.Load() + if current < 0 { + log.Warn("Loaded binlog size is negative, returning 0", zap.Int64("current", current)) + return 0 + } + return current +} + // put is the internal put method updating both global segments and secondary index. func (mgr *segmentManager) put(ctx context.Context, segmentType SegmentType, segment Segment) { mgr.globalSegments.Put(ctx, segmentType, segment) diff --git a/internal/querynodev2/segments/manager_test.go b/internal/querynodev2/segments/manager_test.go index 0082f9d01dc49..3014d63d1099a 100644 --- a/internal/querynodev2/segments/manager_test.go +++ b/internal/querynodev2/segments/manager_test.go @@ -190,3 +190,30 @@ func (s *ManagerSuite) TestIncreaseVersion() { func TestManager(t *testing.T) { suite.Run(t, new(ManagerSuite)) } + +func TestLoadedBinlogSizeAccounting(t *testing.T) { + m := NewSegmentManager() + if got := m.GetLoadedBinlogSize(); got != 0 { + t.Fatalf("expected initial 0, got %d", got) + } + + m.AddLoadedBinlogSize(100) + if got := m.GetLoadedBinlogSize(); got != 100 { + t.Fatalf("expected 100 after add, got %d", got) + } + + m.AddLoadedBinlogSize(50) + if got := m.GetLoadedBinlogSize(); got != 150 { + t.Fatalf("expected 150 after add, got %d", got) + } + + m.SubLoadedBinlogSize(20) + if got := m.GetLoadedBinlogSize(); got != 130 { + t.Fatalf("expected 130 after sub, got %d", got) + } + + m.SubLoadedBinlogSize(1000) + if got := m.GetLoadedBinlogSize(); got != 0 { + t.Fatalf("expected clamp to 0, got %d", got) + } +} diff --git a/internal/querynodev2/segments/mock_segment_manager.go b/internal/querynodev2/segments/mock_segment_manager.go index ebebbcd3c6c9a..ff20e277fc4c8 100644 --- a/internal/querynodev2/segments/mock_segment_manager.go +++ b/internal/querynodev2/segments/mock_segment_manager.go @@ -25,6 +25,39 @@ func (_m *MockSegmentManager) EXPECT() *MockSegmentManager_Expecter { return &MockSegmentManager_Expecter{mock: &_m.Mock} } +// AddLoadedBinlogSize provides a mock function with given fields: size +func (_m *MockSegmentManager) AddLoadedBinlogSize(size int64) { + _m.Called(size) +} + +// MockSegmentManager_AddLoadedBinlogSize_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AddLoadedBinlogSize' +type MockSegmentManager_AddLoadedBinlogSize_Call struct { + *mock.Call +} + +// AddLoadedBinlogSize is a helper method to define mock.On call +// - size int64 +func (_e *MockSegmentManager_Expecter) AddLoadedBinlogSize(size interface{}) *MockSegmentManager_AddLoadedBinlogSize_Call { + return &MockSegmentManager_AddLoadedBinlogSize_Call{Call: _e.mock.On("AddLoadedBinlogSize", size)} +} + +func (_c *MockSegmentManager_AddLoadedBinlogSize_Call) Run(run func(size int64)) *MockSegmentManager_AddLoadedBinlogSize_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(int64)) + }) + return _c +} + +func (_c *MockSegmentManager_AddLoadedBinlogSize_Call) Return() *MockSegmentManager_AddLoadedBinlogSize_Call { + _c.Call.Return() + return _c +} + +func (_c *MockSegmentManager_AddLoadedBinlogSize_Call) RunAndReturn(run func(int64)) *MockSegmentManager_AddLoadedBinlogSize_Call { + _c.Run(run) + return _c +} + // AddLogicalResource provides a mock function with given fields: usage func (_m *MockSegmentManager) AddLogicalResource(usage ResourceUsage) { _m.Called(usage) @@ -484,6 +517,51 @@ func (_c *MockSegmentManager_GetGrowing_Call) RunAndReturn(run func(int64) Segme return _c } +// GetLoadedBinlogSize provides a mock function with no fields +func (_m *MockSegmentManager) GetLoadedBinlogSize() int64 { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for GetLoadedBinlogSize") + } + + var r0 int64 + if rf, ok := ret.Get(0).(func() int64); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(int64) + } + + return r0 +} + +// MockSegmentManager_GetLoadedBinlogSize_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetLoadedBinlogSize' +type MockSegmentManager_GetLoadedBinlogSize_Call struct { + *mock.Call +} + +// GetLoadedBinlogSize is a helper method to define mock.On call +func (_e *MockSegmentManager_Expecter) GetLoadedBinlogSize() *MockSegmentManager_GetLoadedBinlogSize_Call { + return &MockSegmentManager_GetLoadedBinlogSize_Call{Call: _e.mock.On("GetLoadedBinlogSize")} +} + +func (_c *MockSegmentManager_GetLoadedBinlogSize_Call) Run(run func()) *MockSegmentManager_GetLoadedBinlogSize_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockSegmentManager_GetLoadedBinlogSize_Call) Return(_a0 int64) *MockSegmentManager_GetLoadedBinlogSize_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockSegmentManager_GetLoadedBinlogSize_Call) RunAndReturn(run func() int64) *MockSegmentManager_GetLoadedBinlogSize_Call { + _c.Call.Return(run) + return _c +} + // GetLogicalResource provides a mock function with no fields func (_m *MockSegmentManager) GetLogicalResource() ResourceUsage { ret := _m.Called() @@ -804,6 +882,39 @@ func (_c *MockSegmentManager_RemoveBy_Call) RunAndReturn(run func(context.Contex return _c } +// SubLoadedBinlogSize provides a mock function with given fields: size +func (_m *MockSegmentManager) SubLoadedBinlogSize(size int64) { + _m.Called(size) +} + +// MockSegmentManager_SubLoadedBinlogSize_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SubLoadedBinlogSize' +type MockSegmentManager_SubLoadedBinlogSize_Call struct { + *mock.Call +} + +// SubLoadedBinlogSize is a helper method to define mock.On call +// - size int64 +func (_e *MockSegmentManager_Expecter) SubLoadedBinlogSize(size interface{}) *MockSegmentManager_SubLoadedBinlogSize_Call { + return &MockSegmentManager_SubLoadedBinlogSize_Call{Call: _e.mock.On("SubLoadedBinlogSize", size)} +} + +func (_c *MockSegmentManager_SubLoadedBinlogSize_Call) Run(run func(size int64)) *MockSegmentManager_SubLoadedBinlogSize_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(int64)) + }) + return _c +} + +func (_c *MockSegmentManager_SubLoadedBinlogSize_Call) Return() *MockSegmentManager_SubLoadedBinlogSize_Call { + _c.Call.Return() + return _c +} + +func (_c *MockSegmentManager_SubLoadedBinlogSize_Call) RunAndReturn(run func(int64)) *MockSegmentManager_SubLoadedBinlogSize_Call { + _c.Run(run) + return _c +} + // SubLogicalResource provides a mock function with given fields: usage func (_m *MockSegmentManager) SubLogicalResource(usage ResourceUsage) { _m.Called(usage) diff --git a/internal/querynodev2/segments/segment.go b/internal/querynodev2/segments/segment.go index bf1426af9296b..196f7e4a1e902 100644 --- a/internal/querynodev2/segments/segment.go +++ b/internal/querynodev2/segments/segment.go @@ -312,6 +312,7 @@ type LocalSegment struct { // cached results, to avoid too many CGO calls memSize *atomic.Int64 + binlogSize *atomic.Int64 rowNum *atomic.Int64 insertCount *atomic.Int64 @@ -389,6 +390,7 @@ func NewSegment(ctx context.Context, fieldJSONStats: make(map[int64]*querypb.JsonStatsInfo), memSize: atomic.NewInt64(-1), + binlogSize: atomic.NewInt64(0), rowNum: atomic.NewInt64(-1), insertCount: atomic.NewInt64(0), } @@ -1352,10 +1354,17 @@ func (s *LocalSegment) CreateTextIndex(ctx context.Context, fieldID int64) error } func (s *LocalSegment) FinishLoad() error { + err := s.csegment.FinishLoad() + if err != nil { + return err + } // TODO: disable logical resource handling for now // usage := s.ResourceUsageEstimate() // s.manager.AddLogicalResource(usage) - return s.csegment.FinishLoad() + binlogSize := calculateSegmentMemorySize(s.LoadInfo()) + s.manager.AddLoadedBinlogSize(binlogSize) + s.binlogSize.Store(binlogSize) + return nil } type ReleaseScope int @@ -1424,6 +1433,13 @@ func (s *LocalSegment) Release(ctx context.Context, opts ...releaseOption) { // usage := s.ResourceUsageEstimate() // s.manager.SubLogicalResource(usage) + binlogSize := s.binlogSize.Load() + if binlogSize > 0 { + // no concurrent change to s.binlogSize, so the subtraction is safe + s.manager.SubLoadedBinlogSize(binlogSize) + s.binlogSize.Store(0) + } + log.Info("delete segment from memory") } diff --git a/internal/querynodev2/segments/utils.go b/internal/querynodev2/segments/utils.go index 018fc2879ba32..ba8f540772771 100644 --- a/internal/querynodev2/segments/utils.go +++ b/internal/querynodev2/segments/utils.go @@ -244,6 +244,24 @@ func calculateSegmentLogSize(segmentLoadInfo *querypb.SegmentLoadInfo) int64 { return segmentSize } +func calculateSegmentMemorySize(segmentLoadInfo *querypb.SegmentLoadInfo) int64 { + segmentSize := int64(0) + + for _, fieldBinlog := range segmentLoadInfo.BinlogPaths { + segmentSize += getBinlogDataMemorySize(fieldBinlog) + } + + for _, fieldBinlog := range segmentLoadInfo.Statslogs { + segmentSize += getBinlogDataMemorySize(fieldBinlog) + } + + for _, fieldBinlog := range segmentLoadInfo.Deltalogs { + segmentSize += getBinlogDataMemorySize(fieldBinlog) + } + + return segmentSize +} + func getFieldSizeFromFieldBinlog(fieldBinlog *datapb.FieldBinlog) int64 { fieldSize := int64(0) for _, binlog := range fieldBinlog.Binlogs { diff --git a/internal/rootcoord/quota_center.go b/internal/rootcoord/quota_center.go index 2102a06dfca5d..de871c5bafa16 100644 --- a/internal/rootcoord/quota_center.go +++ b/internal/rootcoord/quota_center.go @@ -1378,6 +1378,22 @@ func (q *QuotaCenter) checkDiskQuota(denyWritingDBs map[int64]struct{}) error { return err } + // check disk quota of loaded collections + totalLoadedDiskQuota := Params.QuotaConfig.LoadedDiskQuota.GetAsFloat() + totalLoaded := 0.0 + for _, queryNodeMetrics := range q.queryNodeMetrics { + // for streaming node, queryNodeMetrics.LoadedBinlogSize is always 0 + totalLoaded += float64(queryNodeMetrics.LoadedBinlogSize) + } + if totalLoaded >= totalLoadedDiskQuota { + log.RatedWarn(10, "cluster loaded disk quota exceeded", zap.Float64("total loaded", totalLoaded), zap.Float64("total loaded disk quota", totalLoadedDiskQuota)) + err := q.forceDenyWriting(commonpb.ErrorCode_DiskQuotaExhausted, true, nil, nil, nil) + if err != nil { + log.Warn("fail to force deny writing", zap.Error(err)) + } + return err + } + collectionDiskQuota := Params.QuotaConfig.DiskQuotaPerCollection.GetAsFloat() dbSizeInfo := make(map[int64]int64) collections := make([]int64, 0) @@ -1434,7 +1450,7 @@ func (q *QuotaCenter) checkDiskQuota(denyWritingDBs map[int64]struct{}) error { func (q *QuotaCenter) checkDBDiskQuota(dbSizeInfo map[int64]int64) []int64 { log := log.Ctx(context.Background()).WithRateGroup("rootcoord.QuotaCenter", 1.0, 60.0) dbIDs := make([]int64, 0) - checkDiskQuota := func(dbID, binlogSize int64, quota float64) { + appendIfExceeded := func(dbID, binlogSize int64, quota float64) { if float64(binlogSize) >= quota { log.RatedWarn(10, "db disk quota exceeded", zap.Int64("db", dbID), @@ -1451,7 +1467,7 @@ func (q *QuotaCenter) checkDBDiskQuota(dbSizeInfo map[int64]int64) []int64 { if dbDiskQuotaStr := db.GetProperty(common.DatabaseDiskQuotaKey); dbDiskQuotaStr != "" { if dbDiskQuotaBytes, err := strconv.ParseFloat(dbDiskQuotaStr, 64); err == nil { dbDiskQuotaMB := dbDiskQuotaBytes * 1024 * 1024 - checkDiskQuota(dbID, binlogSize, dbDiskQuotaMB) + appendIfExceeded(dbID, binlogSize, dbDiskQuotaMB) continue } else { log.Warn("invalid configuration for diskQuota.mb", @@ -1460,7 +1476,7 @@ func (q *QuotaCenter) checkDBDiskQuota(dbSizeInfo map[int64]int64) []int64 { } } } - checkDiskQuota(dbID, binlogSize, Params.QuotaConfig.DiskQuotaPerDB.GetAsFloat()) + appendIfExceeded(dbID, binlogSize, Params.QuotaConfig.DiskQuotaPerDB.GetAsFloat()) } return dbIDs } diff --git a/internal/rootcoord/quota_center_test.go b/internal/rootcoord/quota_center_test.go index 645778a70fc70..ccfc299352ab7 100644 --- a/internal/rootcoord/quota_center_test.go +++ b/internal/rootcoord/quota_center_test.go @@ -884,7 +884,7 @@ func TestQuotaCenter(t *testing.T) { quotaCenter := NewQuotaCenter(pcm, dc, core.tsoAllocator, meta) quotaCenter.checkDiskQuota(nil) - checkLimiter := func(notEquals ...int64) { + checkCollectionLimiter := func(notEquals ...int64) { for db, collections := range quotaCenter.writableCollections { for collection := range collections { limiters := quotaCenter.rateLimiter.GetCollectionLimiters(db, collection).GetLimiters() @@ -907,11 +907,21 @@ func TestQuotaCenter(t *testing.T) { } } + checkClusterLimiter := func() { + root := quotaCenter.rateLimiter.GetRootLimiters().GetLimiters() + a, _ := root.Get(internalpb.RateType_DMLInsert) + assert.Equal(t, Limit(0), a.Limit()) + b, _ := root.Get(internalpb.RateType_DMLUpsert) + assert.Equal(t, Limit(0), b.Limit()) + c, _ := root.Get(internalpb.RateType_DMLDelete) + assert.NotEqual(t, Limit(0), c.Limit()) + } + // total DiskQuota exceeded paramtable.Get().Save(Params.QuotaConfig.DiskQuota.Key, "99") paramtable.Get().Save(Params.QuotaConfig.DiskQuotaPerCollection.Key, "90") quotaCenter.dataCoordMetrics = &metricsinfo.DataCoordQuotaMetrics{ - TotalBinlogSize: 10 * 1024 * 1024, + TotalBinlogSize: 300 * 1024 * 1024, CollectionBinlogSize: map[int64]int64{ 1: 100 * 1024 * 1024, 2: 100 * 1024 * 1024, @@ -925,7 +935,7 @@ func TestQuotaCenter(t *testing.T) { quotaCenter.collectionIDToDBID = collectionIDToDBID quotaCenter.resetAllCurrentRates() quotaCenter.checkDiskQuota(nil) - checkLimiter() + checkClusterLimiter() paramtable.Get().Reset(Params.QuotaConfig.DiskQuota.Key) paramtable.Get().Reset(Params.QuotaConfig.DiskQuotaPerCollection.Key) @@ -940,8 +950,24 @@ func TestQuotaCenter(t *testing.T) { } quotaCenter.resetAllCurrentRates() quotaCenter.checkDiskQuota(nil) - checkLimiter(1) + checkCollectionLimiter(1) paramtable.Get().Save(Params.QuotaConfig.DiskQuotaPerCollection.Key, colQuotaBackup) + + // loaded DiskQuota exceeded + loadedQuotaBackup := Params.QuotaConfig.LoadedDiskQuota.GetValue() + paramtable.Get().Save(Params.QuotaConfig.LoadedDiskQuota.Key, "99") + quotaCenter.queryNodeMetrics = map[UniqueID]*metricsinfo.QueryNodeQuotaMetrics{ + 1: { + LoadedBinlogSize: 100 * 1024 * 1024, // 100MB + }, + } + quotaCenter.writableCollections = map[int64]map[int64][]int64{ + 0: collectionIDToPartitionIDs, + } + quotaCenter.resetAllCurrentRates() + quotaCenter.checkDiskQuota(nil) + checkClusterLimiter() + paramtable.Get().Save(Params.QuotaConfig.LoadedDiskQuota.Key, loadedQuotaBackup) }) t.Run("test setRates", func(t *testing.T) { diff --git a/pkg/util/metricsinfo/quota_metric.go b/pkg/util/metricsinfo/quota_metric.go index a36d8f171a125..797924cb019a7 100644 --- a/pkg/util/metricsinfo/quota_metric.go +++ b/pkg/util/metricsinfo/quota_metric.go @@ -62,6 +62,7 @@ type QueryNodeQuotaMetrics struct { Rms []RateMetric Fgm FlowGraphMetric GrowingSegmentsSize int64 + LoadedBinlogSize int64 Effect NodeEffect DeleteBufferInfo DeleteBufferInfo StreamingQuota *StreamingQuotaMetrics diff --git a/pkg/util/paramtable/quota_param.go b/pkg/util/paramtable/quota_param.go index edc9d3f8f76fc..38bee539ae633 100644 --- a/pkg/util/paramtable/quota_param.go +++ b/pkg/util/paramtable/quota_param.go @@ -153,6 +153,7 @@ type quotaConfig struct { GrowingSegmentsSizeHighWaterLevel ParamItem `refreshable:"true"` DiskProtectionEnabled ParamItem `refreshable:"true"` DiskQuota ParamItem `refreshable:"true"` + LoadedDiskQuota ParamItem `refreshable:"true"` DiskQuotaPerDB ParamItem `refreshable:"true"` DiskQuotaPerCollection ParamItem `refreshable:"true"` DiskQuotaPerPartition ParamItem `refreshable:"true"` @@ -1904,6 +1905,27 @@ but the rate will not be lower than minRateRatio * dmlRate.`, } p.DiskQuota.Init(base.mgr) + p.LoadedDiskQuota = ParamItem{ + Key: "quotaAndLimits.limitWriting.diskProtection.loadedDiskQuota", + Version: "2.6.4", + DefaultValue: quota, + Formatter: func(v string) string { + if !p.DiskProtectionEnabled.GetAsBool() { + return max + } + level := getAsFloat(v) + // (0, +inf) + if level <= 0 { + return max + } + // megabytes to bytes + return fmt.Sprintf("%f", megaBytes2Bytes(level)) + }, + Doc: "MB, (0, +inf), default no limit", + Export: true, + } + p.LoadedDiskQuota.Init(base.mgr) + p.DiskQuotaPerDB = ParamItem{ Key: "quotaAndLimits.limitWriting.diskProtection.diskQuotaPerDB", Version: "2.4.1",