-
Notifications
You must be signed in to change notification settings - Fork 31
Description
Hey there,
We're seeing transient network issues cause some unexpected behavior with this connector. While I'm not sure of the exact network issues causing this, I can reproduce using a local docker compose and playing around the docker networks.
I have a setup where I've spun up a redis instance and kafka broker + connect and if I do the following:
docker stop redis
docker start redis
I see instant attempts to reconnect in the connector logs (as expected):
[2023-10-10 00:24:45,127] INFO Reconnecting, last destination was redis:6379 (com.redis.kafka.connect.shaded.io.lettuce.core.protocol.ConnectionWatchdog)
[2023-10-10 00:24:45,128] WARN Cannot reconnect to [redis:6379]: redis (com.redis.kafka.connect.shaded.io.lettuce.core.protocol.ConnectionWatchdog)
java.net.UnknownHostException: redis
at java.base/java.net.InetAddress$CachedAddresses.get(InetAddress.java:797)
at java.base/java.net.InetAddress.getAllByName0(InetAddress.java:1524)
at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1382)
at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1306)
at java.base/java.net.InetAddress.getByName(InetAddress.java:1256)
at com.redis.kafka.connect.shaded.io.netty.util.internal.SocketUtils$8.run(SocketUtils.java:156)
at com.redis.kafka.connect.shaded.io.netty.util.internal.SocketUtils$8.run(SocketUtils.java:153)
at java.base/java.security.AccessController.doPrivileged(Native Method)
at com.redis.kafka.connect.shaded.io.netty.util.internal.SocketUtils.addressByName(SocketUtils.java:153)
at com.redis.kafka.connect.shaded.io.netty.resolver.DefaultNameResolver.doResolve(DefaultNameResolver.java:41)
at com.redis.kafka.connect.shaded.io.netty.resolver.SimpleNameResolver.resolve(SimpleNameResolver.java:61)
at com.redis.kafka.connect.shaded.io.netty.resolver.SimpleNameResolver.resolve(SimpleNameResolver.java:53)
at com.redis.kafka.connect.shaded.io.netty.resolver.InetSocketAddressResolver.doResolve(InetSocketAddressResolver.java:55)
at com.redis.kafka.connect.shaded.io.netty.resolver.InetSocketAddressResolver.doResolve(InetSocketAddressResolver.java:31)
at com.redis.kafka.connect.shaded.io.netty.resolver.AbstractAddressResolver.resolve(AbstractAddressResolver.java:106)
at com.redis.kafka.connect.shaded.io.netty.bootstrap.Bootstrap.doResolveAndConnect0(Bootstrap.java:206)
at com.redis.kafka.connect.shaded.io.netty.bootstrap.Bootstrap.access$000(Bootstrap.java:46)
at com.redis.kafka.connect.shaded.io.netty.bootstrap.Bootstrap$1.operationComplete(Bootstrap.java:180)
at com.redis.kafka.connect.shaded.io.netty.bootstrap.Bootstrap$1.operationComplete(Bootstrap.java:166)
at com.redis.kafka.connect.shaded.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:590)
at com.redis.kafka.connect.shaded.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:557)
at com.redis.kafka.connect.shaded.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:492)
at com.redis.kafka.connect.shaded.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:636)
at com.redis.kafka.connect.shaded.io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:625)
at com.redis.kafka.connect.shaded.io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:105)
at com.redis.kafka.connect.shaded.io.netty.channel.DefaultChannelPromise.trySuccess(DefaultChannelPromise.java:84)
at com.redis.kafka.connect.shaded.io.netty.channel.AbstractChannel$AbstractUnsafe.safeSetSuccess(AbstractChannel.java:990)
at com.redis.kafka.connect.shaded.io.netty.channel.AbstractChannel$AbstractUnsafe.register0(AbstractChannel.java:516)
at com.redis.kafka.connect.shaded.io.netty.channel.AbstractChannel$AbstractUnsafe.access$200(AbstractChannel.java:429)
at com.redis.kafka.connect.shaded.io.netty.channel.AbstractChannel$AbstractUnsafe$1.run(AbstractChannel.java:486)
at com.redis.kafka.connect.shaded.io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:174)
at com.redis.kafka.connect.shaded.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:167)
at com.redis.kafka.connect.shaded.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
at com.redis.kafka.connect.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:569)
at com.redis.kafka.connect.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at com.redis.kafka.connect.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at com.redis.kafka.connect.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:829)
when redis is started again the connector will then reconnect and carry on as normal
[2023-10-10 00:25:25,028] INFO Reconnected to redis:6379 (com.redis.kafka.connect.shaded.io.lettuce.core.protocol.ReconnectionHandler)
[2023-10-10 00:25:22,876] INFO Wrote 0 records (com.redis.kafka.connect.sink.RedisSinkTask)
However, if I disconnect redis from the docker network I'm using for connectivity between the redis instance and connect, restart redis, and re-connect to the network:
docker network disconnect kafka-connect-local_redis redis
docker stop redis
docker start redis
docker network connect kafka-connect-local_redis redis
then the connector carries on as if nothing is wrong until the next time a poll returns records, at which point the connector fails:
[2023-10-10 20:12:56,941] WARN Could not write 1 records (com.redis.kafka.connect.sink.RedisSinkTask)
io.lettuce.core.RedisException: java.io.IOException: Connection reset by peer
at io.lettuce.core.internal.Exceptions.bubble(Exceptions.java:83)
at io.lettuce.core.internal.Exceptions.fromSynchronization(Exceptions.java:109)
at io.lettuce.core.internal.Futures.awaitAll(Futures.java:226)
at io.lettuce.core.LettuceFutures.awaitAll(LettuceFutures.java:59)
at com.redis.spring.batch.RedisItemWriter.write(RedisItemWriter.java:74)
at com.redis.kafka.connect.sink.RedisSinkTask.put(RedisSinkTask.java:304)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:588)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:334)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:235)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:204)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:201)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:256)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.io.IOException: Connection reset by peer
at java.base/sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at java.base/sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at java.base/sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:276)
at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:233)
at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:223)
at java.base/sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:356)
at io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:259)
at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1132)
at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:357)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:151)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
... 1 more
[2023-10-10 20:12:56,947] ERROR WorkerSinkTask{id=debug_redis_sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: java.io.IOException: Connection reset by peer (org.apache.kafka.connect.runtime.WorkerSinkTask)
io.lettuce.core.RedisException: java.io.IOException: Connection reset by peer
at io.lettuce.core.internal.Exceptions.bubble(Exceptions.java:83)
at io.lettuce.core.internal.Futures.awaitOrCancel(Futures.java:250)
at io.lettuce.core.FutureSyncInvocationHandler.handleInvocation(FutureSyncInvocationHandler.java:75)
at io.lettuce.core.internal.AbstractInvocationHandler.invoke(AbstractInvocationHandler.java:80)
at com.sun.proxy.$Proxy52.mset(Unknown Source)
at com.redis.kafka.connect.sink.RedisSinkTask.put(RedisSinkTask.java:335)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:588)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:334)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:235)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:204)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:201)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:256)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.io.IOException: Connection reset by peer
at java.base/sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at java.base/sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at java.base/sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:276)
at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:233)
at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:223)
at java.base/sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:356)
at io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:259)
at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1132)
at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:357)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:151)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
... 1 more
My hope would be that the connector behaves the same in both scenarios and attempts to reconnect before failing. I've had a look at the sink connector code and to be honest I'm not quite sure where a fix would belong for this (whether it should be a fix to this repo, lettuce, or spring-batch-redis.
Any help appreciated, cheers. 😃