Skip to content

Commit 21d703e

Browse files
authored
Merge pull request #12 from aliyun/features/fix_featuredb_selectfields
[fix]:support selectfields for featuredb
2 parents afcd6a2 + 240ced6 commit 21d703e

File tree

4 files changed

+91
-40
lines changed

4 files changed

+91
-40
lines changed

dao/dao_config.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,10 +49,10 @@ type DaoConfig struct {
4949
SaveOriginalField bool
5050

5151
FieldMap map[string]string
52-
// redis, tablestore
52+
// redis, tablestore, featuredb
5353
FieldTypeMap map[string]constants.FSType
5454

55-
// redis
55+
// redis, featuredb
5656
Fields []string
5757

5858
// hologres sequence tables

dao/feature_view_featuredb_dao.go

Lines changed: 84 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ type FeatureViewFeatureDBDao struct {
3232
address string
3333
token string
3434
fieldTypeMap map[string]constants.FSType
35+
fields []string
3536
signature string
3637
primaryKeyField string
3738
}
@@ -44,6 +45,7 @@ func NewFeatureViewFeatureDBDao(config DaoConfig) *FeatureViewFeatureDBDao {
4445
fieldTypeMap: config.FieldTypeMap,
4546
signature: config.FeatureDBSignature,
4647
primaryKeyField: config.PrimaryKeyField,
48+
fields: config.Fields,
4749
}
4850
client, err := featuredb.GetFeatureDBClient()
4951
if err != nil {
@@ -60,6 +62,11 @@ func NewFeatureViewFeatureDBDao(config DaoConfig) *FeatureViewFeatureDBDao {
6062

6163
func (d *FeatureViewFeatureDBDao) GetFeatures(keys []interface{}, selectFields []string) ([]map[string]interface{}, error) {
6264
result := make([]map[string]interface{}, 0, len(keys))
65+
selectFieldsSet := make(map[string]struct{})
66+
for _, selectField := range selectFields {
67+
selectFieldsSet[selectField] = struct{}{}
68+
}
69+
6370
var wg sync.WaitGroup
6471
var mu sync.Mutex
6572
const groupSize = 200
@@ -70,6 +77,7 @@ func (d *FeatureViewFeatureDBDao) GetFeatures(keys []interface{}, selectFields [
7077
return result, errors.New("FeatureDB datasource has not been created")
7178
}
7279

80+
errChan := make(chan error, len(keys)/groupSize+1)
7381
for i := 0; i < len(keys); i += groupSize {
7482
end := i + groupSize
7583
if end > len(keys) {
@@ -148,58 +156,90 @@ func (d *FeatureViewFeatureDBDao) GetFeatures(keys []interface{}, selectFields [
148156
binary.Read(innerReader, binary.LittleEndian, &protocalVersion)
149157
binary.Read(innerReader, binary.LittleEndian, &ifNullFlagVersion)
150158

151-
readFeatureDBFunc_F_1 := func() map[string]interface{} {
159+
readFeatureDBFunc_F_1 := func() (map[string]interface{}, error) {
152160
properties := make(map[string]interface{})
153161

154-
for _, field := range selectFields {
155-
if field == d.primaryKeyField {
156-
continue
157-
}
162+
for _, field := range d.fields {
158163
var isNull uint8
159-
binary.Read(innerReader, binary.LittleEndian, &isNull)
164+
if err := binary.Read(innerReader, binary.LittleEndian, &isNull); err != nil {
165+
return nil, err
166+
}
160167

161168
if isNull == 1 {
162169
// 跳过空值
163170
continue
164171
}
165-
switch d.fieldTypeMap[field] {
166-
case constants.FS_DOUBLE:
167-
var float64Value float64
168-
binary.Read(innerReader, binary.LittleEndian, &float64Value)
169-
properties[field] = float64Value
170-
case constants.FS_FLOAT:
171-
var float32Value float32
172-
binary.Read(innerReader, binary.LittleEndian, &float32Value)
173-
properties[field] = float32Value
174-
case constants.FS_INT64:
175-
var int64Value int64
176-
binary.Read(innerReader, binary.LittleEndian, &int64Value)
177-
properties[field] = int64Value
178-
case constants.FS_INT32:
179-
var int32Value int32
180-
binary.Read(innerReader, binary.LittleEndian, &int32Value)
181-
properties[field] = int32Value
182-
case constants.FS_BOOLEAN:
183-
var booleanValue bool
184-
binary.Read(innerReader, binary.LittleEndian, &booleanValue)
185-
properties[field] = booleanValue
186-
default:
187-
var length uint32
188-
binary.Read(innerReader, binary.LittleEndian, &length)
189-
strBytes := make([]byte, length)
190-
binary.Read(innerReader, binary.LittleEndian, &strBytes)
191-
properties[field] = string(strBytes)
172+
if _, exists := selectFieldsSet[field]; exists {
173+
switch d.fieldTypeMap[field] {
174+
case constants.FS_DOUBLE:
175+
var float64Value float64
176+
binary.Read(innerReader, binary.LittleEndian, &float64Value)
177+
properties[field] = float64Value
178+
case constants.FS_FLOAT:
179+
var float32Value float32
180+
binary.Read(innerReader, binary.LittleEndian, &float32Value)
181+
properties[field] = float32Value
182+
case constants.FS_INT64:
183+
var int64Value int64
184+
binary.Read(innerReader, binary.LittleEndian, &int64Value)
185+
properties[field] = int64Value
186+
case constants.FS_INT32:
187+
var int32Value int32
188+
binary.Read(innerReader, binary.LittleEndian, &int32Value)
189+
properties[field] = int32Value
190+
case constants.FS_BOOLEAN:
191+
var booleanValue bool
192+
binary.Read(innerReader, binary.LittleEndian, &booleanValue)
193+
properties[field] = booleanValue
194+
default:
195+
var length uint32
196+
binary.Read(innerReader, binary.LittleEndian, &length)
197+
strBytes := make([]byte, length)
198+
binary.Read(innerReader, binary.LittleEndian, &strBytes)
199+
properties[field] = string(strBytes)
200+
}
201+
} else {
202+
var skipBytes int
203+
switch d.fieldTypeMap[field] {
204+
case constants.FS_DOUBLE:
205+
skipBytes = 8
206+
case constants.FS_FLOAT:
207+
skipBytes = 4
208+
case constants.FS_INT64:
209+
skipBytes = 8
210+
case constants.FS_INT32:
211+
skipBytes = 4
212+
case constants.FS_BOOLEAN:
213+
skipBytes = 1
214+
default:
215+
var length uint32
216+
binary.Read(innerReader, binary.LittleEndian, &length)
217+
skipBytes = int(length)
218+
}
219+
220+
skipData := make([]byte, skipBytes)
221+
if _, err := io.ReadFull(innerReader, skipData); err != nil {
222+
return nil, err
223+
}
192224
}
193225
}
194226
properties[d.primaryKeyField] = ks[keyStartIdx+i]
195227

196-
return properties
197-
}()
228+
return properties, nil
229+
}
198230

199231
if protocalVersion == FeatureDB_Protocal_Version_F && ifNullFlagVersion == FeatureDB_IfNull_Flag_Version_1 {
200-
innerResult = append(innerResult, readFeatureDBFunc_F_1)
232+
readResult, err := readFeatureDBFunc_F_1()
233+
if err != nil {
234+
errChan <- err
235+
fmt.Println(err)
236+
return
237+
}
238+
innerResult = append(innerResult, readResult)
201239
} else {
202-
panic(fmt.Sprintf("protocalVersion %v or ifNullFlagVersion %d is not supported\n", protocalVersion, ifNullFlagVersion))
240+
errChan <- fmt.Errorf("FeatureDB read key %v error: protocalVersion %v or ifNullFlagVersion %d is not supported", ks[keyStartIdx+i], protocalVersion, ifNullFlagVersion)
241+
fmt.Printf("FeatureDB read key %v error: protocalVersion %v or ifNullFlagVersion %d is not supported", ks[keyStartIdx+i], protocalVersion, ifNullFlagVersion)
242+
return
203243
}
204244
}
205245
keyStartIdx += recordBlock.ValuesLength()
@@ -211,6 +251,13 @@ func (d *FeatureViewFeatureDBDao) GetFeatures(keys []interface{}, selectFields [
211251

212252
}
213253
wg.Wait()
254+
close(errChan)
255+
256+
for err := range errChan {
257+
if err != nil {
258+
return nil, err
259+
}
260+
}
214261

215262
return result, nil
216263
}

dao/feature_view_tablestore_dao.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,9 @@ func (d *FeatureViewTableStoreDao) GetFeatures(keys []interface{}, selectFields
101101
log.Println(errors.New(rowResult.Error.Message))
102102
return
103103
}
104+
if rowResult.PrimaryKey.PrimaryKeys == nil {
105+
continue
106+
}
104107
newMap := make(map[string]interface{})
105108
for _, pkValue := range rowResult.PrimaryKey.PrimaryKeys {
106109
newMap[pkValue.ColumnName] = pkValue.Value

domain/base_feature_view.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ func NewBaseFeatureView(view *api.FeatureView, p *Project, entity *FeatureEntity
5353
daoConfig.FeatureDBTableName = featureView.Name
5454
daoConfig.FeatureDBSignature = p.Signature
5555

56-
fieldTypeMap := make(map[string]constants.FSType, len(view.Fields)-1)
56+
fieldTypeMap := make(map[string]constants.FSType, len(view.Fields))
5757
for _, field := range view.Fields {
5858
if field.IsPartition {
5959
continue
@@ -62,6 +62,7 @@ func NewBaseFeatureView(view *api.FeatureView, p *Project, entity *FeatureEntity
6262
}
6363
}
6464
daoConfig.FieldTypeMap = fieldTypeMap
65+
daoConfig.Fields = featureView.featureFields
6566
} else {
6667
switch p.OnlineDatasourceType {
6768
case constants.Datasource_Type_Hologres:

0 commit comments

Comments
 (0)