Skip to content

Commit 669f263

Browse files
committed
perf: remove mutex for fdb address management
1 parent 6a809e2 commit 669f263

File tree

3 files changed

+119
-237
lines changed

3 files changed

+119
-237
lines changed

dao/feature_view_featuredb_dao.go

Lines changed: 53 additions & 130 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ func (d *FeatureViewFeatureDBDao) GetFeatures(keys []interface{}, selectFields [
8686
if d.signature == "" {
8787
return result, errors.New("FeatureStore DB username and password are not entered, please enter them by adding client.LoginFeatureStoreDB(username, password)")
8888
}
89-
if d.featureDBClient.CurrentAddress == "" || d.featureDBClient.Token == "" {
89+
if d.featureDBClient.GetCurrentAddress(false) == "" || d.featureDBClient.Token == "" {
9090
return result, errors.New("FeatureDB datasource has not been created")
9191
}
9292

@@ -105,11 +105,7 @@ func (d *FeatureViewFeatureDBDao) GetFeatures(keys []interface{}, selectFields [
105105
pkeys = append(pkeys, utils.ToString(k, ""))
106106
}
107107
body, _ := json.Marshal(map[string]any{"keys": pkeys})
108-
d.featureDBClient.AddressMutex.RLock()
109-
currentAddress := d.featureDBClient.CurrentAddress
110-
useVpcAddress := d.featureDBClient.UseVpcAddress
111-
d.featureDBClient.AddressMutex.RUnlock()
112-
url := fmt.Sprintf("%s/api/v1/tables/%s/%s/%s/batch_get_kv2?batch_size=%d&encoder=", currentAddress, d.database, d.schema, d.table, len(pkeys))
108+
url := fmt.Sprintf("%s/api/v1/tables/%s/%s/%s/batch_get_kv2?batch_size=%d&encoder=", d.featureDBClient.GetCurrentAddress(false), d.database, d.schema, d.table, len(pkeys))
113109
requestBody := readerPool.Get().(*bytes.Reader)
114110
defer readerPool.Put(requestBody)
115111
requestBody.Reset(body)
@@ -119,33 +115,21 @@ func (d *FeatureViewFeatureDBDao) GetFeatures(keys []interface{}, selectFields [
119115
return
120116
}
121117
req.Header.Set("Content-Type", "application/json")
118+
req.Header.Set("Authorization", d.featureDBClient.Token)
122119
req.Header.Set("Auth", d.signature)
123-
if !useVpcAddress {
124-
req.Header.Set("Authorization", d.featureDBClient.Token)
125-
}
126120

127121
response, err := d.featureDBClient.Client.Do(req)
128122
if err != nil {
129-
if useVpcAddress {
130-
d.featureDBClient.CheckVpcAddress(3)
131-
d.featureDBClient.AddressMutex.RLock()
132-
currentAddress = d.featureDBClient.CurrentAddress
133-
useVpcAddress = d.featureDBClient.UseVpcAddress
134-
d.featureDBClient.AddressMutex.RUnlock()
135-
url = fmt.Sprintf("%s/api/v1/tables/%s/%s/%s/batch_get_kv2?batch_size=%d&encoder=", currentAddress, d.database, d.schema, d.table, len(pkeys))
136-
req, err = http.NewRequest("POST", url, requestBody)
137-
if err != nil {
138-
errChan <- err
139-
return
140-
}
141-
req.Header.Set("Content-Type", "application/json")
142-
if !useVpcAddress {
143-
req.Header.Set("Authorization", d.featureDBClient.Token)
144-
}
145-
req.Header.Set("Auth", d.signature)
146-
response, err = d.featureDBClient.Client.Do(req)
123+
url = fmt.Sprintf("%s/api/v1/tables/%s/%s/%s/batch_get_kv2?batch_size=%d&encoder=", d.featureDBClient.GetCurrentAddress(true), d.database, d.schema, d.table, len(pkeys))
124+
req, err = http.NewRequest("POST", url, requestBody)
125+
if err != nil {
126+
errChan <- err
127+
return
147128
}
148-
129+
req.Header.Set("Content-Type", "application/json")
130+
req.Header.Set("Authorization", d.featureDBClient.Token)
131+
req.Header.Set("Auth", d.signature)
132+
response, err = d.featureDBClient.Client.Do(req)
149133
if err != nil {
150134
errChan <- err
151135
return
@@ -731,43 +715,28 @@ func (d *FeatureViewFeatureDBDao) GetUserSequenceFeature(keys []interface{}, use
731715
Length: seqLen,
732716
}
733717
body, _ := json.Marshal(request)
734-
d.featureDBClient.AddressMutex.RLock()
735-
currentAddress := d.featureDBClient.CurrentAddress
736-
useVpcAddress := d.featureDBClient.UseVpcAddress
737-
d.featureDBClient.AddressMutex.RUnlock()
738-
url := fmt.Sprintf("%s/api/v1/tables/%s/%s/%s/batch_get_kkv", currentAddress, d.database, d.schema, d.table)
718+
url := fmt.Sprintf("%s/api/v1/tables/%s/%s/%s/batch_get_kkv", d.featureDBClient.GetCurrentAddress(false), d.database, d.schema, d.table)
739719
req, err := http.NewRequest("POST", url, bytes.NewReader(body))
740720
if err != nil {
741721
errChan <- err
742722
return nil
743723
}
744724
req.Header.Set("Content-Type", "application/json")
725+
req.Header.Set("Authorization", d.featureDBClient.Token)
745726
req.Header.Set("Auth", d.signature)
746-
if !useVpcAddress {
747-
req.Header.Set("Authorization", d.featureDBClient.Token)
748-
}
749727

750728
response, err := d.featureDBClient.Client.Do(req)
751729
if err != nil {
752-
if useVpcAddress {
753-
d.featureDBClient.CheckVpcAddress(3)
754-
d.featureDBClient.AddressMutex.RLock()
755-
currentAddress = d.featureDBClient.CurrentAddress
756-
useVpcAddress = d.featureDBClient.UseVpcAddress
757-
d.featureDBClient.AddressMutex.RUnlock()
758-
url = fmt.Sprintf("%s/api/v1/tables/%s/%s/%s/batch_get_kkv", currentAddress, d.database, d.schema, d.table)
759-
req, err = http.NewRequest("POST", url, bytes.NewReader(body))
760-
if err != nil {
761-
errChan <- err
762-
return nil
763-
}
764-
req.Header.Set("Content-Type", "application/json")
765-
req.Header.Set("Auth", d.signature)
766-
if !useVpcAddress {
767-
req.Header.Set("Authorization", d.featureDBClient.Token)
768-
}
769-
response, err = d.featureDBClient.Client.Do(req)
730+
url = fmt.Sprintf("%s/api/v1/tables/%s/%s/%s/batch_get_kkv", d.featureDBClient.GetCurrentAddress(true), d.database, d.schema, d.table)
731+
req, err = http.NewRequest("POST", url, bytes.NewReader(body))
732+
if err != nil {
733+
errChan <- err
734+
return nil
770735
}
736+
req.Header.Set("Content-Type", "application/json")
737+
req.Header.Set("Authorization", d.featureDBClient.Token)
738+
req.Header.Set("Auth", d.signature)
739+
response, err = d.featureDBClient.Client.Do(req)
771740

772741
if err != nil {
773742
errChan <- err
@@ -930,42 +899,28 @@ func (d *FeatureViewFeatureDBDao) GetUserBehaviorFeature(userIds []interface{},
930899
WithValue: true,
931900
}
932901
body, _ := json.Marshal(request)
933-
d.featureDBClient.AddressMutex.RLock()
934-
currentAddress := d.featureDBClient.CurrentAddress
935-
useVpcAddress := d.featureDBClient.UseVpcAddress
936-
d.featureDBClient.AddressMutex.RUnlock()
937-
url := fmt.Sprintf("%s/api/v1/tables/%s/%s/%s/scan_kkv", currentAddress, d.database, d.schema, d.table)
902+
url := fmt.Sprintf("%s/api/v1/tables/%s/%s/%s/scan_kkv", d.featureDBClient.GetCurrentAddress(false), d.database, d.schema, d.table)
938903
req, err := http.NewRequest("POST", url, bytes.NewReader(body))
939904
if err != nil {
940905
errChan <- err
941906
return nil
942907
}
943908
req.Header.Set("Content-Type", "application/json")
909+
req.Header.Set("Authorization", d.featureDBClient.Token)
944910
req.Header.Set("Auth", d.signature)
945-
if !useVpcAddress {
946-
req.Header.Set("Authorization", d.featureDBClient.Token)
947-
}
911+
948912
response, err = d.featureDBClient.Client.Do(req)
949913
if err != nil {
950-
if useVpcAddress {
951-
d.featureDBClient.CheckVpcAddress(3)
952-
d.featureDBClient.AddressMutex.RLock()
953-
currentAddress = d.featureDBClient.CurrentAddress
954-
useVpcAddress = d.featureDBClient.UseVpcAddress
955-
d.featureDBClient.AddressMutex.RUnlock()
956-
url = fmt.Sprintf("%s/api/v1/tables/%s/%s/%s/scan_kkv", currentAddress, d.database, d.schema, d.table)
957-
req, err = http.NewRequest("POST", url, bytes.NewReader(body))
958-
if err != nil {
959-
errChan <- err
960-
return nil
961-
}
962-
req.Header.Set("Content-Type", "application/json")
963-
req.Header.Set("Auth", d.signature)
964-
if !useVpcAddress {
965-
req.Header.Set("Authorization", d.featureDBClient.Token)
966-
}
967-
response, err = d.featureDBClient.Client.Do(req)
914+
url = fmt.Sprintf("%s/api/v1/tables/%s/%s/%s/scan_kkv", d.featureDBClient.GetCurrentAddress(true), d.database, d.schema, d.table)
915+
req, err = http.NewRequest("POST", url, bytes.NewReader(body))
916+
if err != nil {
917+
errChan <- err
918+
return nil
968919
}
920+
req.Header.Set("Content-Type", "application/json")
921+
req.Header.Set("Authorization", d.featureDBClient.Token)
922+
req.Header.Set("Auth", d.signature)
923+
response, err = d.featureDBClient.Client.Do(req)
969924
if err != nil {
970925
errChan <- err
971926
return nil
@@ -981,42 +936,28 @@ func (d *FeatureViewFeatureDBDao) GetUserBehaviorFeature(userIds []interface{},
981936
WithValue: true,
982937
}
983938
body, _ := json.Marshal(request)
984-
d.featureDBClient.AddressMutex.RLock()
985-
currentAddress := d.featureDBClient.CurrentAddress
986-
useVpcAddress := d.featureDBClient.UseVpcAddress
987-
d.featureDBClient.AddressMutex.RUnlock()
988-
url := fmt.Sprintf("%s/api/v1/tables/%s/%s/%s/batch_get_kkv", currentAddress, d.database, d.schema, d.table)
939+
url := fmt.Sprintf("%s/api/v1/tables/%s/%s/%s/batch_get_kkv", d.featureDBClient.GetCurrentAddress(false), d.database, d.schema, d.table)
989940
req, err := http.NewRequest("POST", url, bytes.NewReader(body))
990941
if err != nil {
991942
errChan <- err
992943
return nil
993944
}
994945
req.Header.Set("Content-Type", "application/json")
946+
req.Header.Set("Authorization", d.featureDBClient.Token)
995947
req.Header.Set("Auth", d.signature)
996-
if !useVpcAddress {
997-
req.Header.Set("Authorization", d.featureDBClient.Token)
998-
}
948+
999949
response, err = d.featureDBClient.Client.Do(req)
1000950
if err != nil {
1001-
if useVpcAddress {
1002-
d.featureDBClient.CheckVpcAddress(3)
1003-
d.featureDBClient.AddressMutex.RLock()
1004-
currentAddress = d.featureDBClient.CurrentAddress
1005-
useVpcAddress = d.featureDBClient.UseVpcAddress
1006-
d.featureDBClient.AddressMutex.RUnlock()
1007-
url = fmt.Sprintf("%s/api/v1/tables/%s/%s/%s/batch_get_kkv", currentAddress, d.database, d.schema, d.table)
1008-
req, err = http.NewRequest("POST", url, bytes.NewReader(body))
1009-
if err != nil {
1010-
errChan <- err
1011-
return nil
1012-
}
1013-
req.Header.Set("Content-Type", "application/json")
1014-
req.Header.Set("Auth", d.signature)
1015-
if !useVpcAddress {
1016-
req.Header.Set("Authorization", d.featureDBClient.Token)
1017-
}
1018-
response, err = d.featureDBClient.Client.Do(req)
951+
url = fmt.Sprintf("%s/api/v1/tables/%s/%s/%s/batch_get_kkv", d.featureDBClient.GetCurrentAddress(true), d.database, d.schema, d.table)
952+
req, err = http.NewRequest("POST", url, bytes.NewReader(body))
953+
if err != nil {
954+
errChan <- err
955+
return nil
1019956
}
957+
req.Header.Set("Content-Type", "application/json")
958+
req.Header.Set("Authorization", d.featureDBClient.Token)
959+
req.Header.Set("Auth", d.signature)
960+
response, err = d.featureDBClient.Client.Do(req)
1020961
if err != nil {
1021962
errChan <- err
1022963
return nil
@@ -1224,19 +1165,13 @@ func (d *FeatureViewFeatureDBDao) RowCountIds(filterExpr string) ([]string, int,
12241165
}
12251166

12261167
alloc := memory.NewGoAllocator()
1227-
d.featureDBClient.AddressMutex.RLock()
1228-
currentAddress := d.featureDBClient.CurrentAddress
1229-
useVpcAddress := d.featureDBClient.UseVpcAddress
1230-
d.featureDBClient.AddressMutex.RUnlock()
12311168
req, err := http.NewRequest("GET", fmt.Sprintf("%s/api/v1/tables/%s/%s/%s/snapshots/%s/scan",
1232-
currentAddress, d.database, d.schema, d.table, snapshotId), bytes.NewReader(nil))
1169+
d.featureDBClient.GetCurrentAddress(false), d.database, d.schema, d.table, snapshotId), bytes.NewReader(nil))
12331170
if err != nil {
12341171
return nil, 0, err
12351172
}
12361173
req.Header.Set("Content-Type", "application/json")
1237-
if !useVpcAddress {
1238-
req.Header.Set("Authorization", d.featureDBClient.Token)
1239-
}
1174+
req.Header.Set("Authorization", d.featureDBClient.Token)
12401175
req.Header.Set("Auth", d.signature)
12411176
response, err := d.featureDBClient.Client.Do(req)
12421177
if err != nil {
@@ -1340,19 +1275,13 @@ func (d *FeatureViewFeatureDBDao) RowCountIds(filterExpr string) ([]string, int,
13401275
}
13411276

13421277
func (d *FeatureViewFeatureDBDao) createSnapshot() (string, int64, error) {
1343-
d.featureDBClient.AddressMutex.RLock()
1344-
currentAddress := d.featureDBClient.CurrentAddress
1345-
useVpcAddress := d.featureDBClient.UseVpcAddress
1346-
d.featureDBClient.AddressMutex.RUnlock()
13471278
req, err := http.NewRequest("POST", fmt.Sprintf("%s/api/v1/tables/%s/%s/%s/snapshots",
1348-
currentAddress, d.database, d.schema, d.table), bytes.NewReader(nil))
1279+
d.featureDBClient.GetCurrentAddress(false), d.database, d.schema, d.table), bytes.NewReader(nil))
13491280
if err != nil {
13501281
return "", 0, err
13511282
}
13521283
req.Header.Set("Content-Type", "application/json")
1353-
if !useVpcAddress {
1354-
req.Header.Set("Authorization", d.featureDBClient.Token)
1355-
}
1284+
req.Header.Set("Authorization", d.featureDBClient.Token)
13561285
req.Header.Set("Auth", d.signature)
13571286
response, err := d.featureDBClient.Client.Do(req)
13581287
if err != nil {
@@ -1456,19 +1385,13 @@ func (d *FeatureViewFeatureDBDao) ScanAndIterateData(filter string, ch chan<- st
14561385
alloc := memory.NewGoAllocator()
14571386
for {
14581387
time.Sleep(time.Second * 5)
1459-
d.featureDBClient.AddressMutex.RLock()
1460-
currentAddress := d.featureDBClient.CurrentAddress
1461-
useVpcAddress := d.featureDBClient.UseVpcAddress
1462-
d.featureDBClient.AddressMutex.RUnlock()
14631388
req, err := http.NewRequest("GET", fmt.Sprintf("%s/api/v1/tables/%s/%s/%s/iterate_get_kv?ts=%d",
1464-
currentAddress, d.database, d.schema, d.table, ts), bytes.NewReader(nil))
1389+
d.featureDBClient.GetCurrentAddress(false), d.database, d.schema, d.table, ts), bytes.NewReader(nil))
14651390
if err != nil {
14661391
continue
14671392
}
14681393
req.Header.Set("Content-Type", "application/json")
1469-
if !useVpcAddress {
1470-
req.Header.Set("Authorization", d.featureDBClient.Token)
1471-
}
1394+
req.Header.Set("Authorization", d.featureDBClient.Token)
14721395
req.Header.Set("Auth", d.signature)
14731396
response, err := d.featureDBClient.Client.Do(req)
14741397
if err != nil {

0 commit comments

Comments
 (0)