Skip to content

Commit ee7c197

Browse files
committed
fix: Filter empty search results to prevent rerank panic
issue: milvus-io#44909 pr: milvus-io#44917 This change adds validation to filter out empty search results before reranking operations in both regular and advanced search scenarios. When QueryNode returns empty search results (with no IDs or fields), the rerank process would panic when trying to access result data. This happens in hybrid search scenarios where some sub-requests may return empty results. Changes include: - Add empty result filtering in reduceResults method - Add empty result filtering in PostExecute for advanced search - Add empty result filtering in all reduce utility functions (reduceAdvanceGroupBY, reduceSearchResultDataWithGroupBy, reduceSearchResultDataNoGroupBy, rankSearchResultDataByGroup, rankSearchResultDataByPk) - Add unit tests to verify empty result handling This fix ensures empty results are filtered out before being passed to ranking functions, preventing the panic. Signed-off-by: Wei Liu <[email protected]>
1 parent 82081eb commit ee7c197

File tree

3 files changed

+363
-38
lines changed

3 files changed

+363
-38
lines changed

internal/proxy/search_reduce_util.go

Lines changed: 83 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -73,11 +73,21 @@ func reduceAdvanceGroupBY(ctx context.Context, subSearchResultData []*schemapb.S
7373
nq int64, topK int64, pkType schemapb.DataType, metricType string,
7474
) (*milvuspb.SearchResults, error) {
7575
log.Ctx(ctx).Debug("reduceAdvanceGroupBY", zap.Int("len(subSearchResultData)", len(subSearchResultData)), zap.Int64("nq", nq))
76+
77+
validSubSearchResultData := []*schemapb.SearchResultData{}
78+
for _, result := range subSearchResultData {
79+
size := typeutil.GetSizeOfIDs(result.GetIds())
80+
if result == nil || len(result.GetFieldsData()) == 0 || size == 0 {
81+
continue
82+
}
83+
validSubSearchResultData = append(validSubSearchResultData, result)
84+
}
85+
7686
// for advance group by, offset is not applied, so just return when there's only one channel
77-
if len(subSearchResultData) == 1 {
87+
if len(validSubSearchResultData) == 1 {
7888
return &milvuspb.SearchResults{
7989
Status: merr.Success(),
80-
Results: subSearchResultData[0],
90+
Results: validSubSearchResultData[0],
8191
}, nil
8292
}
8393

@@ -93,40 +103,40 @@ func reduceAdvanceGroupBY(ctx context.Context, subSearchResultData []*schemapb.S
93103
}
94104

95105
var limit int64
96-
if allSearchCount, hitNum, err := checkResultDatas(ctx, subSearchResultData, nq, topK); err != nil {
106+
if allSearchCount, hitNum, err := checkResultDatas(ctx, validSubSearchResultData, nq, topK); err != nil {
97107
log.Ctx(ctx).Warn("invalid search results", zap.Error(err))
98108
return ret, err
99109
} else {
100110
ret.GetResults().AllSearchCount = allSearchCount
101111
limit = int64(hitNum)
102-
ret.GetResults().FieldsData = typeutil.PrepareResultFieldData(subSearchResultData[0].GetFieldsData(), limit)
112+
ret.GetResults().FieldsData = typeutil.PrepareResultFieldData(validSubSearchResultData[0].GetFieldsData(), limit)
103113
}
104114

105115
if err := setupIdListForSearchResult(ret, pkType, limit); err != nil {
106116
return ret, nil
107117
}
108118

109119
var (
110-
subSearchNum = len(subSearchResultData)
120+
subSearchNum = len(validSubSearchResultData)
111121
// for results of each subSearchResultData, storing the start offset of each query of nq queries
112122
subSearchNqOffset = make([][]int64, subSearchNum)
113123
)
114124
for i := 0; i < subSearchNum; i++ {
115-
subSearchNqOffset[i] = make([]int64, subSearchResultData[i].GetNumQueries())
125+
subSearchNqOffset[i] = make([]int64, validSubSearchResultData[i].GetNumQueries())
116126
for j := int64(1); j < nq; j++ {
117-
subSearchNqOffset[i][j] = subSearchNqOffset[i][j-1] + subSearchResultData[i].Topks[j-1]
127+
subSearchNqOffset[i][j] = subSearchNqOffset[i][j-1] + validSubSearchResultData[i].Topks[j-1]
118128
}
119129
}
120130
// reducing nq * topk results
121131
for nqIdx := int64(0); nqIdx < nq; nqIdx++ {
122132
dataCount := int64(0)
123133
for subIdx := 0; subIdx < subSearchNum; subIdx += 1 {
124-
subData := subSearchResultData[subIdx]
134+
subData := validSubSearchResultData[subIdx]
125135
subPks := subData.GetIds()
126136
subScores := subData.GetScores()
127137
subGroupByVals := subData.GetGroupByFieldValue()
128138

129-
nqTopK := subData.Topks[nqIdx]
139+
nqTopK := validSubSearchResultData[subIdx].Topks[nqIdx]
130140
for i := int64(0); i < nqTopK; i++ {
131141
innerIdx := subSearchNqOffset[subIdx][nqIdx] + i
132142
pk := typeutil.GetPK(subPks, innerIdx)
@@ -181,12 +191,21 @@ func reduceSearchResultDataWithGroupBy(ctx context.Context, subSearchResultData
181191
zap.Int64("limit", limit),
182192
zap.String("metricType", metricType))
183193

194+
validSubSearchResultData := []*schemapb.SearchResultData{}
195+
for _, result := range subSearchResultData {
196+
size := typeutil.GetSizeOfIDs(result.GetIds())
197+
if result == nil || len(result.GetFieldsData()) == 0 || size == 0 {
198+
continue
199+
}
200+
validSubSearchResultData = append(validSubSearchResultData, result)
201+
}
202+
184203
ret := &milvuspb.SearchResults{
185204
Status: merr.Success(),
186205
Results: &schemapb.SearchResultData{
187206
NumQueries: nq,
188207
TopK: topk,
189-
FieldsData: typeutil.PrepareResultFieldData(subSearchResultData[0].GetFieldsData(), limit),
208+
FieldsData: typeutil.PrepareResultFieldData(validSubSearchResultData[0].GetFieldsData(), limit),
190209
Scores: []float32{},
191210
Ids: &schemapb.IDs{},
192211
Topks: []int64{},
@@ -197,23 +216,23 @@ func reduceSearchResultDataWithGroupBy(ctx context.Context, subSearchResultData
197216
return ret, nil
198217
}
199218

200-
if allSearchCount, _, err := checkResultDatas(ctx, subSearchResultData, nq, topk); err != nil {
219+
if allSearchCount, _, err := checkResultDatas(ctx, validSubSearchResultData, nq, topk); err != nil {
201220
log.Ctx(ctx).Warn("invalid search results", zap.Error(err))
202221
return ret, err
203222
} else {
204223
ret.GetResults().AllSearchCount = allSearchCount
205224
}
206225

207226
var (
208-
subSearchNum = len(subSearchResultData)
227+
subSearchNum = len(validSubSearchResultData)
209228
// for results of each subSearchResultData, storing the start offset of each query of nq queries
210229
subSearchNqOffset = make([][]int64, subSearchNum)
211230
totalResCount int64 = 0
212231
)
213232
for i := 0; i < subSearchNum; i++ {
214-
subSearchNqOffset[i] = make([]int64, subSearchResultData[i].GetNumQueries())
233+
subSearchNqOffset[i] = make([]int64, validSubSearchResultData[i].GetNumQueries())
215234
for j := int64(1); j < nq; j++ {
216-
subSearchNqOffset[i][j] = subSearchNqOffset[i][j-1] + subSearchResultData[i].Topks[j-1]
235+
subSearchNqOffset[i][j] = subSearchNqOffset[i][j-1] + validSubSearchResultData[i].Topks[j-1]
217236
}
218237
totalResCount += subSearchNqOffset[i][nq-1]
219238
}
@@ -237,11 +256,11 @@ func reduceSearchResultDataWithGroupBy(ctx context.Context, subSearchResultData
237256
)
238257

239258
for j = 0; j < groupBound; {
240-
subSearchIdx, resultDataIdx := selectHighestScoreIndex(ctx, subSearchResultData, subSearchNqOffset, cursors, i)
259+
subSearchIdx, resultDataIdx := selectHighestScoreIndex(ctx, validSubSearchResultData, subSearchNqOffset, cursors, i)
241260
if subSearchIdx == -1 {
242261
break
243262
}
244-
subSearchRes := subSearchResultData[subSearchIdx]
263+
subSearchRes := validSubSearchResultData[subSearchIdx]
245264

246265
id := typeutil.GetPK(subSearchRes.GetIds(), resultDataIdx)
247266
score := subSearchRes.GetScores()[resultDataIdx]
@@ -279,7 +298,7 @@ func reduceSearchResultDataWithGroupBy(ctx context.Context, subSearchResultData
279298
if groupVal != nil {
280299
groupEntities := groupByValMap[groupVal]
281300
for _, groupEntity := range groupEntities {
282-
subResData := subSearchResultData[groupEntity.subSearchIdx]
301+
subResData := validSubSearchResultData[groupEntity.subSearchIdx]
283302
retSize += typeutil.AppendFieldData(ret.Results.FieldsData, subResData.FieldsData, groupEntity.resultIdx)
284303
typeutil.AppendPKs(ret.Results.Ids, groupEntity.id)
285304
ret.Results.Scores = append(ret.Results.Scores, groupEntity.score)
@@ -325,12 +344,21 @@ func reduceSearchResultDataNoGroupBy(ctx context.Context, subSearchResultData []
325344
zap.Int64("limit", limit),
326345
zap.String("metricType", metricType))
327346

347+
validSubSearchResultData := []*schemapb.SearchResultData{}
348+
for _, result := range subSearchResultData {
349+
size := typeutil.GetSizeOfIDs(result.GetIds())
350+
if result == nil || len(result.GetFieldsData()) == 0 || size == 0 {
351+
continue
352+
}
353+
validSubSearchResultData = append(validSubSearchResultData, result)
354+
}
355+
328356
ret := &milvuspb.SearchResults{
329357
Status: merr.Success(),
330358
Results: &schemapb.SearchResultData{
331359
NumQueries: nq,
332360
TopK: topk,
333-
FieldsData: typeutil.PrepareResultFieldData(subSearchResultData[0].GetFieldsData(), limit),
361+
FieldsData: typeutil.PrepareResultFieldData(validSubSearchResultData[0].GetFieldsData(), limit),
334362
Scores: []float32{},
335363
Ids: &schemapb.IDs{},
336364
Topks: []int64{},
@@ -341,20 +369,20 @@ func reduceSearchResultDataNoGroupBy(ctx context.Context, subSearchResultData []
341369
return ret, nil
342370
}
343371

344-
if allSearchCount, _, err := checkResultDatas(ctx, subSearchResultData, nq, topk); err != nil {
372+
if allSearchCount, _, err := checkResultDatas(ctx, validSubSearchResultData, nq, topk); err != nil {
345373
log.Ctx(ctx).Warn("invalid search results", zap.Error(err))
346374
return ret, err
347375
} else {
348376
ret.GetResults().AllSearchCount = allSearchCount
349377
}
350378

351-
subSearchNum := len(subSearchResultData)
379+
subSearchNum := len(validSubSearchResultData)
352380
if subSearchNum == 1 && offset == 0 {
353381
// sorting is not needed if there is only one shard and no offset, assigning the result directly.
354382
// we still need to adjust the scores later.
355-
ret.Results = subSearchResultData[0]
383+
ret.Results = validSubSearchResultData[0]
356384
// realTopK is the topK of the nq-th query, it is used in proxy but not handled by delegator.
357-
topks := subSearchResultData[0].Topks
385+
topks := validSubSearchResultData[0].Topks
358386
if len(topks) > 0 {
359387
ret.Results.TopK = topks[len(topks)-1]
360388
}
@@ -365,9 +393,9 @@ func reduceSearchResultDataNoGroupBy(ctx context.Context, subSearchResultData []
365393
// for results of each subSearchResultData, storing the start offset of each query of nq queries
366394
subSearchNqOffset := make([][]int64, subSearchNum)
367395
for i := 0; i < subSearchNum; i++ {
368-
subSearchNqOffset[i] = make([]int64, subSearchResultData[i].GetNumQueries())
396+
subSearchNqOffset[i] = make([]int64, validSubSearchResultData[i].GetNumQueries())
369397
for j := int64(1); j < nq; j++ {
370-
subSearchNqOffset[i][j] = subSearchNqOffset[i][j-1] + subSearchResultData[i].Topks[j-1]
398+
subSearchNqOffset[i][j] = subSearchNqOffset[i][j-1] + validSubSearchResultData[i].Topks[j-1]
371399
}
372400
}
373401
maxOutputSize := paramtable.Get().QuotaConfig.MaxOutputSize.GetAsInt64()
@@ -382,7 +410,7 @@ func reduceSearchResultDataNoGroupBy(ctx context.Context, subSearchResultData []
382410

383411
// skip offset results
384412
for k := int64(0); k < offset; k++ {
385-
subSearchIdx, _ := selectHighestScoreIndex(ctx, subSearchResultData, subSearchNqOffset, cursors, i)
413+
subSearchIdx, _ := selectHighestScoreIndex(ctx, validSubSearchResultData, subSearchNqOffset, cursors, i)
386414
if subSearchIdx == -1 {
387415
break
388416
}
@@ -395,14 +423,14 @@ func reduceSearchResultDataNoGroupBy(ctx context.Context, subSearchResultData []
395423
// From all the sub-query result sets of the i-th query vector,
396424
// find the sub-query result set index of the score j-th data,
397425
// and the index of the data in schemapb.SearchResultData
398-
subSearchIdx, resultDataIdx := selectHighestScoreIndex(ctx, subSearchResultData, subSearchNqOffset, cursors, i)
426+
subSearchIdx, resultDataIdx := selectHighestScoreIndex(ctx, validSubSearchResultData, subSearchNqOffset, cursors, i)
399427
if subSearchIdx == -1 {
400428
break
401429
}
402-
score := subSearchResultData[subSearchIdx].Scores[resultDataIdx]
430+
score := validSubSearchResultData[subSearchIdx].Scores[resultDataIdx]
403431

404-
retSize += typeutil.AppendFieldData(ret.Results.FieldsData, subSearchResultData[subSearchIdx].FieldsData, resultDataIdx)
405-
typeutil.CopyPk(ret.Results.Ids, subSearchResultData[subSearchIdx].GetIds(), int(resultDataIdx))
432+
retSize += typeutil.AppendFieldData(ret.Results.FieldsData, validSubSearchResultData[subSearchIdx].FieldsData, resultDataIdx)
433+
typeutil.CopyPk(ret.Results.Ids, validSubSearchResultData[subSearchIdx].GetIds(), int(resultDataIdx))
406434
ret.Results.Scores = append(ret.Results.Scores, score)
407435
cursors[subSearchIdx]++
408436
}
@@ -510,13 +538,22 @@ func rankSearchResultDataByGroup(ctx context.Context,
510538
zap.Int64("offset", offset),
511539
zap.Int64("limit", limit))
512540

541+
validSearchResults := []*milvuspb.SearchResults{}
542+
for _, result := range searchResults {
543+
size := typeutil.GetSizeOfIDs(result.GetResults().GetIds())
544+
if result == nil || len(result.GetResults().GetFieldsData()) == 0 || size == 0 {
545+
continue
546+
}
547+
validSearchResults = append(validSearchResults, result)
548+
}
549+
513550
var ret *milvuspb.SearchResults
514-
if ret = initSearchResults(nq, limit); len(searchResults) == 0 {
551+
if ret = initSearchResults(nq, limit); len(validSearchResults) == 0 {
515552
return ret, nil
516553
}
517554

518555
// init FieldsData
519-
ret.Results.FieldsData = typeutil.PrepareResultFieldData(searchResults[0].GetResults().GetFieldsData(), limit)
556+
ret.Results.FieldsData = typeutil.PrepareResultFieldData(validSearchResults[0].GetResults().GetFieldsData(), limit)
520557

521558
totalCount := limit * groupSize
522559
if err := setupIdListForSearchResult(ret, pkType, totalCount); err != nil {
@@ -540,7 +577,7 @@ func rankSearchResultDataByGroup(ctx context.Context,
540577
}
541578

542579
groupByDataType := searchResults[0].GetResults().GetGroupByFieldValue().GetType()
543-
for ri, result := range searchResults {
580+
for ri, result := range validSearchResults {
544581
scores := result.GetResults().GetScores()
545582
start := 0
546583
// milvus has limits for the value range of nq and limit
@@ -643,7 +680,7 @@ func rankSearchResultDataByGroup(ctx context.Context,
643680
}
644681
ret.Results.Scores = append(ret.Results.Scores, score)
645682
loc := pk2DataOffset[i][group.idList[idx]]
646-
typeutil.AppendFieldData(ret.Results.FieldsData, searchResults[loc.resultIdx].GetResults().GetFieldsData(), int64(loc.offset))
683+
typeutil.AppendFieldData(ret.Results.FieldsData, validSearchResults[loc.resultIdx].GetResults().GetFieldsData(), int64(loc.offset))
647684
typeutil.AppendGroupByValue(ret.Results, group.groupVal, groupByDataType)
648685
}
649686
returnedRowNum += len(group.idList)
@@ -707,13 +744,22 @@ func rankSearchResultDataByPk(ctx context.Context,
707744
zap.Int64("offset", offset),
708745
zap.Int64("limit", limit))
709746

747+
validSearchResults := []*milvuspb.SearchResults{}
748+
for _, result := range searchResults {
749+
size := typeutil.GetSizeOfIDs(result.GetResults().GetIds())
750+
if result == nil || len(result.GetResults().GetFieldsData()) == 0 || size == 0 {
751+
continue
752+
}
753+
validSearchResults = append(validSearchResults, result)
754+
}
755+
710756
var ret *milvuspb.SearchResults
711-
if ret = initSearchResults(nq, limit); len(searchResults) == 0 {
757+
if ret = initSearchResults(nq, limit); len(validSearchResults) == 0 {
712758
return ret, nil
713759
}
714760

715761
// init FieldsData
716-
ret.Results.FieldsData = typeutil.PrepareResultFieldData(searchResults[0].GetResults().GetFieldsData(), limit)
762+
ret.Results.FieldsData = typeutil.PrepareResultFieldData(validSearchResults[0].GetResults().GetFieldsData(), limit)
717763

718764
if err := setupIdListForSearchResult(ret, pkType, limit); err != nil {
719765
return ret, nil
@@ -731,7 +777,7 @@ func rankSearchResultDataByPk(ctx context.Context,
731777
pk2DataOffset[i] = make(map[any]dataLoc)
732778
}
733779

734-
for ri, result := range searchResults {
780+
for ri, result := range validSearchResults {
735781
scores := result.GetResults().GetScores()
736782
start := int64(0)
737783
for i := int64(0); i < nq; i++ {
@@ -783,7 +829,7 @@ func rankSearchResultDataByPk(ctx context.Context,
783829
}
784830
ret.Results.Scores = append(ret.Results.Scores, score)
785831
loc := pk2DataOffset[i][keys[index]]
786-
typeutil.AppendFieldData(ret.Results.FieldsData, searchResults[loc.resultIdx].GetResults().GetFieldsData(), loc.offset)
832+
typeutil.AppendFieldData(ret.Results.FieldsData, validSearchResults[loc.resultIdx].GetResults().GetFieldsData(), loc.offset)
787833
}
788834
}
789835

internal/proxy/task_search.go

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -664,12 +664,21 @@ func (t *searchTask) reduceResults(ctx context.Context, toReduceResults []*inter
664664

665665
log := log.Ctx(ctx)
666666
// Decode all search results
667-
validSearchResults, err := decodeSearchResults(ctx, toReduceResults)
667+
decodedSearchResults, err := decodeSearchResults(ctx, toReduceResults)
668668
if err != nil {
669669
log.Warn("failed to decode search results", zap.Error(err))
670670
return nil, err
671671
}
672672

673+
validSearchResults := []*schemapb.SearchResultData{}
674+
for _, result := range decodedSearchResults {
675+
size := typeutil.GetSizeOfIDs(result.GetIds())
676+
if result == nil || len(result.GetFieldsData()) == 0 || size == 0 {
677+
continue
678+
}
679+
validSearchResults = append(validSearchResults, result)
680+
}
681+
673682
if len(validSearchResults) <= 0 {
674683
return fillInEmptyResult(nq), nil
675684
}
@@ -783,6 +792,13 @@ func (t *searchTask) PostExecute(ctx context.Context) error {
783792
if err != nil {
784793
return err
785794
}
795+
796+
// filter out empty results
797+
size := typeutil.GetSizeOfIDs(result.GetResults().GetIds())
798+
if result == nil || len(result.GetResults().GetFieldsData()) == 0 || size == 0 {
799+
continue
800+
}
801+
786802
t.reScorers[index].setMetricType(subMetricType)
787803
t.reScorers[index].reScore(result)
788804
multipleMilvusResults[index] = result

0 commit comments

Comments
 (0)