Skip to content
This repository was archived by the owner on Mar 31, 2021. It is now read-only.

Commit 50bd69b

Browse files
authored
Cursor integration (#76)
* Add cursor to JSONQueryResponse * Add fetchSize property to connection configuration * Use user fetchSize to retrieve query results * Add fetchSize property documentation * Add jacoco test coverage * Add unit tests
1 parent 5044d64 commit 50bd69b

27 files changed

+746
-31
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ To setup a connection, the driver requires a JDBC connection URL. The connection
5858
| ------------- |-------------| -----|---------|
5959
| user | Connection username. mandatory if `auth` property selects a authentication scheme that mandates a username value | any string | `null` |
6060
| password | Connection password. mandatory if `auth` property selects a authentication scheme that mandates a password value | any string | `null` |
61+
| fetchSize | Cursor page size | positive integer value. Max value is limited by `index.max_result_window` Elasticsearch setting | `0` (for non-paginated response) |
6162
| logOutput | location where driver logs should be emitted | a valid file path | `null` (logs are disabled) |
6263
| logLevel | severity level for which driver logs should be emitted | in order from highest(least logging) to lowest(most logging): OFF, FATAL, ERROR, WARN, INFO, DEBUG, TRACE, ALL | OFF (logs are disabled) |
6364
| auth | authentication mechanism to use | `NONE` (no auth), `BASIC` (HTTP Basic), `AWS_SIGV4` (AWS SIGV4) | `basic` if username and/or password is specified, `NONE` otherwise |

build.gradle

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,3 +184,25 @@ signing {
184184
sign publishing.publications.shadow
185185
}
186186

187+
jacoco {
188+
toolVersion = "0.8.3"
189+
}
190+
191+
jacocoTestReport {
192+
reports {
193+
html.enabled true
194+
}
195+
}
196+
test.finalizedBy(project.tasks.jacocoTestReport)
197+
198+
jacocoTestCoverageVerification {
199+
violationRules {
200+
rule {
201+
limit {
202+
minimum = 0.4
203+
}
204+
}
205+
}
206+
}
207+
208+
check.dependsOn jacocoTestCoverageVerification

src/main/java/com/amazon/opendistroforelasticsearch/jdbc/ConnectionImpl.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ public class ConnectionImpl implements ElasticsearchConnection, JdbcWrapper, Log
6060
private String url;
6161
private String user;
6262
private Logger log;
63+
private int fetchSize;
6364
private boolean open = false;
6465
private Transport transport;
6566
private Protocol protocol;
@@ -74,6 +75,7 @@ public ConnectionImpl(ConnectionConfig connectionConfig, TransportFactory transp
7475
this.log = log;
7576
this.url = connectionConfig.getUrl();
7677
this.user = connectionConfig.getUser();
78+
this.fetchSize = connectionConfig.getFetchSize();
7779

7880
try {
7981
this.transport = transportFactory.getTransport(connectionConfig, log, getUserAgent());
@@ -101,6 +103,10 @@ public String getUser() {
101103
return user;
102104
}
103105

106+
public int getFetchSize() {
107+
return fetchSize;
108+
}
109+
104110
@Override
105111
public Statement createStatement() throws SQLException {
106112
log.debug(() -> logEntry("createStatement()"));

src/main/java/com/amazon/opendistroforelasticsearch/jdbc/PreparedStatementImpl.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -75,14 +75,14 @@ public PreparedStatementImpl(ConnectionImpl connection, String sql, Logger log)
7575
public ResultSet executeQuery() throws SQLException {
7676
log.debug(() -> logEntry("executeQuery()"));
7777
checkOpen();
78-
ResultSet rs = executeQueryX();
78+
ResultSet rs = executeQueryX(getFetchSize());
7979
log.debug(() -> logExit("executeQuery", rs));
8080
return rs;
8181
}
8282

83-
protected ResultSet executeQueryX() throws SQLException {
83+
protected ResultSet executeQueryX(int fetchSize) throws SQLException {
8484
checkParamsFilled();
85-
JdbcQueryRequest jdbcQueryRequest = new JdbcQueryRequest(sql);
85+
JdbcQueryRequest jdbcQueryRequest = new JdbcQueryRequest(sql, fetchSize);
8686
jdbcQueryRequest.setParameters(Arrays.asList(parameters));
8787
return executeQueryRequest(jdbcQueryRequest);
8888
}
@@ -293,7 +293,7 @@ private int javaToSqlType(Object x) throws SQLException {
293293
public boolean execute() throws SQLException {
294294
log.debug(() -> logEntry("execute()"));
295295
checkOpen();
296-
executeQueryX();
296+
executeQueryX(getFetchSize());
297297
log.debug(() -> logExit("execute", true));
298298
return true;
299299
}

src/main/java/com/amazon/opendistroforelasticsearch/jdbc/ResultSetImpl.java

Lines changed: 65 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,17 @@
2626
import com.amazon.opendistroforelasticsearch.jdbc.protocol.ColumnDescriptor;
2727
import com.amazon.opendistroforelasticsearch.jdbc.internal.JdbcWrapper;
2828
import com.amazon.opendistroforelasticsearch.jdbc.protocol.QueryResponse;
29+
import com.amazon.opendistroforelasticsearch.jdbc.protocol.exceptions.InternalServerErrorException;
30+
import com.amazon.opendistroforelasticsearch.jdbc.protocol.exceptions.ResponseException;
31+
import com.amazon.opendistroforelasticsearch.jdbc.protocol.http.JdbcCursorQueryRequest;
32+
import com.amazon.opendistroforelasticsearch.jdbc.protocol.http.JsonCursorHttpProtocol;
33+
import com.amazon.opendistroforelasticsearch.jdbc.protocol.http.JsonCursorHttpProtocolFactory;
34+
import com.amazon.opendistroforelasticsearch.jdbc.transport.http.HttpTransport;
2935
import com.amazon.opendistroforelasticsearch.jdbc.types.TypeConverter;
3036
import com.amazon.opendistroforelasticsearch.jdbc.types.TypeConverters;
3137
import com.amazon.opendistroforelasticsearch.jdbc.types.UnrecognizedElasticsearchTypeException;
3238

39+
import java.io.IOException;
3340
import java.io.InputStream;
3441
import java.io.Reader;
3542
import java.math.BigDecimal;
@@ -71,18 +78,24 @@ public class ResultSetImpl implements ResultSet, JdbcWrapper, LoggingSource {
7178

7279
private StatementImpl statement;
7380
protected Cursor cursor;
81+
private String cursorId;
7482
private boolean open = false;
7583
private boolean wasNull = false;
7684
private boolean afterLast = false;
7785
private boolean beforeFirst = true;
7886
private Logger log;
7987

8088
public ResultSetImpl(StatementImpl statement, QueryResponse queryResponse, Logger log) throws SQLException {
81-
this(statement, queryResponse.getColumnDescriptors(), queryResponse.getDatarows(), log);
89+
this(statement, queryResponse.getColumnDescriptors(), queryResponse.getDatarows(), queryResponse.getCursor(), log);
8290
}
8391

8492
public ResultSetImpl(StatementImpl statement, List<? extends ColumnDescriptor> columnDescriptors,
8593
List<List<Object>> dataRows, Logger log) throws SQLException {
94+
this(statement, columnDescriptors, dataRows, null, log);
95+
}
96+
97+
public ResultSetImpl(StatementImpl statement, List<? extends ColumnDescriptor> columnDescriptors,
98+
List<List<Object>> dataRows, String cursorId, Logger log) throws SQLException {
8699
this.statement = statement;
87100
this.log = log;
88101

@@ -93,12 +106,10 @@ public ResultSetImpl(StatementImpl statement, List<? extends ColumnDescriptor> c
93106
.map(ColumnMetaData::new)
94107
.collect(Collectors.toList()));
95108

96-
List<Row> rows = dataRows
97-
.parallelStream()
98-
.map(Row::new)
99-
.collect(Collectors.toList());
109+
List<Row> rows = getRowsFromDataRows(dataRows);
100110

101111
this.cursor = new Cursor(schema, rows);
112+
this.cursorId = cursorId;
102113
this.open = true;
103114

104115
} catch (UnrecognizedElasticsearchTypeException ex) {
@@ -112,15 +123,63 @@ public boolean next() throws SQLException {
112123
log.debug(() -> logEntry("next()"));
113124
checkOpen();
114125
boolean next = cursor.next();
126+
127+
if (!next && this.cursorId != null) {
128+
log.debug(() -> logEntry("buildNextPageFromCursorId()"));
129+
buildNextPageFromCursorId();
130+
log.debug(() -> logExit("buildNextPageFromCursorId()"));
131+
next = cursor.next();
132+
}
133+
115134
if (next) {
116135
beforeFirst = false;
117136
} else {
118137
afterLast = true;
119138
}
120-
log.debug(() -> logExit("next", next));
139+
boolean finalNext = next;
140+
log.debug(() -> logExit("next", finalNext));
121141
return next;
122142
}
123143

144+
/**
145+
* TODO: Refactor as suggested https://github.com/opendistro-for-elasticsearch/sql-jdbc/pull/76#discussion_r421571383
146+
*
147+
* This method has side effects. It creates a new Cursor to hold rows from new pages.
148+
* Ideally fetching next set of rows using cursorId should be delegated to Cursor.
149+
* In addition, the cursor should be final.
150+
*
151+
**/
152+
protected void buildNextPageFromCursorId() throws SQLException {
153+
try {
154+
JdbcCursorQueryRequest jdbcCursorQueryRequest = new JdbcCursorQueryRequest(this.cursorId);
155+
JsonCursorHttpProtocolFactory protocolFactory = JsonCursorHttpProtocolFactory.INSTANCE;
156+
ConnectionImpl connection = (ConnectionImpl) statement.getConnection();
157+
158+
JsonCursorHttpProtocol protocol = protocolFactory.getProtocol(null, (HttpTransport) connection.getTransport());
159+
QueryResponse queryResponse = protocol.execute(jdbcCursorQueryRequest);
160+
161+
if (queryResponse.getError() != null) {
162+
throw new InternalServerErrorException(
163+
queryResponse.getError().getReason(),
164+
queryResponse.getError().getType(),
165+
queryResponse.getError().getDetails());
166+
}
167+
168+
cursor = new Cursor(cursor.getSchema(), getRowsFromDataRows(queryResponse.getDatarows()));
169+
cursorId = queryResponse.getCursor();
170+
171+
} catch (ResponseException | IOException ex) {
172+
logAndThrowSQLException(log, new SQLException("Error executing cursor query", ex));
173+
}
174+
}
175+
176+
private List<Row> getRowsFromDataRows(List<List<Object>> dataRows) {
177+
return dataRows
178+
.parallelStream()
179+
.map(Row::new)
180+
.collect(Collectors.toList());
181+
}
182+
124183
@Override
125184
public void close() throws SQLException {
126185
log.debug(() -> logEntry("close()"));

src/main/java/com/amazon/opendistroforelasticsearch/jdbc/StatementImpl.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -38,26 +38,28 @@ public class StatementImpl implements Statement, JdbcWrapper, LoggingSource {
3838

3939
protected ConnectionImpl connection;
4040
protected boolean open = false;
41+
protected int fetchSize;
4142
protected ResultSetImpl resultSet;
4243
protected Logger log;
4344
private boolean closeOnCompletion;
4445

4546
public StatementImpl(ConnectionImpl connection, Logger log) {
4647
this.connection = connection;
4748
this.open = true;
49+
this.fetchSize = connection.getFetchSize();
4850
this.log = log;
4951
}
5052

5153
@Override
5254
public ResultSet executeQuery(String sql) throws SQLException {
5355
log.debug(()-> logEntry("executeQuery (%s)", sql));
54-
ResultSet rs = executeQueryX(sql);
56+
ResultSet rs = executeQueryX(sql, fetchSize);
5557
log.debug(()-> logExit("executeQuery", rs));
5658
return rs;
5759
}
5860

59-
protected ResultSet executeQueryX(String sql) throws SQLException {
60-
JdbcQueryRequest jdbcQueryRequest = new JdbcQueryRequest(sql);
61+
protected ResultSet executeQueryX(String sql, int fetchSize) throws SQLException {
62+
JdbcQueryRequest jdbcQueryRequest = new JdbcQueryRequest(sql, fetchSize);
6163
return executeQueryRequest(jdbcQueryRequest);
6264
}
6365

@@ -167,7 +169,7 @@ public void setCursorName(String name) throws SQLException {
167169
public boolean execute(String sql) throws SQLException {
168170
log.debug(()->logEntry("execute (%s)", sql));
169171
checkOpen();
170-
executeQueryX(sql);
172+
executeQueryX(sql, fetchSize);
171173
log.debug(() -> logExit("execute", true));
172174
return true;
173175
}
@@ -205,12 +207,12 @@ public int getFetchDirection() throws SQLException {
205207

206208
@Override
207209
public void setFetchSize(int rows) throws SQLException {
208-
210+
fetchSize = rows;
209211
}
210212

211213
@Override
212214
public int getFetchSize() throws SQLException {
213-
return 0;
215+
return fetchSize;
214216
}
215217

216218
@Override
@@ -275,7 +277,7 @@ public boolean execute(String sql, int autoGeneratedKeys) throws SQLException {
275277
if (autoGeneratedKeys != Statement.NO_GENERATED_KEYS) {
276278
throw new SQLNonTransientException("Auto generated keys are not supported");
277279
}
278-
executeQueryX(sql);
280+
executeQueryX(sql, fetchSize);
279281
log.debug(() -> logExit("execute", true));
280282
return true;
281283
}

src/main/java/com/amazon/opendistroforelasticsearch/jdbc/config/ConnectionConfig.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ public class ConnectionConfig {
3434
private String url;
3535
private String host;
3636
private int port;
37+
private int fetchSize;
3738
private String path;
3839
private boolean useSSL;
3940
private int loginTimeout;
@@ -60,6 +61,7 @@ private ConnectionConfig(Builder builder) {
6061
this.url = builder.getUrl();
6162
this.host = builder.getHostProperty().getValue();
6263
this.port = builder.getPortProperty().getValue();
64+
this.fetchSize = builder.getFetchSizeProperty().getValue();
6365
this.path = builder.getPathProperty().getValue();
6466
this.useSSL = builder.getUseSSLProperty().getValue();
6567

@@ -106,6 +108,10 @@ public int getPort() {
106108
return port;
107109
}
108110

111+
public int getFetchSize() {
112+
return fetchSize;
113+
}
114+
109115
public String getPath() {
110116
return path;
111117
}
@@ -192,6 +198,7 @@ public String toString() {
192198
"url='" + url + '\'' +
193199
", host='" + host + '\'' +
194200
", port=" + port +
201+
", fetchSize=" + fetchSize +
195202
", path='" + path + '\'' +
196203
", useSSL=" + useSSL +
197204
", loginTimeout=" + loginTimeout +
@@ -223,6 +230,7 @@ public static class Builder {
223230

224231
private HostConnectionProperty hostProperty = new HostConnectionProperty();
225232
private PortConnectionProperty portProperty = new PortConnectionProperty();
233+
private FetchSizeProperty fetchSizeProperty = new FetchSizeProperty();
226234
private LoginTimeoutConnectionProperty loginTimeoutProperty = new LoginTimeoutConnectionProperty();
227235
private UseSSLConnectionProperty useSSLProperty = new UseSSLConnectionProperty();
228236
private PathConnectionProperty pathProperty = new PathConnectionProperty();
@@ -261,6 +269,7 @@ public static class Builder {
261269
ConnectionProperty[] connectionProperties = new ConnectionProperty[]{
262270
hostProperty,
263271
portProperty,
272+
fetchSizeProperty,
264273
loginTimeoutProperty,
265274
useSSLProperty,
266275
pathProperty,
@@ -302,6 +311,10 @@ public PortConnectionProperty getPortProperty() {
302311
return portProperty;
303312
}
304313

314+
public FetchSizeProperty getFetchSizeProperty() {
315+
return fetchSizeProperty;
316+
}
317+
305318
public LoginTimeoutConnectionProperty getLoginTimeoutProperty() {
306319
return loginTimeoutProperty;
307320
}
@@ -519,6 +532,11 @@ private void validateConfig() throws ConnectionPropertyException {
519532
// change the default port to use to 443
520533
portProperty.setRawValue(443);
521534
}
535+
536+
if (fetchSizeProperty.getValue() < 0) {
537+
throw new ConnectionPropertyException(fetchSizeProperty.getKey(),
538+
"Cursor fetch size value should be greater or equal to zero");
539+
}
522540
}
523541

524542
/**
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package com.amazon.opendistroforelasticsearch.jdbc.config;
2+
3+
public class FetchSizeProperty extends IntConnectionProperty {
4+
5+
public static final String KEY = "fetchSize";
6+
7+
public FetchSizeProperty() {
8+
super(KEY);
9+
}
10+
}

0 commit comments

Comments
 (0)