Skip to content

bug/question: Subscription Callbacks don't seem to work with ApolloFederatedTracingHeaderForwarder #2077

@hassanzareef

Description

@hassanzareef

Please read our contributor guide before
creating an issue.

Context

Trying to implement subscriptions with Apollo Router and Netflix DGS (Integration with Spring-GraphQL), using multipart HTTP for client - router communication and HTTP callbacks for router-subgraph communication.

Using the Netflix DGS Spring GraphQL Integration and the apollo federation subscription callback support I was able to get it working. However when I add the DGS configuration for the ApolloFederatedTracingHeaderForwarder, the request never reaches the subgraph datafetcher and throws an error.

Expected behavior

Datafetcher to return the desired subscription result.

Actual behavior

Request doesn't reach subgraph datafetcher and instead throws an error.

{"@timestamp":"2024-11-26T15:36:37.642383-05:00","@version":"1","message":"Subscription terminated abnormally due to exception","logger_name":"com.apollographql.subscription.callback.SubscriptionCallbackHandler","thread_name":"loomBoundedElastic-2","level":"ERROR","level_value":40000,"stack_trace":"java.lang.NullPointerException: get(...) must not be null\n\tat com.netflix.graphql.dgs.context.DgsContext$Companion.from(DgsContext.kt:46)\n\tSuppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: \nAssembly trace from producer [reactor.core.publisher.MonoDeferContextual] :\n\treactor.core.publisher.Mono.deferContextual(Mono.java:235)\n\torg.springframework.graphql.execution.DefaultExecutionGraphQlService.execute(DefaultExecutionGraphQlService.java:90)\nError has been observed at the following site(s):\n\t*__Mono.deferContextual ⇢ at org.springframework.graphql.execution.DefaultExecutionGraphQlService.execute(DefaultExecutionGraphQlService.java:90)\n\t|_ Mono.flatMapMany ⇢ at com.apollographql.subscription.callback.SubscriptionCallbackHandler.startSubscription(SubscriptionCallbackHandler.java:111)\n\t|_ Flux.publishOn ⇢ at com.apollographql.subscription.callback.SubscriptionCallbackHandler.startSubscription(SubscriptionCallbackHandler.java:145)\n\t|_ Flux.concatMap ⇢ at com.apollographql.subscription.callback.SubscriptionCallbackHandler.startSubscription(SubscriptionCallbackHandler.java:146)\nOriginal Stack Trace:\n\t\tat com.netflix.graphql.dgs.context.DgsContext$Companion.from(DgsContext.kt:46)\n\t\tat com.netflix.graphql.dgs.context.GraphQLContextContributorInstrumentation.createState(GraphQLContextContributorInstrumentation.kt:42)\n\t\tat graphql.execution.instrumentation.SimplePerformantInstrumentation.createStateAsync(SimplePerformantInstrumentation.java:61)\n\t\tat graphql.execution.instrumentation.ChainedInstrumentation$ChainedInstrumentationState.combineAll(ChainedInstrumentation.java:382)\n\t\tat graphql.execution.instrumentation.ChainedInstrumentation.createStateAsync(ChainedInstrumentation.java:96)\n\t\tat graphql.GraphQL.executeAsync(GraphQL.java:427)\n\t\tat org.springframework.graphql.execution.DefaultExecutionGraphQlService.lambda$execute$2(DefaultExecutionGraphQlService.java:104)\n\t\tat reactor.core.publisher.MonoDeferContextual.subscribe(MonoDeferContextual.java:47)\n\t\tat reactor.core.publisher.InternalFluxOperator.subscribe(InternalFluxOperator.java:68)\n\t\tat reactor.core.publisher.FluxPublish.connect(FluxPublish.java:106)\n\t\tat reactor.core.publisher.FluxRefCount.subscribe(FluxRefCount.java:88)\n\t\tat reactor.core.publisher.Mono.subscribe(Mono.java:4512)\n\t\tat reactor.core.publisher.FluxTakeUntilOther.subscribeOrReturn(FluxTakeUntilOther.java:57)\n\t\tat reactor.core.publisher.Flux.subscribe(Flux.java:8762)\n\t\tat reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:430)\n\t\tat reactor.core.publisher.FluxArray$ArraySubscription.slowPath(FluxArray.java:126)\n\t\tat reactor.core.publisher.FluxArray$ArraySubscription.request(FluxArray.java:99)\n\t\tat reactor.core.publisher.FluxFlatMap$FlatMapMain.onSubscribe(FluxFlatMap.java:373)\n\t\tat reactor.core.publisher.FluxMerge.subscribe(FluxMerge.java:73)\n\t\tat reactor.core.publisher.Flux.subscribe(Flux.java:8777)\n\t\tat reactor.core.publisher.Flux.subscribeWith(Flux.java:8898)\n\t\tat reactor.core.publisher.Flux.subscribe(Flux.java:8742)\n\t\tat reactor.core.publisher.Flux.subscribe(Flux.java:8666)\n\t\tat reactor.core.publisher.Flux.subscribe(Flux.java:8584)\n\t\tat com.apollographql.subscription.callback.SubscriptionCallbackHandler.lambda$handleSubscriptionUsingCallback$0(SubscriptionCallbackHandler.java:80)\n\t\tat reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onSubscribe(FluxPeekFuseable.java:169)\n\t\tat reactor.core.publisher.MonoSubscribeOnCallable.subscribe(MonoSubscribeOnCallable.java:49)\n\t\tat reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:76)\n\t\tat reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:165)\n\t\tat reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107)\n\t\tat reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onNext(FluxDoFinally.java:113)\n\t\tat reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:180)\n\t\tat reactor.core.publisher.FluxPeekFuseable$PeekFuseableConditionalSubscriber.onNext(FluxPeekFuseable.java:503)\n\t\tat reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:74)\n\t\tat reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)\n\t\tat reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)\n\t\tat reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)\n\t\tat reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)\n\t\tat reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)\n\t\tat reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)\n\t\tat reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:82)\n\t\tat reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:250)\n\t\tat reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107)\n\t\tat reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2571)\n\t\tat reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.request(FluxContextWrite.java:136)\n\t\tat reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onSubscribeInner(MonoFlatMapMany.java:150)\n\t\tat reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onSubscribe(MonoFlatMapMany.java:245)\n\t\tat reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onSubscribe(FluxContextWrite.java:101)\n\t\tat reactor.core.publisher.MonoJust.subscribe(MonoJust.java:55)\n\t\tat reactor.core.publisher.InternalFluxOperator.subscribe(InternalFluxOperator.java:68)\n\t\tat reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onNext(MonoFlatMapMany.java:195)\n\t\tat reactor.core.publisher.SerializedSubscriber.onNext(SerializedSubscriber.java:99)\n\t\tat reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.onNext(FluxRetryWhen.java:177)\n\t\tat reactor.core.publisher.MonoCreate$DefaultMonoSink.success(MonoCreate.java:176)\n\t\tat reactor.netty.http.client.HttpClientConnect$HttpIOHandlerObserver.onStateChange(HttpClientConnect.java:435)\n\t\tat reactor.netty.ReactorNetty$CompositeConnectionObserver.onStateChange(ReactorNetty.java:715)\n\t\tat reactor.netty.resources.DefaultPooledConnectionProvider$DisposableAcquire.onStateChange(DefaultPooledConnectionProvider.java:205)\n\t\tat reactor.netty.resources.DefaultPooledConnectionProvider$PooledConnection.onStateChange(DefaultPooledConnectionProvider.java:466)\n\t\tat reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:725)\n\t\tat reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:115)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)\n\t\tat io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)\n\t\tat io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)\n\t\tat io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)\n\t\tat io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318)\n\t\tat io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)\n\t\tat io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1357)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)\n\t\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\t\tat io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:868)\n\t\tat io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)\n\t\tat io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)\n\t\tat io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)\n\t\tat io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)\n\t\tat io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)\n\t\tat io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)\n\t\tat io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)\n\t\tat io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)\n\t\tat java.base/java.lang.Thread.run(Thread.java:1583)\n"}

