diff --git a/changelog/unreleased/solr-18063.yml b/changelog/unreleased/solr-18063.yml new file mode 100644 index 000000000000..914ddf2bfb6c --- /dev/null +++ b/changelog/unreleased/solr-18063.yml @@ -0,0 +1,9 @@ +# See https://github.com/apache/solr/blob/main/dev-docs/changelog.adoc +title: SOLR-18063 - NPE when resubmitting to DLQ +type: fixed +authors: + - name: Andrzej Bialecki + nick: ab +links: + - name: SOLR-18063 + url: https://issues.apache.org/jira/browse/SOLR-18063 diff --git a/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/KafkaCrossDcConsumer.java b/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/KafkaCrossDcConsumer.java index bf1ba691f263..dd7aabef25ad 100644 --- a/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/KafkaCrossDcConsumer.java +++ b/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/KafkaCrossDcConsumer.java @@ -527,15 +527,24 @@ protected void processResult( log.trace("result=failed-resubmit"); } final int attempt = item.getAttempt(); - if (attempt > this.maxAttempts) { - log.info( - "Sending message to dead letter queue because of max attempts limit with current value = {}", - attempt); - kafkaMirroringSink.submitToDlq(item); - metrics.counter(MetricRegistry.name(type.name(), "failed-dlq")).inc(); - } else { - kafkaMirroringSink.submit(item); - metrics.counter(MetricRegistry.name(type.name(), "failed-resubmit")).inc(); + final boolean dlq = attempt > this.maxAttempts; + try { + if (dlq) { + log.info( + "Sending message to dead letter queue because of max attempts limit with current value = {}", + attempt); + kafkaMirroringSink.submitToDlq(item); + metrics.counter(MetricRegistry.name(type.name(), "failed-dlq")).inc(); + } else { + kafkaMirroringSink.submit(item); + metrics.counter(MetricRegistry.name(type.name(), "failed-resubmit")).inc(); + } + } catch (Exception e) { + log.error( + "Failed to {}, msg={}", + dlq ? "send message to dead-letter queue" : "resubmit message for retry", + item, + e); } break; case HANDLED: @@ -556,6 +565,17 @@ protected void processResult( "Unexpected response while processing request. We never expect {}.", result.status()); metrics.counter(MetricRegistry.name(type.name(), "failed-retry")).inc(); break; + case FAILED_NO_RETRY: + if (log.isDebugEnabled()) { + log.debug("Failed no-retry: sending message to dead-letter queue"); + } + try { + kafkaMirroringSink.submitToDlq(item); + } catch (Exception e) { + log.error("Failed to send message to dead-letter queue, msg={}", item, e); + } + metrics.counter(MetricRegistry.name(type.name(), "failed-no-retry")).inc(); + break; default: if (log.isTraceEnabled()) { log.trace("result=no matching case"); diff --git a/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/messageprocessor/SolrMessageProcessor.java b/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/messageprocessor/SolrMessageProcessor.java index f0ac2b182a27..29f83052fe11 100644 --- a/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/messageprocessor/SolrMessageProcessor.java +++ b/solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/messageprocessor/SolrMessageProcessor.java @@ -31,6 +31,8 @@ import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrInputDocument; import org.apache.solr.common.SolrInputField; +import org.apache.solr.common.cloud.ClusterState; +import org.apache.solr.common.cloud.DocCollection; import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.common.params.SolrParams; @@ -108,7 +110,7 @@ private Result> handleSolrRequest( try { prepareIfUpdateRequest(request); logRequest(request); - result = processMirroredSolrRequest(request, mirroredSolrRequest.getType()); + result = processMirroredSolrRequest(mirroredSolrRequest); } catch (Exception e) { result = handleException(mirroredSolrRequest, e); } @@ -124,7 +126,7 @@ private Result> handleException( logIf4xxException(solrException); if (!isRetryable(e)) { log.error("Non retryable exception processing Solr update", e); - return new Result<>(ResultStatus.FAILED_NO_RETRY, e); + return new Result<>(ResultStatus.FAILED_NO_RETRY, e, mirroredSolrRequest); } else { logFailure(mirroredSolrRequest, e, solrException); mirroredSolrRequest.setAttempt(mirroredSolrRequest.getAttempt() + 1); @@ -188,13 +190,32 @@ private void logFailure( /** Process the SolrRequest. If not, this method throws an exception. */ private Result> processMirroredSolrRequest( - SolrRequest request, MirroredSolrRequest.Type type) throws Exception { + MirroredSolrRequest mirroredSolrRequest) throws Exception { + final SolrRequest request = mirroredSolrRequest.getSolrRequest(); + final MirroredSolrRequest.Type type = mirroredSolrRequest.getType(); if (log.isDebugEnabled()) { log.debug( "Sending request to Solr at ZK address={} with params {}", ZkStateReader.from(clientSupplier.get()).getZkClient().getZkServerAddress(), request.getParams()); } + // short-circuit requests to nonexistent or not updatable collections + if (type == MirroredSolrRequest.Type.UPDATE && request.getCollection() != null) { + ClusterState clusterState = clientSupplier.get().getClusterState(); + DocCollection docCollection = clusterState.getCollectionOrNull(request.getCollection()); + if (docCollection == null + || docCollection.isReadOnly() + || docCollection.getActiveSlices().isEmpty()) { + if (log.isInfoEnabled()) { + log.warn( + "Skipping update request to nonexistent / not updatable collection {}", + request.getCollection()); + } + metrics.counter(MetricRegistry.name(type.name(), "invalid-collection")).inc(); + return new Result<>(ResultStatus.FAILED_NO_RETRY, mirroredSolrRequest); + } + } + Result> result; SolrResponseBase response; Timer.Context ctx = metrics.timer(MetricRegistry.name(type.name(), "outputTime")).time(); @@ -222,7 +243,7 @@ private Result> processMirroredSolrRequest( request.getParams(), status); } - result = new Result<>(ResultStatus.HANDLED); + result = new Result<>(ResultStatus.HANDLED, mirroredSolrRequest); return result; } diff --git a/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/consumer/KafkaCrossDcConsumerTest.java b/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/consumer/KafkaCrossDcConsumerTest.java index 328a629a9fe0..9f8e904a72b7 100644 --- a/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/consumer/KafkaCrossDcConsumerTest.java +++ b/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/consumer/KafkaCrossDcConsumerTest.java @@ -297,7 +297,7 @@ public void testCreateKafkaCrossDcConsumer() { public void testHandleValidMirroredSolrRequest() { KafkaConsumer> mockConsumer = mock(KafkaConsumer.class); KafkaCrossDcConsumer spyConsumer = createCrossDcConsumerSpy(mockConsumer); - doReturn(new IQueueHandler.Result<>(IQueueHandler.ResultStatus.HANDLED)) + doReturn(new IQueueHandler.Result<>(IQueueHandler.ResultStatus.HANDLED, null)) .when(messageProcessorMock) .handleItem(any()); SolrInputDocument doc = new SolrInputDocument(); @@ -336,7 +336,7 @@ public void testHandleValidMirroredSolrRequest() { public void testHandleValidAdminRequest() throws Exception { KafkaConsumer> mockConsumer = mock(KafkaConsumer.class); KafkaCrossDcConsumer spyConsumer = createCrossDcConsumerSpy(mockConsumer); - doReturn(new IQueueHandler.Result<>(IQueueHandler.ResultStatus.HANDLED)) + doReturn(new IQueueHandler.Result<>(IQueueHandler.ResultStatus.HANDLED, null)) .when(messageProcessorMock) .handleItem(any()); CollectionAdminRequest.Create create = @@ -417,7 +417,7 @@ private void doTestCollapseUpdates( CrossDcConf.COLLAPSE_UPDATES, collapseUpdates.name(), CrossDcConf.MAX_COLLAPSE_RECORDS, String.valueOf(maxCollapseRecords)); KafkaCrossDcConsumer spyConsumer = createCrossDcConsumerSpy(mockConsumer); - doReturn(new IQueueHandler.Result<>(IQueueHandler.ResultStatus.HANDLED)) + doReturn(new IQueueHandler.Result<>(IQueueHandler.ResultStatus.HANDLED, null)) .when(messageProcessorMock) .handleItem(any()); List>> records = new ArrayList<>(); @@ -452,7 +452,7 @@ private void doTestCollapseUpdates( public void testHandleInvalidMirroredSolrRequest() { KafkaConsumer> mockConsumer = mock(KafkaConsumer.class); SolrMessageProcessor mockSolrMessageProcessor = mock(SolrMessageProcessor.class); - doReturn(new IQueueHandler.Result<>(IQueueHandler.ResultStatus.HANDLED)) + doReturn(new IQueueHandler.Result<>(IQueueHandler.ResultStatus.HANDLED, null)) .when(mockSolrMessageProcessor) .handleItem(any()); KafkaCrossDcConsumer spyConsumer = diff --git a/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/messageprocessor/SolrMessageProcessorTest.java b/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/messageprocessor/SolrMessageProcessorTest.java index 2a1b552ab260..6110cfbbbad6 100644 --- a/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/messageprocessor/SolrMessageProcessorTest.java +++ b/solr/cross-dc-manager/src/test/org/apache/solr/crossdc/manager/messageprocessor/SolrMessageProcessorTest.java @@ -18,7 +18,7 @@ import static org.apache.solr.SolrTestCaseJ4.assumeWorkingMockito; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertNotNull; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -76,6 +76,7 @@ public void handleItemWithFailedResultNoRetry() throws SolrServerException, IOEx solrMessageProcessor.handleItem(mirroredSolrRequest); assertEquals(IQueueHandler.ResultStatus.FAILED_RESUBMIT, result.status()); + assertNotNull(result.getItem()); } /** Should handle MirroredSolrRequest and return a failed result with resubmit */ @@ -92,6 +93,7 @@ public void handleItemWithFailedResultResubmit() throws SolrServerException, IOE solrMessageProcessor.handleItem(mirroredSolrRequest); assertEquals(IQueueHandler.ResultStatus.FAILED_RESUBMIT, result.status()); + assertNotNull(result.getItem()); assertEquals(mirroredSolrRequest, result.getItem()); } @@ -111,7 +113,7 @@ public void handleItemWithSuccessfulResult() throws SolrServerException, IOExcep solrMessageProcessor.handleItem(mirroredSolrRequest); assertEquals(IQueueHandler.ResultStatus.HANDLED, result.status()); - assertNull(result.getItem()); + assertNotNull(result.getItem()); } /** Should connect to Solr if not connected and process the request */ diff --git a/solr/modules/cross-dc/src/java/org/apache/solr/crossdc/common/IQueueHandler.java b/solr/modules/cross-dc/src/java/org/apache/solr/crossdc/common/IQueueHandler.java index f4fa5f7ae829..145630deee7a 100644 --- a/solr/modules/cross-dc/src/java/org/apache/solr/crossdc/common/IQueueHandler.java +++ b/solr/modules/cross-dc/src/java/org/apache/solr/crossdc/common/IQueueHandler.java @@ -39,16 +39,8 @@ class Result { private final Throwable _throwable; private final T _item; - public Result(final ResultStatus status) { - _status = status; - _throwable = null; - _item = null; - } - - public Result(final ResultStatus status, final Throwable throwable) { - _status = status; - _throwable = throwable; - _item = null; + public Result(final ResultStatus status, final T newItem) { + this(status, null, newItem); } public Result(final ResultStatus status, final Throwable throwable, final T newItem) {