Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

package org.apache.seatunnel.connectors.seatunnel.http.client;

import org.apache.seatunnel.shade.com.fasterxml.jackson.core.type.TypeReference;
import org.apache.seatunnel.shade.com.google.common.base.Strings;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
import org.apache.seatunnel.shade.org.apache.commons.lang3.StringUtils;
import org.apache.seatunnel.shade.org.apache.commons.lang3.exception.ExceptionUtils;

Expand Down Expand Up @@ -67,7 +67,6 @@
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

@Slf4j
public class HttpClientProvider implements AutoCloseable {
Expand Down Expand Up @@ -128,13 +127,7 @@ public HttpResponse execute(
Map<String, Object> bodyMap = new HashMap<>();
// If body is set but bodyMap is not, convert body to bodyMap
if (!Strings.isNullOrEmpty(body)) {
bodyMap =
ConfigFactory.parseString(body).entrySet().stream()
.collect(
Collectors.toMap(
Map.Entry::getKey,
entry -> entry.getValue().unwrapped(),
(v1, v2) -> v2));
bodyMap = parseBodyToMap(body);
}

// convert method option to uppercase
Expand Down Expand Up @@ -522,6 +515,18 @@ private void addBody(HttpEntityEnclosingRequestBase request, String body) {
request.setEntity(entity);
}

private static Map<String, Object> parseBodyToMap(String body) {
if (Strings.isNullOrEmpty(body)) {
return Collections.emptyMap();
}
try {
return JsonUtils.parseObject(body, new TypeReference<Map<String, Object>>() {});
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible that body is a list?

} catch (Exception e) {
log.warn("Failed to parse body as JSON, treating as empty map. Body: {}", body, e);
return Collections.emptyMap();
}
}

@Override
public void close() throws IOException {
if (Objects.nonNull(httpClient)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map;

Expand Down Expand Up @@ -84,4 +85,60 @@ void addBody() throws Exception {
// ensure no manually set content type or encoding
Assertions.assertNull(post.getEntity().getContentEncoding());
}

@Test
void testFixedBodyParsingPreservesNestedJsonStructure() throws Exception {
// Given: a nested JSON body with object, array, and primitive
String body =
"{\n"
+ " \"user\": {\n"
+ " \"name\": \"Alice\",\n"
+ " \"age\": 30,\n"
+ " \"address\": {\n"
+ " \"city\": \"Beijing\",\n"
+ " \"country\": \"China\"\n"
+ " }\n"
+ " },\n"
+ " \"active\": true,\n"
+ " \"scores\": [95, 87, 92]\n"
+ " }";

;

// When: parsing using the FIXED logic
Method parseMethod =
HttpClientProvider.class.getDeclaredMethod("parseBodyToMap", String.class);
parseMethod.setAccessible(true);
@SuppressWarnings("unchecked")
Map<String, Object> result = (Map<String, Object>) parseMethod.invoke(null, body);

// Then: nested structure is fully preserved
Assertions.assertTrue(result.containsKey("user"));
Assertions.assertTrue(result.containsKey("active"));
Assertions.assertTrue(result.containsKey("scores"));

// Ensure NO flattened keys exist
Assertions.assertFalse(result.containsKey("user.name"));
Assertions.assertFalse(result.containsKey("user.age"));
Assertions.assertFalse(result.containsKey("user.address.city"));

// Validate nested objects
@SuppressWarnings("unchecked")
Map<String, Object> user = (Map<String, Object>) result.get("user");
Assertions.assertEquals("Alice", user.get("name"));
Assertions.assertEquals(30, user.get("age"));

@SuppressWarnings("unchecked")
Map<String, Object> address = (Map<String, Object>) user.get("address");
Assertions.assertEquals("Beijing", address.get("city"));
Assertions.assertEquals("China", address.get("country"));

// Validate array
@SuppressWarnings("unchecked")
java.util.List<Integer> scores = (java.util.List<Integer>) result.get("scores");
Assertions.assertEquals(java.util.Arrays.asList(95, 87, 92), scores);

// Validate primitive
Assertions.assertEquals(true, result.get("active"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,13 @@
package org.apache.seatunnel.connectors.seatunnel.jdbc;

import org.apache.seatunnel.shade.com.google.common.collect.Lists;
import org.apache.seatunnel.shade.org.apache.commons.lang3.StringUtils;
import org.apache.seatunnel.shade.org.apache.commons.lang3.tuple.Pair;

import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.e2e.common.container.seatunnel.SeaTunnelContainer;

import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
Expand Down