Steps to reproduce

Implement A simple federated subscription through Apollo router and netflix DGS, with HTTP callbacks for router - subgraph communication.

DataFetcher:

@DgsComponent
public class SubscriptionDataFetcher {
  @DgsSubscription
  public Publisher<String> testSub(
          @InputArgument String name
  ) {
    return  Flux.just(name + "1", name + "2", name + "3", name + "4", name + "5", name + "6", name + "7")
            .delayElements(Duration.ofMillis(200));
  }
}

GraphQL HTTP callback config:

@Configuration
public class GraphQLConfiguration {

  @Bean
  public SubscriptionCallbackHandler callbackHandler(ExecutionGraphQlService graphQlService) {
    return new SubscriptionCallbackHandler(graphQlService);
  }

  // This interceptor defaults to Ordered#LOWEST_PRECEDENCE order as it should run last in chain
  // to allow users to still apply other interceptors that handle common stuff (e.g. extracting
  // auth headers, etc).
  // You can override this behavior by specifying custom order.
  @Bean
  public CallbackWebGraphQLInterceptor callbackGraphQlInterceptor(
        SubscriptionCallbackHandler callbackHandler) {
    return new CallbackWebGraphQLInterceptor(callbackHandler);
  }
}

Question

Wondering if this is a bug with the DGS framework causing the issue, because if I remove ApolloFederatedTracingHeaderForwarder file, it works fine, and I get the desired output.

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions