Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 30 additions & 16 deletions client-v2/src/main/java/com/clickhouse/client/api/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import com.clickhouse.data.ClickHouseDataType;
import com.clickhouse.data.ClickHouseFormat;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import net.jpountz.lz4.LZ4Factory;
import org.apache.hc.core5.concurrent.DefaultThreadFactory;
import org.apache.hc.core5.http.ClassicHttpResponse;
Expand All @@ -56,7 +55,6 @@
import java.lang.reflect.InvocationTargetException;
import java.net.MalformedURLException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.ZoneId;
import java.time.temporal.ChronoUnit;
Expand Down Expand Up @@ -186,7 +184,6 @@ private Client(Collection<Endpoint> endpoints, Map<String,String> configuration,
}

this.endpoints = tmpEndpoints.build();
this.httpClientHelper = new HttpAPIClientHelper(this.configuration, metricsRegistry, initSslContext);

String retry = configuration.get(ClientConfigProperties.RETRY_ON_FAILURE.getKey());
this.retries = retry == null ? 0 : Integer.parseInt(retry);
Expand All @@ -197,6 +194,7 @@ private Client(Collection<Endpoint> endpoints, Map<String,String> configuration,
this.lz4Factory = LZ4Factory.fastestJavaInstance();
}

this.httpClientHelper = new HttpAPIClientHelper(this.configuration, metricsRegistry, initSslContext, lz4Factory);
this.serverVersion = configuration.getOrDefault(ClientConfigProperties.SERVER_VERSION.getKey(), "unknown");
this.dbUser = configuration.getOrDefault(ClientConfigProperties.USER.getKey(), ClientConfigProperties.USER.getDefObjVal());
this.typeHintMapping = (Map<ClickHouseDataType, Class<?>>) this.configuration.get(ClientConfigProperties.TYPE_HINT_MAPPING.getKey());
Expand Down Expand Up @@ -1073,6 +1071,20 @@ public Builder sslSocketSNI(String sni) {
return this;
}

/**
* Make sending statement parameters as HTTP Form data (in the body of a request).
* Note: work only with Server side compression. If client compression is enabled it will be disabled
* for query requests with parameters. It is because each parameter is sent as part of multipart content
* what would require compressions of them separately.
*
* @param enable - if feature enabled
* @return this builder instance
*/
public Builder useHttpFormDataForQuery(boolean enable) {
this.configuration.put(ClientConfigProperties.HTTP_SEND_PARAMS_IN_BODY.getKey(), String.valueOf(enable));
return this;
}

public Client build() {
// check if endpoint are empty. so can not initiate client
if (this.endpoints.isEmpty()) {
Expand Down Expand Up @@ -1279,7 +1291,7 @@ public CompletableFuture<InsertResponse> insert(String tableName, List<?> data,
for (int i = 0; i <= maxRetries; i++) {
// Execute request
try (ClassicHttpResponse httpResponse =
httpClientHelper.executeRequest(selectedEndpoint, requestSettings.getAllSettings(), lz4Factory,
httpClientHelper.executeRequest(selectedEndpoint, requestSettings.getAllSettings(),
out -> {
out.write("INSERT INTO ".getBytes());
out.write(tableName.getBytes());
Expand Down Expand Up @@ -1496,7 +1508,7 @@ public CompletableFuture<InsertResponse> insert(String tableName,
for (int i = 0; i <= retries; i++) {
// Execute request
try (ClassicHttpResponse httpResponse =
httpClientHelper.executeRequest(selectedEndpoint, requestSettings.getAllSettings(), lz4Factory,
httpClientHelper.executeRequest(selectedEndpoint, requestSettings.getAllSettings(),
out -> {
writer.onOutput(out);
out.close();
Expand Down Expand Up @@ -1607,25 +1619,27 @@ public CompletableFuture<QueryResponse> query(String sqlQuery, Map<String, Objec
ClientStatisticsHolder clientStats = new ClientStatisticsHolder();
clientStats.start(ClientMetrics.OP_DURATION);

Supplier<QueryResponse> responseSupplier;
if (queryParams != null) {
requestSettings.setOption(HttpAPIClientHelper.KEY_STATEMENT_PARAMS, queryParams);
}

if (queryParams != null) {
requestSettings.setOption(HttpAPIClientHelper.KEY_STATEMENT_PARAMS, queryParams);
}
responseSupplier = () -> {
Supplier<QueryResponse> responseSupplier = () -> {
long startTime = System.nanoTime();
// Selecting some node
Endpoint selectedEndpoint = getNextAliveNode();
RuntimeException lastException = null;
for (int i = 0; i <= retries; i++) {
ClassicHttpResponse httpResponse = null;
try {
httpResponse =
httpClientHelper.executeRequest(selectedEndpoint, requestSettings.getAllSettings(), lz4Factory, output -> {
output.write(sqlQuery.getBytes(StandardCharsets.UTF_8));
output.close();
});

boolean useMultipart = ClientConfigProperties.HTTP_SEND_PARAMS_IN_BODY.getOrDefault(requestSettings.getAllSettings());
if (queryParams != null && useMultipart) {
httpResponse = httpClientHelper.executeMultiPartRequest(selectedEndpoint,
requestSettings.getAllSettings(), sqlQuery);
} else {
httpResponse = httpClientHelper.executeRequest(selectedEndpoint,
requestSettings.getAllSettings(),
sqlQuery);
}
// Check response
if (httpResponse.getCode() == HttpStatus.SC_SERVICE_UNAVAILABLE) {
LOG.warn("Failed to get response. Server returned {}. Retrying. (Duration: {})", httpResponse.getCode(), durationSince(startTime));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,14 @@ public Object parseValue(String value) {
* SNI SSL parameter that will be set for each outbound SSL socket.
*/
SSL_SOCKET_SNI("ssl_socket_sni", String.class,""),

/**
* If parameters should be sent in request body.
* Note: work only with Server side compression. If compression is enabled on client level it will be disabled
* for query requests with parameters.
*/
HTTP_SEND_PARAMS_IN_BODY("client.http.use_form_request_for_query", Boolean.class, "false"),

;

private static final Logger LOG = LoggerFactory.getLogger(ClientConfigProperties.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ public void writeTo(OutputStream outStream) throws IOException {
throw new UnsupportedOperationException("Unsupported: writing compressed response to elsewhere");
}

try {
httpEntity.writeTo(compressorStreamFactory.createCompressorOutputStream(compressionAlgo, outStream));
try (OutputStream compressingStream = compressorStreamFactory.createCompressorOutputStream(compressionAlgo, outStream)){
httpEntity.writeTo(compressingStream);
} catch (CompressorException e) {
throw new IOException("Failed to create compressing output stream", e);
}
Expand All @@ -75,7 +75,8 @@ public void close() throws IOException {

@Override
public long getContentLength() {
return httpEntity.getContentLength();
// compressed request length is unknown even if it is a byte[]
return isResponse ? httpEntity.getContentLength() : -1;
}

@Override
Expand Down
Loading
Loading