Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1209,6 +1209,7 @@ quotaAndLimits:
diskProtection:
enabled: true # When the total file size of object storage is greater than `diskQuota`, all dml requests would be rejected;
diskQuota: -1 # MB, (0, +inf), default no limit
loadedDiskQuota: -1 # MB, (0, +inf), default no limit
diskQuotaPerDB: -1 # MB, (0, +inf), default no limit
diskQuotaPerCollection: -1 # MB, (0, +inf), default no limit
diskQuotaPerPartition: -1 # MB, (0, +inf), default no limit
Expand Down
1 change: 1 addition & 0 deletions internal/querynodev2/metrics_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ func getQuotaMetrics(node *QueryNode) (*metricsinfo.QueryNodeQuotaMetrics, error
NumFlowGraph: node.pipelineManager.Num(),
},
GrowingSegmentsSize: totalGrowingSize,
LoadedBinlogSize: node.manager.Segment.GetLoadedBinlogSize(),
Effect: metricsinfo.NodeEffect{
NodeID: node.GetNodeID(),
CollectionIDs: lo.Keys(collections),
Expand Down
41 changes: 41 additions & 0 deletions internal/querynodev2/segments/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"fmt"
"sync"

"go.uber.org/atomic"
"go.uber.org/zap"
"golang.org/x/sync/singleflight"

Expand Down Expand Up @@ -202,6 +203,10 @@ type SegmentManager interface {
AddLogicalResource(usage ResourceUsage)
SubLogicalResource(usage ResourceUsage)
GetLogicalResource() ResourceUsage

AddLoadedBinlogSize(size int64)
SubLoadedBinlogSize(size int64)
GetLoadedBinlogSize() int64
}

var _ SegmentManager = (*segmentManager)(nil)
Expand Down Expand Up @@ -377,6 +382,9 @@ type segmentManager struct {
// only MemorySize and DiskSize are used, other fields are ignored.
logicalResource ResourceUsage
logicalResourceLock sync.Mutex

// loadedBinlogSize stats the total binlog size of all loaded segments of this querynode.
loadedBinlogSize atomic.Int64
}

func NewSegmentManager() *segmentManager {
Expand Down Expand Up @@ -424,6 +432,39 @@ func (mgr *segmentManager) GetLogicalResource() ResourceUsage {
return mgr.logicalResource
}

func (mgr *segmentManager) AddLoadedBinlogSize(size int64) {
mgr.loadedBinlogSize.Add(size)
}

func (mgr *segmentManager) SubLoadedBinlogSize(size int64) {
// Clamp to zero to avoid negative values on concurrent or duplicate subtractions
for {
current := mgr.loadedBinlogSize.Load()
newVal := current - size
if newVal < 0 {
newVal = 0
}
if mgr.loadedBinlogSize.CompareAndSwap(current, newVal) {
if current < size {
log.Warn("Loaded binlog size subtraction exceeds current value, clamped to 0",
zap.Int64("current", current),
zap.Int64("subtracted", size))
}
return
}
// retry on CompareAndSwap failure
}
}

func (mgr *segmentManager) GetLoadedBinlogSize() int64 {
current := mgr.loadedBinlogSize.Load()
if current < 0 {
log.Warn("Loaded binlog size is negative, returning 0", zap.Int64("current", current))
return 0
}
return current
}

// put is the internal put method updating both global segments and secondary index.
func (mgr *segmentManager) put(ctx context.Context, segmentType SegmentType, segment Segment) {
mgr.globalSegments.Put(ctx, segmentType, segment)
Expand Down
27 changes: 27 additions & 0 deletions internal/querynodev2/segments/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,3 +190,30 @@ func (s *ManagerSuite) TestIncreaseVersion() {
func TestManager(t *testing.T) {
suite.Run(t, new(ManagerSuite))
}

func TestLoadedBinlogSizeAccounting(t *testing.T) {
m := NewSegmentManager()
if got := m.GetLoadedBinlogSize(); got != 0 {
t.Fatalf("expected initial 0, got %d", got)
}

m.AddLoadedBinlogSize(100)
if got := m.GetLoadedBinlogSize(); got != 100 {
t.Fatalf("expected 100 after add, got %d", got)
}

m.AddLoadedBinlogSize(50)
if got := m.GetLoadedBinlogSize(); got != 150 {
t.Fatalf("expected 150 after add, got %d", got)
}

m.SubLoadedBinlogSize(20)
if got := m.GetLoadedBinlogSize(); got != 130 {
t.Fatalf("expected 130 after sub, got %d", got)
}

m.SubLoadedBinlogSize(1000)
if got := m.GetLoadedBinlogSize(); got != 0 {
t.Fatalf("expected clamp to 0, got %d", got)
}
}
111 changes: 111 additions & 0 deletions internal/querynodev2/segments/mock_segment_manager.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 17 additions & 1 deletion internal/querynodev2/segments/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,7 @@ type LocalSegment struct {

// cached results, to avoid too many CGO calls
memSize *atomic.Int64
binlogSize *atomic.Int64
rowNum *atomic.Int64
insertCount *atomic.Int64

Expand Down Expand Up @@ -389,6 +390,7 @@ func NewSegment(ctx context.Context,
fieldJSONStats: make(map[int64]*querypb.JsonStatsInfo),

memSize: atomic.NewInt64(-1),
binlogSize: atomic.NewInt64(0),
rowNum: atomic.NewInt64(-1),
insertCount: atomic.NewInt64(0),
}
Expand Down Expand Up @@ -1352,10 +1354,17 @@ func (s *LocalSegment) CreateTextIndex(ctx context.Context, fieldID int64) error
}

func (s *LocalSegment) FinishLoad() error {
err := s.csegment.FinishLoad()
if err != nil {
return err
}
// TODO: disable logical resource handling for now
// usage := s.ResourceUsageEstimate()
// s.manager.AddLogicalResource(usage)
return s.csegment.FinishLoad()
binlogSize := calculateSegmentMemorySize(s.LoadInfo())
s.manager.AddLoadedBinlogSize(binlogSize)
s.binlogSize.Store(binlogSize)
return nil
}

type ReleaseScope int
Expand Down Expand Up @@ -1424,6 +1433,13 @@ func (s *LocalSegment) Release(ctx context.Context, opts ...releaseOption) {
// usage := s.ResourceUsageEstimate()
// s.manager.SubLogicalResource(usage)

binlogSize := s.binlogSize.Load()
if binlogSize > 0 {
// no concurrent change to s.binlogSize, so the subtraction is safe
s.manager.SubLoadedBinlogSize(binlogSize)
s.binlogSize.Store(0)
}

log.Info("delete segment from memory")
}

Expand Down
18 changes: 18 additions & 0 deletions internal/querynodev2/segments/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,24 @@ func calculateSegmentLogSize(segmentLoadInfo *querypb.SegmentLoadInfo) int64 {
return segmentSize
}

func calculateSegmentMemorySize(segmentLoadInfo *querypb.SegmentLoadInfo) int64 {
segmentSize := int64(0)

for _, fieldBinlog := range segmentLoadInfo.BinlogPaths {
segmentSize += getBinlogDataMemorySize(fieldBinlog)
}

for _, fieldBinlog := range segmentLoadInfo.Statslogs {
segmentSize += getBinlogDataMemorySize(fieldBinlog)
}

for _, fieldBinlog := range segmentLoadInfo.Deltalogs {
segmentSize += getBinlogDataMemorySize(fieldBinlog)
}

return segmentSize
}

func getFieldSizeFromFieldBinlog(fieldBinlog *datapb.FieldBinlog) int64 {
fieldSize := int64(0)
for _, binlog := range fieldBinlog.Binlogs {
Expand Down
22 changes: 19 additions & 3 deletions internal/rootcoord/quota_center.go
Original file line number Diff line number Diff line change
Expand Up @@ -1378,6 +1378,22 @@ func (q *QuotaCenter) checkDiskQuota(denyWritingDBs map[int64]struct{}) error {
return err
}

// check disk quota of loaded collections
totalLoadedDiskQuota := Params.QuotaConfig.LoadedDiskQuota.GetAsFloat()
totalLoaded := 0.0
for _, queryNodeMetrics := range q.queryNodeMetrics {
// for streaming node, queryNodeMetrics.LoadedBinlogSize is always 0
totalLoaded += float64(queryNodeMetrics.LoadedBinlogSize)
}
if totalLoaded >= totalLoadedDiskQuota {
log.RatedWarn(10, "cluster loaded disk quota exceeded", zap.Float64("total loaded", totalLoaded), zap.Float64("total loaded disk quota", totalLoadedDiskQuota))
err := q.forceDenyWriting(commonpb.ErrorCode_DiskQuotaExhausted, true, nil, nil, nil)
if err != nil {
log.Warn("fail to force deny writing", zap.Error(err))
}
return err
}

collectionDiskQuota := Params.QuotaConfig.DiskQuotaPerCollection.GetAsFloat()
dbSizeInfo := make(map[int64]int64)
collections := make([]int64, 0)
Expand Down Expand Up @@ -1434,7 +1450,7 @@ func (q *QuotaCenter) checkDiskQuota(denyWritingDBs map[int64]struct{}) error {
func (q *QuotaCenter) checkDBDiskQuota(dbSizeInfo map[int64]int64) []int64 {
log := log.Ctx(context.Background()).WithRateGroup("rootcoord.QuotaCenter", 1.0, 60.0)
dbIDs := make([]int64, 0)
checkDiskQuota := func(dbID, binlogSize int64, quota float64) {
appendIfExceeded := func(dbID, binlogSize int64, quota float64) {
if float64(binlogSize) >= quota {
log.RatedWarn(10, "db disk quota exceeded",
zap.Int64("db", dbID),
Expand All @@ -1451,7 +1467,7 @@ func (q *QuotaCenter) checkDBDiskQuota(dbSizeInfo map[int64]int64) []int64 {
if dbDiskQuotaStr := db.GetProperty(common.DatabaseDiskQuotaKey); dbDiskQuotaStr != "" {
if dbDiskQuotaBytes, err := strconv.ParseFloat(dbDiskQuotaStr, 64); err == nil {
dbDiskQuotaMB := dbDiskQuotaBytes * 1024 * 1024
checkDiskQuota(dbID, binlogSize, dbDiskQuotaMB)
appendIfExceeded(dbID, binlogSize, dbDiskQuotaMB)
continue
} else {
log.Warn("invalid configuration for diskQuota.mb",
Expand All @@ -1460,7 +1476,7 @@ func (q *QuotaCenter) checkDBDiskQuota(dbSizeInfo map[int64]int64) []int64 {
}
}
}
checkDiskQuota(dbID, binlogSize, Params.QuotaConfig.DiskQuotaPerDB.GetAsFloat())
appendIfExceeded(dbID, binlogSize, Params.QuotaConfig.DiskQuotaPerDB.GetAsFloat())
}
return dbIDs
}
Expand Down
Loading
Loading