Skip to content

Commit e2b381f

Browse files
committed
add support sequence feature view get behavior table features
1 parent 63103e8 commit e2b381f

File tree

4 files changed

+129
-187
lines changed

4 files changed

+129
-187
lines changed

api/model_feature_view_seq_config.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,5 +24,5 @@ type SeqConfig struct {
2424
SeqEvent string `json:"seq_event"`
2525
SeqLen int `json:"seq_len"`
2626
OnlineSeqName string `json:"online_seq_name"`
27-
OnlineBehaviorTableFields string `json:"online_behavior_table_fields"`
27+
OnlineBehaviorTableFields []string `json:"online_behavior_table_fields"`
2828
}

dao/feature_view_dao.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ func makeSequenceFeatures(offlineSequences, onlineSequences []*sequenceInfo, seq
9292

9393
//produce seqeunce feature correspond to easyrec processor
9494
sequencesValueMap := make(map[string][]string)
95-
sequenceMap := make(map[string]bool, 0)
95+
sequenceMap := make(map[string]bool)
9696

9797
for _, seq := range onlineSequences {
9898
key := fmt.Sprintf("%s#%s", seq.itemId, seq.event)

dao/feature_view_featuredb_dao.go

Lines changed: 120 additions & 184 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ import (
2424
"github.com/aliyun/aliyun-pai-featurestore-go-sdk/v2/utils"
2525
"github.com/expr-lang/expr"
2626
"github.com/expr-lang/expr/vm"
27-
flatbuffers "github.com/google/flatbuffers/go"
2827
)
2928

3029
const (
@@ -54,6 +53,31 @@ type FeatureViewFeatureDBDao struct {
5453
primaryKeyField string
5554
}
5655

56+
func CntSkipBytes(innerReader *bytes.Reader, fieldType constants.FSType) int {
57+
var skipBytes int = 0
58+
switch fieldType {
59+
case constants.FS_INT32:
60+
skipBytes = 4
61+
case constants.FS_INT64:
62+
skipBytes = 8
63+
case constants.FS_FLOAT:
64+
skipBytes = 4
65+
case constants.FS_DOUBLE:
66+
skipBytes = 8
67+
case constants.FS_STRING:
68+
var length uint32
69+
binary.Read(innerReader, binary.LittleEndian, &length)
70+
skipBytes = int(length)
71+
case constants.FS_BOOLEAN:
72+
skipBytes = 1
73+
default:
74+
var length uint32
75+
binary.Read(innerReader, binary.LittleEndian, &length)
76+
skipBytes = int(length)
77+
}
78+
return skipBytes
79+
}
80+
5781
func NewFeatureViewFeatureDBDao(config DaoConfig) *FeatureViewFeatureDBDao {
5882
dao := FeatureViewFeatureDBDao{
5983
database: config.FeatureDBDatabaseName,
@@ -704,170 +728,32 @@ type FeatureDBBatchGetKKVRequest struct {
704728
WithValue bool `json:"with_value"`
705729
}
706730

707-
type IndexPair struct {
708-
index int
709-
fsType constants.FSType
710-
}
711-
712-
713-
func readValueFromKKVData(data *fdbserverfb.KKVData, offset int, fsType constants.FSType) (interface{}, error) {
714-
715-
tab := data.Table()
716-
717-
// ValueInt32 - 读取 int32 值
718-
valueInt32 := func(i int) int32 {
719-
o := flatbuffers.UOffsetT(tab.Offset(12))
720-
if o != 0 && i+4 <= data.ValueLength() {
721-
a := tab.Vector(o)
722-
return tab.GetInt32(a + flatbuffers.UOffsetT(i))
723-
}
724-
return 0
725-
}
726-
727-
// ValueInt64 - 读取 int64 值
728-
valueInt64 := func(i int) int64 {
729-
o := flatbuffers.UOffsetT(tab.Offset(12))
730-
if o != 0 && i+8 <= data.ValueLength() {
731-
a := tab.Vector(o)
732-
return tab.GetInt64(a + flatbuffers.UOffsetT(i))
733-
}
734-
return 0
735-
}
736-
737-
// ValueFloat32 - 读取 float32 值
738-
valueFloat32 := func(i int) float32 {
739-
o := flatbuffers.UOffsetT(tab.Offset(12))
740-
if o != 0 && i+4 <= data.ValueLength() {
741-
a := tab.Vector(o)
742-
return tab.GetFloat32(a + flatbuffers.UOffsetT(i))
743-
}
744-
return 0.0
745-
}
746-
747-
// ValueFloat64 - 读取 float64 值
748-
valueFloat64 := func(i int) float64 {
749-
o := flatbuffers.UOffsetT(tab.Offset(12))
750-
if o != 0 && i+8 <= data.ValueLength() {
751-
a := tab.Vector(o)
752-
return tab.GetFloat64(a + flatbuffers.UOffsetT(i))
753-
}
754-
return 0.0
755-
}
756-
757-
// ValueBool - 读取 bool 值
758-
valueBool := func(i int) bool {
759-
o := flatbuffers.UOffsetT(tab.Offset(12))
760-
if o != 0 && i+1 <= data.ValueLength() {
761-
a := tab.Vector(o)
762-
return tab.GetBool(a + flatbuffers.UOffsetT(i))
763-
}
764-
return false
765-
}
766-
767-
// ValueString - 读取 string 值
768-
valueString := func(i int) string {
769-
o := flatbuffers.UOffsetT(tab.Offset(12))
770-
if o != 0 && i+4 <= data.ValueLength() {
771-
a := tab.Vector(o)
772-
// 读取长度(uint32,4字节)
773-
length := tab.GetUint32(a + flatbuffers.UOffsetT(i))
774-
775-
// 检查是否有足够的字节用于字符串数据
776-
if i+4+int(length) <= data.ValueLength() {
777-
strBytes := make([]byte, length)
778-
for j := 0; j < int(length); j++ {
779-
strBytes[j] = tab.GetByte(a + flatbuffers.UOffsetT(i+4+j))
780-
}
781-
return string(strBytes)
782-
}
783-
}
784-
return ""
785-
}
786-
787-
switch fsType {
788-
case constants.FS_INT32:
789-
if offset+4 <= data.ValueLength() {
790-
value := valueInt32(offset)
791-
return value, nil
792-
}
793-
return int32(0), fmt.Errorf("insufficient bytes for int32")
794-
795-
case constants.FS_INT64:
796-
if offset+8 <= data.ValueLength() {
797-
value := valueInt64(offset)
798-
return value, nil
799-
}
800-
return int64(0), fmt.Errorf("insufficient bytes for int64")
801-
802-
case constants.FS_FLOAT:
803-
if offset+4 <= data.ValueLength() {
804-
value := valueFloat32(offset)
805-
return value, nil
806-
}
807-
return float32(0), fmt.Errorf("insufficient bytes for float32")
808-
809-
case constants.FS_DOUBLE:
810-
if offset+8 <= data.ValueLength() {
811-
value := valueFloat64(offset)
812-
return value, nil
813-
}
814-
return float64(0), fmt.Errorf("insufficient bytes for float64")
815-
816-
case constants.FS_BOOLEAN:
817-
if offset+1 <= data.ValueLength() {
818-
value := valueBool(offset)
819-
return value, nil
820-
}
821-
return false, fmt.Errorf("insufficient bytes for boolean")
822-
823-
case constants.FS_STRING:
824-
if offset+4 <= data.ValueLength() {
825-
value := valueString(offset)
826-
return value, nil
827-
}
828-
return "", fmt.Errorf("insufficient bytes for string")
829-
830-
default:
831-
// 对于未知类型,尝试按字符串处理
832-
if offset+4 <= data.ValueLength() {
833-
value := valueString(offset)
834-
return value, nil
835-
}
836-
return "", fmt.Errorf("unknown type or insufficient bytes")
837-
}
838-
}
839-
840731
func (d *FeatureViewFeatureDBDao) GetUserSequenceFeature(keys []interface{}, userIdField string, sequenceConfig api.FeatureViewSeqConfig, onlineConfig []*api.SeqConfig) ([]map[string]interface{}, error) {
841732
currTime := time.Now().Unix()
842733
sequencePlayTimeMap := makePlayTimeMap(sequenceConfig.PlayTimeFilter)
843734

844735
seqConfigsMap := make(map[string][]*api.SeqConfig)
845736

846-
featureViewFieldsIndexMap := make(map[string]int)
847-
featureViewFieldsTypeMap := make(map[string]constants.FSType)
848-
849-
seqConfigsBehaviorFieldsIndexTypeMap := make(map[string][][]IndexPair)
737+
seqConfigsBehaviorFieldsMap := make(map[string][]map[string]struct{})
850738

851-
for i, field := range d.fields {
852-
featureViewFieldsIndexMap[field] = i
853-
}
854-
for field, curType := range d.fieldTypeMap {
855-
featureViewFieldsTypeMap[field] = curType
856-
}
739+
withValue := false
857740
for _, seqConfig := range onlineConfig {
858741
mapKey := fmt.Sprintf("%s:%d", seqConfig.SeqEvent, seqConfig.SeqLen)
859742
seqConfigsMap[mapKey] = append(seqConfigsMap[mapKey], seqConfig)
860-
curBehaviorFields := strings.Split(seqConfig.OnlineBehaviorTableFields, "\u0001")
861-
curBehaviorFieldsIndexTypeList := make([]IndexPair, len(curBehaviorFields))
743+
curBehaviorFields := seqConfig.OnlineBehaviorTableFields
744+
curBehaviorFieldsMap := make(map[string]struct{})
862745
for _, field := range curBehaviorFields {
863-
curBehaviorFieldsIndexTypeList = append(curBehaviorFieldsIndexTypeList, IndexPair{index: featureViewFieldsIndexMap[field], fsType: featureViewFieldsTypeMap[field]})
746+
curBehaviorFieldsMap[field] = struct{}{}
747+
}
748+
if len(curBehaviorFieldsMap) > 0 {
749+
withValue = true
750+
seqConfigsBehaviorFieldsMap[mapKey] = append(seqConfigsBehaviorFieldsMap[mapKey], curBehaviorFieldsMap)
864751
}
865-
seqConfigsBehaviorFieldsIndexTypeMap[mapKey] = append(seqConfigsBehaviorFieldsIndexTypeMap[mapKey], curBehaviorFieldsIndexTypeList)
866752
}
867753

868754
errChan := make(chan error, len(keys)*len(onlineConfig))
869755

870-
fetchDataFunc := func(seqEvent string, seqLen int, key interface{}, seqConfigsBehaviorFields []IndexPair) []*sequenceInfo {
756+
fetchDataFunc := func(seqEvent string, seqLen int, key interface{}, selectBehaviorFieldsSet map[string]struct{}) []*sequenceInfo {
871757
sequences := []*sequenceInfo{}
872758

873759
events := strings.Split(seqEvent, "|")
@@ -876,8 +762,9 @@ func (d *FeatureViewFeatureDBDao) GetUserSequenceFeature(keys []interface{}, use
876762
pks = append(pks, fmt.Sprintf("%v\u001D%s", key, event))
877763
}
878764
request := FeatureDBBatchGetKKVRequest{
879-
PKs: pks,
880-
Length: seqLen,
765+
PKs: pks,
766+
Length: seqLen,
767+
WithValue: withValue,
881768
}
882769
body, _ := json.Marshal(request)
883770
url := fmt.Sprintf("%s/api/v1/tables/%s/%s/%s/batch_get_kkv", d.featureDBClient.GetCurrentAddress(false), d.database, d.schema, d.table)
@@ -978,19 +865,89 @@ func (d *FeatureViewFeatureDBDao) GetUserSequenceFeature(keys []interface{}, use
978865
continue
979866
}
980867
}
868+
dataBytes := kkv.ValueBytes()
869+
if len(dataBytes) < 2 {
870+
sequences = append(sequences, seq)
871+
continue
872+
}
873+
innerReader.Reset(dataBytes)
874+
// 读取版本号
875+
var protocalVersion, ifNullFlagVersion uint8
876+
binary.Read(innerReader, binary.LittleEndian, &protocalVersion)
877+
binary.Read(innerReader, binary.LittleEndian, &ifNullFlagVersion)
878+
readFeatureDBFunc_F_1 := func() (map[string]string, error) {
879+
properties := make(map[string]string)
981880

982-
for _, field := range seqConfigsBehaviorFields {
983-
// 将值转换为字符串存储在 seq.onlineBehaviourTableFieldsMap 中
984-
if d.fields != nil && field.index < len(d.fields) {
985-
fieldName := d.fields[field.index]
986-
rawValue, err := readValueFromKKVData(kkv, field.index, field.fsType)
987-
value := utils.ToString(rawValue, "")
988-
if err != nil {
989-
value = ""
881+
for _, field := range d.fields {
882+
var isNull uint8
883+
if err := binary.Read(innerReader, binary.LittleEndian, &isNull); err != nil {
884+
if err == io.EOF {
885+
break
886+
}
887+
return nil, err
888+
}
889+
if isNull == 1 {
890+
// 跳过空值
891+
continue
892+
}
893+
894+
if _, exists := selectBehaviorFieldsSet[field]; exists {
895+
switch d.fieldTypeMap[field] {
896+
case constants.FS_INT32:
897+
var int32Value int32
898+
binary.Read(innerReader, binary.LittleEndian, &int32Value)
899+
properties[field] = fmt.Sprintf("%d", int32Value)
900+
case constants.FS_INT64:
901+
var int64Value int64
902+
binary.Read(innerReader, binary.LittleEndian, &int64Value)
903+
properties[field] = fmt.Sprintf("%d", int64Value)
904+
case constants.FS_FLOAT:
905+
var float32Value float32
906+
binary.Read(innerReader, binary.LittleEndian, &float32Value)
907+
properties[field] = fmt.Sprintf("%v", float32Value)
908+
case constants.FS_DOUBLE:
909+
var float64Value float64
910+
binary.Read(innerReader, binary.LittleEndian, &float64Value)
911+
properties[field] = fmt.Sprintf("%v", float64Value)
912+
case constants.FS_STRING:
913+
var length uint32
914+
binary.Read(innerReader, binary.LittleEndian, &length)
915+
strBytes := make([]byte, length)
916+
binary.Read(innerReader, binary.LittleEndian, &strBytes)
917+
properties[field] = string(strBytes)
918+
case constants.FS_BOOLEAN:
919+
var boolValue bool
920+
binary.Read(innerReader, binary.LittleEndian, &boolValue)
921+
properties[field] = fmt.Sprintf("%v", boolValue)
922+
default:
923+
var length uint32
924+
binary.Read(innerReader, binary.LittleEndian, &length)
925+
strBytes := make([]byte, length)
926+
binary.Read(innerReader, binary.LittleEndian, &strBytes)
927+
properties[field] = string(strBytes)
928+
}
929+
} else {
930+
skipBytes := CntSkipBytes(innerReader, d.fieldTypeMap[field])
931+
if skipBytes > 0 {
932+
if _, err := innerReader.Seek(int64(skipBytes), io.SeekCurrent); err != nil {
933+
return nil, err
934+
}
935+
}
990936
}
991-
seq.onlineBehaviourTableFieldsMap[fieldName] = value
992937
}
938+
return properties, nil
939+
}
993940

941+
if protocalVersion == FeatureDB_Protocal_Version_F && ifNullFlagVersion == FeatureDB_IfNull_Flag_Version_1 {
942+
readResult, err := readFeatureDBFunc_F_1()
943+
if err != nil {
944+
errChan <- err
945+
return nil
946+
}
947+
seq.onlineBehaviourTableFieldsMap = readResult
948+
} else {
949+
errChan <- fmt.Errorf("unsupported protocal version: %d, ifNullFlagVersion: %d", protocalVersion, ifNullFlagVersion)
950+
continue
994951
}
995952
sequences = append(sequences, seq)
996953
}
@@ -1015,8 +972,8 @@ func (d *FeatureViewFeatureDBDao) GetUserSequenceFeature(keys []interface{}, use
1015972
if len(seqConfigs) == 0 {
1016973
continue
1017974
}
1018-
seqConfigsBehaviorFields := make([][]IndexPair, len(seqConfigs))
1019-
if curFields, exits := seqConfigsBehaviorFieldsIndexTypeMap[k]; exits {
975+
seqConfigsBehaviorFields := make([]map[string]struct{}, len(seqConfigs))
976+
if curFields, exits := seqConfigsBehaviorFieldsMap[k]; exits {
1020977
seqConfigsBehaviorFields = curFields
1021978
}
1022979
eventWg.Add(1)
@@ -1255,28 +1212,7 @@ func (d *FeatureViewFeatureDBDao) GetUserBehaviorFeature(userIds []interface{},
12551212
properties[field] = string(strBytes)
12561213
}
12571214
} else {
1258-
var skipBytes int = 0
1259-
switch d.fieldTypeMap[field] {
1260-
case constants.FS_INT32:
1261-
skipBytes = 4
1262-
case constants.FS_INT64:
1263-
skipBytes = 8
1264-
case constants.FS_FLOAT:
1265-
skipBytes = 4
1266-
case constants.FS_DOUBLE:
1267-
skipBytes = 8
1268-
case constants.FS_STRING:
1269-
var length uint32
1270-
binary.Read(innerReader, binary.LittleEndian, &length)
1271-
skipBytes = int(length)
1272-
case constants.FS_BOOLEAN:
1273-
skipBytes = 1
1274-
default:
1275-
var length uint32
1276-
binary.Read(innerReader, binary.LittleEndian, &length)
1277-
skipBytes = int(length)
1278-
}
1279-
1215+
skipBytes := CntSkipBytes(innerReader, d.fieldTypeMap[field])
12801216
if skipBytes > 0 {
12811217
if _, err := innerReader.Seek(int64(skipBytes), io.SeekCurrent); err != nil {
12821218
return nil, err

0 commit comments

Comments
 (0)