Skip to content

Commit 2c75593

Browse files
committed
Add boolean parameter to the ISM rollover action that prevents rollover of indices with zero documents
Signed-off-by: Kshitij Tandon <[email protected]>
1 parent 0063142 commit 2c75593

File tree

13 files changed

+514
-29
lines changed

13 files changed

+514
-29
lines changed

src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/RolloverAction.kt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ class RolloverAction(
2121
val minAge: TimeValue?,
2222
val minPrimaryShardSize: ByteSizeValue?,
2323
val copyAlias: Boolean = false,
24+
val preventEmptyRollover: Boolean = false,
2425
index: Int,
2526
) : Action(name, index) {
2627
init {
@@ -48,6 +49,7 @@ class RolloverAction(
4849
if (minAge != null) builder.field(MIN_INDEX_AGE_FIELD, minAge.stringRep)
4950
if (minPrimaryShardSize != null) builder.field(MIN_PRIMARY_SHARD_SIZE_FIELD, minPrimaryShardSize.stringRep)
5051
builder.field(COPY_ALIAS_FIELD, copyAlias)
52+
if (preventEmptyRollover) builder.field(PREVENT_EMPTY_ROLLOVER_FIELD, preventEmptyRollover)
5153
builder.endObject()
5254
}
5355

@@ -57,6 +59,7 @@ class RolloverAction(
5759
out.writeOptionalTimeValue(minAge)
5860
out.writeOptionalWriteable(minPrimaryShardSize)
5961
out.writeBoolean(copyAlias)
62+
out.writeBoolean(preventEmptyRollover)
6063
out.writeInt(actionIndex)
6164
}
6265

@@ -67,5 +70,6 @@ class RolloverAction(
6770
const val MIN_INDEX_AGE_FIELD = "min_index_age"
6871
const val MIN_PRIMARY_SHARD_SIZE_FIELD = "min_primary_shard_size"
6972
const val COPY_ALIAS_FIELD = "copy_alias"
73+
const val PREVENT_EMPTY_ROLLOVER_FIELD = "prevent_empty_rollover"
7074
}
7175
}

src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/RolloverActionParser.kt

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,10 @@ class RolloverActionParser : ActionParser() {
2020
val minAge = sin.readOptionalTimeValue()
2121
val minPrimaryShardSize = sin.readOptionalWriteable(::ByteSizeValue)
2222
val copyAlias = sin.readBoolean()
23+
val preventEmptyRollover = sin.readBoolean()
2324
val index = sin.readInt()
2425

25-
return RolloverAction(minSize, minDocs, minAge, minPrimaryShardSize, copyAlias, index)
26+
return RolloverAction(minSize, minDocs, minAge, minPrimaryShardSize, copyAlias, preventEmptyRollover, index)
2627
}
2728

2829
override fun fromXContent(xcp: XContentParser, index: Int): Action {
@@ -31,6 +32,7 @@ class RolloverActionParser : ActionParser() {
3132
var minAge: TimeValue? = null
3233
var minPrimaryShardSize: ByteSizeValue? = null
3334
var copyAlias = false
35+
var preventEmptyRollover = false
3436

3537
ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp)
3638
while (xcp.nextToken() != XContentParser.Token.END_OBJECT) {
@@ -54,11 +56,13 @@ class RolloverActionParser : ActionParser() {
5456

5557
RolloverAction.COPY_ALIAS_FIELD -> copyAlias = xcp.booleanValue()
5658

59+
RolloverAction.PREVENT_EMPTY_ROLLOVER_FIELD -> preventEmptyRollover = xcp.booleanValue()
60+
5761
else -> throw IllegalArgumentException("Invalid field: [$fieldName] found in RolloverAction.")
5862
}
5963
}
6064

61-
return RolloverAction(minSize, minDocs, minAge, minPrimaryShardSize, copyAlias, index)
65+
return RolloverAction(minSize, minDocs, minAge, minPrimaryShardSize, copyAlias, preventEmptyRollover, index)
6266
}
6367

6468
override fun getActionType(): String = RolloverAction.name

src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/rollover/AttemptRolloverStep.kt

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,25 @@ class AttemptRolloverStep(private val action: RolloverAction) : Step(name) {
7575
// If statsResponse is null we already updated failed info from getIndexStatsOrUpdateInfo and can return early
7676
statsResponse ?: return this
7777

78+
// Check for empty index if prevent_empty_rollover is enabled
79+
if (action.preventEmptyRollover) {
80+
val numDocs = statsResponse.primaries.docs?.count ?: 0
81+
if (numDocs == 0L) {
82+
stepStatus = StepStatus.CONDITION_NOT_MET
83+
info = mapOf(
84+
"message" to getPreventedEmptyRolloverMessage(indexName),
85+
"conditions" to mapOf(
86+
"prevent_empty_rollover" to mapOf(
87+
"condition" to "index must have at least 1 document",
88+
"current" to 0,
89+
),
90+
),
91+
)
92+
logger.info("$indexName rollover prevented: index is empty (0 documents)")
93+
return this
94+
}
95+
}
96+
7897
val indexCreationDate = clusterService.state().metadata().index(indexName).creationDate
7998
val indexAgeTimeValue =
8099
if (indexCreationDate == -1L) {
@@ -426,5 +445,8 @@ class AttemptRolloverStep(private val action: RolloverAction) : Step(name) {
426445

427446
fun getCopyAliasRolledOverIndexNotFoundMessage(index: String?) =
428447
"Successfully rolled over [index=$index] but ISM cannot find rolled over index from metadata to copy aliases to, please manually copy"
448+
449+
fun getPreventedEmptyRolloverMessage(index: String) =
450+
"Rollover prevented: index is empty (0 documents) [index=$index]"
429451
}
430452
}

src/main/resources/mappings/opendistro-ism-config.json

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"_meta" : {
3-
"schema_version": 24
3+
"schema_version": 25
44
},
55
"dynamic": "strict",
66
"properties": {
@@ -245,6 +245,9 @@
245245
},
246246
"copy_alias": {
247247
"type": "boolean"
248+
},
249+
"prevent_empty_rollover": {
250+
"type": "boolean"
248251
}
249252
}
250253
},

src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ import javax.management.remote.JMXConnectorFactory
4141
import javax.management.remote.JMXServiceURL
4242

4343
abstract class IndexManagementRestTestCase : ODFERestTestCase() {
44-
val configSchemaVersion = 24
44+
val configSchemaVersion = 25
4545
val historySchemaVersion = 7
4646

4747
// Having issues with tests leaking into other tests and mappings being incorrect and they are not caught by any pending task wait check as

src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/TestHelpers.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,7 @@ fun randomRolloverActionConfig(
137137
minDocs = minDocs,
138138
minAge = minAge,
139139
minPrimaryShardSize = minPrimaryShardSize,
140+
preventEmptyRollover = false,
140141
index = 0,
141142
)
142143

0 commit comments

Comments
 (0)