Skip to content

Commit 4fd0a68

Browse files
Fix StopReplicationIT.test delete follower index when leader index is unavailable when running with security (#1588) (#1589)
* Fix StopReplicationIT.test delete follower index when leader index is unavailable when running with security * Fix StopReplicationIT.test delete follower index when leader index is unavailable when running with security --------- (cherry picked from commit 8800659) Signed-off-by: Craig Perkins <[email protected]> Co-authored-by: Craig Perkins <[email protected]>
1 parent 076eb5d commit 4fd0a68

File tree

1 file changed

+7
-6
lines changed

1 file changed

+7
-6
lines changed

src/main/kotlin/org/opensearch/replication/task/index/IndexReplicationTask.kt

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ import org.opensearch.commons.replication.action.StopIndexReplicationRequest
110110
import org.opensearch.replication.ReplicationPlugin
111111
import kotlin.streams.toList
112112
import org.opensearch.cluster.DiffableUtils
113+
import org.opensearch.common.util.concurrent.ThreadContext
113114

114115
open class IndexReplicationTask(id: Long, type: String, action: String, description: String,
115116
parentTask: TaskId,
@@ -349,7 +350,7 @@ open class IndexReplicationTask(id: Long, type: String, action: String, descript
349350
// Shard & Index Tasks has Close listeners
350351
//All ops are idempotent .
351352

352-
//Only called once if task starts in MONITORING STATE
353+
//Only called once if task starts in MONITORING STATE
353354
if (!shouldCallEvalMonitoring) {
354355
return MonitoringState
355356
}
@@ -698,13 +699,11 @@ open class IndexReplicationTask(id: Long, type: String, action: String, descript
698699
}
699700

700701
private suspend fun stopReplication(state: FailedState): IndexReplicationState {
702+
var storedContext: ThreadContext.StoredContext? = null
701703
try {
704+
storedContext = client.threadPool().threadContext.stashContext()
702705
log.info("Going to initiate stop of index $followerIndexName due to deletion of corresponding leader index ${leaderIndex.name}")
703-
val stopReplicationResponse = client.suspendExecute(
704-
replicationMetadata,
705-
INTERNAL_STOP_REPLICATION_ACTION_TYPE, StopIndexReplicationRequest(followerIndexName),
706-
defaultContext = true
707-
)
706+
val stopReplicationResponse = client.execute(INTERNAL_STOP_REPLICATION_ACTION_TYPE, StopIndexReplicationRequest(followerIndexName)).actionGet()
708707
if (!stopReplicationResponse.isAcknowledged) {
709708
throw ReplicationException(
710709
"Failed to gracefully stop replication after deletion of leader index. " +
@@ -717,6 +716,8 @@ open class IndexReplicationTask(id: Long, type: String, action: String, descript
717716
log.error("Encountered exception while stopping $followerIndexName", e)
718717
return FailedState(state.failedShards,
719718
"Stop failed with \"${e.message}\". Original failure for initiating stop - ${state.errorMsg}")
719+
} finally {
720+
storedContext?.close()
720721
}
721722
return CompletedState
722723
}

0 commit comments

Comments
 (0)