Skip to content

Retries only attempted on some connectivity issues #29

@jakedorne

Description

@jakedorne

Hey there,

We're seeing transient network issues cause some unexpected behavior with this connector. While I'm not sure of the exact network issues causing this, I can reproduce using a local docker compose and playing around the docker networks.

I have a setup where I've spun up a redis instance and kafka broker + connect and if I do the following:

docker stop redis
docker start redis

I see instant attempts to reconnect in the connector logs (as expected):

[2023-10-10 00:24:45,127] INFO Reconnecting, last destination was redis:6379 (com.redis.kafka.connect.shaded.io.lettuce.core.protocol.ConnectionWatchdog)
[2023-10-10 00:24:45,128] WARN Cannot reconnect to [redis:6379]: redis (com.redis.kafka.connect.shaded.io.lettuce.core.protocol.ConnectionWatchdog)
java.net.UnknownHostException: redis
	at java.base/java.net.InetAddress$CachedAddresses.get(InetAddress.java:797)
	at java.base/java.net.InetAddress.getAllByName0(InetAddress.java:1524)
	at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1382)
	at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1306)
	at java.base/java.net.InetAddress.getByName(InetAddress.java:1256)
	at com.redis.kafka.connect.shaded.io.netty.util.internal.SocketUtils$8.run(SocketUtils.java:156)
	at com.redis.kafka.connect.shaded.io.netty.util.internal.SocketUtils$8.run(SocketUtils.java:153)
	at java.base/java.security.AccessController.doPrivileged(Native Method)
	at com.redis.kafka.connect.shaded.io.netty.util.internal.SocketUtils.addressByName(SocketUtils.java:153)
	at com.redis.kafka.connect.shaded.io.netty.resolver.DefaultNameResolver.doResolve(DefaultNameResolver.java:41)
	at com.redis.kafka.connect.shaded.io.netty.resolver.SimpleNameResolver.resolve(SimpleNameResolver.java:61)
	at com.redis.kafka.connect.shaded.io.netty.resolver.SimpleNameResolver.resolve(SimpleNameResolver.java:53)
	at com.redis.kafka.connect.shaded.io.netty.resolver.InetSocketAddressResolver.doResolve(InetSocketAddressResolver.java:55)
	at com.redis.kafka.connect.shaded.io.netty.resolver.InetSocketAddressResolver.doResolve(InetSocketAddressResolver.java:31)
	at com.redis.kafka.connect.shaded.io.netty.resolver.AbstractAddressResolver.resolve(AbstractAddressResolver.java:106)
	at com.redis.kafka.connect.shaded.io.netty.bootstrap.Bootstrap.doResolveAndConnect0(Bootstrap.java:206)
	at com.redis.kafka.connect.shaded.io.netty.bootstrap.Bootstrap.access$000(Bootstrap.java:46)
	at com.redis.kafka.connect.shaded.io.netty.bootstrap.Bootstrap$1.operationComplete(Bootstrap.java:180)
	at com.redis.kafka.connect.shaded.io.netty.bootstrap.Bootstrap$1.operationComplete(Bootstrap.java:166)
	at com.redis.kafka.connect.shaded.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:590)
	at com.redis.kafka.connect.shaded.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:557)
	at com.redis.kafka.connect.shaded.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:492)
	at com.redis.kafka.connect.shaded.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:636)
	at com.redis.kafka.connect.shaded.io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:625)
	at com.redis.kafka.connect.shaded.io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:105)
	at com.redis.kafka.connect.shaded.io.netty.channel.DefaultChannelPromise.trySuccess(DefaultChannelPromise.java:84)
	at com.redis.kafka.connect.shaded.io.netty.channel.AbstractChannel$AbstractUnsafe.safeSetSuccess(AbstractChannel.java:990)
	at com.redis.kafka.connect.shaded.io.netty.channel.AbstractChannel$AbstractUnsafe.register0(AbstractChannel.java:516)
	at com.redis.kafka.connect.shaded.io.netty.channel.AbstractChannel$AbstractUnsafe.access$200(AbstractChannel.java:429)
	at com.redis.kafka.connect.shaded.io.netty.channel.AbstractChannel$AbstractUnsafe$1.run(AbstractChannel.java:486)
	at com.redis.kafka.connect.shaded.io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:174)
	at com.redis.kafka.connect.shaded.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:167)
	at com.redis.kafka.connect.shaded.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
	at com.redis.kafka.connect.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:569)
	at com.redis.kafka.connect.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
	at com.redis.kafka.connect.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at com.redis.kafka.connect.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.base/java.lang.Thread.run(Thread.java:829)

