Skip to content

Commit 19a838f

Browse files
authored
Merge pull request #55 from aliyun/feature/add_sequence_fv_behavior_field
feat: add support sequence feature view behavior fields
2 parents 49bbae6 + e2b381f commit 19a838f

File tree

5 files changed

+163
-42
lines changed

5 files changed

+163
-42
lines changed

api/model_feature_view_seq_config.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,9 @@ type FeatureViewSeqConfig struct {
2020
}
2121

2222
type SeqConfig struct {
23-
OfflineSeqName string `json:"offline_seq_name"`
24-
SeqEvent string `json:"seq_event"`
25-
SeqLen int `json:"seq_len"`
26-
OnlineSeqName string `json:"online_seq_name"`
23+
OfflineSeqName string `json:"offline_seq_name"`
24+
SeqEvent string `json:"seq_event"`
25+
SeqLen int `json:"seq_len"`
26+
OnlineSeqName string `json:"online_seq_name"`
27+
OnlineBehaviorTableFields []string `json:"online_behavior_table_fields"`
2728
}

dao/feature_view_dao.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ func makeSequenceFeatures(offlineSequences, onlineSequences []*sequenceInfo, seq
9292

9393
//produce seqeunce feature correspond to easyrec processor
9494
sequencesValueMap := make(map[string][]string)
95-
sequenceMap := make(map[string]bool, 0)
95+
sequenceMap := make(map[string]bool)
9696

9797
for _, seq := range onlineSequences {
9898
key := fmt.Sprintf("%s#%s", seq.itemId, seq.event)
@@ -105,6 +105,9 @@ func makeSequenceFeatures(offlineSequences, onlineSequences []*sequenceInfo, seq
105105
sequencesValueMap[sequenceConfig.PlayTimeField] = append(sequencesValueMap[sequenceConfig.PlayTimeField], fmt.Sprintf("%.2f", seq.playTime))
106106
}
107107
sequencesValueMap["ts"] = append(sequencesValueMap["ts"], fmt.Sprintf("%d", currTime-seq.timestamp))
108+
for k, v := range seq.onlineBehaviourTableFieldsMap {
109+
sequencesValueMap[k] = append(sequencesValueMap[k], v)
110+
}
108111
}
109112
}
110113

dao/feature_view_featuredb_dao.go

Lines changed: 142 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,31 @@ type FeatureViewFeatureDBDao struct {
5353
primaryKeyField string
5454
}
5555

