-
Notifications
You must be signed in to change notification settings - Fork 325
Description
Please read our contributor guide before
creating an issue.
Context
Trying to implement subscriptions with Apollo Router and Netflix DGS (Integration with Spring-GraphQL), using multipart HTTP for client - router communication and HTTP callbacks for router-subgraph communication.
Using the Netflix DGS Spring GraphQL Integration and the apollo federation subscription callback support I was able to get it working. However when I add the DGS configuration for the ApolloFederatedTracingHeaderForwarder, the request never reaches the subgraph datafetcher and throws an error.
Expected behavior
Datafetcher to return the desired subscription result.
Actual behavior
Request doesn't reach subgraph datafetcher and instead throws an error.
{"@timestamp":"2024-11-26T15:36:37.642383-05:00","@version":"1","message":"Subscription terminated abnormally due to exception","logger_name":"com.apollographql.subscription.callback.SubscriptionCallbackHandler","thread_name":"loomBoundedElastic-2","level":"ERROR","level_value":40000,"stack_trace":"java.lang.NullPointerException: get(...) must not be null\n\tat com.netflix.graphql.dgs.context.DgsContext$Companion.from(DgsContext.kt:46)\n\tSuppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: \nAssembly trace from producer [reactor.core.publisher.MonoDeferContextual] :\n\treactor.core.publisher.Mono.deferContextual(Mono.java:235)\n\torg.springframework.graphql.execution.DefaultExecutionGraphQlService.execute(DefaultExecutionGraphQlService.java:90)\nError has been observed at the following site(s):\n\t*__Mono.deferContextual ⇢ at org.springframework.graphql.execution.DefaultExecutionGraphQlService.execute(DefaultExecutionGraphQlService.java:90)\n\t|_ Mono.flatMapMany ⇢ at com.apollographql.subscription.callback.SubscriptionCallbackHandler.startSubscription(SubscriptionCallbackHandler.java:111)\n\t|_ Flux.publishOn ⇢ at com.apollographql.subscription.callback.SubscriptionCallbackHandler.startSubscription(SubscriptionCallbackHandler.java:145)\n\t|_ Flux.concatMap ⇢ at com.apollographql.subscription.callback.SubscriptionCallbackHandler.startSubscription(SubscriptionCallbackHandler.java:146)\nOriginal Stack Trace:\n\t\tat com.netflix.graphql.dgs.context.DgsContext$Companion.from(DgsContext.kt:46)\n\t\tat com.netflix.graphql.dgs.context.GraphQLContextContributorInstrumentation.createState(GraphQLContextContributorInstrumentation.kt:42)\n\t\tat graphql.execution.instrumentation.SimplePerformantInstrumentation.createStateAsync(SimplePerformantInstrumentation.java:61)\n\t\tat graphql.execution.instrumentation.ChainedInstrumentation$ChainedInstrumentationState.combineAll(ChainedInstrumentation.java:382)\n\t\tat graphql.execution.instrumentation.ChainedInstrumentation.createStateAsync(ChainedInstrumentation.java:96)\n\t\tat graphql.GraphQL.executeAsync(GraphQL.java:427)\n\t\tat org.springframework.graphql.execution.DefaultExecutionGraphQlService.lambda$execute$2(DefaultExecutionGraphQlService.java:104)\n\t\tat reactor.core.publisher.MonoDeferContextual.subscribe(MonoDeferContextual.java:47)\n\t\tat reactor.core.publisher.InternalFluxOperator.subscribe(InternalFluxOperator.java:68)\n\t\tat reactor.core.publisher.FluxPublish.connect(FluxPublish.java:106)\n\t\tat reactor.core.publisher.FluxRefCount.subscribe(FluxRefCount.java:88)\n\t\tat reactor.core.publisher.Mono.subscribe(Mono.java:4512)\n\t\tat reactor.core.publisher.FluxTakeUntilOther.subscribeOrReturn(FluxTakeUntilOther.java:57)\n\t\tat reactor.core.publisher.Flux.subscribe(Flux.java:8762)\n\t\tat reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:430)\n\t\tat reactor.core.publisher.FluxArray$ArraySubscription.slowPath(FluxArray.java:126)\n\t\tat reactor.core.publisher.FluxArray$ArraySubscription.request(FluxArray.java:99)\n\t\tat reactor.core.publisher.FluxFlatMap$FlatMapMain.onSubscribe(FluxFlatMap.java:373)\n\t\tat reactor.core.publisher.FluxMerge.subscribe(FluxMerge.java:73)\n\t\tat reactor.core.publisher.Flux.subscribe(Flux.java:8777)\n\t\tat reactor.core.publisher.Flux.subscribeWith(Flux.java:8898)\n\t\tat reactor.core.publisher.Flux.subscribe(Flux.java:8742)\n\t\tat reactor.core.publisher.Flux.subscribe(Flux.java:8666)\n\t\tat reactor.core.publisher.Flux.subscribe(Flux.java:8584)\n\t\tat com.apollographql.subscription.callback.SubscriptionCallbackHandler.lambda$handleSubscriptionUsingCallback$0(SubscriptionCallbackHandler.java:80)\n\t\tat reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onSubscribe(FluxPeekFuseable.java:169)\n\t\tat reactor.core.publisher.MonoSubscribeOnCallable.subscribe(MonoSubscribeOnCallable.java:49)\n\t\tat reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:76)\n\t\tat reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:165)\n\t\tat reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107)\n\t\tat reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onNext(FluxDoFinally.java:113)\n\t\tat reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:180)\n\t\tat reactor.core.publisher.FluxPeekFuseable$PeekFuseableConditionalSubscriber.onNext(FluxPeekFuseable.java:503)\n\t\tat reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:74)\n\t\tat reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)\n\t\tat reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)\n\t\tat reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)\n\t\tat reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)\n\t\tat reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)\n\t\tat reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)\n\t\tat reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:82)\n\t\tat reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:250)\n\t\tat reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107)\n\t\tat reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2571)\n\t\tat reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.request(FluxContextWrite.java:136)\n\t\tat reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onSubscribeInner(MonoFlatMapMany.java:150)\n\t\tat reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onSubscribe(MonoFlatMapMany.java:245)\n\t\tat reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onSubscribe(FluxContextWrite.java:101)\n\t\tat reactor.core.publisher.MonoJust.subscribe(MonoJust.java:55)\n\t\tat reactor.core.publisher.InternalFluxOperator.subscribe(InternalFluxOperator.java:68)\n\t\tat reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onNext(MonoFlatMapMany.java:195)\n\t\tat reactor.core.publisher.SerializedSubscriber.onNext(SerializedSubscriber.java:99)\n\t\tat reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.onNext(FluxRetryWhen.java:177)\n\t\tat reactor.core.publisher.MonoCreate$DefaultMonoSink.success(MonoCreate.java:176)\n\t\tat reactor.netty.http.client.HttpClientConnect$HttpIOHandlerObserver.onStateChange(HttpClientConnect.java:435)\n\t\tat reactor.netty.ReactorNetty$CompositeConnectionObserver.onStateChange(ReactorNetty.java:715)\n\t\tat reactor.netty.resources.DefaultPooledConnectionProvider$DisposableAcquire.onStateChange(DefaultPooledConnectionProvider.java:205)\n\t\tat reactor.netty.resources.DefaultPooledConnectionProvider$PooledConnection.onStateChange(DefaultPooledConnectionProvider.java:466)\n\t\tat reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:725)\n\t\tat reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:115)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)\n\t\tat io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)\n\t\tat io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)\n\t\tat io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)\n\t\tat io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318)\n\t\tat io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)\n\t\tat io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1357)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\t\tat io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:868)\n\t\tat io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)\n\t\tat io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)\n\t\tat io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)\n\t\tat io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)\n\t\tat io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)\n\t\tat io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)\n\t\tat io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)\n\t\tat io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)\n\t\tat java.base/java.lang.Thread.run(Thread.java:1583)\n"}
Steps to reproduce
Implement A simple federated subscription through Apollo router and netflix DGS, with HTTP callbacks for router - subgraph communication.
DataFetcher:
@DgsComponent
public class SubscriptionDataFetcher {
@DgsSubscription
public Publisher<String> testSub(
@InputArgument String name
) {
return Flux.just(name + "1", name + "2", name + "3", name + "4", name + "5", name + "6", name + "7")
.delayElements(Duration.ofMillis(200));
}
}
GraphQL HTTP callback config:
@Configuration
public class GraphQLConfiguration {
@Bean
public SubscriptionCallbackHandler callbackHandler(ExecutionGraphQlService graphQlService) {
return new SubscriptionCallbackHandler(graphQlService);
}
// This interceptor defaults to Ordered#LOWEST_PRECEDENCE order as it should run last in chain
// to allow users to still apply other interceptors that handle common stuff (e.g. extracting
// auth headers, etc).
// You can override this behavior by specifying custom order.
@Bean
public CallbackWebGraphQLInterceptor callbackGraphQlInterceptor(
SubscriptionCallbackHandler callbackHandler) {
return new CallbackWebGraphQLInterceptor(callbackHandler);
}
}
Question
Wondering if this is a bug with the DGS framework causing the issue, because if I remove ApolloFederatedTracingHeaderForwarder file, it works fine, and I get the desired output.