when redis is started again the connector will then reconnect and carry on as normal

[2023-10-10 00:25:25,028] INFO Reconnected to redis:6379 (com.redis.kafka.connect.shaded.io.lettuce.core.protocol.ReconnectionHandler)
[2023-10-10 00:25:22,876] INFO Wrote 0 records (com.redis.kafka.connect.sink.RedisSinkTask)

However, if I disconnect redis from the docker network I'm using for connectivity between the redis instance and connect, restart redis, and re-connect to the network:

docker network disconnect kafka-connect-local_redis redis
docker stop redis
docker start redis
docker network connect kafka-connect-local_redis redis

then the connector carries on as if nothing is wrong until the next time a poll returns records, at which point the connector fails:

[2023-10-10 20:12:56,941] WARN Could not write 1 records (com.redis.kafka.connect.sink.RedisSinkTask)
io.lettuce.core.RedisException: java.io.IOException: Connection reset by peer
	at io.lettuce.core.internal.Exceptions.bubble(Exceptions.java:83)
	at io.lettuce.core.internal.Exceptions.fromSynchronization(Exceptions.java:109)
	at io.lettuce.core.internal.Futures.awaitAll(Futures.java:226)
	at io.lettuce.core.LettuceFutures.awaitAll(LettuceFutures.java:59)
	at com.redis.spring.batch.RedisItemWriter.write(RedisItemWriter.java:74)
	at com.redis.kafka.connect.sink.RedisSinkTask.put(RedisSinkTask.java:304)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:588)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:334)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:235)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:204)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:201)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:256)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.io.IOException: Connection reset by peer
	at java.base/sun.nio.ch.FileDispatcherImpl.read0(Native Method)
	at java.base/sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
	at java.base/sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:276)
	at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:233)
	at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:223)
	at java.base/sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:356)
	at io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:259)
	at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1132)
	at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:357)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:151)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	... 1 more
[2023-10-10 20:12:56,947] ERROR WorkerSinkTask{id=debug_redis_sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: java.io.IOException: Connection reset by peer (org.apache.kafka.connect.runtime.WorkerSinkTask)
io.lettuce.core.RedisException: java.io.IOException: Connection reset by peer
	at io.lettuce.core.internal.Exceptions.bubble(Exceptions.java:83)
	at io.lettuce.core.internal.Futures.awaitOrCancel(Futures.java:250)
	at io.lettuce.core.FutureSyncInvocationHandler.handleInvocation(FutureSyncInvocationHandler.java:75)
	at io.lettuce.core.internal.AbstractInvocationHandler.invoke(AbstractInvocationHandler.java:80)
	at com.sun.proxy.$Proxy52.mset(Unknown Source)
	at com.redis.kafka.connect.sink.RedisSinkTask.put(RedisSinkTask.java:335)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:588)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:334)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:235)
	at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:204)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:201)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:256)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.io.IOException: Connection reset by peer
	at java.base/sun.nio.ch.FileDispatcherImpl.read0(Native Method)
	at java.base/sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
	at java.base/sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:276)
	at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:233)
	at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:223)
	at java.base/sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:356)
	at io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:259)
	at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1132)
	at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:357)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:151)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	... 1 more

My hope would be that the connector behaves the same in both scenarios and attempts to reconnect before failing. I've had a look at the sink connector code and to be honest I'm not quite sure where a fix would belong for this (whether it should be a fix to this repo, lettuce, or spring-batch-redis.

Any help appreciated, cheers. 😃

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions