From 8ac7270a776e5e519342f90695bc5c2b1134b88f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alexandre=20L=C3=A9tourneau?= Date: Tue, 28 Oct 2025 17:02:40 -0400 Subject: [PATCH] in_http: add fluentd like add_remote_addr functionality MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Alexandre Létourneau --- plugins/in_http/http.c | 12 ++ plugins/in_http/http.h | 4 + plugins/in_http/http_prot.c | 248 +++++++++++++++++++++++++++++++++--- tests/runtime/in_http.c | 124 ++++++++++++++++++ 4 files changed, 372 insertions(+), 16 deletions(-) diff --git a/plugins/in_http/http.c b/plugins/in_http/http.c index ff9687ed04c..b2c357d08be 100644 --- a/plugins/in_http/http.c +++ b/plugins/in_http/http.c @@ -207,6 +207,18 @@ static struct flb_config_map config_map[] = { NULL }, + { + FLB_CONFIG_MAP_BOOL, "add_remote_addr", "false", + 0, FLB_TRUE, offsetof(struct flb_http, add_remote_addr), + "Adds REMOTE_ADDR field to the record. The value of REMOTE_ADDR is the client's address." + }, + + { + FLB_CONFIG_MAP_STR, "remote_addr_key", REMOTE_ADDR_KEY, + 0, FLB_TRUE, offsetof(struct flb_http, remote_addr_key), + "Key name for the remote address field added to the record." + }, + { FLB_CONFIG_MAP_SIZE, "buffer_max_size", HTTP_BUFFER_MAX_SIZE, 0, FLB_TRUE, offsetof(struct flb_http, buffer_max_size), diff --git a/plugins/in_http/http.h b/plugins/in_http/http.h index 2e3796798c9..daf28adb743 100644 --- a/plugins/in_http/http.h +++ b/plugins/in_http/http.h @@ -32,6 +32,7 @@ #define HTTP_BUFFER_MAX_SIZE "4M" #define HTTP_BUFFER_CHUNK_SIZE "512K" +#define REMOTE_ADDR_KEY "REMOTE_ADDR" struct flb_http { int successful_response_code; @@ -47,6 +48,9 @@ struct flb_http { struct flb_input_instance *ins; + int add_remote_addr; + const char *remote_addr_key; + /* New gen HTTP server */ int enable_http2; struct flb_http_server http_server; diff --git a/plugins/in_http/http_prot.c b/plugins/in_http/http_prot.c index e92220a51d9..86a20753560 100644 --- a/plugins/in_http/http_prot.c +++ b/plugins/in_http/http_prot.c @@ -39,6 +39,11 @@ #define HTTP_CONTENT_JSON 0 #define HTTP_CONTENT_URLENCODED 1 +static int http_header_lookup(int version, void *ptr, char *key, + char **val, size_t *val_len); +static int process_pack_ng(struct flb_http *ctx, flb_sds_t tag, + char *buf, size_t size, void *request); + static inline char hex2nibble(char c) { if ((c >= 0x30) && (c <= '9')) { @@ -215,6 +220,95 @@ static flb_sds_t tag_key(struct flb_http *ctx, msgpack_object *map) return tag; } +/* Extract the client IP address from the X-Forwarded-For header */ +static flb_sds_t get_remote_addr(void *request, int version) +{ + int ret; + char *ptr = NULL; + size_t len = 0; + flb_sds_t remote_addr; + + ret = http_header_lookup(version, + request, + "X-Forwarded-For", + &ptr, &len); + + if (ret != 0) { + return NULL; + } + + remote_addr = flb_sds_create_len(ptr, len); + if (!remote_addr) { + return NULL; + } + + ptr = strchr(remote_addr, ','); + if (ptr) { + *ptr = '\0'; + flb_sds_len_set(remote_addr, ptr - remote_addr); + } + + /* need to trim spaces due to mk_http_header */ + if (flb_sds_trim(remote_addr) <= 0) { + flb_sds_destroy(remote_addr); + return NULL; + } + + return remote_addr; +} + +static int append_remote_addr( + msgpack_object *obj, + msgpack_unpacked *unpck, + msgpack_sbuffer *sbuf, + struct flb_http *ctx, + char *remote_addr) +{ + msgpack_object *key; + msgpack_packer pck; + size_t off = 0; + size_t key_len; + int i; + + /* check if remote_addr_key already exists */ + key_len = strlen(ctx->remote_addr_key); + for (i = 0; i < obj->via.map.size; i++) { + key = &obj->via.map.ptr[i].key; + if (key->type == MSGPACK_OBJECT_STR && + key->via.str.size == key_len && + memcmp(key->via.str.ptr, ctx->remote_addr_key, key_len) == 0) { + flb_plg_warn(ctx->ins, "remote_addr_key already present in record, skipping injection"); + return -1; + } + } + + msgpack_sbuffer_clear(sbuf); + msgpack_packer_init(&pck, sbuf, msgpack_sbuffer_write); + + /* create new map with +1 size */ + msgpack_pack_map(&pck, obj->via.map.size + 1); + + /* copy existing map entries */ + for (i = 0; i < obj->via.map.size; i++) { + msgpack_pack_object(&pck, obj->via.map.ptr[i].key); + msgpack_pack_object(&pck, obj->via.map.ptr[i].val); + } + + /* append REMOTE_ADDR entry */ + msgpack_pack_str(&pck, key_len); + msgpack_pack_str_body(&pck, ctx->remote_addr_key, key_len); + msgpack_pack_str(&pck, strlen(remote_addr)); + msgpack_pack_str_body(&pck, remote_addr, strlen(remote_addr)); + + /* unpack the new record */ + if (msgpack_unpack_next(unpck, sbuf->data, sbuf->size, &off) != MSGPACK_UNPACK_SUCCESS) { + flb_plg_debug(ctx->ins, "error repacking record with remote_addr"); + return -1; + } + + return 0; +} + static int process_pack_record(struct flb_http *ctx, struct flb_time *tm, flb_sds_t tag, msgpack_object *record) @@ -265,7 +359,7 @@ static int process_pack_record(struct flb_http *ctx, struct flb_time *tm, return 0; } -int process_pack(struct flb_http *ctx, flb_sds_t tag, char *buf, size_t size) +int process_pack(struct flb_http *ctx, flb_sds_t tag, char *buf, size_t size, void *request) { int ret; size_t off = 0; @@ -275,17 +369,40 @@ int process_pack(struct flb_http *ctx, flb_sds_t tag, char *buf, size_t size) msgpack_object *obj; msgpack_object record; flb_sds_t tag_from_record = NULL; + + flb_sds_t remote_addr = NULL; + msgpack_unpacked appended_result; + msgpack_sbuffer appended_sbuf; + int appended_initialized = 0; flb_time_get(&tm); + if (ctx->add_remote_addr == FLB_TRUE && ctx->remote_addr_key != NULL) { + remote_addr = get_remote_addr(request, HTTP_PROTOCOL_VERSION_11); + } + msgpack_unpacked_init(&result); while (msgpack_unpack_next(&result, buf, size, &off) == MSGPACK_UNPACK_SUCCESS) { - obj = &result.data; - if (result.data.type == MSGPACK_OBJECT_MAP) { + obj = &result.data; + if (remote_addr != NULL && flb_sds_len(remote_addr) > 0) { + if (!appended_initialized) { + /* doing this only once, since it can be cleared and reused */ + msgpack_sbuffer_init(&appended_sbuf); + appended_initialized = 1; + } else if (appended_result.zone != NULL) { + msgpack_unpacked_destroy(&appended_result); + } + + /* if we fail to append, we just continue with the original object */ + msgpack_unpacked_init(&appended_result); + if (append_remote_addr(obj, &appended_result, &appended_sbuf, ctx, remote_addr) == 0) { + obj = &appended_result.data; + } + } + tag_from_record = NULL; if (ctx->tag_key) { - obj = &result.data; tag_from_record = tag_key(ctx, obj); } @@ -307,8 +424,25 @@ int process_pack(struct flb_http *ctx, flb_sds_t tag, char *buf, size_t size) flb_log_event_encoder_reset(&ctx->log_encoder); } else if (result.data.type == MSGPACK_OBJECT_ARRAY) { + obj = &result.data; for (i = 0; i < obj->via.array.size; i++) { record = obj->via.array.ptr[i]; + if (record.type == MSGPACK_OBJECT_MAP && + remote_addr != NULL && flb_sds_len(remote_addr) > 0) { + if (!appended_initialized) { + /* doing this only once, since it can be cleared and reused */ + msgpack_sbuffer_init(&appended_sbuf); + appended_initialized = 1; + } else if (appended_result.zone != NULL) { + msgpack_unpacked_destroy(&appended_result); + } + + /* if we fail to append, we just continue with the original object */ + msgpack_unpacked_init(&appended_result); + if (append_remote_addr(&record, &appended_result, &appended_sbuf, ctx, remote_addr) == 0) { + record = appended_result.data; + } + } tag_from_record = NULL; if (ctx->tag_key) { @@ -347,22 +481,41 @@ int process_pack(struct flb_http *ctx, flb_sds_t tag, char *buf, size_t size) result.data.type); msgpack_unpacked_destroy(&result); + if (remote_addr) { + flb_sds_destroy(remote_addr); + } + return -1; } } msgpack_unpacked_destroy(&result); + if (appended_initialized) { + msgpack_unpacked_destroy(&appended_result); + msgpack_sbuffer_destroy(&appended_sbuf); + } + + if(remote_addr) { + flb_sds_destroy(remote_addr); + } return 0; log_event_error: msgpack_unpacked_destroy(&result); + if (appended_initialized) { + msgpack_unpacked_destroy(&appended_result); + msgpack_sbuffer_destroy(&appended_sbuf); + } + if (remote_addr) { + flb_sds_destroy(remote_addr); + } flb_plg_error(ctx->ins, "Error encoding record : %d", ret); return ret; } static ssize_t parse_payload_json(struct flb_http *ctx, flb_sds_t tag, - char *payload, size_t size) + char *payload, size_t size, void *request) { int ret; int out_size; @@ -392,14 +545,14 @@ static ssize_t parse_payload_json(struct flb_http *ctx, flb_sds_t tag, } /* Process the packaged JSON and return the last byte used */ - ret = process_pack(ctx, tag, pack, out_size); + ret = process_pack(ctx, tag, pack, out_size, request); flb_free(pack); return ret; } static ssize_t parse_payload_urlencoded(struct flb_http *ctx, flb_sds_t tag, - char *payload, size_t size) + char *payload, size_t size, void *request) { int i; int idx = 0; @@ -494,7 +647,12 @@ static ssize_t parse_payload_urlencoded(struct flb_http *ctx, flb_sds_t tag, msgpack_pack_str_body(&pck, vals[i], flb_sds_len(vals[i])); } - ret = process_pack(ctx, tag, sbuf.data, sbuf.size); + if(ctx->enable_http2) { + ret = process_pack_ng(ctx, tag, sbuf.data, sbuf.size, request); + } + else { + ret = process_pack(ctx, tag, sbuf.data, sbuf.size, request); + } decode_error: for (idx = 0; idx < mk_list_size(kvs); idx++) { @@ -515,7 +673,6 @@ static ssize_t parse_payload_urlencoded(struct flb_http *ctx, flb_sds_t tag, return ret; } - /* * We use two backends for HTTP parsing and it depends on the version of the * protocol: @@ -583,7 +740,6 @@ static int http_header_lookup(int version, void *ptr, char *key, return -1; } - static \ int uncompress_zlib(struct flb_http *ctx, char **output_buffer, @@ -813,10 +969,10 @@ static int process_payload(struct flb_http *ctx, struct http_conn *conn, } if (type == HTTP_CONTENT_JSON) { - ret = parse_payload_json(ctx, tag, request->data.data, request->data.len); + ret = parse_payload_json(ctx, tag, request->data.data, request->data.len, request); } else if (type == HTTP_CONTENT_URLENCODED) { - ret = parse_payload_urlencoded(ctx, tag, request->data.data, request->data.len); + ret = parse_payload_urlencoded(ctx, tag, request->data.data, request->data.len, request); } if (uncompressed_data != NULL) { @@ -1038,7 +1194,7 @@ static int send_response_ng(struct flb_http_response *response, return 0; } -static int process_pack_ng(struct flb_http *ctx, flb_sds_t tag, char *buf, size_t size) +static int process_pack_ng(struct flb_http *ctx, flb_sds_t tag, char *buf, size_t size, void *request) { int ret; size_t off = 0; @@ -1049,13 +1205,38 @@ static int process_pack_ng(struct flb_http *ctx, flb_sds_t tag, char *buf, size_ msgpack_object record; flb_sds_t tag_from_record = NULL; + flb_sds_t remote_addr = NULL; + msgpack_unpacked appended_result; + msgpack_sbuffer appended_sbuf; + int appended_initialized = 0; + flb_time_get(&tm); + if (ctx->add_remote_addr == FLB_TRUE && ctx->remote_addr_key != NULL) { + remote_addr = get_remote_addr(request, HTTP_PROTOCOL_VERSION_20); + } + msgpack_unpacked_init(&result); while (msgpack_unpack_next(&result, buf, size, &off) == MSGPACK_UNPACK_SUCCESS) { if (result.data.type == MSGPACK_OBJECT_MAP) { - tag_from_record = NULL; obj = &result.data; + if (remote_addr != NULL && flb_sds_len(remote_addr) > 0) { + if (!appended_initialized) { + /* doing this only once, since it can be cleared and reused */ + msgpack_sbuffer_init(&appended_sbuf); + appended_initialized = 1; + } else if (appended_result.zone != NULL) { + msgpack_unpacked_destroy(&appended_result); + } + + /* if we fail to append, we just continue with the original object */ + msgpack_unpacked_init(&appended_result); + if (append_remote_addr(obj, &appended_result, &appended_sbuf, ctx, remote_addr) == 0) { + obj = &appended_result.data; + } + } + + tag_from_record = NULL; if (ctx->tag_key) { tag_from_record = tag_key(ctx, obj); @@ -1083,6 +1264,22 @@ static int process_pack_ng(struct flb_http *ctx, flb_sds_t tag, char *buf, size_ for (i = 0; i < obj->via.array.size; i++) { record = obj->via.array.ptr[i]; + if (record.type == MSGPACK_OBJECT_MAP && + remote_addr != NULL && flb_sds_len(remote_addr) > 0) { + if (!appended_initialized) { + /* doing this only once, since it can be cleared and reused */ + msgpack_sbuffer_init(&appended_sbuf); + appended_initialized = 1; + } else if (appended_result.zone != NULL) { + msgpack_unpacked_destroy(&appended_result); + } + + /* if we fail to append, we just continue with the original object */ + msgpack_unpacked_init(&appended_result); + if (append_remote_addr(&record, &appended_result, &appended_sbuf, ctx, remote_addr) == 0) { + record = appended_result.data; + } + } tag_from_record = NULL; if (ctx->tag_key) { @@ -1121,17 +1318,36 @@ static int process_pack_ng(struct flb_http *ctx, flb_sds_t tag, char *buf, size_ result.data.type); msgpack_unpacked_destroy(&result); + if (remote_addr) { + flb_sds_destroy(remote_addr); + } return -1; } } msgpack_unpacked_destroy(&result); + if (appended_initialized) { + msgpack_unpacked_destroy(&appended_result); + msgpack_sbuffer_destroy(&appended_sbuf); + } + if (remote_addr) { + flb_sds_destroy(remote_addr); + } + return 0; log_event_error: flb_plg_error(ctx->ins, "Error encoding record : %d", ret); msgpack_unpacked_destroy(&result); + if (appended_initialized) { + msgpack_unpacked_destroy(&appended_result); + msgpack_sbuffer_destroy(&appended_sbuf); + } + if (remote_addr) { + flb_sds_destroy(remote_addr); + } + return -1; } @@ -1172,7 +1388,7 @@ static ssize_t parse_payload_json_ng(flb_sds_t tag, } /* Process the packaged JSON and return the last byte used */ - ret = process_pack_ng(ctx, tag, pack, out_size); + ret = process_pack_ng(ctx, tag, pack, out_size, request); flb_free(pack); return ret; @@ -1217,7 +1433,7 @@ static int process_payload_ng(flb_sds_t tag, ctx = (struct flb_http *) request->stream->user_data; payload = (char *) request->body; if (payload) { - return parse_payload_urlencoded(ctx, tag, payload, cfl_sds_len(payload)); + return parse_payload_urlencoded(ctx, tag, payload, cfl_sds_len(payload), request); } } diff --git a/tests/runtime/in_http.c b/tests/runtime/in_http.c index 66ddaea5230..feaf9fce23a 100644 --- a/tests/runtime/in_http.c +++ b/tests/runtime/in_http.c @@ -671,6 +671,125 @@ void flb_test_http_tag_key_with_array_input() test_http_tag_key("[{\"tag\":\"new_tag\",\"test\":\"msg\"}]"); } +void test_http_add_remote_addr(char *input, char *xff_content, char *expected_ip, char *http2_cfg) +{ + struct flb_lib_out_cb cb_data; + struct test_ctx *ctx; + struct flb_http_client *c; + int ret; + int num; + size_t b_sent; + char expected_buffer[64]; + + char *buf = input; + + clear_output_num(); + + cb_data.cb = cb_check_result_json; + ret = snprintf(expected_buffer, sizeof(expected_buffer),"\"test\":\"msg\",\"REMOTE_ADDR\":\"%s\"", expected_ip); + + if(!TEST_CHECK(ret > 0)) { + TEST_MSG("snprintf failed"); + exit(EXIT_FAILURE); + } + cb_data.data = expected_buffer; + + ctx = test_ctx_create(&cb_data); + if (!TEST_CHECK(ctx != NULL)) { + TEST_MSG("test_ctx_create failed"); + exit(EXIT_FAILURE); + } + + ret = flb_input_set(ctx->flb, ctx->i_ffd, + "add_remote_addr", "true", + "http2", http2_cfg, + NULL); + TEST_CHECK(ret == 0); + + ret = flb_output_set(ctx->flb, ctx->o_ffd, + "match", "*", + "format", "json", + NULL); + TEST_CHECK(ret == 0); + + /* Start the engine */ + ret = flb_start(ctx->flb); + TEST_CHECK(ret == 0); + + ctx->httpc = http_client_ctx_create(); + TEST_CHECK(ctx->httpc != NULL); + + c = flb_http_client(ctx->httpc->u_conn, FLB_HTTP_POST, "/", buf, strlen(buf), + "127.0.0.1", 9880, NULL, 0); + ret = flb_http_add_header(c, FLB_HTTP_HEADER_CONTENT_TYPE, strlen(FLB_HTTP_HEADER_CONTENT_TYPE), + JSON_CONTENT_TYPE, strlen(JSON_CONTENT_TYPE)); + TEST_CHECK(ret == 0); + if (!TEST_CHECK(c != NULL)) { + TEST_MSG("http_client failed"); + exit(EXIT_FAILURE); + } + + /* Add XFF header (undefined in the flb_http_client) */ + ret = flb_http_add_header(c, "X-Forwarded-For", strlen("X-Forwarded-For"), + xff_content, strlen(xff_content)); + TEST_CHECK(ret == 0); + if (!TEST_CHECK(c != NULL)) { + TEST_MSG("http_client failed (XFF header)"); + exit(EXIT_FAILURE); + } + + ret = flb_http_do(c, &b_sent); + if (!TEST_CHECK(ret == 0)) { + TEST_MSG("ret error. ret=%d\n", ret); + } + else if (!TEST_CHECK(b_sent > 0)){ + TEST_MSG("b_sent size error. b_sent = %lu\n", b_sent); + } + else if (!TEST_CHECK(c->resp.status == 201)) { + TEST_MSG("http response code error. expect: 201, got: %d\n", c->resp.status); + } + + /* waiting to flush */ + flb_time_msleep(1500); + + num = get_output_num(); + if (!TEST_CHECK(num > 0)) { + TEST_MSG("no outputs"); + } + flb_http_client_destroy(c); + flb_upstream_conn_release(ctx->httpc->u_conn); + test_ctx_destroy(ctx); +} + +/* Test if remote_addr injection is skipped if remote_addr_key is already present */ +void flb_test_http_remote_addr_skip_colliding_ng() +{ + test_http_add_remote_addr("{\"test\":\"msg\",\"REMOTE_ADDR\":\"old\"}", "1.2.3.4, 5.6.7.8", "old", "true"); +} + +/* Test flow through next gen http server */ +void flb_test_http_remote_addr_map_ng() +{ + test_http_add_remote_addr("{\"test\":\"msg\"}", "1.2.3.4, 5.6.7.8", "1.2.3.4", "true"); +} + +void flb_test_http_remote_addr_array_ng() +{ + test_http_add_remote_addr("[{\"test\":\"msg\"}]", "1.2.3.4, 5.6.7.8", "1.2.3.4", "true"); +} + +/* Test flow through legacy http server (monkey) */ +void flb_test_http_remote_addr_map() +{ + test_http_add_remote_addr("{\"test\":\"msg\"}", "1.2.3.4, 5.6.7.8", "1.2.3.4", "false"); +} + +void flb_test_http_remote_addr_array() +{ + test_http_add_remote_addr("[{\"test\":\"msg\"}]", "1.2.3.4, 5.6.7.8", "1.2.3.4", "false"); +} + + TEST_LIST = { {"http", flb_test_http}, {"successful_response_code_200", flb_test_http_successful_response_code_200}, @@ -679,5 +798,10 @@ TEST_LIST = { {"failure_response_code_400_bad_disk_write", flb_test_http_failure_400_bad_disk_write}, {"tag_key_with_map_input", flb_test_http_tag_key_with_map_input}, {"tag_key_with_array_input", flb_test_http_tag_key_with_array_input}, + {"add_remote_addr_skip_colliding_ng", flb_test_http_remote_addr_skip_colliding_ng}, + {"add_remote_addr_map_ng", flb_test_http_remote_addr_map_ng}, + {"add_remote_addr_array_ng", flb_test_http_remote_addr_array_ng}, + {"add_remote_addr_map", flb_test_http_remote_addr_map}, + {"add_remote_addr_array", flb_test_http_remote_addr_array}, {NULL, NULL} };