Skip to content

Commit 88ea37b

Browse files
committed
Update ChangeStreamDao to query different TVF for postgresSQL based on
the change stream partition mode For MUTABLE_KEY_RANGE change stream, use read_proto_bytes_, else use read_json_
1 parent 8481373 commit 88ea37b

File tree

2 files changed

+320
-2
lines changed

2 files changed

+320
-2
lines changed

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/ChangeStreamDao.java

Lines changed: 102 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,22 +22,38 @@
2222
import com.google.cloud.spanner.Dialect;
2323
import com.google.cloud.spanner.Options;
2424
import com.google.cloud.spanner.Options.RpcPriority;
25+
import com.google.cloud.spanner.ReadOnlyTransaction;
2526
import com.google.cloud.spanner.ResultSet;
2627
import com.google.cloud.spanner.Statement;
2728
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.InitialPartition;
29+
import org.slf4j.Logger;
30+
import org.slf4j.LoggerFactory;
2831

2932
/**
3033
* Responsible for making change stream queries for a given partition. The result will be given back
3134
* as a {@link ResultSet}, which can be consumed until the stream is finished.
3235
*/
3336
public class ChangeStreamDao {
3437

38+
private static final Logger LOG = LoggerFactory.getLogger(ChangeStreamDao.class);
39+
40+
// new enum for partition mode
41+
private enum PartitionMode {
42+
UNKNOWN,
43+
MUTABLE_KEY_RANGE,
44+
IMMUTABLE_KEY_RANGE
45+
}
46+
3547
private final String changeStreamName;
3648
private final DatabaseClient databaseClient;
3749
private final RpcPriority rpcPriority;
3850
private final String jobName;
3951
private final Dialect dialect;
4052

53+
// Always non-null to satisfy nullness checker.
54+
// Start UNKNOWN until we fetch and cache the real mode.
55+
private volatile PartitionMode partitionMode = PartitionMode.UNKNOWN;
56+
4157
/**
4258
* Constructs a change stream dao. All the queries performed by this class will be for the given
4359
* change stream name with the specified rpc priority. The job name will be used to tag all the
@@ -91,8 +107,20 @@ public ChangeStreamResultSet changeStreamQuery(
91107
String query = "";
92108
Statement statement;
93109
if (this.isPostgres()) {
94-
query =
95-
"SELECT * FROM \"spanner\".\"read_json_" + changeStreamName + "\"($1, $2, $3, $4, null)";
110+
// Ensure we have determined whether change stream uses mutable key range
111+
boolean isMutable = isMutableKeyRangeChangeStream();
112+
113+
if (isMutable) {
114+
query =
115+
"SELECT * FROM \"spanner\".\"read_proto_bytes_"
116+
+ changeStreamName
117+
+ "\"($1, $2, $3, $4, null)";
118+
} else {
119+
query =
120+
"SELECT * FROM \"spanner\".\"read_json_"
121+
+ changeStreamName
122+
+ "\"($1, $2, $3, $4, null)";
123+
}
96124
statement =
97125
Statement.newBuilder(query)
98126
.bind("p1")
@@ -138,4 +166,76 @@ public ChangeStreamResultSet changeStreamQuery(
138166
private boolean isPostgres() {
139167
return this.dialect == Dialect.POSTGRESQL;
140168
}
169+
170+
// Returns the PartitionMode, fetching from Spanner on first call and caching.
171+
protected PartitionMode getPartitionMode() {
172+
if (this.partitionMode != PartitionMode.UNKNOWN) {
173+
return this.partitionMode;
174+
}
175+
synchronized (this) {
176+
if (this.partitionMode == PartitionMode.UNKNOWN) {
177+
String fetchedPartitionMode =
178+
fetchPartitionMode(this.databaseClient, this.dialect, this.changeStreamName);
179+
if (fetchedPartitionMode.isEmpty()
180+
|| fetchedPartitionMode.equalsIgnoreCase("IMMUTABLE_KEY_RANGE")) {
181+
this.partitionMode = PartitionMode.IMMUTABLE_KEY_RANGE;
182+
} else {
183+
this.partitionMode = PartitionMode.MUTABLE_KEY_RANGE;
184+
}
185+
}
186+
}
187+
return this.partitionMode;
188+
}
189+
190+
// Convenience boolean method kept for compatibility
191+
protected boolean isMutableKeyRangeChangeStream() {
192+
return getPartitionMode() == PartitionMode.MUTABLE_KEY_RANGE;
193+
}
194+
195+
// Returns the partition_mode option value for the given change stream.
196+
private static String fetchPartitionMode(
197+
DatabaseClient databaseClient, Dialect dialect, String changeStreamName) {
198+
try (ReadOnlyTransaction tx = databaseClient.readOnlyTransaction()) {
199+
Statement statement;
200+
if (dialect == Dialect.POSTGRESQL) {
201+
statement =
202+
Statement.newBuilder(
203+
"select option_name, option_value\n"
204+
+ "from information_schema.change_stream_options\n"
205+
+ "where change_stream_name = $1")
206+
.bind("p1")
207+
.to(changeStreamName)
208+
.build();
209+
} else {
210+
statement =
211+
Statement.newBuilder(
212+
"select option_name, option_value\n"
213+
+ "from information_schema.change_stream_options\n"
214+
+ "where change_stream_name = @changeStreamName")
215+
.bind("changeStreamName")
216+
.to(changeStreamName)
217+
.build();
218+
}
219+
ResultSet resultSet = tx.executeQuery(statement);
220+
while (resultSet.next()) {
221+
String optionName = resultSet.getString(0);
222+
if ("partition_mode".equalsIgnoreCase(optionName)) {
223+
String value = resultSet.getString(1);
224+
if (value != null) {
225+
return value;
226+
}
227+
}
228+
}
229+
return "";
230+
} catch (RuntimeException e) {
231+
// Log the failure (with stack trace) but rethrow so the caller still observes
232+
// the error.
233+
LOG.warn(
234+
"Failed to fetch partition_mode for change stream '{}', dialect={} - will propagate exception",
235+
changeStreamName,
236+
dialect,
237+
e);
238+
throw e;
239+
}
240+
}
141241
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,218 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.sdk.io.gcp.spanner.changestreams.dao;
19+
20+
import static org.junit.Assert.assertEquals;
21+
import static org.junit.Assert.assertTrue;
22+
import static org.mockito.ArgumentMatchers.any;
23+
import static org.mockito.Mockito.mock;
24+
import static org.mockito.Mockito.times;
25+
import static org.mockito.Mockito.verify;
26+
import static org.mockito.Mockito.when;
27+
28+
import com.google.cloud.spanner.DatabaseClient;
29+
import com.google.cloud.spanner.Dialect;
30+
import com.google.cloud.spanner.Options.RpcPriority;
31+
import com.google.cloud.spanner.ReadOnlyTransaction;
32+
import com.google.cloud.spanner.ResultSet;
33+
import com.google.cloud.spanner.Statement;
34+
import org.junit.Before;
35+
import org.junit.Test;
36+
import org.mockito.ArgumentCaptor;
37+
38+
public class ChangeStreamDaoTest {
39+
private DatabaseClient databaseClient;
40+
private RpcPriority rpcPriority;
41+
private ChangeStreamDao changeStreamDao;
42+
private static final String CHANGE_STREAM_NAME = "testCS";
43+
44+
@Before
45+
public void setUp() {
46+
databaseClient = mock(DatabaseClient.class);
47+
rpcPriority = mock(RpcPriority.class);
48+
}
49+
50+
@Test
51+
public void testPartitionOptionMutable() {
52+
ReadOnlyTransaction readOnlyTransaction = mock(ReadOnlyTransaction.class);
53+
when(databaseClient.readOnlyTransaction()).thenReturn(readOnlyTransaction);
54+
55+
ResultSet resultSet = mock(ResultSet.class);
56+
when(readOnlyTransaction.executeQuery(any())).thenReturn(resultSet);
57+
when(resultSet.next()).thenReturn(true).thenReturn(false);
58+
when(resultSet.getString(0)).thenReturn("partition_mode");
59+
when(resultSet.getString(1)).thenReturn("MUTABLE_KEY_RANGE");
60+
61+
ChangeStreamDao changeStreamDao =
62+
new ChangeStreamDao(
63+
CHANGE_STREAM_NAME,
64+
databaseClient,
65+
rpcPriority,
66+
"testjob",
67+
Dialect.GOOGLE_STANDARD_SQL);
68+
69+
assertEquals(true, changeStreamDao.isMutableKeyRangeChangeStream());
70+
}
71+
72+
@Test
73+
public void testPartitionOptionImmutable() {
74+
ReadOnlyTransaction readOnlyTransaction = mock(ReadOnlyTransaction.class);
75+
when(databaseClient.readOnlyTransaction()).thenReturn(readOnlyTransaction);
76+
77+
ResultSet resultSet = mock(ResultSet.class);
78+
when(readOnlyTransaction.executeQuery(any())).thenReturn(resultSet);
79+
when(resultSet.next()).thenReturn(true).thenReturn(false);
80+
when(resultSet.getString(0)).thenReturn("partition_mode");
81+
when(resultSet.getString(1)).thenReturn("IMMUTABLE_KEY_RANGE");
82+
83+
ChangeStreamDao changeStreamDao =
84+
new ChangeStreamDao(
85+
CHANGE_STREAM_NAME,
86+
databaseClient,
87+
rpcPriority,
88+
"testjob",
89+
Dialect.GOOGLE_STANDARD_SQL);
90+
91+
assertEquals(false, changeStreamDao.isMutableKeyRangeChangeStream());
92+
}
93+
94+
@Test
95+
public void testPartitionOptionEmpty() {
96+
ReadOnlyTransaction readOnlyTransaction = mock(ReadOnlyTransaction.class);
97+
when(databaseClient.readOnlyTransaction()).thenReturn(readOnlyTransaction);
98+
99+
ResultSet resultSet = mock(ResultSet.class);
100+
when(readOnlyTransaction.executeQuery(any())).thenReturn(resultSet);
101+
when(resultSet.next()).thenReturn(true).thenReturn(false);
102+
when(resultSet.getString(0)).thenReturn("partition_mode");
103+
when(resultSet.getString(1)).thenReturn("");
104+
105+
ChangeStreamDao changeStreamDao =
106+
new ChangeStreamDao(
107+
CHANGE_STREAM_NAME,
108+
databaseClient,
109+
rpcPriority,
110+
"testjob",
111+
Dialect.GOOGLE_STANDARD_SQL);
112+
113+
assertEquals(false, changeStreamDao.isMutableKeyRangeChangeStream());
114+
}
115+
116+
@Test
117+
public void testPartitionModeCaching() {
118+
ReadOnlyTransaction readOnlyTransaction = mock(ReadOnlyTransaction.class);
119+
when(databaseClient.readOnlyTransaction()).thenReturn(readOnlyTransaction);
120+
121+
ResultSet resultSet = mock(ResultSet.class);
122+
when(readOnlyTransaction.executeQuery(any())).thenReturn(resultSet);
123+
when(resultSet.next()).thenReturn(true).thenReturn(false);
124+
when(resultSet.getString(0)).thenReturn("partition_mode");
125+
when(resultSet.getString(1)).thenReturn("MUTABLE_KEY_RANGE");
126+
127+
ChangeStreamDao changeStreamDao =
128+
new ChangeStreamDao(
129+
CHANGE_STREAM_NAME,
130+
databaseClient,
131+
rpcPriority,
132+
"testjob",
133+
Dialect.GOOGLE_STANDARD_SQL);
134+
135+
// first call triggers the read
136+
assertEquals(true, changeStreamDao.isMutableKeyRangeChangeStream());
137+
// second call should use cached partitionMode and NOT call
138+
// databaseClient.readOnlyTransaction() again
139+
assertEquals(true, changeStreamDao.isMutableKeyRangeChangeStream());
140+
141+
verify(databaseClient, times(1)).readOnlyTransaction();
142+
}
143+
144+
// New tests for PostgreSQL branch to verify the chosen TVF in the generated
145+
// SQL.
146+
@Test
147+
public void testChangeStreamQueryPostgresMutable() {
148+
// Arrange: metadata read returning MUTABLE_KEY_RANGE
149+
ReadOnlyTransaction metaTx = mock(ReadOnlyTransaction.class);
150+
when(databaseClient.readOnlyTransaction()).thenReturn(metaTx);
151+
152+
ResultSet metaResult = mock(ResultSet.class);
153+
when(metaTx.executeQuery(any())).thenReturn(metaResult);
154+
when(metaResult.next()).thenReturn(true).thenReturn(false);
155+
when(metaResult.getString(0)).thenReturn("partition_mode");
156+
when(metaResult.getString(1)).thenReturn("MUTABLE_KEY_RANGE");
157+
158+
// Arrange: single-use transaction for the actual change stream query
159+
ReadOnlyTransaction singleUseTx = mock(ReadOnlyTransaction.class);
160+
when(databaseClient.singleUse()).thenReturn(singleUseTx);
161+
162+
ResultSet queryResult = mock(ResultSet.class);
163+
// We don't need queryResult to return rows; just to satisfy the call.
164+
when(singleUseTx.executeQuery(any(), any(), any())).thenReturn(queryResult);
165+
166+
ChangeStreamDao changeStreamDao =
167+
new ChangeStreamDao(
168+
CHANGE_STREAM_NAME, databaseClient, rpcPriority, "testjob", Dialect.POSTGRESQL);
169+
170+
// Act: call the method that constructs and executes the statement
171+
changeStreamDao.changeStreamQuery(null, null, null, 0L);
172+
173+
// Assert: capture the Statement passed to singleUse().executeQuery and verify
174+
// SQL
175+
ArgumentCaptor<Statement> captor = ArgumentCaptor.forClass(Statement.class);
176+
verify(singleUseTx).executeQuery(captor.capture(), any(), any());
177+
Statement captured = captor.getValue();
178+
String sql = captured.getSql(); // adjust if different accessor is used
179+
assertTrue(
180+
"Expected SQL to contain read_proto_bytes_",
181+
sql.contains("read_proto_bytes_" + CHANGE_STREAM_NAME));
182+
}
183+
184+
@Test
185+
public void testChangeStreamQueryPostgresImmutable() {
186+
// Arrange: metadata read returning IMMUTABLE_KEY_RANGE
187+
ReadOnlyTransaction metaTx = mock(ReadOnlyTransaction.class);
188+
when(databaseClient.readOnlyTransaction()).thenReturn(metaTx);
189+
190+
ResultSet metaResult = mock(ResultSet.class);
191+
when(metaTx.executeQuery(any())).thenReturn(metaResult);
192+
when(metaResult.next()).thenReturn(true).thenReturn(false);
193+
when(metaResult.getString(0)).thenReturn("partition_mode");
194+
when(metaResult.getString(1)).thenReturn("IMMUTABLE_KEY_RANGE");
195+
196+
// Arrange: single-use transaction for the actual change stream query
197+
ReadOnlyTransaction singleUseTx = mock(ReadOnlyTransaction.class);
198+
when(databaseClient.singleUse()).thenReturn(singleUseTx);
199+
200+
ResultSet queryResult = mock(ResultSet.class);
201+
when(singleUseTx.executeQuery(any(), any(), any())).thenReturn(queryResult);
202+
203+
ChangeStreamDao changeStreamDao =
204+
new ChangeStreamDao(
205+
CHANGE_STREAM_NAME, databaseClient, rpcPriority, "testjob", Dialect.POSTGRESQL);
206+
207+
// Act
208+
changeStreamDao.changeStreamQuery(null, null, null, 0L);
209+
210+
// Assert
211+
ArgumentCaptor<Statement> captor = ArgumentCaptor.forClass(Statement.class);
212+
verify(singleUseTx).executeQuery(captor.capture(), any(), any());
213+
Statement captured = captor.getValue();
214+
String sql = captured.getSql();
215+
assertTrue(
216+
"Expected SQL to contain read_json_", sql.contains("read_json_" + CHANGE_STREAM_NAME));
217+
}
218+
}

0 commit comments

Comments
 (0)