@@ -24,6 +24,7 @@ import (
2424 "github.com/aliyun/aliyun-pai-featurestore-go-sdk/v2/utils"
2525 "github.com/expr-lang/expr"
2626 "github.com/expr-lang/expr/vm"
27+ flatbuffers "github.com/google/flatbuffers/go"
2728)
2829
2930const (
@@ -703,13 +704,170 @@ type FeatureDBBatchGetKKVRequest struct {
703704 WithValue bool `json:"with_value"`
704705}
705706
707+ type IndexPair struct {
708+ index int
709+ fsType constants.FSType
710+ }
711+
712+
713+ func readValueFromKKVData (data * fdbserverfb.KKVData , offset int , fsType constants.FSType ) (interface {}, error ) {
714+
715+ tab := data .Table ()
716+
717+ // ValueInt32 - 读取 int32 值
718+ valueInt32 := func (i int ) int32 {
719+ o := flatbuffers .UOffsetT (tab .Offset (12 ))
720+ if o != 0 && i + 4 <= data .ValueLength () {
721+ a := tab .Vector (o )
722+ return tab .GetInt32 (a + flatbuffers .UOffsetT (i ))
723+ }
724+ return 0
725+ }
726+
727+ // ValueInt64 - 读取 int64 值
728+ valueInt64 := func (i int ) int64 {
729+ o := flatbuffers .UOffsetT (tab .Offset (12 ))
730+ if o != 0 && i + 8 <= data .ValueLength () {
731+ a := tab .Vector (o )
732+ return tab .GetInt64 (a + flatbuffers .UOffsetT (i ))
733+ }
734+ return 0
735+ }
736+
737+ // ValueFloat32 - 读取 float32 值
738+ valueFloat32 := func (i int ) float32 {
739+ o := flatbuffers .UOffsetT (tab .Offset (12 ))
740+ if o != 0 && i + 4 <= data .ValueLength () {
741+ a := tab .Vector (o )
742+ return tab .GetFloat32 (a + flatbuffers .UOffsetT (i ))
743+ }
744+ return 0.0
745+ }
746+
747+ // ValueFloat64 - 读取 float64 值
748+ valueFloat64 := func (i int ) float64 {
749+ o := flatbuffers .UOffsetT (tab .Offset (12 ))
750+ if o != 0 && i + 8 <= data .ValueLength () {
751+ a := tab .Vector (o )
752+ return tab .GetFloat64 (a + flatbuffers .UOffsetT (i ))
753+ }
754+ return 0.0
755+ }
756+
757+ // ValueBool - 读取 bool 值
758+ valueBool := func (i int ) bool {
759+ o := flatbuffers .UOffsetT (tab .Offset (12 ))
760+ if o != 0 && i + 1 <= data .ValueLength () {
761+ a := tab .Vector (o )
762+ return tab .GetBool (a + flatbuffers .UOffsetT (i ))
763+ }
764+ return false
765+ }
766+
767+ // ValueString - 读取 string 值
768+ valueString := func (i int ) string {
769+ o := flatbuffers .UOffsetT (tab .Offset (12 ))
770+ if o != 0 && i + 4 <= data .ValueLength () {
771+ a := tab .Vector (o )
772+ // 读取长度(uint32,4字节)
773+ length := tab .GetUint32 (a + flatbuffers .UOffsetT (i ))
774+
775+ // 检查是否有足够的字节用于字符串数据
776+ if i + 4 + int (length ) <= data .ValueLength () {
777+ strBytes := make ([]byte , length )
778+ for j := 0 ; j < int (length ); j ++ {
779+ strBytes [j ] = tab .GetByte (a + flatbuffers .UOffsetT (i + 4 + j ))
780+ }
781+ return string (strBytes )
782+ }
783+ }
784+ return ""
785+ }
786+
787+ switch fsType {
788+ case constants .FS_INT32 :
789+ if offset + 4 <= data .ValueLength () {
790+ value := valueInt32 (offset )
791+ return value , nil
792+ }
793+ return int32 (0 ), fmt .Errorf ("insufficient bytes for int32" )
794+
795+ case constants .FS_INT64 :
796+ if offset + 8 <= data .ValueLength () {
797+ value := valueInt64 (offset )
798+ return value , nil
799+ }
800+ return int64 (0 ), fmt .Errorf ("insufficient bytes for int64" )
801+
802+ case constants .FS_FLOAT :
803+ if offset + 4 <= data .ValueLength () {
804+ value := valueFloat32 (offset )
805+ return value , nil
806+ }
807+ return float32 (0 ), fmt .Errorf ("insufficient bytes for float32" )
808+
809+ case constants .FS_DOUBLE :
810+ if offset + 8 <= data .ValueLength () {
811+ value := valueFloat64 (offset )
812+ return value , nil
813+ }
814+ return float64 (0 ), fmt .Errorf ("insufficient bytes for float64" )
815+
816+ case constants .FS_BOOLEAN :
817+ if offset + 1 <= data .ValueLength () {
818+ value := valueBool (offset )
819+ return value , nil
820+ }
821+ return false , fmt .Errorf ("insufficient bytes for boolean" )
822+
823+ case constants .FS_STRING :
824+ if offset + 4 <= data .ValueLength () {
825+ value := valueString (offset )
826+ return value , nil
827+ }
828+ return "" , fmt .Errorf ("insufficient bytes for string" )
829+
830+ default :
831+ // 对于未知类型,尝试按字符串处理
832+ if offset + 4 <= data .ValueLength () {
833+ value := valueString (offset )
834+ return value , nil
835+ }
836+ return "" , fmt .Errorf ("unknown type or insufficient bytes" )
837+ }
838+ }
839+
706840func (d * FeatureViewFeatureDBDao ) GetUserSequenceFeature (keys []interface {}, userIdField string , sequenceConfig api.FeatureViewSeqConfig , onlineConfig []* api.SeqConfig ) ([]map [string ]interface {}, error ) {
707841 currTime := time .Now ().Unix ()
708842 sequencePlayTimeMap := makePlayTimeMap (sequenceConfig .PlayTimeFilter )
709843
844+ seqConfigsMap := make (map [string ][]* api.SeqConfig )
845+
846+ featureViewFieldsIndexMap := make (map [string ]int )
847+ featureViewFieldsTypeMap := make (map [string ]constants.FSType )
848+
849+ seqConfigsBehaviorFieldsIndexTypeMap := make (map [string ][][]IndexPair )
850+
851+ for i , field := range d .fields {
852+ featureViewFieldsIndexMap [field ] = i
853+ }
854+ for field , curType := range d .fieldTypeMap {
855+ featureViewFieldsTypeMap [field ] = curType
856+ }
857+ for _ , seqConfig := range onlineConfig {
858+ mapKey := fmt .Sprintf ("%s:%d" , seqConfig .SeqEvent , seqConfig .SeqLen )
859+ seqConfigsMap [mapKey ] = append (seqConfigsMap [mapKey ], seqConfig )
860+ curBehaviorFields := strings .Split (seqConfig .OnlineBehaviorTableFields , "\u0001 " )
861+ curBehaviorFieldsIndexTypeList := make ([]IndexPair , len (curBehaviorFields ))
862+ for _ , field := range curBehaviorFields {
863+ curBehaviorFieldsIndexTypeList = append (curBehaviorFieldsIndexTypeList , IndexPair {index : featureViewFieldsIndexMap [field ], fsType : featureViewFieldsTypeMap [field ]})
864+ }
865+ seqConfigsBehaviorFieldsIndexTypeMap [mapKey ] = append (seqConfigsBehaviorFieldsIndexTypeMap [mapKey ], curBehaviorFieldsIndexTypeList )
866+ }
867+
710868 errChan := make (chan error , len (keys )* len (onlineConfig ))
711869
712- fetchDataFunc := func (seqEvent string , seqLen int , key interface {}) []* sequenceInfo {
870+ fetchDataFunc := func (seqEvent string , seqLen int , key interface {}, seqConfigsBehaviorFields [] IndexPair ) []* sequenceInfo {
713871 sequences := []* sequenceInfo {}
714872
715873 events := strings .Split (seqEvent , "|" )
@@ -768,6 +926,8 @@ func (d *FeatureViewFeatureDBDao) GetUserSequenceFeature(keys []interface{}, use
768926 }
769927
770928 reader := bufio .NewReader (response .Body )
929+ innerReader := readerPool .Get ().(* bytes.Reader )
930+ defer readerPool .Put (innerReader )
771931 for {
772932 buf , err := deserialize (reader )
773933 if err == io .EOF {
@@ -808,6 +968,8 @@ func (d *FeatureViewFeatureDBDao) GetUserSequenceFeature(keys []interface{}, use
808968 seq .timestamp = kkv .EventTimestamp ()
809969 seq .playTime = kkv .PlayTime ()
810970
971+ seq .onlineBehaviourTableFieldsMap = make (map [string ]string )
972+
811973 if seq .event == "" || seq .itemId == "" {
812974 continue
813975 }
@@ -817,6 +979,19 @@ func (d *FeatureViewFeatureDBDao) GetUserSequenceFeature(keys []interface{}, use
817979 }
818980 }
819981
982+ for _ , field := range seqConfigsBehaviorFields {
983+ // 将值转换为字符串存储在 seq.onlineBehaviourTableFieldsMap 中
984+ if d .fields != nil && field .index < len (d .fields ) {
985+ fieldName := d .fields [field .index ]
986+ rawValue , err := readValueFromKKVData (kkv , field .index , field .fsType )
987+ value := utils .ToString (rawValue , "" )
988+ if err != nil {
989+ value = ""
990+ }
991+ seq .onlineBehaviourTableFieldsMap [fieldName ] = value
992+ }
993+
994+ }
820995 sequences = append (sequences , seq )
821996 }
822997 }
@@ -827,11 +1002,6 @@ func (d *FeatureViewFeatureDBDao) GetUserSequenceFeature(keys []interface{}, use
8271002 results := make ([]map [string ]interface {}, 0 , len (keys ))
8281003 var outmu sync.Mutex
8291004
830- seqConfigsMap := make (map [string ][]* api.SeqConfig )
831- for _ , seqConfig := range onlineConfig {
832- mapKey := fmt .Sprintf ("%s:%d" , seqConfig .SeqEvent , seqConfig .SeqLen )
833- seqConfigsMap [mapKey ] = append (seqConfigsMap [mapKey ], seqConfig )
834- }
8351005 var wg sync.WaitGroup
8361006 for _ , key := range keys {
8371007 wg .Add (1 )
@@ -841,10 +1011,14 @@ func (d *FeatureViewFeatureDBDao) GetUserSequenceFeature(keys []interface{}, use
8411011 var mu sync.Mutex
8421012
8431013 var eventWg sync.WaitGroup
844- for _ , seqConfigs := range seqConfigsMap {
1014+ for k , seqConfigs := range seqConfigsMap {
8451015 if len (seqConfigs ) == 0 {
8461016 continue
8471017 }
1018+ seqConfigsBehaviorFields := make ([][]IndexPair , len (seqConfigs ))
1019+ if curFields , exits := seqConfigsBehaviorFieldsIndexTypeMap [k ]; exits {
1020+ seqConfigsBehaviorFields = curFields
1021+ }
8481022 eventWg .Add (1 )
8491023 go func (seqConfigs []* api.SeqConfig ) {
8501024 defer eventWg .Done ()
@@ -854,7 +1028,7 @@ func (d *FeatureViewFeatureDBDao) GetUserSequenceFeature(keys []interface{}, use
8541028 // FeatureDB has processed the integration of online sequence features and offline sequence features
8551029 // Here we put the results into onlineSequences
8561030
857- if onlineresult := fetchDataFunc (seqConfigs [0 ].SeqEvent , seqConfigs [0 ].SeqLen , key ); onlineresult != nil {
1031+ if onlineresult := fetchDataFunc (seqConfigs [0 ].SeqEvent , seqConfigs [0 ].SeqLen , key , seqConfigsBehaviorFields [ 0 ] ); onlineresult != nil {
8581032 onlineSequences = onlineresult
8591033 }
8601034
0 commit comments