Skip to content

Commit 32ccde1

Browse files
committed
Add safeguard for multi-batch execution (#205)
(cherry picked from commit 8c6b9c5) (cherry picked from commit 7b6c6d0)
1 parent 00b8e28 commit 32ccde1

File tree

1 file changed

+18
-7
lines changed

1 file changed

+18
-7
lines changed

flink-sql-runner/src/main/java/com/datasqrl/flinkrunner/SqlExecutor.java

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -47,9 +47,12 @@
4747
@RequiredArgsConstructor(access = AccessLevel.PACKAGE)
4848
class SqlExecutor {
4949

50-
private static final Pattern SET_STATEMENT_PATTERN =
50+
private static final Pattern SET_PATTERN =
5151
Pattern.compile("SET\\s+'(\\S+)'\\s*=\\s*'(.+)';?", Pattern.CASE_INSENSITIVE);
5252

53+
private static final Pattern STATEMENT_SET_PATTERN =
54+
Pattern.compile("EXECUTE\\s+STATEMENT\\s+SET", Pattern.CASE_INSENSITIVE | Pattern.DOTALL);
55+
5356
private final TableEnvironment tEnv;
5457

5558
SqlExecutor(Configuration config, @Nullable String udfPath) {
@@ -115,8 +118,11 @@ void setupSystemFunctions() {
115118
TableResult executeScript(String script) {
116119
var statements = SqlUtils.parseStatements(script);
117120
TableResult tableResult = null;
118-
for (String statement : statements) {
119-
tableResult = executeStatement(statement);
121+
122+
var it = statements.iterator();
123+
while (it.hasNext()) {
124+
var statement = it.next();
125+
tableResult = executeStatement(statement, it.hasNext());
120126
}
121127

122128
return tableResult;
@@ -141,24 +147,29 @@ TableResult executeCompiledPlan(String planJson) {
141147
}
142148
}
143149

144-
private TableResult executeStatement(String statement) {
150+
private TableResult executeStatement(String statement, boolean intermediate) {
145151
TableResult tableResult = null;
146152
try {
147-
var setMatcher = SET_STATEMENT_PATTERN.matcher(statement.trim());
153+
var setMatcher = SET_PATTERN.matcher(statement.trim());
154+
var statementSetMatcher = STATEMENT_SET_PATTERN.matcher(statement.trim());
148155

149156
if (setMatcher.matches()) {
150157
// Handle SET statements
151158
var key = setMatcher.group(1);
152159
var value = setMatcher.group(2);
153160
tEnv.getConfig().getConfiguration().setString(key, value);
154161
log.info("Set configuration: {} = {}", key, value);
162+
155163
} else {
156164
log.info("Executing statement:\n{}", statement);
157165
tableResult = tEnv.executeSql(statement);
166+
if (statementSetMatcher.find() && intermediate) {
167+
log.debug("Make sure to wait intermediate statement set to finish...");
168+
tableResult.await();
169+
}
158170
}
159171
} catch (Exception e) {
160-
e.addSuppressed(new RuntimeException("Error while executing stmt: " + statement));
161-
throw e;
172+
throw new RuntimeException("Error while executing stmt: " + statement, e);
162173
}
163174
return tableResult;
164175
}

0 commit comments

Comments
 (0)