Skip to content

Commit 8a64cfd

Browse files
authored
Add boolean parameter to the ISM rollover action that prevents rollover on empty indices (#1545) (#1547)
* Add boolean parameter to the ISM rollover action that prevents rollover of indices with zero documents * Add version checks --------- (cherry picked from commit fe5da1d) Signed-off-by: Kshitij Tandon <[email protected]>
1 parent 198dd24 commit 8a64cfd

File tree

13 files changed

+641
-29
lines changed

13 files changed

+641
-29
lines changed

src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/RolloverAction.kt

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
package org.opensearch.indexmanagement.indexstatemanagement.action
77

8+
import org.opensearch.Version
89
import org.opensearch.common.unit.TimeValue
910
import org.opensearch.core.common.io.stream.StreamOutput
1011
import org.opensearch.core.common.unit.ByteSizeValue
@@ -21,6 +22,7 @@ class RolloverAction(
2122
val minAge: TimeValue?,
2223
val minPrimaryShardSize: ByteSizeValue?,
2324
val copyAlias: Boolean = false,
25+
val preventEmptyRollover: Boolean = false,
2426
index: Int,
2527
) : Action(name, index) {
2628
init {
@@ -48,6 +50,7 @@ class RolloverAction(
4850
if (minAge != null) builder.field(MIN_INDEX_AGE_FIELD, minAge.stringRep)
4951
if (minPrimaryShardSize != null) builder.field(MIN_PRIMARY_SHARD_SIZE_FIELD, minPrimaryShardSize.stringRep)
5052
builder.field(COPY_ALIAS_FIELD, copyAlias)
53+
if (preventEmptyRollover) builder.field(PREVENT_EMPTY_ROLLOVER_FIELD, preventEmptyRollover)
5154
builder.endObject()
5255
}
5356

@@ -57,6 +60,9 @@ class RolloverAction(
5760
out.writeOptionalTimeValue(minAge)
5861
out.writeOptionalWriteable(minPrimaryShardSize)
5962
out.writeBoolean(copyAlias)
63+
if (out.version.onOrAfter(Version.V_3_4_0)) {
64+
out.writeBoolean(preventEmptyRollover)
65+
}
6066
out.writeInt(actionIndex)
6167
}
6268

@@ -67,5 +73,6 @@ class RolloverAction(
6773
const val MIN_INDEX_AGE_FIELD = "min_index_age"
6874
const val MIN_PRIMARY_SHARD_SIZE_FIELD = "min_primary_shard_size"
6975
const val COPY_ALIAS_FIELD = "copy_alias"
76+
const val PREVENT_EMPTY_ROLLOVER_FIELD = "prevent_empty_rollover"
7077
}
7178
}

src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/RolloverActionParser.kt

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
package org.opensearch.indexmanagement.indexstatemanagement.action
77

8+
import org.opensearch.Version
89
import org.opensearch.common.unit.TimeValue
910
import org.opensearch.core.common.io.stream.StreamInput
1011
import org.opensearch.core.common.unit.ByteSizeValue
@@ -20,9 +21,14 @@ class RolloverActionParser : ActionParser() {
2021
val minAge = sin.readOptionalTimeValue()
2122
val minPrimaryShardSize = sin.readOptionalWriteable(::ByteSizeValue)
2223
val copyAlias = sin.readBoolean()
24+
val preventEmptyRollover = if (sin.version.onOrAfter(Version.V_3_4_0)) {
25+
sin.readBoolean()
26+
} else {
27+
false
28+
}
2329
val index = sin.readInt()
2430

25-
return RolloverAction(minSize, minDocs, minAge, minPrimaryShardSize, copyAlias, index)
31+
return RolloverAction(minSize, minDocs, minAge, minPrimaryShardSize, copyAlias, preventEmptyRollover, index)
2632
}
2733

2834
override fun fromXContent(xcp: XContentParser, index: Int): Action {
@@ -31,6 +37,7 @@ class RolloverActionParser : ActionParser() {
3137
var minAge: TimeValue? = null
3238
var minPrimaryShardSize: ByteSizeValue? = null
3339
var copyAlias = false
40+
var preventEmptyRollover = false
3441

3542
ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp)
3643
while (xcp.nextToken() != XContentParser.Token.END_OBJECT) {
@@ -49,11 +56,14 @@ class RolloverActionParser : ActionParser() {
4956
.MIN_PRIMARY_SHARD_SIZE_FIELD,
5057
)
5158
RolloverAction.COPY_ALIAS_FIELD -> copyAlias = xcp.booleanValue()
59+
60+
RolloverAction.PREVENT_EMPTY_ROLLOVER_FIELD -> preventEmptyRollover = xcp.booleanValue()
61+
5262
else -> throw IllegalArgumentException("Invalid field: [$fieldName] found in RolloverAction.")
5363
}
5464
}
5565

56-
return RolloverAction(minSize, minDocs, minAge, minPrimaryShardSize, copyAlias, index)
66+
return RolloverAction(minSize, minDocs, minAge, minPrimaryShardSize, copyAlias, preventEmptyRollover, index)
5767
}
5868

5969
override fun getActionType(): String = RolloverAction.name

src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/rollover/AttemptRolloverStep.kt

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,25 @@ class AttemptRolloverStep(private val action: RolloverAction) : Step(name) {
7575
// If statsResponse is null we already updated failed info from getIndexStatsOrUpdateInfo and can return early
7676
statsResponse ?: return this
7777

78+
// Check for empty index if prevent_empty_rollover is enabled
79+
if (action.preventEmptyRollover) {
80+
val numDocs = statsResponse.primaries.docs?.count ?: 0
81+
if (numDocs == 0L) {
82+
stepStatus = StepStatus.CONDITION_NOT_MET
83+
info = mapOf(
84+
"message" to getPreventedEmptyRolloverMessage(indexName),
85+
"conditions" to mapOf(
86+
"prevent_empty_rollover" to mapOf(
87+
"condition" to "index must have at least 1 document",
88+
"current" to 0,
89+
),
90+
),
91+
)
92+
logger.info("$indexName rollover prevented: index is empty (0 documents)")
93+
return this
94+
}
95+
}
96+
7897
val indexCreationDate = clusterService.state().metadata().index(indexName).creationDate
7998
val indexAgeTimeValue =
8099
if (indexCreationDate == -1L) {
@@ -426,5 +445,8 @@ class AttemptRolloverStep(private val action: RolloverAction) : Step(name) {
426445

427446
fun getCopyAliasRolledOverIndexNotFoundMessage(index: String?) =
428447
"Successfully rolled over [index=$index] but ISM cannot find rolled over index from metadata to copy aliases to, please manually copy"
448+
449+
fun getPreventedEmptyRolloverMessage(index: String) =
450+
"Rollover prevented: index is empty (0 documents) [index=$index]"
429451
}
430452
}

src/main/resources/mappings/opendistro-ism-config.json

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"_meta" : {
3-
"schema_version": 24
3+
"schema_version": 25
44
},
55
"dynamic": "strict",
66
"properties": {
@@ -245,6 +245,9 @@
245245
},
246246
"copy_alias": {
247247
"type": "boolean"
248+
},
249+
"prevent_empty_rollover": {
250+
"type": "boolean"
248251
}
249252
}
250253
},

src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ import javax.management.remote.JMXConnectorFactory
4141
import javax.management.remote.JMXServiceURL
4242

4343
abstract class IndexManagementRestTestCase : ODFERestTestCase() {
44-
val configSchemaVersion = 24
44+
val configSchemaVersion = 25
4545
val historySchemaVersion = 7
4646

4747
// Having issues with tests leaking into other tests and mappings being incorrect and they are not caught by any pending task wait check as

src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/TestHelpers.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,7 @@ fun randomRolloverActionConfig(
133133
minDocs = minDocs,
134134
minAge = minAge,
135135
minPrimaryShardSize = minPrimaryShardSize,
136+
preventEmptyRollover = false,
136137
index = 0,
137138
)
138139

0 commit comments

Comments
 (0)