@@ -75,11 +75,25 @@ func reduceAdvanceGroupBy(ctx context.Context, subSearchResultData []*schemapb.S
7575 nq int64 , topK int64 , pkType schemapb.DataType , metricType string ,
7676) (* milvuspb.SearchResults , error ) {
7777 log .Ctx (ctx ).Debug ("reduceAdvanceGroupBY" , zap .Int ("len(subSearchResultData)" , len (subSearchResultData )), zap .Int64 ("nq" , nq ))
78+
79+ validSubSearchResultData := []* schemapb.SearchResultData {}
80+ for _ , result := range subSearchResultData {
81+ size := typeutil .GetSizeOfIDs (result .GetIds ())
82+ if result == nil || size == 0 {
83+ continue
84+ }
85+ validSubSearchResultData = append (validSubSearchResultData , result )
86+ }
87+
88+ if len (validSubSearchResultData ) == 0 {
89+ return fillInEmptyResult (nq ), nil
90+ }
91+
7892 // for advance group by, offset is not applied, so just return when there's only one channel
79- if len (subSearchResultData ) == 1 {
93+ if len (validSubSearchResultData ) == 1 {
8094 return & milvuspb.SearchResults {
8195 Status : merr .Success (),
82- Results : subSearchResultData [0 ],
96+ Results : validSubSearchResultData [0 ],
8397 }, nil
8498 }
8599
@@ -95,45 +109,45 @@ func reduceAdvanceGroupBy(ctx context.Context, subSearchResultData []*schemapb.S
95109 }
96110
97111 var limit int64
98- if allSearchCount , hitNum , err := checkResultDatas (ctx , subSearchResultData , nq , topK ); err != nil {
112+ if allSearchCount , hitNum , err := checkResultDatas (ctx , validSubSearchResultData , nq , topK ); err != nil {
99113 log .Ctx (ctx ).Warn ("invalid search results" , zap .Error (err ))
100114 return ret , err
101115 } else {
102116 ret .GetResults ().AllSearchCount = allSearchCount
103117 limit = int64 (hitNum )
104- ret .GetResults ().FieldsData = typeutil .PrepareResultFieldData (subSearchResultData [0 ].GetFieldsData (), limit )
118+ ret .GetResults ().FieldsData = typeutil .PrepareResultFieldData (validSubSearchResultData [0 ].GetFieldsData (), limit )
105119 }
106120
107121 if err := setupIdListForSearchResult (ret , pkType , limit ); err != nil {
108122 return ret , nil
109123 }
110124
111125 var (
112- subSearchNum = len (subSearchResultData )
126+ subSearchNum = len (validSubSearchResultData )
113127 // for results of each subSearchResultData, storing the start offset of each query of nq queries
114128 subSearchNqOffset = make ([][]int64 , subSearchNum )
115129 )
116130 for i := 0 ; i < subSearchNum ; i ++ {
117- subSearchNqOffset [i ] = make ([]int64 , subSearchResultData [i ].GetNumQueries ())
131+ subSearchNqOffset [i ] = make ([]int64 , validSubSearchResultData [i ].GetNumQueries ())
118132 for j := int64 (1 ); j < nq ; j ++ {
119- subSearchNqOffset [i ][j ] = subSearchNqOffset [i ][j - 1 ] + subSearchResultData [i ].Topks [j - 1 ]
133+ subSearchNqOffset [i ][j ] = subSearchNqOffset [i ][j - 1 ] + validSubSearchResultData [i ].Topks [j - 1 ]
120134 }
121135 }
122136
123- gpFieldBuilder , err := typeutil .NewFieldDataBuilder (subSearchResultData [0 ].GetGroupByFieldValue ().GetType (), true , int (limit ))
137+ gpFieldBuilder , err := typeutil .NewFieldDataBuilder (validSubSearchResultData [0 ].GetGroupByFieldValue ().GetType (), true , int (limit ))
124138 if err != nil {
125139 return ret , merr .WrapErrServiceInternal ("failed to construct group by field data builder, this is abnormal as segcore should always set up a group by field, no matter data status, check code on qn" , err .Error ())
126140 }
127141 // reducing nq * topk results
128142 for nqIdx := int64 (0 ); nqIdx < nq ; nqIdx ++ {
129143 dataCount := int64 (0 )
130144 for subIdx := 0 ; subIdx < subSearchNum ; subIdx += 1 {
131- subData := subSearchResultData [subIdx ]
145+ subData := validSubSearchResultData [subIdx ]
132146 subPks := subData .GetIds ()
133147 subScores := subData .GetScores ()
134148 subGroupByVals := subData .GetGroupByFieldValue ()
135149
136- nqTopK := subData .Topks [nqIdx ]
150+ nqTopK := validSubSearchResultData [ subIdx ] .Topks [nqIdx ]
137151 groupByValIterator := typeutil .GetDataIterator (subGroupByVals )
138152
139153 for i := int64 (0 ); i < nqTopK ; i ++ {
@@ -188,12 +202,25 @@ func reduceSearchResultDataWithGroupBy(ctx context.Context, subSearchResultData
188202 zap .Int64 ("limit" , limit ),
189203 zap .String ("metricType" , metricType ))
190204
205+ validSubSearchResultData := []* schemapb.SearchResultData {}
206+ for _ , result := range subSearchResultData {
207+ size := typeutil .GetSizeOfIDs (result .GetIds ())
208+ if result == nil || size == 0 {
209+ continue
210+ }
211+ validSubSearchResultData = append (validSubSearchResultData , result )
212+ }
213+
214+ if len (validSubSearchResultData ) == 0 {
215+ return fillInEmptyResult (nq ), nil
216+ }
217+
191218 ret := & milvuspb.SearchResults {
192219 Status : merr .Success (),
193220 Results : & schemapb.SearchResultData {
194221 NumQueries : nq ,
195222 TopK : topk ,
196- FieldsData : typeutil .PrepareResultFieldData (subSearchResultData [0 ].GetFieldsData (), limit ),
223+ FieldsData : typeutil .PrepareResultFieldData (validSubSearchResultData [0 ].GetFieldsData (), limit ),
197224 Scores : []float32 {},
198225 Ids : & schemapb.IDs {},
199226 Topks : []int64 {},
@@ -204,30 +231,30 @@ func reduceSearchResultDataWithGroupBy(ctx context.Context, subSearchResultData
204231 return ret , err
205232 }
206233
207- if allSearchCount , _ , err := checkResultDatas (ctx , subSearchResultData , nq , topk ); err != nil {
234+ if allSearchCount , _ , err := checkResultDatas (ctx , validSubSearchResultData , nq , topk ); err != nil {
208235 log .Ctx (ctx ).Warn ("invalid search results" , zap .Error (err ))
209236 return ret , err
210237 } else {
211238 ret .GetResults ().AllSearchCount = allSearchCount
212239 }
213240
214241 var (
215- subSearchNum = len (subSearchResultData )
242+ subSearchNum = len (validSubSearchResultData )
216243 // for results of each subSearchResultData, storing the start offset of each query of nq queries
217244 subSearchNqOffset = make ([][]int64 , subSearchNum )
218245 totalResCount int64 = 0
219246 subSearchGroupByValIterator = make ([]func (int ) any , subSearchNum )
220247 )
221248 for i := 0 ; i < subSearchNum ; i ++ {
222- subSearchNqOffset [i ] = make ([]int64 , subSearchResultData [i ].GetNumQueries ())
249+ subSearchNqOffset [i ] = make ([]int64 , validSubSearchResultData [i ].GetNumQueries ())
223250 for j := int64 (1 ); j < nq ; j ++ {
224- subSearchNqOffset [i ][j ] = subSearchNqOffset [i ][j - 1 ] + subSearchResultData [i ].Topks [j - 1 ]
251+ subSearchNqOffset [i ][j ] = subSearchNqOffset [i ][j - 1 ] + validSubSearchResultData [i ].Topks [j - 1 ]
225252 }
226253 totalResCount += subSearchNqOffset [i ][nq - 1 ]
227- subSearchGroupByValIterator [i ] = typeutil .GetDataIterator (subSearchResultData [i ].GetGroupByFieldValue ())
254+ subSearchGroupByValIterator [i ] = typeutil .GetDataIterator (validSubSearchResultData [i ].GetGroupByFieldValue ())
228255 }
229256
230- gpFieldBuilder , err := typeutil .NewFieldDataBuilder (subSearchResultData [0 ].GetGroupByFieldValue ().GetType (), true , int (limit ))
257+ gpFieldBuilder , err := typeutil .NewFieldDataBuilder (validSubSearchResultData [0 ].GetGroupByFieldValue ().GetType (), true , int (limit ))
231258 if err != nil {
232259 return ret , merr .WrapErrServiceInternal ("failed to construct group by field data builder, this is abnormal as segcore should always set up a group by field, no matter data status, check code on qn" , err .Error ())
233260 }
@@ -251,11 +278,11 @@ func reduceSearchResultDataWithGroupBy(ctx context.Context, subSearchResultData
251278 )
252279
253280 for j = 0 ; j < groupBound ; {
254- subSearchIdx , resultDataIdx := selectHighestScoreIndex (ctx , subSearchResultData , subSearchNqOffset , cursors , i )
281+ subSearchIdx , resultDataIdx := selectHighestScoreIndex (ctx , validSubSearchResultData , subSearchNqOffset , cursors , i )
255282 if subSearchIdx == - 1 {
256283 break
257284 }
258- subSearchRes := subSearchResultData [subSearchIdx ]
285+ subSearchRes := validSubSearchResultData [subSearchIdx ]
259286
260287 id := typeutil .GetPK (subSearchRes .GetIds (), resultDataIdx )
261288 score := subSearchRes .GetScores ()[resultDataIdx ]
@@ -288,7 +315,7 @@ func reduceSearchResultDataWithGroupBy(ctx context.Context, subSearchResultData
288315 for _ , groupVal := range groupByValList {
289316 groupEntities := groupByValMap [groupVal ]
290317 for _ , groupEntity := range groupEntities {
291- subResData := subSearchResultData [groupEntity .subSearchIdx ]
318+ subResData := validSubSearchResultData [groupEntity .subSearchIdx ]
292319 retSize += typeutil .AppendFieldData (ret .Results .FieldsData , subResData .FieldsData , groupEntity .resultIdx )
293320 typeutil .AppendPKs (ret .Results .Ids , groupEntity .id )
294321 ret .Results .Scores = append (ret .Results .Scores , groupEntity .score )
@@ -331,12 +358,25 @@ func reduceSearchResultDataNoGroupBy(ctx context.Context, subSearchResultData []
331358 zap .Int64 ("limit" , limit ),
332359 zap .String ("metricType" , metricType ))
333360
361+ validSubSearchResultData := []* schemapb.SearchResultData {}
362+ for _ , result := range subSearchResultData {
363+ size := typeutil .GetSizeOfIDs (result .GetIds ())
364+ if result == nil || size == 0 {
365+ continue
366+ }
367+ validSubSearchResultData = append (validSubSearchResultData , result )
368+ }
369+
370+ if len (validSubSearchResultData ) == 0 {
371+ return fillInEmptyResult (nq ), nil
372+ }
373+
334374 ret := & milvuspb.SearchResults {
335375 Status : merr .Success (),
336376 Results : & schemapb.SearchResultData {
337377 NumQueries : nq ,
338378 TopK : topk ,
339- FieldsData : typeutil .PrepareResultFieldData (subSearchResultData [0 ].GetFieldsData (), limit ),
379+ FieldsData : typeutil .PrepareResultFieldData (validSubSearchResultData [0 ].GetFieldsData (), limit ),
340380 Scores : []float32 {},
341381 Ids : & schemapb.IDs {},
342382 Topks : []int64 {},
@@ -347,20 +387,20 @@ func reduceSearchResultDataNoGroupBy(ctx context.Context, subSearchResultData []
347387 return ret , nil
348388 }
349389
350- if allSearchCount , _ , err := checkResultDatas (ctx , subSearchResultData , nq , topk ); err != nil {
390+ if allSearchCount , _ , err := checkResultDatas (ctx , validSubSearchResultData , nq , topk ); err != nil {
351391 log .Ctx (ctx ).Warn ("invalid search results" , zap .Error (err ))
352392 return ret , err
353393 } else {
354394 ret .GetResults ().AllSearchCount = allSearchCount
355395 }
356396
357- subSearchNum := len (subSearchResultData )
397+ subSearchNum := len (validSubSearchResultData )
358398 if subSearchNum == 1 && offset == 0 {
359399 // sorting is not needed if there is only one shard and no offset, assigning the result directly.
360400 // we still need to adjust the scores later.
361- ret .Results = subSearchResultData [0 ]
401+ ret .Results = validSubSearchResultData [0 ]
362402 // realTopK is the topK of the nq-th query, it is used in proxy but not handled by delegator.
363- topks := subSearchResultData [0 ].Topks
403+ topks := validSubSearchResultData [0 ].Topks
364404 if len (topks ) > 0 {
365405 ret .Results .TopK = topks [len (topks )- 1 ]
366406 }
@@ -371,9 +411,9 @@ func reduceSearchResultDataNoGroupBy(ctx context.Context, subSearchResultData []
371411 // for results of each subSearchResultData, storing the start offset of each query of nq queries
372412 subSearchNqOffset := make ([][]int64 , subSearchNum )
373413 for i := 0 ; i < subSearchNum ; i ++ {
374- subSearchNqOffset [i ] = make ([]int64 , subSearchResultData [i ].GetNumQueries ())
414+ subSearchNqOffset [i ] = make ([]int64 , validSubSearchResultData [i ].GetNumQueries ())
375415 for j := int64 (1 ); j < nq ; j ++ {
376- subSearchNqOffset [i ][j ] = subSearchNqOffset [i ][j - 1 ] + subSearchResultData [i ].Topks [j - 1 ]
416+ subSearchNqOffset [i ][j ] = subSearchNqOffset [i ][j - 1 ] + validSubSearchResultData [i ].Topks [j - 1 ]
377417 }
378418 }
379419 maxOutputSize := paramtable .Get ().QuotaConfig .MaxOutputSize .GetAsInt64 ()
@@ -388,7 +428,7 @@ func reduceSearchResultDataNoGroupBy(ctx context.Context, subSearchResultData []
388428
389429 // skip offset results
390430 for k := int64 (0 ); k < offset ; k ++ {
391- subSearchIdx , _ := selectHighestScoreIndex (ctx , subSearchResultData , subSearchNqOffset , cursors , i )
431+ subSearchIdx , _ := selectHighestScoreIndex (ctx , validSubSearchResultData , subSearchNqOffset , cursors , i )
392432 if subSearchIdx == - 1 {
393433 break
394434 }
@@ -401,14 +441,14 @@ func reduceSearchResultDataNoGroupBy(ctx context.Context, subSearchResultData []
401441 // From all the sub-query result sets of the i-th query vector,
402442 // find the sub-query result set index of the score j-th data,
403443 // and the index of the data in schemapb.SearchResultData
404- subSearchIdx , resultDataIdx := selectHighestScoreIndex (ctx , subSearchResultData , subSearchNqOffset , cursors , i )
444+ subSearchIdx , resultDataIdx := selectHighestScoreIndex (ctx , validSubSearchResultData , subSearchNqOffset , cursors , i )
405445 if subSearchIdx == - 1 {
406446 break
407447 }
408- score := subSearchResultData [subSearchIdx ].Scores [resultDataIdx ]
448+ score := validSubSearchResultData [subSearchIdx ].Scores [resultDataIdx ]
409449
410- retSize += typeutil .AppendFieldData (ret .Results .FieldsData , subSearchResultData [subSearchIdx ].FieldsData , resultDataIdx )
411- typeutil .CopyPk (ret .Results .Ids , subSearchResultData [subSearchIdx ].GetIds (), int (resultDataIdx ))
450+ retSize += typeutil .AppendFieldData (ret .Results .FieldsData , validSubSearchResultData [subSearchIdx ].FieldsData , resultDataIdx )
451+ typeutil .CopyPk (ret .Results .Ids , validSubSearchResultData [subSearchIdx ].GetIds (), int (resultDataIdx ))
412452 ret .Results .Scores = append (ret .Results .Scores , score )
413453 cursors [subSearchIdx ]++
414454 }
0 commit comments