Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions changelog/unreleased/solr-18063.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# See https://github.com/apache/solr/blob/main/dev-docs/changelog.adoc
title: SOLR-18063 - NPE when resubmitting to DLQ
type: fixed
authors:
- name: Andrzej Bialecki
nick: ab
links:
- name: SOLR-18063
url: https://issues.apache.org/jira/browse/SOLR-18063
Original file line number Diff line number Diff line change
Expand Up @@ -527,15 +527,24 @@ protected void processResult(
log.trace("result=failed-resubmit");
}
final int attempt = item.getAttempt();
if (attempt > this.maxAttempts) {
log.info(
"Sending message to dead letter queue because of max attempts limit with current value = {}",
attempt);
kafkaMirroringSink.submitToDlq(item);
metrics.counter(MetricRegistry.name(type.name(), "failed-dlq")).inc();
} else {
kafkaMirroringSink.submit(item);
metrics.counter(MetricRegistry.name(type.name(), "failed-resubmit")).inc();
final boolean dlq = attempt > this.maxAttempts;
try {
if (dlq) {
log.info(
"Sending message to dead letter queue because of max attempts limit with current value = {}",
attempt);
kafkaMirroringSink.submitToDlq(item);
metrics.counter(MetricRegistry.name(type.name(), "failed-dlq")).inc();
} else {
kafkaMirroringSink.submit(item);
metrics.counter(MetricRegistry.name(type.name(), "failed-resubmit")).inc();
}
} catch (Exception e) {
log.error(
"Failed to {}, msg={}",
dlq ? "send message to dead-letter queue" : "resubmit message for retry",
item,
e);
}
break;
case HANDLED:
Expand All @@ -556,6 +565,17 @@ protected void processResult(
"Unexpected response while processing request. We never expect {}.", result.status());
metrics.counter(MetricRegistry.name(type.name(), "failed-retry")).inc();
break;
case FAILED_NO_RETRY:
if (log.isDebugEnabled()) {
log.debug("Failed no-retry: sending message to dead-letter queue");
}
try {
kafkaMirroringSink.submitToDlq(item);
} catch (Exception e) {
log.error("Failed to send message to dead-letter queue, msg={}", item, e);
}
metrics.counter(MetricRegistry.name(type.name(), "failed-no-retry")).inc();
break;
default:
if (log.isTraceEnabled()) {
log.trace("result=no matching case");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.SolrInputField;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
Expand Down Expand Up @@ -108,7 +110,7 @@ private Result<MirroredSolrRequest<?>> handleSolrRequest(
try {
prepareIfUpdateRequest(request);
logRequest(request);
result = processMirroredSolrRequest(request, mirroredSolrRequest.getType());
result = processMirroredSolrRequest(mirroredSolrRequest);
} catch (Exception e) {
result = handleException(mirroredSolrRequest, e);
}
Expand All @@ -124,7 +126,7 @@ private Result<MirroredSolrRequest<?>> handleException(
logIf4xxException(solrException);
if (!isRetryable(e)) {
log.error("Non retryable exception processing Solr update", e);
return new Result<>(ResultStatus.FAILED_NO_RETRY, e);
return new Result<>(ResultStatus.FAILED_NO_RETRY, e, mirroredSolrRequest);
} else {
logFailure(mirroredSolrRequest, e, solrException);
mirroredSolrRequest.setAttempt(mirroredSolrRequest.getAttempt() + 1);
Expand Down Expand Up @@ -188,13 +190,32 @@ private void logFailure(

/** Process the SolrRequest. If not, this method throws an exception. */
private Result<MirroredSolrRequest<?>> processMirroredSolrRequest(
SolrRequest<?> request, MirroredSolrRequest.Type type) throws Exception {
MirroredSolrRequest<?> mirroredSolrRequest) throws Exception {
final SolrRequest<?> request = mirroredSolrRequest.getSolrRequest();
final MirroredSolrRequest.Type type = mirroredSolrRequest.getType();
if (log.isDebugEnabled()) {
log.debug(
"Sending request to Solr at ZK address={} with params {}",
ZkStateReader.from(clientSupplier.get()).getZkClient().getZkServerAddress(),
request.getParams());
}
// short-circuit requests to nonexistent or not updatable collections
if (type == MirroredSolrRequest.Type.UPDATE && request.getCollection() != null) {
ClusterState clusterState = clientSupplier.get().getClusterState();
DocCollection docCollection = clusterState.getCollectionOrNull(request.getCollection());
if (docCollection == null
|| docCollection.isReadOnly()
|| docCollection.getActiveSlices().isEmpty()) {
if (log.isInfoEnabled()) {
log.warn(
"Skipping update request to nonexistent / not updatable collection {}",
request.getCollection());
}
metrics.counter(MetricRegistry.name(type.name(), "invalid-collection")).inc();
return new Result<>(ResultStatus.FAILED_NO_RETRY, mirroredSolrRequest);
}
}

Result<MirroredSolrRequest<?>> result;
SolrResponseBase response;
Timer.Context ctx = metrics.timer(MetricRegistry.name(type.name(), "outputTime")).time();
Expand Down Expand Up @@ -222,7 +243,7 @@ private Result<MirroredSolrRequest<?>> processMirroredSolrRequest(
request.getParams(),
status);
}
result = new Result<>(ResultStatus.HANDLED);
result = new Result<>(ResultStatus.HANDLED, mirroredSolrRequest);
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ public void testCreateKafkaCrossDcConsumer() {
public void testHandleValidMirroredSolrRequest() {
KafkaConsumer<String, MirroredSolrRequest<?>> mockConsumer = mock(KafkaConsumer.class);
KafkaCrossDcConsumer spyConsumer = createCrossDcConsumerSpy(mockConsumer);
doReturn(new IQueueHandler.Result<>(IQueueHandler.ResultStatus.HANDLED))
doReturn(new IQueueHandler.Result<>(IQueueHandler.ResultStatus.HANDLED, null))
.when(messageProcessorMock)
.handleItem(any());
SolrInputDocument doc = new SolrInputDocument();
Expand Down Expand Up @@ -336,7 +336,7 @@ public void testHandleValidMirroredSolrRequest() {
public void testHandleValidAdminRequest() throws Exception {
KafkaConsumer<String, MirroredSolrRequest<?>> mockConsumer = mock(KafkaConsumer.class);
KafkaCrossDcConsumer spyConsumer = createCrossDcConsumerSpy(mockConsumer);
doReturn(new IQueueHandler.Result<>(IQueueHandler.ResultStatus.HANDLED))
doReturn(new IQueueHandler.Result<>(IQueueHandler.ResultStatus.HANDLED, null))
.when(messageProcessorMock)
.handleItem(any());
CollectionAdminRequest.Create create =
Expand Down Expand Up @@ -417,7 +417,7 @@ private void doTestCollapseUpdates(
CrossDcConf.COLLAPSE_UPDATES, collapseUpdates.name(),
CrossDcConf.MAX_COLLAPSE_RECORDS, String.valueOf(maxCollapseRecords));
KafkaCrossDcConsumer spyConsumer = createCrossDcConsumerSpy(mockConsumer);
doReturn(new IQueueHandler.Result<>(IQueueHandler.ResultStatus.HANDLED))
doReturn(new IQueueHandler.Result<>(IQueueHandler.ResultStatus.HANDLED, null))
.when(messageProcessorMock)
.handleItem(any());
List<ConsumerRecord<String, MirroredSolrRequest<?>>> records = new ArrayList<>();
Expand Down Expand Up @@ -452,7 +452,7 @@ private void doTestCollapseUpdates(
public void testHandleInvalidMirroredSolrRequest() {
KafkaConsumer<String, MirroredSolrRequest<?>> mockConsumer = mock(KafkaConsumer.class);
SolrMessageProcessor mockSolrMessageProcessor = mock(SolrMessageProcessor.class);
doReturn(new IQueueHandler.Result<>(IQueueHandler.ResultStatus.HANDLED))
doReturn(new IQueueHandler.Result<>(IQueueHandler.ResultStatus.HANDLED, null))
.when(mockSolrMessageProcessor)
.handleItem(any());
KafkaCrossDcConsumer spyConsumer =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import static org.apache.solr.SolrTestCaseJ4.assumeWorkingMockito;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertNotNull;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
Expand Down Expand Up @@ -76,6 +76,7 @@ public void handleItemWithFailedResultNoRetry() throws SolrServerException, IOEx
solrMessageProcessor.handleItem(mirroredSolrRequest);

assertEquals(IQueueHandler.ResultStatus.FAILED_RESUBMIT, result.status());
assertNotNull(result.getItem());
}

/** Should handle MirroredSolrRequest and return a failed result with resubmit */
Expand All @@ -92,6 +93,7 @@ public void handleItemWithFailedResultResubmit() throws SolrServerException, IOE
solrMessageProcessor.handleItem(mirroredSolrRequest);

assertEquals(IQueueHandler.ResultStatus.FAILED_RESUBMIT, result.status());
assertNotNull(result.getItem());
assertEquals(mirroredSolrRequest, result.getItem());
}

Expand All @@ -111,7 +113,7 @@ public void handleItemWithSuccessfulResult() throws SolrServerException, IOExcep
solrMessageProcessor.handleItem(mirroredSolrRequest);

assertEquals(IQueueHandler.ResultStatus.HANDLED, result.status());
assertNull(result.getItem());
assertNotNull(result.getItem());
}

/** Should connect to Solr if not connected and process the request */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,8 @@ class Result<T> {
private final Throwable _throwable;
private final T _item;

public Result(final ResultStatus status) {
_status = status;
_throwable = null;
_item = null;
}

public Result(final ResultStatus status, final Throwable throwable) {
_status = status;
_throwable = throwable;
_item = null;
public Result(final ResultStatus status, final T newItem) {
this(status, null, newItem);
}

public Result(final ResultStatus status, final Throwable throwable, final T newItem) {
Expand Down