15
15
*/
16
16
package com .google .cloud .bigtable .hbase .wrappers .veneer ;
17
17
18
- import com .google .api .gax .core .BackgroundResource ;
19
18
import com .google .api .gax .core .CredentialsProvider ;
20
- import com .google .api .gax .core .FixedCredentialsProvider ;
21
- import com .google .api .gax .core .FixedExecutorProvider ;
19
+ import com .google .api .gax .core .NoCredentialsProvider ;
22
20
import com .google .api .gax .rpc .ClientContext ;
23
- import com .google .api .gax .rpc .FixedHeaderProvider ;
24
- import com .google .api .gax .rpc .FixedTransportChannelProvider ;
25
- import com .google .api .gax .rpc .FixedWatchdogProvider ;
26
21
import com .google .cloud .bigtable .data .v2 .BigtableDataClient ;
22
+ import com .google .cloud .bigtable .data .v2 .BigtableDataClientFactory ;
27
23
import com .google .cloud .bigtable .data .v2 .BigtableDataSettings ;
28
- import com .google .cloud .bigtable .data .v2 .BigtableDataSettings .Builder ;
29
24
import com .google .cloud .bigtable .data .v2 .stub .EnhancedBigtableStubSettings ;
30
25
import com .google .cloud .bigtable .hbase .wrappers .DataClientWrapper ;
31
26
import com .google .cloud .bigtable .metrics .BigtableClientMetrics ;
41
36
* <p>This class is meant to support channel pool caching feature.
42
37
*/
43
38
class SharedDataClientWrapperFactory {
44
- private final Map <Key , ClientContext > cachedContexts = new HashMap <>();
39
+ private final Map <Key , BigtableDataClientFactory > cachedContexts = new HashMap <>();
45
40
private final Map <Key , Integer > refCounts = new HashMap <>();
46
41
47
42
private final Map <Key , Integer > channelPoolSizes = new HashMap <>();
@@ -52,13 +47,14 @@ synchronized DataClientWrapper createDataClient(BigtableHBaseVeneerSettings sett
52
47
Key key = Key .createFromSettings (settings .getDataSettings ());
53
48
54
49
// Get or create ClientContext that will contained the shared resources
55
- ClientContext sharedCtx = cachedContexts .get (key );
50
+ BigtableDataClientFactory sharedCtx = cachedContexts .get (key );
51
+
56
52
if (sharedCtx == null ) {
57
- EnhancedBigtableStubSettings stubSettings = settings .getDataSettings ().getStubSettings ();
58
- sharedCtx = ClientContext .create (stubSettings );
53
+ sharedCtx = BigtableDataClientFactory .create (settings .getDataSettings ());
59
54
cachedContexts .put (key , sharedCtx );
60
55
refCounts .put (key , 0 );
61
- int channelPoolSize = BigtableVeneerApi .getChannelPoolSize (stubSettings );
56
+ int channelPoolSize =
57
+ BigtableVeneerApi .getChannelPoolSize (settings .getDataSettings ().getStubSettings ());
62
58
for (int i = 0 ; i < channelPoolSize ; i ++) {
63
59
BigtableClientMetrics .counter (MetricLevel .Info , "grpc.channel.active" ).inc ();
64
60
}
@@ -68,25 +64,17 @@ synchronized DataClientWrapper createDataClient(BigtableHBaseVeneerSettings sett
68
64
refCounts .put (key , refCounts .get (key ) + 1 );
69
65
70
66
try {
71
- // Patch settings to use shared resources
72
- Builder builder = settings .getDataSettings ().toBuilder ();
73
- builder
74
- .stubSettings ()
75
- .setRefreshingChannel (false )
76
- .setTransportChannelProvider (
77
- FixedTransportChannelProvider .create (sharedCtx .getTransportChannel ()))
78
- .setCredentialsProvider (FixedCredentialsProvider .create (sharedCtx .getCredentials ()))
79
- .setExecutorProvider (FixedExecutorProvider .create (sharedCtx .getExecutor ()))
80
- .setStreamWatchdogProvider (FixedWatchdogProvider .create (sharedCtx .getStreamWatchdog ()))
81
- .setHeaderProvider (FixedHeaderProvider .create (sharedCtx .getHeaders ()))
82
- .setClock (sharedCtx .getClock ());
83
-
84
- BigtableDataSettings data = builder .build ();
67
+ final BigtableDataClient client ;
68
+ if (settings .getAppProfileId () == null ) {
69
+ client = sharedCtx .createForInstance (settings .getProjectId (), settings .getInstanceId ());
70
+ } else {
71
+ client =
72
+ sharedCtx .createForInstance (
73
+ settings .getProjectId (), settings .getInstanceId (), settings .getAppProfileId ());
74
+ }
85
75
// Create a reference counted client wrapper
86
76
return new SharedDataClientWrapper (
87
- this ,
88
- key ,
89
- new DataClientVeneerApi (BigtableDataClient .create (data ), settings .getClientTimeouts ()));
77
+ this , key , new DataClientVeneerApi (client , settings .getClientTimeouts ()));
90
78
} catch (IOException | RuntimeException e ) {
91
79
release (key );
92
80
throw e ;
@@ -101,13 +89,15 @@ synchronized void release(Key key) {
101
89
}
102
90
103
91
refCounts .remove (key );
104
- ClientContext clientContext = cachedContexts .remove (key );
92
+ BigtableDataClientFactory clientContext = cachedContexts .remove (key );
105
93
for (int i = 0 ; i < channelPoolSizes .get (key ); i ++) {
106
94
BigtableClientMetrics .counter (MetricLevel .Info , "grpc.channel.active" ).dec ();
107
95
}
108
96
channelPoolSizes .remove (key );
109
- for (BackgroundResource resource : clientContext .getBackgroundResources ()) {
110
- resource .shutdown ();
97
+ try {
98
+ clientContext .close ();
99
+ } catch (Exception e ) {
100
+ throw new RuntimeException (e );
111
101
}
112
102
}
113
103
@@ -119,23 +109,24 @@ synchronized void release(Key key) {
119
109
* compatible with a ClientContext required by {@link BigtableDataSettings}.
120
110
*/
121
111
static final class Key {
112
+ private static final NoCredentialsProvider NO_CREDENTIALS_PROVIDER_INSTANCE =
113
+ NoCredentialsProvider .create ();
122
114
private final String endpoint ;
123
- private final Map <String , String > headers ;
124
115
private final CredentialsProvider credentialsProvider ;
125
116
126
117
static Key createFromSettings (BigtableDataSettings settings ) {
127
118
EnhancedBigtableStubSettings stubSettings = settings .getStubSettings ();
119
+ CredentialsProvider effectiveCredProvider = stubSettings .getCredentialsProvider ();
120
+ // NoCredentialsProvider doesnt implement equals, but all instances are equivalent
121
+ if (effectiveCredProvider instanceof NoCredentialsProvider ) {
122
+ effectiveCredProvider = NO_CREDENTIALS_PROVIDER_INSTANCE ;
123
+ }
128
124
129
- return new Key (
130
- stubSettings .getEndpoint (),
131
- stubSettings .getHeaderProvider ().getHeaders (),
132
- stubSettings .getCredentialsProvider ());
125
+ return new Key (stubSettings .getEndpoint (), effectiveCredProvider );
133
126
}
134
127
135
- private Key (
136
- String endpoint , Map <String , String > headers , CredentialsProvider credentialsProvider ) {
128
+ private Key (String endpoint , CredentialsProvider credentialsProvider ) {
137
129
this .endpoint = endpoint ;
138
- this .headers = headers ;
139
130
this .credentialsProvider = credentialsProvider ;
140
131
}
141
132
@@ -149,13 +140,12 @@ public boolean equals(Object o) {
149
140
}
150
141
Key key = (Key ) o ;
151
142
return Objects .equal (endpoint , key .endpoint )
152
- && Objects .equal (headers , key .headers )
153
143
&& Objects .equal (credentialsProvider , key .credentialsProvider );
154
144
}
155
145
156
146
@ Override
157
147
public int hashCode () {
158
- return Objects .hashCode (endpoint , headers , credentialsProvider );
148
+ return Objects .hashCode (endpoint , credentialsProvider );
159
149
}
160
150
}
161
151
}
0 commit comments