Skip to content

Commit e446787

Browse files
authored
Merge pull request #43 from aliyun/feature/fdb_pvl
feat: add FeatureDB vpc privatelink support
2 parents 20a0b79 + 3d19639 commit e446787

File tree

8 files changed

+251
-75
lines changed

8 files changed

+251
-75
lines changed

api/api_datasource.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ func (a *DatasourceApiService) DatasourceDatasourceIdGet(datasourceId int, holog
7474
datasource.VpcAddress = config["fdb_vpc_address"]
7575
datasource.PublicAddress = config["fdb_public_address"]
7676
datasource.Token = config["token"]
77+
datasource.FdbVpcAddress = config["fdb_vpc_plk_address"]
7778
}
7879
}
7980

@@ -82,7 +83,7 @@ func (a *DatasourceApiService) DatasourceDatasourceIdGet(datasourceId int, holog
8283
return localVarReturnValue, nil
8384
}
8485

85-
func (a *DatasourceApiService) GetFeatureDBDatasourceInfo(isTestMode bool, workspaceId string) (string, string, error) {
86+
func (a *DatasourceApiService) GetFeatureDBDatasourceInfo(isTestMode bool, workspaceId string) (string, string, string, error) {
8687

8788
featureDBType := "FeatureDB"
8889
request := paifeaturestore.ListDatasourcesRequest{
@@ -91,26 +92,26 @@ func (a *DatasourceApiService) GetFeatureDBDatasourceInfo(isTestMode bool, works
9192
}
9293
listDatasourcesResponse, err := a.client.ListDatasources(&a.client.instanceId, &request)
9394
if err != nil {
94-
return "", "", err
95+
return "", "", "", err
9596
}
9697

9798
for _, datasource := range listDatasourcesResponse.Body.Datasources {
9899
if _, err := strconv.Atoi(*datasource.DatasourceId); err == nil {
99100
response, err := a.client.GetDatasource(&a.client.instanceId, datasource.DatasourceId)
100101
if err != nil {
101-
return "", "", err
102+
return "", "", "", err
102103
}
103104
var config map[string]string
104105
if err := json.Unmarshal([]byte(*response.Body.Config), &config); err == nil {
105106
if isTestMode {
106-
return config["fdb_public_address"], config["token"], nil
107+
return config["fdb_public_address"], config["token"], "", nil
107108
} else {
108-
return config["fdb_vpc_address"], config["token"], nil
109+
return config["fdb_vpc_address"], config["token"], config["fdb_vpc_plk_address"], nil
109110
}
110111
}
111112
}
112113
}
113114

114-
return "", "", nil
115+
return "", "", "", nil
115116

116117
}

api/model_datasource.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ type Datasource struct {
1616
WorkspaceId string `json:"workspace_id"`
1717
VpcAddress string `json:"vpc_address,omitempty"`
1818
PublicAddress string `json:"public_address,omitempty"`
19+
FdbVpcAddress string `json:"fdb_vpc_address,omitempty"`
1920
Project string `json:"project,omitempty"`
2021
Database string `json:"database,omitempty"`
2122
Token string `json:"token,omitempty"`

api/model_project.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,9 @@ type Project struct {
1616
OfflineDataSource *Datasource `json:"offline_datasource,omitempty"`
1717
OnlineDataSource *Datasource `json:"online_datasource,omitempty"`
1818

19-
InstanceId string `json:"instance_id,omitempty"`
20-
Signature string `json:"-"`
21-
FeatureDBAddress string `json:"-"`
22-
FeatureDBToken string `json:"-"`
19+
InstanceId string `json:"instance_id,omitempty"`
20+
Signature string `json:"-"`
21+
FeatureDBAddress string `json:"-"`
22+
FeatureDBToken string `json:"-"`
23+
FeatureDBVpcAddress string `json:"-"`
2324
}

dao/feature_view_featuredb_dao.go

Lines changed: 87 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -43,12 +43,10 @@ func init() {
4343

4444
type FeatureViewFeatureDBDao struct {
4545
UnimplementedFeatureViewDao
46-
featureDBClient *http.Client
46+
featureDBClient *featuredb.FeatureDBClient
4747
database string
4848
schema string
4949
table string
50-
address string
51-
token string
5250
fieldTypeMap map[string]constants.FSType
5351
fields []string
5452
signature string
@@ -70,10 +68,7 @@ func NewFeatureViewFeatureDBDao(config DaoConfig) *FeatureViewFeatureDBDao {
7068
return nil
7169
}
7270

73-
dao.featureDBClient = client.Client
74-
75-
dao.address = client.Address
76-
dao.token = client.Token
71+
dao.featureDBClient = client
7772

7873
return &dao
7974
}
@@ -91,7 +86,7 @@ func (d *FeatureViewFeatureDBDao) GetFeatures(keys []interface{}, selectFields [
9186
if d.signature == "" {
9287
return result, errors.New("FeatureStore DB username and password are not entered, please enter them by adding client.LoginFeatureStoreDB(username, password)")
9388
}
94-
if d.address == "" || d.token == "" {
89+
if d.featureDBClient.GetCurrentAddress(false) == "" || d.featureDBClient.Token == "" {
9590
return result, errors.New("FeatureDB datasource has not been created")
9691
}
9792

@@ -110,23 +105,35 @@ func (d *FeatureViewFeatureDBDao) GetFeatures(keys []interface{}, selectFields [
110105
pkeys = append(pkeys, utils.ToString(k, ""))
111106
}
112107
body, _ := json.Marshal(map[string]any{"keys": pkeys})
108+
url := fmt.Sprintf("%s/api/v1/tables/%s/%s/%s/batch_get_kv2?batch_size=%d&encoder=", d.featureDBClient.GetCurrentAddress(false), d.database, d.schema, d.table, len(pkeys))
113109
requestBody := readerPool.Get().(*bytes.Reader)
114110
defer readerPool.Put(requestBody)
115111
requestBody.Reset(body)
116-
req, err := http.NewRequest("POST", fmt.Sprintf("%s/api/v1/tables/%s/%s/%s/batch_get_kv2?batch_size=%d&encoder=",
117-
d.address, d.database, d.schema, d.table, len(pkeys)), requestBody)
112+
req, err := http.NewRequest("POST", url, requestBody)
118113
if err != nil {
119114
errChan <- err
120115
return
121116
}
122117
req.Header.Set("Content-Type", "application/json")
123-
req.Header.Set("Authorization", d.token)
118+
req.Header.Set("Authorization", d.featureDBClient.Token)
124119
req.Header.Set("Auth", d.signature)
125120

126-
response, err := d.featureDBClient.Do(req)
121+
response, err := d.featureDBClient.Client.Do(req)
127122
if err != nil {
128-
errChan <- err
129-
return
123+
url = fmt.Sprintf("%s/api/v1/tables/%s/%s/%s/batch_get_kv2?batch_size=%d&encoder=", d.featureDBClient.GetCurrentAddress(true), d.database, d.schema, d.table, len(pkeys))
124+
req, err = http.NewRequest("POST", url, requestBody)
125+
if err != nil {
126+
errChan <- err
127+
return
128+
}
129+
req.Header.Set("Content-Type", "application/json")
130+
req.Header.Set("Authorization", d.featureDBClient.Token)
131+
req.Header.Set("Auth", d.signature)
132+
response, err = d.featureDBClient.Client.Do(req)
133+
if err != nil {
134+
errChan <- err
135+
return
136+
}
130137
}
131138
defer response.Body.Close() // 确保关闭response.Body
132139
// 检查状态码
@@ -708,20 +715,33 @@ func (d *FeatureViewFeatureDBDao) GetUserSequenceFeature(keys []interface{}, use
708715
Length: seqLen,
709716
}
710717
body, _ := json.Marshal(request)
711-
req, err := http.NewRequest("POST", fmt.Sprintf("%s/api/v1/tables/%s/%s/%s/batch_get_kkv",
712-
d.address, d.database, d.schema, d.table), bytes.NewReader(body))
718+
url := fmt.Sprintf("%s/api/v1/tables/%s/%s/%s/batch_get_kkv", d.featureDBClient.GetCurrentAddress(false), d.database, d.schema, d.table)
719+
req, err := http.NewRequest("POST", url, bytes.NewReader(body))
713720
if err != nil {
714721
errChan <- err
715722
return nil
716723
}
717724
req.Header.Set("Content-Type", "application/json")
718-
req.Header.Set("Authorization", d.token)
725+
req.Header.Set("Authorization", d.featureDBClient.Token)
719726
req.Header.Set("Auth", d.signature)
720727

721-
response, err := d.featureDBClient.Do(req)
728+
response, err := d.featureDBClient.Client.Do(req)
722729
if err != nil {
723-
errChan <- err
724-
return nil
730+
url = fmt.Sprintf("%s/api/v1/tables/%s/%s/%s/batch_get_kkv", d.featureDBClient.GetCurrentAddress(true), d.database, d.schema, d.table)
731+
req, err = http.NewRequest("POST", url, bytes.NewReader(body))
732+
if err != nil {
733+
errChan <- err
734+
return nil
735+
}
736+
req.Header.Set("Content-Type", "application/json")
737+
req.Header.Set("Authorization", d.featureDBClient.Token)
738+
req.Header.Set("Auth", d.signature)
739+
response, err = d.featureDBClient.Client.Do(req)
740+
741+
if err != nil {
742+
errChan <- err
743+
return nil
744+
}
725745
}
726746
defer response.Body.Close() // 确保关闭response.Body
727747
// 检查状态码
@@ -879,19 +899,32 @@ func (d *FeatureViewFeatureDBDao) GetUserBehaviorFeature(userIds []interface{},
879899
WithValue: true,
880900
}
881901
body, _ := json.Marshal(request)
882-
req, err := http.NewRequest("POST", fmt.Sprintf("%s/api/v1/tables/%s/%s/%s/scan_kkv",
883-
d.address, d.database, d.schema, d.table), bytes.NewReader(body))
902+
url := fmt.Sprintf("%s/api/v1/tables/%s/%s/%s/scan_kkv", d.featureDBClient.GetCurrentAddress(false), d.database, d.schema, d.table)
903+
req, err := http.NewRequest("POST", url, bytes.NewReader(body))
884904
if err != nil {
885905
errChan <- err
886906
return nil
887907
}
888908
req.Header.Set("Content-Type", "application/json")
889-
req.Header.Set("Authorization", d.token)
909+
req.Header.Set("Authorization", d.featureDBClient.Token)
890910
req.Header.Set("Auth", d.signature)
891-
response, err = d.featureDBClient.Do(req)
911+
912+
response, err = d.featureDBClient.Client.Do(req)
892913
if err != nil {
893-
errChan <- err
894-
return nil
914+
url = fmt.Sprintf("%s/api/v1/tables/%s/%s/%s/scan_kkv", d.featureDBClient.GetCurrentAddress(true), d.database, d.schema, d.table)
915+
req, err = http.NewRequest("POST", url, bytes.NewReader(body))
916+
if err != nil {
917+
errChan <- err
918+
return nil
919+
}
920+
req.Header.Set("Content-Type", "application/json")
921+
req.Header.Set("Authorization", d.featureDBClient.Token)
922+
req.Header.Set("Auth", d.signature)
923+
response, err = d.featureDBClient.Client.Do(req)
924+
if err != nil {
925+
errChan <- err
926+
return nil
927+
}
895928
}
896929
} else {
897930
pks := make([]string, 0, len(events))
@@ -903,19 +936,32 @@ func (d *FeatureViewFeatureDBDao) GetUserBehaviorFeature(userIds []interface{},
903936
WithValue: true,
904937
}
905938
body, _ := json.Marshal(request)
906-
req, err := http.NewRequest("POST", fmt.Sprintf("%s/api/v1/tables/%s/%s/%s/batch_get_kkv",
907-
d.address, d.database, d.schema, d.table), bytes.NewReader(body))
939+
url := fmt.Sprintf("%s/api/v1/tables/%s/%s/%s/batch_get_kkv", d.featureDBClient.GetCurrentAddress(false), d.database, d.schema, d.table)
940+
req, err := http.NewRequest("POST", url, bytes.NewReader(body))
908941
if err != nil {
909942
errChan <- err
910943
return nil
911944
}
912945
req.Header.Set("Content-Type", "application/json")
913-
req.Header.Set("Authorization", d.token)
946+
req.Header.Set("Authorization", d.featureDBClient.Token)
914947
req.Header.Set("Auth", d.signature)
915-
response, err = d.featureDBClient.Do(req)
948+
949+
response, err = d.featureDBClient.Client.Do(req)
916950
if err != nil {
917-
errChan <- err
918-
return nil
951+
url = fmt.Sprintf("%s/api/v1/tables/%s/%s/%s/batch_get_kkv", d.featureDBClient.GetCurrentAddress(true), d.database, d.schema, d.table)
952+
req, err = http.NewRequest("POST", url, bytes.NewReader(body))
953+
if err != nil {
954+
errChan <- err
955+
return nil
956+
}
957+
req.Header.Set("Content-Type", "application/json")
958+
req.Header.Set("Authorization", d.featureDBClient.Token)
959+
req.Header.Set("Auth", d.signature)
960+
response, err = d.featureDBClient.Client.Do(req)
961+
if err != nil {
962+
errChan <- err
963+
return nil
964+
}
919965
}
920966
}
921967

@@ -1120,14 +1166,14 @@ func (d *FeatureViewFeatureDBDao) RowCountIds(filterExpr string) ([]string, int,
11201166

11211167
alloc := memory.NewGoAllocator()
11221168
req, err := http.NewRequest("GET", fmt.Sprintf("%s/api/v1/tables/%s/%s/%s/snapshots/%s/scan",
1123-
d.address, d.database, d.schema, d.table, snapshotId), bytes.NewReader(nil))
1169+
d.featureDBClient.GetCurrentAddress(false), d.database, d.schema, d.table, snapshotId), bytes.NewReader(nil))
11241170
if err != nil {
11251171
return nil, 0, err
11261172
}
11271173
req.Header.Set("Content-Type", "application/json")
1128-
req.Header.Set("Authorization", d.token)
1174+
req.Header.Set("Authorization", d.featureDBClient.Token)
11291175
req.Header.Set("Auth", d.signature)
1130-
response, err := d.featureDBClient.Do(req)
1176+
response, err := d.featureDBClient.Client.Do(req)
11311177
if err != nil {
11321178
return nil, 0, err
11331179
}
@@ -1230,14 +1276,14 @@ func (d *FeatureViewFeatureDBDao) RowCountIds(filterExpr string) ([]string, int,
12301276

12311277
func (d *FeatureViewFeatureDBDao) createSnapshot() (string, int64, error) {
12321278
req, err := http.NewRequest("POST", fmt.Sprintf("%s/api/v1/tables/%s/%s/%s/snapshots",
1233-
d.address, d.database, d.schema, d.table), bytes.NewReader(nil))
1279+
d.featureDBClient.GetCurrentAddress(false), d.database, d.schema, d.table), bytes.NewReader(nil))
12341280
if err != nil {
12351281
return "", 0, err
12361282
}
12371283
req.Header.Set("Content-Type", "application/json")
1238-
req.Header.Set("Authorization", d.token)
1284+
req.Header.Set("Authorization", d.featureDBClient.Token)
12391285
req.Header.Set("Auth", d.signature)
1240-
response, err := d.featureDBClient.Do(req)
1286+
response, err := d.featureDBClient.Client.Do(req)
12411287
if err != nil {
12421288
return "", 0, err
12431289
}
@@ -1340,14 +1386,14 @@ func (d *FeatureViewFeatureDBDao) ScanAndIterateData(filter string, ch chan<- st
13401386
for {
13411387
time.Sleep(time.Second * 5)
13421388
req, err := http.NewRequest("GET", fmt.Sprintf("%s/api/v1/tables/%s/%s/%s/iterate_get_kv?ts=%d",
1343-
d.address, d.database, d.schema, d.table, ts), bytes.NewReader(nil))
1389+
d.featureDBClient.GetCurrentAddress(false), d.database, d.schema, d.table, ts), bytes.NewReader(nil))
13441390
if err != nil {
13451391
continue
13461392
}
13471393
req.Header.Set("Content-Type", "application/json")
1348-
req.Header.Set("Authorization", d.token)
1394+
req.Header.Set("Authorization", d.featureDBClient.Token)
13491395
req.Header.Set("Auth", d.signature)
1350-
response, err := d.featureDBClient.Do(req)
1396+
response, err := d.featureDBClient.Client.Do(req)
13511397
if err != nil {
13521398
continue
13531399
}

0 commit comments

Comments
 (0)