Skip to content

Commit

Permalink
Adds min rollover age as a transition condition (opensearch-project#215)
Browse files Browse the repository at this point in the history
* Adds min rollover age as a transition condition

Signed-off-by: Drew Baugher <46505179+dbbaughe@users.noreply.github.com>

* Fixes link checker

Signed-off-by: Drew Baugher <46505179+dbbaughe@users.noreply.github.com>
Signed-off-by: Robert Downs <downsrob@amazon.com>
  • Loading branch information
dbbaughe authored and downsrob committed Mar 9, 2022
1 parent 76931d9 commit 127a744
Show file tree
Hide file tree
Showing 11 changed files with 200 additions and 18 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/links.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ jobs:
id: lychee
uses: lycheeverse/lychee-action@master
with:
args: --accept=200,403,429 "**/*.html" "**/*.md" "**/*.txt" "**/*.json"
args: --accept=200,403,429 **/*.html **/*.md **/*.txt **/*.json
env:
GITHUB_TOKEN: ${{secrets.GITHUB_TOKEN}}
- name: Fail if there were link errors
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,12 @@ data class Conditions(
val indexAge: TimeValue? = null,
val docCount: Long? = null,
val size: ByteSizeValue? = null,
val cron: CronSchedule? = null
val cron: CronSchedule? = null,
val rolloverAge: TimeValue? = null
) : ToXContentObject, Writeable {

init {
val conditionsList = listOf(indexAge, docCount, size, cron)
val conditionsList = listOf(indexAge, docCount, size, cron, rolloverAge)
require(conditionsList.filterNotNull().size == 1) { "Cannot provide more than one Transition condition" }

// Validate doc count condition
Expand All @@ -98,6 +99,7 @@ data class Conditions(
if (docCount != null) builder.field(MIN_DOC_COUNT_FIELD, docCount)
if (size != null) builder.field(MIN_SIZE_FIELD, size.stringRep)
if (cron != null) builder.field(CRON_FIELD, cron)
if (rolloverAge != null) builder.field(MIN_ROLLOVER_AGE_FIELD, rolloverAge.stringRep)
return builder.endObject()
}

Expand All @@ -106,7 +108,8 @@ data class Conditions(
indexAge = sin.readOptionalTimeValue(),
docCount = sin.readOptionalLong(),
size = sin.readOptionalWriteable(::ByteSizeValue),
cron = sin.readOptionalWriteable(::CronSchedule)
cron = sin.readOptionalWriteable(::CronSchedule),
rolloverAge = sin.readOptionalTimeValue()
)

@Throws(IOException::class)
Expand All @@ -115,13 +118,15 @@ data class Conditions(
out.writeOptionalLong(docCount)
out.writeOptionalWriteable(size)
out.writeOptionalWriteable(cron)
out.writeOptionalTimeValue(rolloverAge)
}

companion object {
const val MIN_INDEX_AGE_FIELD = "min_index_age"
const val MIN_DOC_COUNT_FIELD = "min_doc_count"
const val MIN_SIZE_FIELD = "min_size"
const val CRON_FIELD = "cron"
const val MIN_ROLLOVER_AGE_FIELD = "min_rollover_age"

@JvmStatic
@Throws(IOException::class)
Expand All @@ -130,6 +135,7 @@ data class Conditions(
var docCount: Long? = null
var size: ByteSizeValue? = null
var cron: CronSchedule? = null
var rolloverAge: TimeValue? = null

ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp)
while (xcp.nextToken() != Token.END_OBJECT) {
Expand All @@ -141,11 +147,12 @@ data class Conditions(
MIN_DOC_COUNT_FIELD -> docCount = xcp.longValue()
MIN_SIZE_FIELD -> size = ByteSizeValue.parseBytesSizeValue(xcp.text(), MIN_SIZE_FIELD)
CRON_FIELD -> cron = ScheduleParser.parse(xcp) as? CronSchedule
MIN_ROLLOVER_AGE_FIELD -> rolloverAge = TimeValue.parseTimeValue(xcp.text(), MIN_ROLLOVER_AGE_FIELD)
else -> throw IllegalArgumentException("Invalid field: [$fieldName] found in Conditions.")
}
}

return Conditions(indexAge, docCount, size, cron)
return Conditions(indexAge, docCount, size, cron, rolloverAge)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndex
import org.opensearch.indexmanagement.indexstatemanagement.util.managedIndexMetadataID
import org.opensearch.indexmanagement.opensearchapi.contentParser
import org.opensearch.indexmanagement.opensearchapi.suspendUntil
import java.time.Instant

private val log = LogManager.getLogger("Index Management Helper")

Expand Down Expand Up @@ -201,3 +202,11 @@ fun XContentBuilder.buildMetadata(name: String, metadata: ToXContentFragment, pa
this.endObject()
return this
}

// Get the oldest rollover time or null if index was never rolled over
fun IndexMetadata.getOldestRolloverTime(): Instant? {
return this.rolloverInfos.values()
.map { it.value.time }
.minOrNull() // oldest should be min as its epoch time
?.let { Instant.ofEpochMilli(it) }
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import org.opensearch.common.unit.ByteSizeValue
import org.opensearch.indexmanagement.indexstatemanagement.model.ManagedIndexMetaData
import org.opensearch.indexmanagement.indexstatemanagement.model.action.TransitionsActionConfig
import org.opensearch.indexmanagement.indexstatemanagement.model.managedindexmetadata.StepMetaData
import org.opensearch.indexmanagement.indexstatemanagement.opensearchapi.getOldestRolloverTime
import org.opensearch.indexmanagement.indexstatemanagement.step.Step
import org.opensearch.indexmanagement.indexstatemanagement.util.evaluateConditions
import org.opensearch.indexmanagement.indexstatemanagement.util.hasStatsConditions
Expand Down Expand Up @@ -46,7 +47,7 @@ class AttemptTransitionStep(

override fun isIdempotent() = true

@Suppress("TooGenericExceptionCaught", "ReturnCount", "ComplexMethod")
@Suppress("TooGenericExceptionCaught", "ReturnCount", "ComplexMethod", "LongMethod")
override suspend fun execute(): AttemptTransitionStep {
try {
if (config.transitions.isEmpty()) {
Expand All @@ -56,14 +57,28 @@ class AttemptTransitionStep(
return this
}

val indexCreationDate = clusterService.state().metadata().index(indexName).creationDate
val indexMetaData = clusterService.state().metadata().index(indexName)
val indexCreationDate = indexMetaData.creationDate
val indexCreationDateInstant = Instant.ofEpochMilli(indexCreationDate)
if (indexCreationDate == -1L) {
logger.warn("$indexName had an indexCreationDate=-1L, cannot use for comparison")
}
val stepStartTime = getStepStartTime()
var numDocs: Long? = null
var indexSize: ByteSizeValue? = null
val rolloverDate: Instant? = indexMetaData.getOldestRolloverTime()

if (config.transitions.any { it.conditions?.rolloverAge !== null }) {
// if we have a transition with rollover age condition, then we must have a rollover date
// otherwise fail this transition
if (rolloverDate == null) {
val message = getFailedRolloverDateMessage(indexName)
logger.warn(message)
stepStatus = StepStatus.FAILED
info = mapOf("message" to message)
return this
}
}

if (config.transitions.any { it.hasStatsConditions() }) {
val statsRequest = IndicesStatsRequest()
Expand All @@ -85,7 +100,9 @@ class AttemptTransitionStep(
}

// Find the first transition that evaluates to true and get the state to transition to, otherwise return null if none are true
stateName = config.transitions.find { it.evaluateConditions(indexCreationDateInstant, numDocs, indexSize, stepStartTime) }?.stateName
stateName = config.transitions.find {
it.evaluateConditions(indexCreationDateInstant, numDocs, indexSize, stepStartTime, rolloverDate)
}?.stateName
val message: String
val stateName = stateName // shadowed on purpose to prevent var from changing
if (stateName != null) {
Expand Down Expand Up @@ -131,6 +148,8 @@ class AttemptTransitionStep(
companion object {
fun getFailedMessage(index: String) = "Failed to transition index [index=$index]"
fun getFailedStatsMessage(index: String) = "Failed to get stats information for the index [index=$index]"
fun getFailedRolloverDateMessage(index: String) =
"Failed to transition index as min_rollover_age condition was used, but the index has never been rolled over [index=$index]"
fun getEvaluatingMessage(index: String) = "Evaluating transition conditions [index=$index]"
fun getSuccessMessage(index: String, state: String) = "Transitioning to $state [index=$index]"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,8 @@ fun Transition.evaluateConditions(
indexCreationDate: Instant,
numDocs: Long?,
indexSize: ByteSizeValue?,
transitionStartTime: Instant
transitionStartTime: Instant,
rolloverDate: Instant?,
): Boolean {
// If there are no conditions, treat as always true
if (this.conditions == null) return true
Expand All @@ -238,6 +239,12 @@ fun Transition.evaluateConditions(
return this.conditions.cron.getNextExecutionTime(transitionStartTime) <= Instant.now()
}

if (this.conditions.rolloverAge != null) {
val rolloverDateMilli = rolloverDate?.toEpochMilli() ?: return false
val elapsedTime = Instant.now().toEpochMilli() - rolloverDateMilli
return this.conditions.rolloverAge.millis <= elapsedTime
}

// We should never reach this
return false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -590,11 +590,11 @@ abstract class IndexStateManagementRestTestCase : IndexManagementRestTestCase()
return metadata
}

protected fun rolloverIndex(index: String) {
protected fun rolloverIndex(alias: String) {
val response = client().performRequest(
Request(
"POST",
"/$index/_rollover"
"/$alias/_rollover"
)
)
assertEquals(response.statusLine.statusCode, RestStatus.OK.status)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ fun randomTransition(
*/
fun randomConditions(
condition: Pair<String, Any>? =
OpenSearchRestTestCase.randomFrom(listOf(randomIndexAge(), randomDocCount(), randomSize(), null))
OpenSearchRestTestCase.randomFrom(listOf(randomIndexAge(), randomDocCount(), randomSize(), randomRolloverAge(), null))
): Conditions? {

if (condition == null) return null
Expand All @@ -102,6 +102,7 @@ fun randomConditions(
Conditions.MIN_DOC_COUNT_FIELD -> Conditions(docCount = value as Long)
Conditions.MIN_SIZE_FIELD -> Conditions(size = value as ByteSizeValue)
// Conditions.CRON_FIELD -> Conditions(cron = value as CronSchedule) // TODO: Uncomment after issues are fixed
Conditions.MIN_ROLLOVER_AGE_FIELD -> Conditions(rolloverAge = value as TimeValue)
else -> throw IllegalArgumentException("Invalid field: [$type] given for random Conditions.")
}
}
Expand Down Expand Up @@ -221,6 +222,8 @@ fun randomSize(size: ByteSizeValue = randomByteSizeValue()) = Conditions.MIN_SIZ
fun randomCronSchedule(cron: CronSchedule = CronSchedule("0 * * * *", ZoneId.of("UTC"))) =
Conditions.CRON_FIELD to cron

fun randomRolloverAge(rolloverAge: TimeValue = randomTimeValueObject()) = Conditions.MIN_ROLLOVER_AGE_FIELD to rolloverAge

fun randomTimeValueObject(): TimeValue = TimeValue.parseTimeValue(OpenSearchRestTestCase.randomPositiveTimeValue(), "")

fun randomByteSizeValue(): ByteSizeValue =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.indexmanagement.indexstatemanagement.action

import org.opensearch.common.unit.TimeValue
import org.opensearch.indexmanagement.indexstatemanagement.IndexStateManagementRestTestCase
import org.opensearch.indexmanagement.indexstatemanagement.model.Conditions
import org.opensearch.indexmanagement.indexstatemanagement.model.Policy
Expand Down Expand Up @@ -65,4 +66,80 @@ class TransitionActionIT : IndexStateManagementRestTestCase() {
// Should have evaluated to true
waitFor { assertEquals(AttemptTransitionStep.getSuccessMessage(indexName, secondStateName), getExplainManagedIndexMetaData(indexName).info?.get("message")) }
}

fun `test rollover age transition for index with no rollover fails`() {
val indexName = "${testIndexName}_rollover_age_no_rollover"
val policyID = "${testIndexName}_rollover_age_no_rollover_policy"
val secondStateName = "second"
val states = listOf(
State("first", listOf(), listOf(Transition(secondStateName, Conditions(rolloverAge = TimeValue.timeValueSeconds(30))))),
State(secondStateName, listOf(), listOf())
)

val policy = Policy(
id = policyID,
description = "$testIndexName description",
schemaVersion = 1L,
lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS),
errorNotification = randomErrorNotification(),
defaultState = states[0].name,
states = states
)

createPolicy(policy, policyID)
createIndex(indexName, policyID)

val managedIndexConfig = getExistingManagedIndexConfig(indexName)

// Initializing the policy/metadata
updateManagedIndexConfigStartTime(managedIndexConfig)

waitFor { assertEquals(policyID, getExplainManagedIndexMetaData(indexName).policyID) }

// Evaluating transition conditions for first time
updateManagedIndexConfigStartTime(managedIndexConfig)

// Should fail because it attempted to use the rollover age and the index has not been rolled over
waitFor { assertEquals(AttemptTransitionStep.getFailedRolloverDateMessage(indexName), getExplainManagedIndexMetaData(indexName).info?.get("message")) }
}

fun `test rollover age transition for index`() {
val indexName = "${testIndexName}_rollover_age-01"
val policyID = "${testIndexName}_rollover_age_policy"
val alias = "foo-alias"
val secondStateName = "second"
val states = listOf(
State("first", listOf(), listOf(Transition(secondStateName, Conditions(rolloverAge = TimeValue.timeValueMillis(1))))),
State(secondStateName, listOf(), listOf())
)

val policy = Policy(
id = policyID,
description = "$testIndexName description",
schemaVersion = 1L,
lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS),
errorNotification = randomErrorNotification(),
defaultState = states[0].name,
states = states
)

createPolicy(policy, policyID)
createIndex(indexName, policyID, alias)

val managedIndexConfig = getExistingManagedIndexConfig(indexName)

// Initializing the policy/metadata
updateManagedIndexConfigStartTime(managedIndexConfig)

waitFor { assertEquals(policyID, getExplainManagedIndexMetaData(indexName).policyID) }

// Rollover the index
rolloverIndex(alias)

// Evaluating transition conditions for first time
updateManagedIndexConfigStartTime(managedIndexConfig)

// Should have evaluated to true
waitFor { assertEquals(AttemptTransitionStep.getSuccessMessage(indexName, secondStateName), getExplainManagedIndexMetaData(indexName).info?.get("message")) }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.indexstatemanagement.opensearchapi

import org.opensearch.Version
import org.opensearch.action.admin.indices.rollover.RolloverInfo
import org.opensearch.cluster.metadata.IndexMetadata
import org.opensearch.test.OpenSearchTestCase

class ExtensionsTests : OpenSearchTestCase() {

fun `test getting oldest rollover time`() {
val noRolloverMetadata = IndexMetadata
.Builder("foo-index")
.settings(settings(Version.CURRENT))
.numberOfShards(1)
.numberOfReplicas(1)
.build()

assertNull(noRolloverMetadata.getOldestRolloverTime())
val oldest = RolloverInfo("bar-alias", emptyList(), 17L)

val metadata = IndexMetadata
.Builder(noRolloverMetadata)
.putRolloverInfo(RolloverInfo("foo-alias", emptyList(), 42L))
.putRolloverInfo(oldest)
.putRolloverInfo(RolloverInfo("baz-alias", emptyList(), 134345L))
.build()

assertEquals("Did not get the oldest rollover time", oldest.time, metadata.getOldestRolloverTime()?.toEpochMilli())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import com.nhaarman.mockitokotlin2.mock
import com.nhaarman.mockitokotlin2.whenever
import kotlinx.coroutines.runBlocking
import org.opensearch.action.ActionListener
import org.opensearch.action.admin.indices.rollover.RolloverInfo
import org.opensearch.action.admin.indices.stats.CommonStats
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse
import org.opensearch.client.AdminClient
Expand All @@ -21,6 +22,7 @@ import org.opensearch.cluster.ClusterState
import org.opensearch.cluster.metadata.IndexMetadata
import org.opensearch.cluster.metadata.Metadata
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.collect.ImmutableOpenMap
import org.opensearch.index.shard.DocsStats
import org.opensearch.indexmanagement.indexstatemanagement.model.Conditions
import org.opensearch.indexmanagement.indexstatemanagement.model.ManagedIndexMetaData
Expand All @@ -35,7 +37,10 @@ import java.time.Instant

class AttemptTransitionStepTests : OpenSearchTestCase() {

private val indexMetadata: IndexMetadata = mock()
@Suppress("UNCHECKED_CAST")
private val indexMetadata: IndexMetadata = mock {
on { rolloverInfos } doReturn ImmutableOpenMap.builder<String, RolloverInfo>().build()
}
private val metadata: Metadata = mock { on { index(any<String>()) } doReturn indexMetadata }
private val clusterState: ClusterState = mock { on { metadata() } doReturn metadata }
private val clusterService: ClusterService = mock { on { state() } doReturn clusterState }
Expand Down
Loading

0 comments on commit 127a744

Please sign in to comment.