56+
func CntSkipBytes(innerReader *bytes.Reader, fieldType constants.FSType) int {
57+
var skipBytes int = 0
58+
switch fieldType {
59+
case constants.FS_INT32:
60+
skipBytes = 4
61+
case constants.FS_INT64:
62+
skipBytes = 8
63+
case constants.FS_FLOAT:
64+
skipBytes = 4
65+
case constants.FS_DOUBLE:
66+
skipBytes = 8
67+
case constants.FS_STRING:
68+
var length uint32
69+
binary.Read(innerReader, binary.LittleEndian, &length)
70+
skipBytes = int(length)
71+
case constants.FS_BOOLEAN:
72+
skipBytes = 1
73+
default:
74+
var length uint32
75+
binary.Read(innerReader, binary.LittleEndian, &length)
76+
skipBytes = int(length)
77+
}
78+
return skipBytes
79+
}
80+
5681
func NewFeatureViewFeatureDBDao(config DaoConfig) *FeatureViewFeatureDBDao {
5782
dao := FeatureViewFeatureDBDao{
5883
database: config.FeatureDBDatabaseName,
@@ -707,9 +732,28 @@ func (d *FeatureViewFeatureDBDao) GetUserSequenceFeature(keys []interface{}, use
707732
currTime := time.Now().Unix()
708733
sequencePlayTimeMap := makePlayTimeMap(sequenceConfig.PlayTimeFilter)
709734

735+
seqConfigsMap := make(map[string][]*api.SeqConfig)
736+
737+
seqConfigsBehaviorFieldsMap := make(map[string][]map[string]struct{})
738+
739+
withValue := false
740+
for _, seqConfig := range onlineConfig {
741+
mapKey := fmt.Sprintf("%s:%d", seqConfig.SeqEvent, seqConfig.SeqLen)
742+
seqConfigsMap[mapKey] = append(seqConfigsMap[mapKey], seqConfig)
743+
curBehaviorFields := seqConfig.OnlineBehaviorTableFields
744+
curBehaviorFieldsMap := make(map[string]struct{})
745+
for _, field := range curBehaviorFields {
746+
curBehaviorFieldsMap[field] = struct{}{}
747+
}
748+
if len(curBehaviorFieldsMap) > 0 {
749+
withValue = true
750+
seqConfigsBehaviorFieldsMap[mapKey] = append(seqConfigsBehaviorFieldsMap[mapKey], curBehaviorFieldsMap)
751+
}
752+
}
753+
710754
errChan := make(chan error, len(keys)*len(onlineConfig))
711755

712-
fetchDataFunc := func(seqEvent string, seqLen int, key interface{}) []*sequenceInfo {
756+
fetchDataFunc := func(seqEvent string, seqLen int, key interface{}, selectBehaviorFieldsSet map[string]struct{}) []*sequenceInfo {
713757
sequences := []*sequenceInfo{}
714758

715759
events := strings.Split(seqEvent, "|")
@@ -718,8 +762,9 @@ func (d *FeatureViewFeatureDBDao) GetUserSequenceFeature(keys []interface{}, use
718762
pks = append(pks, fmt.Sprintf("%v\u001D%s", key, event))
719763
}
720764
request := FeatureDBBatchGetKKVRequest{
721-
PKs: pks,
722-
Length: seqLen,
765+
PKs: pks,
766+
Length: seqLen,
767+
WithValue: withValue,
723768
}
724769
body, _ := json.Marshal(request)
725770
url := fmt.Sprintf("%s/api/v1/tables/%s/%s/%s/batch_get_kkv", d.featureDBClient.GetCurrentAddress(false), d.database, d.schema, d.table)
@@ -768,6 +813,8 @@ func (d *FeatureViewFeatureDBDao) GetUserSequenceFeature(keys []interface{}, use
768813
}
769814

770815
reader := bufio.NewReader(response.Body)
816+
innerReader := readerPool.Get().(*bytes.Reader)
817+
defer readerPool.Put(innerReader)
771818
for {
772819
buf, err := deserialize(reader)
773820
if err == io.EOF {
@@ -808,6 +855,8 @@ func (d *FeatureViewFeatureDBDao) GetUserSequenceFeature(keys []interface{}, use
808855
seq.timestamp = kkv.EventTimestamp()
809856
seq.playTime = kkv.PlayTime()
810857

858+
seq.onlineBehaviourTableFieldsMap = make(map[string]string)
859+
811860
if seq.event == "" || seq.itemId == "" {
812861
continue
813862
}
@@ -816,7 +865,90 @@ func (d *FeatureViewFeatureDBDao) GetUserSequenceFeature(keys []interface{}, use
816865
continue
817866
}
818867
}
868+
dataBytes := kkv.ValueBytes()
869+
if len(dataBytes) < 2 {
870+
sequences = append(sequences, seq)
871+
continue
872+
}
873+
innerReader.Reset(dataBytes)
874+
// 读取版本号
875+
var protocalVersion, ifNullFlagVersion uint8
876+
binary.Read(innerReader, binary.LittleEndian, &protocalVersion)
877+
binary.Read(innerReader, binary.LittleEndian, &ifNullFlagVersion)
878+
readFeatureDBFunc_F_1 := func() (map[string]string, error) {
879+
properties := make(map[string]string)
880+
881+
for _, field := range d.fields {
882+
var isNull uint8
883+
if err := binary.Read(innerReader, binary.LittleEndian, &isNull); err != nil {
884+
if err == io.EOF {
885+
break
886+
}
887+
return nil, err
888+
}
889+
if isNull == 1 {
890+
// 跳过空值
891+
continue
892+
}
893+
894+
if _, exists := selectBehaviorFieldsSet[field]; exists {
895+
switch d.fieldTypeMap[field] {
896+
case constants.FS_INT32:
897+
var int32Value int32
898+
binary.Read(innerReader, binary.LittleEndian, &int32Value)
899+
properties[field] = fmt.Sprintf("%d", int32Value)
900+
case constants.FS_INT64:
901+
var int64Value int64
902+
binary.Read(innerReader, binary.LittleEndian, &int64Value)
903+
properties[field] = fmt.Sprintf("%d", int64Value)
904+
case constants.FS_FLOAT:
905+
var float32Value float32
906+
binary.Read(innerReader, binary.LittleEndian, &float32Value)
907+
properties[field] = fmt.Sprintf("%v", float32Value)
908+
case constants.FS_DOUBLE:
909+
var float64Value float64
910+
binary.Read(innerReader, binary.LittleEndian, &float64Value)
911+
properties[field] = fmt.Sprintf("%v", float64Value)
912+
case constants.FS_STRING:
913+
var length uint32
914+
binary.Read(innerReader, binary.LittleEndian, &length)
915+
strBytes := make([]byte, length)
916+
binary.Read(innerReader, binary.LittleEndian, &strBytes)
917+
properties[field] = string(strBytes)
918+
case constants.FS_BOOLEAN:
919+
var boolValue bool
920+
binary.Read(innerReader, binary.LittleEndian, &boolValue)
921+
properties[field] = fmt.Sprintf("%v", boolValue)
922+
default:
923+
var length uint32
924+
binary.Read(innerReader, binary.LittleEndian, &length)
925+
strBytes := make([]byte, length)
926+
binary.Read(innerReader, binary.LittleEndian, &strBytes)
927+
properties[field] = string(strBytes)
928+
}
929+
} else {
930+
skipBytes := CntSkipBytes(innerReader, d.fieldTypeMap[field])
931+
if skipBytes > 0 {
932+
if _, err := innerReader.Seek(int64(skipBytes), io.SeekCurrent); err != nil {
933+
return nil, err
934+
}
935+
}
936+
}
937+
}
938+
return properties, nil
939+
}
819940

941+
if protocalVersion == FeatureDB_Protocal_Version_F && ifNullFlagVersion == FeatureDB_IfNull_Flag_Version_1 {
942+
readResult, err := readFeatureDBFunc_F_1()
943+
if err != nil {
944+
errChan <- err
945+
return nil
946+
}
947+
seq.onlineBehaviourTableFieldsMap = readResult
948+
} else {
949+
errChan <- fmt.Errorf("unsupported protocal version: %d, ifNullFlagVersion: %d", protocalVersion, ifNullFlagVersion)
950+
continue
951+
}
820952
sequences = append(sequences, seq)
821953
}
822954
}
@@ -827,11 +959,6 @@ func (d *FeatureViewFeatureDBDao) GetUserSequenceFeature(keys []interface{}, use
827959
results := make([]map[string]interface{}, 0, len(keys))
828960
var outmu sync.Mutex
829961

830-
seqConfigsMap := make(map[string][]*api.SeqConfig)
831-
for _, seqConfig := range onlineConfig {
832-
mapKey := fmt.Sprintf("%s:%d", seqConfig.SeqEvent, seqConfig.SeqLen)
833-
seqConfigsMap[mapKey] = append(seqConfigsMap[mapKey], seqConfig)
834-
}
835962
var wg sync.WaitGroup
836963
for _, key := range keys {
837964
wg.Add(1)
@@ -841,10 +968,14 @@ func (d *FeatureViewFeatureDBDao) GetUserSequenceFeature(keys []interface{}, use
841968
var mu sync.Mutex
842969

843970
var eventWg sync.WaitGroup
844-
for _, seqConfigs := range seqConfigsMap {
971+
for k, seqConfigs := range seqConfigsMap {
845972
if len(seqConfigs) == 0 {
846973
continue
847974
}
975+
seqConfigsBehaviorFields := make([]map[string]struct{}, len(seqConfigs))
976+
if curFields, exits := seqConfigsBehaviorFieldsMap[k]; exits {
977+
seqConfigsBehaviorFields = curFields
978+
}
848979
eventWg.Add(1)
849980
go func(seqConfigs []*api.SeqConfig) {
850981
defer eventWg.Done()
@@ -854,7 +985,7 @@ func (d *FeatureViewFeatureDBDao) GetUserSequenceFeature(keys []interface{}, use
854985
// FeatureDB has processed the integration of online sequence features and offline sequence features
855986
// Here we put the results into onlineSequences
856987

857-
if onlineresult := fetchDataFunc(seqConfigs[0].SeqEvent, seqConfigs[0].SeqLen, key); onlineresult != nil {
988+
if onlineresult := fetchDataFunc(seqConfigs[0].SeqEvent, seqConfigs[0].SeqLen, key, seqConfigsBehaviorFields[0]); onlineresult != nil {
858989
onlineSequences = onlineresult
859990
}
860991

@@ -1081,28 +1212,7 @@ func (d *FeatureViewFeatureDBDao) GetUserBehaviorFeature(userIds []interface{},
10811212
properties[field] = string(strBytes)
10821213
}
10831214
} else {
1084-
var skipBytes int = 0
1085-
switch d.fieldTypeMap[field] {
1086-
case constants.FS_INT32:
1087-
skipBytes = 4
1088-
case constants.FS_INT64:
1089-
skipBytes = 8
1090-
case constants.FS_FLOAT:
1091-
skipBytes = 4
1092-
case constants.FS_DOUBLE:
1093-
skipBytes = 8
1094-
case constants.FS_STRING:
1095-
var length uint32
1096-
binary.Read(innerReader, binary.LittleEndian, &length)
1097-
skipBytes = int(length)
1098-
case constants.FS_BOOLEAN:
1099-
skipBytes = 1
1100-
default:
1101-
var length uint32
1102-
binary.Read(innerReader, binary.LittleEndian, &length)
1103-
skipBytes = int(length)
1104-
}
1105-
1215+
skipBytes := CntSkipBytes(innerReader, d.fieldTypeMap[field])
11061216
if skipBytes > 0 {
11071217
if _, err := innerReader.Seek(int64(skipBytes), io.SeekCurrent); err != nil {
11081218
return nil, err

dao/feature_view_hologres_dao.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -121,10 +121,11 @@ func (d *FeatureViewHologresDao) GetFeatures(keys []interface{}, selectFields []
121121
}
122122

123123
type sequenceInfo struct {
124-
itemId string
125-
event string
126-
playTime float64
127-
timestamp int64
124+
itemId string
125+
event string
126+
playTime float64
127+
timestamp int64
128+
onlineBehaviourTableFieldsMap map[string]string
128129
}
129130

130131
func (d *FeatureViewHologresDao) GetUserSequenceFeature(keys []interface{}, userIdField string, sequenceConfig api.FeatureViewSeqConfig, onlineConfig []*api.SeqConfig) ([]map[string]interface{}, error) {

domain/sequence_feature_view.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,13 +57,19 @@ func NewSequenceFeatureView(view *api.FeatureView, p *Project, entity *FeatureEn
5757
}
5858

5959
seen := make(map[string]bool)
60+
nameSeqConfigsMap := make(map[string]*api.SeqConfig)
6061
var uniqueSeqConfigs []*api.SeqConfig
6162
for _, seqConfig := range sequenceFeatureView.sequenceConfig.SeqConfig {
6263
if !seen[seqConfig.OnlineSeqName] {
63-
uniqueSeqConfigs = append(uniqueSeqConfigs, seqConfig)
64+
nameSeqConfigsMap[seqConfig.OnlineSeqName] = seqConfig
6465
seen[seqConfig.OnlineSeqName] = true
66+
} else if len(seqConfig.OnlineBehaviorTableFields) > 0 {
67+
nameSeqConfigsMap[seqConfig.OnlineSeqName].OnlineBehaviorTableFields = append(nameSeqConfigsMap[seqConfig.OnlineSeqName].OnlineBehaviorTableFields, seqConfig.OnlineBehaviorTableFields...)
6568
}
6669
}
70+
for _, seqConfig := range nameSeqConfigsMap {
71+
uniqueSeqConfigs = append(uniqueSeqConfigs, seqConfig)
72+
}
6773
sequenceFeatureView.sequenceConfig.SeqConfig = uniqueSeqConfigs
6874
}
6975

0 commit comments

Comments
 (0)