@@ -29,17 +29,21 @@ import org.apache.kafka.common.serialization.ByteArrayDeserializer
2929import org.apache.kafka.common.serialization.ByteArraySerializer
3030import org.apache.kafka.common.serialization.StringDeserializer
3131import org.apache.kafka.common.serialization.StringSerializer
32- import org.joda.time.DateTime
3332import org.slf4j.LoggerFactory
34- import java.time.Duration
33+ import kotlin.time.Clock
34+ import kotlin.time.Duration.Companion.minutes
3535import kotlin.time.Duration.Companion.seconds
36+ import kotlin.time.ExperimentalTime
37+ import kotlin.time.Instant
38+ import kotlin.time.toJavaDuration
3639
3740const val RETRY_COUNT_HEADER = " retryCount"
3841const val RETRY_AFTER = " retryableAfter"
3942const val RETRY_REASON = " retryReason"
4043
4144val logger = LoggerFactory .getLogger(FailedMessageKafkaHandler ::class .java)
4245
46+ @OptIn(ExperimentalTime ::class )
4347class FailedMessageKafkaHandler (
4448 val kafkaErrorQueue : KafkaErrorQueue = config().kafkaErrorQueue,
4549 kafka : Kafka = config().kafka
@@ -119,10 +123,9 @@ class FailedMessageKafkaHandler(
119123 }
120124 record.offset.acknowledge()
121125 record.retryCounter()
122- val retryableAfter = DateTime .parse(
123- String (record.headers().lastHeader(RETRY_AFTER ).value())
124- )
125- if (DateTime .now().isAfter(retryableAfter)) {
126+ val retryableAfter = Instant .parse(String (record.headers().lastHeader(RETRY_AFTER ).value()))
127+
128+ if (Clock .System .now() > retryableAfter) {
126129 messageFilterService.filterMessage(record)
127130 } else {
128131 logger.info(" ${record.key()} is not retryable yet." )
@@ -135,9 +138,9 @@ class FailedMessageKafkaHandler(
135138
136139 fun getNextRetryTime (record : ReceiverRecord <String , ByteArray >): String {
137140 if (record.headers().lastHeader(RETRY_AFTER ) == null ) {
138- return DateTime .now().toString()
141+ return Clock . System .now().toString()
139142 }
140- return DateTime . now().plusMinutes( 5 )
143+ return Clock . System . now().plus( 5 .minutes )
141144 .toString() // TODO create retry strategy
142145 }
143146
@@ -194,7 +197,7 @@ fun getRecords(
194197 // Collect
195198 val recordList = ArrayList <ReceiverRecord <String , ByteArray >>()
196199 for (i in 0 .. requestedRecords) {
197- val kafkaRecords: ConsumerRecords <String , ByteArray > = consumer.poll(Duration .ofSeconds( 1 ))
200+ val kafkaRecords: ConsumerRecords <String , ByteArray > = consumer.poll(1 .seconds.toJavaDuration( ))
198201 if (kafkaRecords.isEmpty) break
199202 kafkaRecords
200203 .filterNotNull()
0 commit comments