Skip to content

Commit 473321f

Browse files
committed
Update ChangeStreamDao to query different TVF for postgresSQL based on
the change stream partition mode For MUTABLE_KEY_RANGE change stream, use read_proto_bytes_, else use read_json_
1 parent 8481373 commit 473321f

File tree

2 files changed

+379
-60
lines changed

2 files changed

+379
-60
lines changed

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/ChangeStreamDao.java

Lines changed: 161 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -22,31 +22,50 @@
2222
import com.google.cloud.spanner.Dialect;
2323
import com.google.cloud.spanner.Options;
2424
import com.google.cloud.spanner.Options.RpcPriority;
25+
import com.google.cloud.spanner.ReadOnlyTransaction;
2526
import com.google.cloud.spanner.ResultSet;
2627
import com.google.cloud.spanner.Statement;
2728
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.InitialPartition;
29+
import org.slf4j.Logger;
30+
import org.slf4j.LoggerFactory;
2831

2932
/**
30-
* Responsible for making change stream queries for a given partition. The result will be given back
33+
* Responsible for making change stream queries for a given partition. The
34+
* result will be given back
3135
* as a {@link ResultSet}, which can be consumed until the stream is finished.
3236
*/
3337
public class ChangeStreamDao {
3438

39+
private static final Logger LOG = LoggerFactory.getLogger(ChangeStreamDao.class);
40+
41+
// new enum for partition mode
42+
private enum PartitionMode {
43+
UNKNOWN,
44+
MUTABLE_KEY_RANGE,
45+
IMMUTABLE_KEY_RANGE
46+
}
47+
3548
private final String changeStreamName;
3649
private final DatabaseClient databaseClient;
3750
private final RpcPriority rpcPriority;
3851
private final String jobName;
3952
private final Dialect dialect;
4053

54+
// Always non-null to satisfy nullness checker.
55+
// Start UNKNOWN until we fetch and cache the real mode.
56+
private volatile PartitionMode partitionMode = PartitionMode.UNKNOWN;
57+
4158
/**
42-
* Constructs a change stream dao. All the queries performed by this class will be for the given
43-
* change stream name with the specified rpc priority. The job name will be used to tag all the
59+
* Constructs a change stream dao. All the queries performed by this class will
60+
* be for the given
61+
* change stream name with the specified rpc priority. The job name will be used
62+
* to tag all the
4463
* queries made.
4564
*
4665
* @param changeStreamName the name of the change stream to be queried
47-
* @param databaseClient a spanner {@link DatabaseClient}
48-
* @param rpcPriority the priority to be used for the change stream queries
49-
* @param jobName the name of the job performing the query
66+
* @param databaseClient a spanner {@link DatabaseClient}
67+
* @param rpcPriority the priority to be used for the change stream queries
68+
* @param jobName the name of the job performing the query
5069
*/
5170
ChangeStreamDao(
5271
String changeStreamName,
@@ -62,80 +81,162 @@ public class ChangeStreamDao {
6281
}
6382

6483
/**
65-
* Performs a change stream query. If the partition token given is the initial partition null will
66-
* be used in the query instead. The change stream query will be tagged as following: {@code
67-
* "job=<jobName>"}. The result will be given as a {@link ChangeStreamResultSet} which can be
68-
* consumed as a stream, yielding records until no more are available for the query made. Note
69-
* that one needs to call {@link ChangeStreamResultSet#next()} to initiate the change stream
84+
* Performs a change stream query. If the partition token given is the initial
85+
* partition null will
86+
* be used in the query instead. The change stream query will be tagged as
87+
* following: {@code
88+
* "job=<jobName>"}. The result will be given as a {@link ChangeStreamResultSet}
89+
* which can be
90+
* consumed as a stream, yielding records until no more are available for the
91+
* query made. Note
92+
* that one needs to call {@link ChangeStreamResultSet#next()} to initiate the
93+
* change stream
7094
* query.
7195
*
72-
* @param partitionToken the unique partition token to be queried. If {@link
73-
* InitialPartition#PARTITION_TOKEN} is given, null will be used in the change stream query
74-
* instead.
75-
* @param startTimestamp the inclusive start time for the change stream query
76-
* @param endTimestamp the inclusive end time for the change stream query
77-
* @param heartbeatMillis the number of milliseconds after the stream is idle, which a heartbeat
78-
* record will be emitted in the change stream query
79-
* @return a {@link ChangeStreamResultSet} that will produce a stream of records for the change
80-
* stream query
96+
* @param partitionToken the unique partition token to be queried. If {@link
97+
* InitialPartition#PARTITION_TOKEN} is given, null will
98+
* be used in the change stream query
99+
* instead.
100+
* @param startTimestamp the inclusive start time for the change stream query
101+
* @param endTimestamp the inclusive end time for the change stream query
102+
* @param heartbeatMillis the number of milliseconds after the stream is idle,
103+
* which a heartbeat
104+
* record will be emitted in the change stream query
105+
* @return a {@link ChangeStreamResultSet} that will produce a stream of records
106+
* for the change
107+
* stream query
81108
*/
82109
public ChangeStreamResultSet changeStreamQuery(
83110
String partitionToken,
84111
Timestamp startTimestamp,
85112
Timestamp endTimestamp,
86113
long heartbeatMillis) {
87114
// For the initial partition we query with a null partition token
88-
final String partitionTokenOrNull =
89-
InitialPartition.isInitialPartition(partitionToken) ? null : partitionToken;
115+
final String partitionTokenOrNull = InitialPartition.isInitialPartition(partitionToken) ? null : partitionToken;
90116

91117
String query = "";
92118
Statement statement;
93119
if (this.isPostgres()) {
94-
query =
95-
"SELECT * FROM \"spanner\".\"read_json_" + changeStreamName + "\"($1, $2, $3, $4, null)";
96-
statement =
97-
Statement.newBuilder(query)
98-
.bind("p1")
99-
.to(startTimestamp)
100-
.bind("p2")
101-
.to(endTimestamp)
102-
.bind("p3")
103-
.to(partitionTokenOrNull)
104-
.bind("p4")
105-
.to(heartbeatMillis)
106-
.build();
120+
// Ensure we have determined whether change stream uses mutable key range
121+
boolean isMutable = isMutableKeyRangeChangeStream();
122+
123+
if (isMutable) {
124+
query = "SELECT * FROM \"spanner\".\"read_proto_bytes_"
125+
+ changeStreamName
126+
+ "\"($1, $2, $3, $4, null)";
127+
} else {
128+
query = "SELECT * FROM \"spanner\".\"read_json_"
129+
+ changeStreamName
130+
+ "\"($1, $2, $3, $4, null)";
131+
}
132+
statement = Statement.newBuilder(query)
133+
.bind("p1")
134+
.to(startTimestamp)
135+
.bind("p2")
136+
.to(endTimestamp)
137+
.bind("p3")
138+
.to(partitionTokenOrNull)
139+
.bind("p4")
140+
.to(heartbeatMillis)
141+
.build();
107142
} else {
108-
query =
109-
"SELECT * FROM READ_"
110-
+ changeStreamName
111-
+ "("
112-
+ " start_timestamp => @startTimestamp,"
113-
+ " end_timestamp => @endTimestamp,"
114-
+ " partition_token => @partitionToken,"
115-
+ " read_options => null,"
116-
+ " heartbeat_milliseconds => @heartbeatMillis"
117-
+ ")";
118-
statement =
119-
Statement.newBuilder(query)
120-
.bind("startTimestamp")
121-
.to(startTimestamp)
122-
.bind("endTimestamp")
123-
.to(endTimestamp)
124-
.bind("partitionToken")
125-
.to(partitionTokenOrNull)
126-
.bind("heartbeatMillis")
127-
.to(heartbeatMillis)
128-
.build();
143+
query = "SELECT * FROM READ_"
144+
+ changeStreamName
145+
+ "("
146+
+ " start_timestamp => @startTimestamp,"
147+
+ " end_timestamp => @endTimestamp,"
148+
+ " partition_token => @partitionToken,"
149+
+ " read_options => null,"
150+
+ " heartbeat_milliseconds => @heartbeatMillis"
151+
+ ")";
152+
statement = Statement.newBuilder(query)
153+
.bind("startTimestamp")
154+
.to(startTimestamp)
155+
.bind("endTimestamp")
156+
.to(endTimestamp)
157+
.bind("partitionToken")
158+
.to(partitionTokenOrNull)
159+
.bind("heartbeatMillis")
160+
.to(heartbeatMillis)
161+
.build();
129162
}
130-
final ResultSet resultSet =
131-
databaseClient
132-
.singleUse()
133-
.executeQuery(statement, Options.priority(rpcPriority), Options.tag("job=" + jobName));
163+
final ResultSet resultSet = databaseClient
164+
.singleUse()
165+
.executeQuery(statement, Options.priority(rpcPriority), Options.tag("job=" + jobName));
134166

135167
return new ChangeStreamResultSet(resultSet);
136168
}
137169

138170
private boolean isPostgres() {
139171
return this.dialect == Dialect.POSTGRESQL;
140172
}
173+
174+
// Returns the PartitionMode, fetching from Spanner on first call and caching.
175+
protected PartitionMode getPartitionMode() {
176+
if (this.partitionMode != PartitionMode.UNKNOWN) {
177+
return this.partitionMode;
178+
}
179+
synchronized (this) {
180+
if (this.partitionMode == PartitionMode.UNKNOWN) {
181+
String fetchedPartitionMode = fetchPartitionMode(this.databaseClient, this.dialect, this.changeStreamName);
182+
if (fetchedPartitionMode.isEmpty()
183+
|| fetchedPartitionMode.equalsIgnoreCase("IMMUTABLE_KEY_RANGE")) {
184+
this.partitionMode = PartitionMode.IMMUTABLE_KEY_RANGE;
185+
} else {
186+
this.partitionMode = PartitionMode.MUTABLE_KEY_RANGE;
187+
}
188+
}
189+
}
190+
return this.partitionMode;
191+
}
192+
193+
// Convenience boolean method kept for compatibility
194+
protected boolean isMutableKeyRangeChangeStream() {
195+
return getPartitionMode() == PartitionMode.MUTABLE_KEY_RANGE;
196+
}
197+
198+
// Returns the partition_mode option value for the given change stream.
199+
private static String fetchPartitionMode(
200+
DatabaseClient databaseClient, Dialect dialect, String changeStreamName) {
201+
try (ReadOnlyTransaction tx = databaseClient.readOnlyTransaction()) {
202+
Statement statement;
203+
if (dialect == Dialect.POSTGRESQL) {
204+
statement = Statement.newBuilder(
205+
"select option_name, option_value\n"
206+
+ "from information_schema.change_stream_options\n"
207+
+ "where change_stream_name = $1")
208+
.bind("p1")
209+
.to(changeStreamName)
210+
.build();
211+
} else {
212+
statement = Statement.newBuilder(
213+
"select option_name, option_value\n"
214+
+ "from information_schema.change_stream_options\n"
215+
+ "where change_stream_name = @changeStreamName")
216+
.bind("changeStreamName")
217+
.to(changeStreamName)
218+
.build();
219+
}
220+
ResultSet resultSet = tx.executeQuery(statement);
221+
while (resultSet.next()) {
222+
String optionName = resultSet.getString(0);
223+
if ("partition_mode".equalsIgnoreCase(optionName)) {
224+
String value = resultSet.getString(1);
225+
if (value != null) {
226+
return value;
227+
}
228+
}
229+
}
230+
return "";
231+
} catch (RuntimeException e) {
232+
// Log the failure (with stack trace) but rethrow so the caller still observes
233+
// the error.
234+
LOG.warn(
235+
"Failed to fetch partition_mode for change stream '{}', dialect={} - will propagate exception",
236+
changeStreamName,
237+
dialect,
238+
e);
239+
throw e;
240+
}
241+
}
141242
}

0 commit comments

Comments
 (0)