Skip to content

Commit f7ebbdb

Browse files
committed
fix: Handle empty FieldsData in reduce/rerank for requery scenario (milvus-io#44917)
issue: milvus-io#44909 When requery optimization is enabled, search results contain IDs but empty FieldsData. During reduce/rerank operations, if the first shard has empty FieldsData while others have data, PrepareResultFieldData initializes an empty array, causing AppendFieldData to panic when accessing array indices. Changes: - Find first non-empty FieldsData as template in 3 functions: reduceAdvanceGroupBy, reduceSearchResultDataWithGroupBy, reduceSearchResultDataNoGroupBy - Add length check before 2 AppendFieldData calls in reduce functions to prevent panic - Improve newRerankOutputs to find first non-empty fieldData using len(FieldsData) check instead of GetSizeOfIDs - Add length check in appendResult before AppendFieldData - Add comprehensive unit tests for empty and partial empty FieldsData scenarios in both reduce and rerank functions This fix handles both pure requery (all empty) and mixed scenarios (some empty, some with data) without breaking normal search flow. The key improvement is checking FieldsData length directly rather than IDs, as requery may have IDs but empty FieldsData. Signed-off-by: Wei Liu <[email protected]>
1 parent a3006b7 commit f7ebbdb

File tree

4 files changed

+334
-8
lines changed

4 files changed

+334
-8
lines changed

internal/proxy/search_reduce_util.go

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,13 @@ func reduceAdvanceGroupBy(ctx context.Context, subSearchResultData []*schemapb.S
101101
} else {
102102
ret.GetResults().AllSearchCount = allSearchCount
103103
limit = int64(hitNum)
104-
ret.GetResults().FieldsData = typeutil.PrepareResultFieldData(subSearchResultData[0].GetFieldsData(), limit)
104+
// Find the first non-empty FieldsData as template
105+
for _, result := range subSearchResultData {
106+
if len(result.GetFieldsData()) > 0 {
107+
ret.GetResults().FieldsData = typeutil.PrepareResultFieldData(result.GetFieldsData(), limit)
108+
break
109+
}
110+
}
105111
}
106112

107113
if err := setupIdListForSearchResult(ret, pkType, limit); err != nil {
@@ -193,7 +199,7 @@ func reduceSearchResultDataWithGroupBy(ctx context.Context, subSearchResultData
193199
Results: &schemapb.SearchResultData{
194200
NumQueries: nq,
195201
TopK: topk,
196-
FieldsData: typeutil.PrepareResultFieldData(subSearchResultData[0].GetFieldsData(), limit),
202+
FieldsData: []*schemapb.FieldData{},
197203
Scores: []float32{},
198204
Ids: &schemapb.IDs{},
199205
Topks: []int64{},
@@ -211,6 +217,14 @@ func reduceSearchResultDataWithGroupBy(ctx context.Context, subSearchResultData
211217
ret.GetResults().AllSearchCount = allSearchCount
212218
}
213219

220+
// Find the first non-empty FieldsData as template
221+
for _, result := range subSearchResultData {
222+
if len(result.GetFieldsData()) > 0 {
223+
ret.GetResults().FieldsData = typeutil.PrepareResultFieldData(result.GetFieldsData(), limit)
224+
break
225+
}
226+
}
227+
214228
var (
215229
subSearchNum = len(subSearchResultData)
216230
// for results of each subSearchResultData, storing the start offset of each query of nq queries
@@ -289,7 +303,9 @@ func reduceSearchResultDataWithGroupBy(ctx context.Context, subSearchResultData
289303
groupEntities := groupByValMap[groupVal]
290304
for _, groupEntity := range groupEntities {
291305
subResData := subSearchResultData[groupEntity.subSearchIdx]
292-
retSize += typeutil.AppendFieldData(ret.Results.FieldsData, subResData.FieldsData, groupEntity.resultIdx)
306+
if len(ret.Results.FieldsData) > 0 {
307+
retSize += typeutil.AppendFieldData(ret.Results.FieldsData, subResData.FieldsData, groupEntity.resultIdx)
308+
}
293309
typeutil.AppendPKs(ret.Results.Ids, groupEntity.id)
294310
ret.Results.Scores = append(ret.Results.Scores, groupEntity.score)
295311
gpFieldBuilder.Add(groupVal)
@@ -336,7 +352,7 @@ func reduceSearchResultDataNoGroupBy(ctx context.Context, subSearchResultData []
336352
Results: &schemapb.SearchResultData{
337353
NumQueries: nq,
338354
TopK: topk,
339-
FieldsData: typeutil.PrepareResultFieldData(subSearchResultData[0].GetFieldsData(), limit),
355+
FieldsData: []*schemapb.FieldData{},
340356
Scores: []float32{},
341357
Ids: &schemapb.IDs{},
342358
Topks: []int64{},
@@ -354,6 +370,14 @@ func reduceSearchResultDataNoGroupBy(ctx context.Context, subSearchResultData []
354370
ret.GetResults().AllSearchCount = allSearchCount
355371
}
356372

373+
// Find the first non-empty FieldsData as template
374+
for _, result := range subSearchResultData {
375+
if len(result.GetFieldsData()) > 0 {
376+
ret.GetResults().FieldsData = typeutil.PrepareResultFieldData(result.GetFieldsData(), limit)
377+
break
378+
}
379+
}
380+
357381
subSearchNum := len(subSearchResultData)
358382
if subSearchNum == 1 && offset == 0 {
359383
// sorting is not needed if there is only one shard and no offset, assigning the result directly.
@@ -407,7 +431,9 @@ func reduceSearchResultDataNoGroupBy(ctx context.Context, subSearchResultData []
407431
}
408432
score := subSearchResultData[subSearchIdx].Scores[resultDataIdx]
409433

410-
retSize += typeutil.AppendFieldData(ret.Results.FieldsData, subSearchResultData[subSearchIdx].FieldsData, resultDataIdx)
434+
if len(ret.Results.FieldsData) > 0 {
435+
retSize += typeutil.AppendFieldData(ret.Results.FieldsData, subSearchResultData[subSearchIdx].FieldsData, resultDataIdx)
436+
}
411437
typeutil.CopyPk(ret.Results.Ids, subSearchResultData[subSearchIdx].GetIds(), int(resultDataIdx))
412438
ret.Results.Scores = append(ret.Results.Scores, score)
413439
cursors[subSearchIdx]++

internal/proxy/search_reduce_util_test.go

Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,166 @@ func (struts *SearchReduceUtilTestSuite) TestReduceSearchResultWithEmtpyGroupDat
8484
struts.Nil(results.Results.GetGroupByFieldValue())
8585
}
8686

87+
// TestReduceWithEmptyFieldsData tests reduce functions when FieldsData is empty (requery scenario)
88+
func (struts *SearchReduceUtilTestSuite) TestReduceWithEmptyFieldsData() {
89+
ctx := context.Background()
90+
nq := int64(1)
91+
topK := int64(5)
92+
offset := int64(0)
93+
94+
// Create search results with empty FieldsData (simulating requery scenario)
95+
searchResultData1 := &schemapb.SearchResultData{
96+
Ids: &schemapb.IDs{
97+
IdField: &schemapb.IDs_IntId{
98+
IntId: &schemapb.LongArray{
99+
Data: []int64{1, 2, 3, 4, 5},
100+
},
101+
},
102+
},
103+
Scores: []float32{0.9, 0.8, 0.7, 0.6, 0.5},
104+
Topks: []int64{5},
105+
NumQueries: nq,
106+
TopK: topK,
107+
FieldsData: []*schemapb.FieldData{}, // Empty FieldsData for requery
108+
}
109+
110+
searchResultData2 := &schemapb.SearchResultData{
111+
Ids: &schemapb.IDs{
112+
IdField: &schemapb.IDs_IntId{
113+
IntId: &schemapb.LongArray{
114+
Data: []int64{6, 7, 8, 9, 10},
115+
},
116+
},
117+
},
118+
Scores: []float32{0.85, 0.75, 0.65, 0.55, 0.45},
119+
Topks: []int64{5},
120+
NumQueries: nq,
121+
TopK: topK,
122+
FieldsData: []*schemapb.FieldData{}, // Empty FieldsData for requery
123+
}
124+
125+
// Test reduceSearchResultDataNoGroupBy with empty FieldsData
126+
{
127+
results, err := reduceSearchResultDataNoGroupBy(ctx, []*schemapb.SearchResultData{searchResultData1, searchResultData2}, nq, topK, "L2", schemapb.DataType_Int64, offset)
128+
struts.NoError(err)
129+
struts.NotNil(results)
130+
// Should have merged results without panic
131+
struts.Equal(int64(5), results.Results.Topks[0])
132+
// FieldsData should be empty since all inputs were empty
133+
struts.Equal(0, len(results.Results.FieldsData))
134+
}
135+
136+
// Test reduceSearchResultDataWithGroupBy with empty FieldsData
137+
{
138+
// Add GroupByFieldValue to support group by
139+
searchResultData1.GroupByFieldValue = &schemapb.FieldData{
140+
Type: schemapb.DataType_VarChar,
141+
FieldName: "group",
142+
FieldId: 101,
143+
Field: &schemapb.FieldData_Scalars{
144+
Scalars: &schemapb.ScalarField{
145+
Data: &schemapb.ScalarField_StringData{
146+
StringData: &schemapb.StringArray{
147+
Data: []string{"a", "b", "c", "a", "b"},
148+
},
149+
},
150+
},
151+
},
152+
}
153+
searchResultData2.GroupByFieldValue = &schemapb.FieldData{
154+
Type: schemapb.DataType_VarChar,
155+
FieldName: "group",
156+
FieldId: 101,
157+
Field: &schemapb.FieldData_Scalars{
158+
Scalars: &schemapb.ScalarField{
159+
Data: &schemapb.ScalarField_StringData{
160+
StringData: &schemapb.StringArray{
161+
Data: []string{"c", "a", "b", "c", "a"},
162+
},
163+
},
164+
},
165+
},
166+
}
167+
168+
results, err := reduceSearchResultDataWithGroupBy(ctx, []*schemapb.SearchResultData{searchResultData1, searchResultData2}, nq, topK, "L2", schemapb.DataType_Int64, offset, int64(2))
169+
struts.NoError(err)
170+
struts.NotNil(results)
171+
// FieldsData should be empty since all inputs were empty
172+
struts.Equal(0, len(results.Results.FieldsData))
173+
}
174+
175+
// Test reduceAdvanceGroupBy with empty FieldsData
176+
{
177+
results, err := reduceAdvanceGroupBy(ctx, []*schemapb.SearchResultData{searchResultData1, searchResultData2}, nq, topK, schemapb.DataType_Int64, "L2")
178+
struts.NoError(err)
179+
struts.NotNil(results)
180+
// FieldsData should be empty since all inputs were empty
181+
struts.Equal(0, len(results.Results.FieldsData))
182+
}
183+
}
184+
185+
// TestReduceWithPartialEmptyFieldsData tests when first result has empty FieldsData but second has data
186+
func (struts *SearchReduceUtilTestSuite) TestReduceWithPartialEmptyFieldsData() {
187+
ctx := context.Background()
188+
nq := int64(1)
189+
topK := int64(3)
190+
offset := int64(0)
191+
192+
// First result with empty FieldsData
193+
searchResultData1 := &schemapb.SearchResultData{
194+
Ids: &schemapb.IDs{
195+
IdField: &schemapb.IDs_IntId{
196+
IntId: &schemapb.LongArray{
197+
Data: []int64{1, 2, 3},
198+
},
199+
},
200+
},
201+
Scores: []float32{0.9, 0.8, 0.7},
202+
Topks: []int64{3},
203+
NumQueries: nq,
204+
TopK: topK,
205+
FieldsData: []*schemapb.FieldData{}, // Empty
206+
}
207+
208+
// Second result with non-empty FieldsData
209+
searchResultData2 := &schemapb.SearchResultData{
210+
Ids: &schemapb.IDs{
211+
IdField: &schemapb.IDs_IntId{
212+
IntId: &schemapb.LongArray{
213+
Data: []int64{4, 5, 6},
214+
},
215+
},
216+
},
217+
Scores: []float32{0.85, 0.75, 0.65},
218+
Topks: []int64{3},
219+
NumQueries: nq,
220+
TopK: topK,
221+
FieldsData: []*schemapb.FieldData{
222+
{
223+
Type: schemapb.DataType_Int64,
224+
FieldName: "field1",
225+
FieldId: 100,
226+
Field: &schemapb.FieldData_Scalars{
227+
Scalars: &schemapb.ScalarField{
228+
Data: &schemapb.ScalarField_LongData{
229+
LongData: &schemapb.LongArray{
230+
Data: []int64{40, 50, 60},
231+
},
232+
},
233+
},
234+
},
235+
},
236+
},
237+
}
238+
239+
// Test: Should use the non-empty FieldsData from second result
240+
results, err := reduceSearchResultDataNoGroupBy(ctx, []*schemapb.SearchResultData{searchResultData1, searchResultData2}, nq, topK, "L2", schemapb.DataType_Int64, offset)
241+
struts.NoError(err)
242+
struts.NotNil(results)
243+
// Should have initialized FieldsData from second result
244+
struts.Greater(len(results.Results.FieldsData), 0)
245+
}
246+
87247
func TestSearchReduceUtilTestSuite(t *testing.T) {
88248
suite.Run(t, new(SearchReduceUtilTestSuite))
89249
}

internal/util/function/rerank/util.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -146,8 +146,12 @@ func newRerankOutputs(inputs *rerankInputs, searchParams *SearchParams) *rerankO
146146
Ids: &schemapb.IDs{},
147147
Topks: []int64{},
148148
}
149-
if len(inputs.fieldData) > 0 {
150-
ret.FieldsData = typeutil.PrepareResultFieldData(inputs.fieldData[0].GetFieldsData(), searchParams.limit)
149+
// Find the first non-empty fieldData and prepare result fields
150+
for _, fieldData := range inputs.fieldData {
151+
if fieldData != nil && len(fieldData.GetFieldsData()) > 0 {
152+
ret.FieldsData = typeutil.PrepareResultFieldData(fieldData.GetFieldsData(), searchParams.limit)
153+
break
154+
}
151155
}
152156
return &rerankOutputs{ret}
153157
}
@@ -157,7 +161,7 @@ func appendResult[T PKType](inputs *rerankInputs, outputs *rerankOutputs, idScor
157161
scores := idScores.scores
158162
outputs.searchResultData.Topks = append(outputs.searchResultData.Topks, int64(len(ids)))
159163
outputs.searchResultData.Scores = append(outputs.searchResultData.Scores, scores...)
160-
if len(inputs.fieldData) > 0 {
164+
if len(inputs.fieldData) > 0 && len(outputs.searchResultData.FieldsData) > 0 {
161165
for idx := range ids {
162166
loc := idScores.locations[idx]
163167
typeutil.AppendFieldData(outputs.searchResultData.FieldsData, inputs.fieldData[loc.batchIdx].GetFieldsData(), int64(loc.offset))

0 commit comments

Comments
 (0)