Skip to content

The first part of a composed function that ends with a Consumer gets invoked twice #1302

@rmarianni

Description

@rmarianni

As stated in this StackOverflow question, when an application has a function definition that composes two other functions — a reactive Function followed by an imperative Consumer —, the first function gets invoked twice, even though the Consumer is invoked only once, as expected.

The issue can be reproduced (and debugged) with this sample project.

As @artembilan has already looked into this by answering the question, I'll transcribe his answer for convenience:

So, after making this change into your code:

LOGGER.info("Will transform this message's payload to upper case: {}", message);

I see this in logs:

2025-08-28T12:18:26.201-04:00  INFO 37308 --- [function-composition-test] [flux-http-nio-2] j.j.s.i.s.FunctionCompositionApplication : Will transform this message's payload to upper case: GenericMessage [payload=functional composition test, headers={Accept=application/json, application/*+json, host=localhost:57957, id=c2e496dc-90e1-bbca-31ea-bc890f9de5bd, Content-Length=27, uri=http://localhost:57957/test/uppercase,log, accept-encoding=gzip, user-agent=ReactorNetty/1.2.9, Content-Type=text/plain, timestamp=1756397906201}]
2025-08-28T12:18:26.223-04:00  INFO 37308 --- [function-composition-test] [flux-http-nio-2] j.j.s.i.s.FunctionCompositionApplication : Will transform this message's payload to upper case: GenericMessage [payload=functional composition test, headers={Accept=application/json, application/*+json, host=localhost:57957, id=919af0da-5461-b6ab-ddaf-40cd2b77e80a, Content-Length=27, uri=http://localhost:57957/test/uppercase,log, accept-encoding=gzip, user-agent=ReactorNetty/1.2.9, Content-Type=text/plain, timestamp=1756397906223}]

That means that we have two different messages with the same payload.
After some debugging, I see that we have subscribe to the input Flux twice:

  1. SimpleFunctionRegistry.ConsumerWrapper:
public void accept(Flux messageFlux) {
     messageFlux.doOnNext(this.targetConsumer).subscribe();
 }
  1. FunctionWebRequestProcessingHelper:
Object result = function.apply(inputMessage);
 if (function.isConsumer()) {
     if (result instanceof Publisher) {
         Mono.from((Publisher) result).subscribe();
     }
     return "DELETE".equals(wrapper.getMethod()) ?
             Mono.empty() : Mono.just(ResponseEntity.accepted().headers(HeaderUtils.sanitize(headers, ignoredHeaders, requestOnlyHeaders)).build());
 }

There is some disconnection between return from the reactive function call and that supplier invocation:

                result = fluxInput
                        .transform(flux -> {
                            flux =  Flux.from((Publisher) flux).map(v -> this.extractValueFromOriginalValueHolderIfNecessary(v));
                            ((Consumer) this.target).accept(flux);
                            return Mono.ignoreElements((Flux) flux);
                        }).then();

That's, probably, why we don't see a a double processing on the consumer side, but the flux in the function is still called twice because of those two subscriptions.

Feels like a bug somewhere in the SimpleFunctionRegistry.invokeConsumer().

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions