-
Notifications
You must be signed in to change notification settings - Fork 636
Description
As stated in this StackOverflow question, when an application has a function definition that composes two other functions — a reactive Function
followed by an imperative Consumer
—, the first function gets invoked twice, even though the Consumer is invoked only once, as expected.
The issue can be reproduced (and debugged) with this sample project.
As @artembilan has already looked into this by answering the question, I'll transcribe his answer for convenience:
So, after making this change into your code:
LOGGER.info("Will transform this message's payload to upper case: {}", message);
I see this in logs:
2025-08-28T12:18:26.201-04:00 INFO 37308 --- [function-composition-test] [flux-http-nio-2] j.j.s.i.s.FunctionCompositionApplication : Will transform this message's payload to upper case: GenericMessage [payload=functional composition test, headers={Accept=application/json, application/*+json, host=localhost:57957, id=c2e496dc-90e1-bbca-31ea-bc890f9de5bd, Content-Length=27, uri=http://localhost:57957/test/uppercase,log, accept-encoding=gzip, user-agent=ReactorNetty/1.2.9, Content-Type=text/plain, timestamp=1756397906201}]
2025-08-28T12:18:26.223-04:00 INFO 37308 --- [function-composition-test] [flux-http-nio-2] j.j.s.i.s.FunctionCompositionApplication : Will transform this message's payload to upper case: GenericMessage [payload=functional composition test, headers={Accept=application/json, application/*+json, host=localhost:57957, id=919af0da-5461-b6ab-ddaf-40cd2b77e80a, Content-Length=27, uri=http://localhost:57957/test/uppercase,log, accept-encoding=gzip, user-agent=ReactorNetty/1.2.9, Content-Type=text/plain, timestamp=1756397906223}]
That means that we have two different messages with the same payload.
After some debugging, I see that we have subscribe to the inputFlux
twice:
SimpleFunctionRegistry.ConsumerWrapper
:
public void accept(Flux messageFlux) {
messageFlux.doOnNext(this.targetConsumer).subscribe();
}
FunctionWebRequestProcessingHelper
:
Object result = function.apply(inputMessage);
if (function.isConsumer()) {
if (result instanceof Publisher) {
Mono.from((Publisher) result).subscribe();
}
return "DELETE".equals(wrapper.getMethod()) ?
Mono.empty() : Mono.just(ResponseEntity.accepted().headers(HeaderUtils.sanitize(headers, ignoredHeaders, requestOnlyHeaders)).build());
}
There is some disconnection between return from the reactive function call and that supplier invocation:
result = fluxInput
.transform(flux -> {
flux = Flux.from((Publisher) flux).map(v -> this.extractValueFromOriginalValueHolderIfNecessary(v));
((Consumer) this.target).accept(flux);
return Mono.ignoreElements((Flux) flux);
}).then();
That's, probably, why we don't see a a double processing on the consumer side, but the flux in the function is still called twice because of those two subscriptions.
Feels like a bug somewhere in the
SimpleFunctionRegistry.invokeConsumer()
.