Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds min rollover age as a transition condition #215

Merged
merged 2 commits into from
Dec 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/links.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ jobs:
id: lychee
uses: lycheeverse/lychee-action@master
with:
args: --accept=200,403,429 "**/*.html" "**/*.md" "**/*.txt" "**/*.json"
args: --accept=200,403,429 **/*.html **/*.md **/*.txt **/*.json
env:
GITHUB_TOKEN: ${{secrets.GITHUB_TOKEN}}
- name: Fail if there were link errors
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,12 @@ data class Conditions(
val indexAge: TimeValue? = null,
val docCount: Long? = null,
val size: ByteSizeValue? = null,
val cron: CronSchedule? = null
val cron: CronSchedule? = null,
val rolloverAge: TimeValue? = null
) : ToXContentObject, Writeable {

init {
val conditionsList = listOf(indexAge, docCount, size, cron)
val conditionsList = listOf(indexAge, docCount, size, cron, rolloverAge)
require(conditionsList.filterNotNull().size == 1) { "Cannot provide more than one Transition condition" }

// Validate doc count condition
Expand All @@ -119,6 +120,7 @@ data class Conditions(
if (docCount != null) builder.field(MIN_DOC_COUNT_FIELD, docCount)
if (size != null) builder.field(MIN_SIZE_FIELD, size.stringRep)
if (cron != null) builder.field(CRON_FIELD, cron)
if (rolloverAge != null) builder.field(MIN_ROLLOVER_AGE_FIELD, rolloverAge.stringRep)
return builder.endObject()
}

Expand All @@ -127,7 +129,8 @@ data class Conditions(
indexAge = sin.readOptionalTimeValue(),
docCount = sin.readOptionalLong(),
size = sin.readOptionalWriteable(::ByteSizeValue),
cron = sin.readOptionalWriteable(::CronSchedule)
cron = sin.readOptionalWriteable(::CronSchedule),
rolloverAge = sin.readOptionalTimeValue()
)

@Throws(IOException::class)
Expand All @@ -136,13 +139,15 @@ data class Conditions(
out.writeOptionalLong(docCount)
out.writeOptionalWriteable(size)
out.writeOptionalWriteable(cron)
out.writeOptionalTimeValue(rolloverAge)
}

companion object {
const val MIN_INDEX_AGE_FIELD = "min_index_age"
const val MIN_DOC_COUNT_FIELD = "min_doc_count"
const val MIN_SIZE_FIELD = "min_size"
const val CRON_FIELD = "cron"
const val MIN_ROLLOVER_AGE_FIELD = "min_rollover_age"

@JvmStatic
@Throws(IOException::class)
Expand All @@ -151,6 +156,7 @@ data class Conditions(
var docCount: Long? = null
var size: ByteSizeValue? = null
var cron: CronSchedule? = null
var rolloverAge: TimeValue? = null

ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp)
while (xcp.nextToken() != Token.END_OBJECT) {
Expand All @@ -162,11 +168,12 @@ data class Conditions(
MIN_DOC_COUNT_FIELD -> docCount = xcp.longValue()
MIN_SIZE_FIELD -> size = ByteSizeValue.parseBytesSizeValue(xcp.text(), MIN_SIZE_FIELD)
CRON_FIELD -> cron = ScheduleParser.parse(xcp) as? CronSchedule
MIN_ROLLOVER_AGE_FIELD -> rolloverAge = TimeValue.parseTimeValue(xcp.text(), MIN_ROLLOVER_AGE_FIELD)
dbbaughe marked this conversation as resolved.
Show resolved Hide resolved
else -> throw IllegalArgumentException("Invalid field: [$fieldName] found in Conditions.")
}
}

return Conditions(indexAge, docCount, size, cron)
return Conditions(indexAge, docCount, size, cron, rolloverAge)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndex
import org.opensearch.indexmanagement.indexstatemanagement.util.managedIndexMetadataID
import org.opensearch.indexmanagement.opensearchapi.contentParser
import org.opensearch.indexmanagement.opensearchapi.suspendUntil
import java.time.Instant

private val log = LogManager.getLogger("Index Management Helper")

Expand Down Expand Up @@ -222,3 +223,11 @@ fun XContentBuilder.buildMetadata(name: String, metadata: ToXContentFragment, pa
this.endObject()
return this
}

// Get the oldest rollover time or null if index was never rolled over
fun IndexMetadata.getOldestRolloverTime(): Instant? {
return this.rolloverInfos.values()
.map { it.value.time }
.minOrNull() // oldest should be min as its epoch time
?.let { Instant.ofEpochMilli(it) }
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import org.opensearch.common.unit.ByteSizeValue
import org.opensearch.indexmanagement.indexstatemanagement.model.ManagedIndexMetaData
import org.opensearch.indexmanagement.indexstatemanagement.model.action.TransitionsActionConfig
import org.opensearch.indexmanagement.indexstatemanagement.model.managedindexmetadata.StepMetaData
import org.opensearch.indexmanagement.indexstatemanagement.opensearchapi.getOldestRolloverTime
import org.opensearch.indexmanagement.indexstatemanagement.step.Step
import org.opensearch.indexmanagement.indexstatemanagement.util.evaluateConditions
import org.opensearch.indexmanagement.indexstatemanagement.util.hasStatsConditions
Expand Down Expand Up @@ -67,7 +68,7 @@ class AttemptTransitionStep(

override fun isIdempotent() = true

@Suppress("TooGenericExceptionCaught", "ReturnCount", "ComplexMethod")
@Suppress("TooGenericExceptionCaught", "ReturnCount", "ComplexMethod", "LongMethod")
override suspend fun execute(): AttemptTransitionStep {
try {
if (config.transitions.isEmpty()) {
Expand All @@ -77,14 +78,28 @@ class AttemptTransitionStep(
return this
}

val indexCreationDate = clusterService.state().metadata().index(indexName).creationDate
val indexMetaData = clusterService.state().metadata().index(indexName)
val indexCreationDate = indexMetaData.creationDate
val indexCreationDateInstant = Instant.ofEpochMilli(indexCreationDate)
if (indexCreationDate == -1L) {
logger.warn("$indexName had an indexCreationDate=-1L, cannot use for comparison")
}
val stepStartTime = getStepStartTime()
var numDocs: Long? = null
var indexSize: ByteSizeValue? = null
val rolloverDate: Instant? = indexMetaData.getOldestRolloverTime()

if (config.transitions.any { it.conditions?.rolloverAge !== null }) {
// if we have a transition with rollover age condition, then we must have a rollover date
// otherwise fail this transition
if (rolloverDate == null) {
val message = getFailedRolloverDateMessage(indexName)
logger.warn(message)
stepStatus = StepStatus.FAILED
info = mapOf("message" to message)
return this
}
}

if (config.transitions.any { it.hasStatsConditions() }) {
val statsRequest = IndicesStatsRequest()
Expand All @@ -106,7 +121,9 @@ class AttemptTransitionStep(
}

// Find the first transition that evaluates to true and get the state to transition to, otherwise return null if none are true
stateName = config.transitions.find { it.evaluateConditions(indexCreationDateInstant, numDocs, indexSize, stepStartTime) }?.stateName
stateName = config.transitions.find {
it.evaluateConditions(indexCreationDateInstant, numDocs, indexSize, stepStartTime, rolloverDate)
}?.stateName
val message: String
val stateName = stateName // shadowed on purpose to prevent var from changing
if (stateName != null) {
Expand Down Expand Up @@ -152,6 +169,8 @@ class AttemptTransitionStep(
companion object {
fun getFailedMessage(index: String) = "Failed to transition index [index=$index]"
fun getFailedStatsMessage(index: String) = "Failed to get stats information for the index [index=$index]"
fun getFailedRolloverDateMessage(index: String) =
"Failed to transition index as min_rollover_age condition was used, but the index has never been rolled over [index=$index]"
fun getEvaluatingMessage(index: String) = "Evaluating transition conditions [index=$index]"
fun getSuccessMessage(index: String, state: String) = "Transitioning to $state [index=$index]"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,8 @@ fun Transition.evaluateConditions(
indexCreationDate: Instant,
numDocs: Long?,
indexSize: ByteSizeValue?,
transitionStartTime: Instant
transitionStartTime: Instant,
rolloverDate: Instant?,
): Boolean {
// If there are no conditions, treat as always true
if (this.conditions == null) return true
Expand All @@ -258,6 +259,12 @@ fun Transition.evaluateConditions(
return this.conditions.cron.getNextExecutionTime(transitionStartTime) <= Instant.now()
}

if (this.conditions.rolloverAge != null) {
val rolloverDateMilli = rolloverDate?.toEpochMilli() ?: return false
val elapsedTime = Instant.now().toEpochMilli() - rolloverDateMilli
return this.conditions.rolloverAge.millis <= elapsedTime
}

// We should never reach this
return false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -611,11 +611,11 @@ abstract class IndexStateManagementRestTestCase : IndexManagementRestTestCase()
return metadata
}

protected fun rolloverIndex(index: String) {
protected fun rolloverIndex(alias: String) {
val response = client().performRequest(
Request(
"POST",
"/$index/_rollover"
"/$alias/_rollover"
)
)
assertEquals(response.statusLine.statusCode, RestStatus.OK.status)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ fun randomTransition(
*/
fun randomConditions(
condition: Pair<String, Any>? =
OpenSearchRestTestCase.randomFrom(listOf(randomIndexAge(), randomDocCount(), randomSize(), null))
OpenSearchRestTestCase.randomFrom(listOf(randomIndexAge(), randomDocCount(), randomSize(), randomRolloverAge(), null))
): Conditions? {

if (condition == null) return null
Expand All @@ -123,6 +123,7 @@ fun randomConditions(
Conditions.MIN_DOC_COUNT_FIELD -> Conditions(docCount = value as Long)
Conditions.MIN_SIZE_FIELD -> Conditions(size = value as ByteSizeValue)
// Conditions.CRON_FIELD -> Conditions(cron = value as CronSchedule) // TODO: Uncomment after issues are fixed
Conditions.MIN_ROLLOVER_AGE_FIELD -> Conditions(rolloverAge = value as TimeValue)
else -> throw IllegalArgumentException("Invalid field: [$type] given for random Conditions.")
}
}
Expand Down Expand Up @@ -242,6 +243,8 @@ fun randomSize(size: ByteSizeValue = randomByteSizeValue()) = Conditions.MIN_SIZ
fun randomCronSchedule(cron: CronSchedule = CronSchedule("0 * * * *", ZoneId.of("UTC"))) =
Conditions.CRON_FIELD to cron

fun randomRolloverAge(rolloverAge: TimeValue = randomTimeValueObject()) = Conditions.MIN_ROLLOVER_AGE_FIELD to rolloverAge

fun randomTimeValueObject(): TimeValue = TimeValue.parseTimeValue(OpenSearchRestTestCase.randomPositiveTimeValue(), "")

fun randomByteSizeValue(): ByteSizeValue =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

package org.opensearch.indexmanagement.indexstatemanagement.action

import org.opensearch.common.unit.TimeValue
import org.opensearch.indexmanagement.indexstatemanagement.IndexStateManagementRestTestCase
import org.opensearch.indexmanagement.indexstatemanagement.model.Conditions
import org.opensearch.indexmanagement.indexstatemanagement.model.Policy
Expand Down Expand Up @@ -71,4 +72,80 @@ class TransitionActionIT : IndexStateManagementRestTestCase() {
// Should have evaluated to true
waitFor { assertEquals(AttemptTransitionStep.getSuccessMessage(indexName, secondStateName), getExplainManagedIndexMetaData(indexName).info?.get("message")) }
}

fun `test rollover age transition for index with no rollover fails`() {
val indexName = "${testIndexName}_rollover_age_no_rollover"
val policyID = "${testIndexName}_rollover_age_no_rollover_policy"
val secondStateName = "second"
val states = listOf(
State("first", listOf(), listOf(Transition(secondStateName, Conditions(rolloverAge = TimeValue.timeValueSeconds(30))))),
State(secondStateName, listOf(), listOf())
)

val policy = Policy(
id = policyID,
description = "$testIndexName description",
schemaVersion = 1L,
lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS),
errorNotification = randomErrorNotification(),
defaultState = states[0].name,
states = states
)

createPolicy(policy, policyID)
createIndex(indexName, policyID)

val managedIndexConfig = getExistingManagedIndexConfig(indexName)

// Initializing the policy/metadata
updateManagedIndexConfigStartTime(managedIndexConfig)

waitFor { assertEquals(policyID, getExplainManagedIndexMetaData(indexName).policyID) }

// Evaluating transition conditions for first time
updateManagedIndexConfigStartTime(managedIndexConfig)

// Should fail because it attempted to use the rollover age and the index has not been rolled over
waitFor { assertEquals(AttemptTransitionStep.getFailedRolloverDateMessage(indexName), getExplainManagedIndexMetaData(indexName).info?.get("message")) }
}

fun `test rollover age transition for index`() {
val indexName = "${testIndexName}_rollover_age-01"
val policyID = "${testIndexName}_rollover_age_policy"
val alias = "foo-alias"
val secondStateName = "second"
val states = listOf(
State("first", listOf(), listOf(Transition(secondStateName, Conditions(rolloverAge = TimeValue.timeValueMillis(1))))),
State(secondStateName, listOf(), listOf())
)

val policy = Policy(
id = policyID,
description = "$testIndexName description",
schemaVersion = 1L,
lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS),
errorNotification = randomErrorNotification(),
defaultState = states[0].name,
states = states
)

createPolicy(policy, policyID)
createIndex(indexName, policyID, alias)

val managedIndexConfig = getExistingManagedIndexConfig(indexName)

// Initializing the policy/metadata
updateManagedIndexConfigStartTime(managedIndexConfig)

waitFor { assertEquals(policyID, getExplainManagedIndexMetaData(indexName).policyID) }

// Rollover the index
rolloverIndex(alias)

// Evaluating transition conditions for first time
updateManagedIndexConfigStartTime(managedIndexConfig)

// Should have evaluated to true
waitFor { assertEquals(AttemptTransitionStep.getSuccessMessage(indexName, secondStateName), getExplainManagedIndexMetaData(indexName).info?.get("message")) }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.indexstatemanagement.opensearchapi

import org.opensearch.Version
import org.opensearch.action.admin.indices.rollover.RolloverInfo
import org.opensearch.cluster.metadata.IndexMetadata
import org.opensearch.test.OpenSearchTestCase

class ExtensionsTests : OpenSearchTestCase() {

fun `test getting oldest rollover time`() {
val noRolloverMetadata = IndexMetadata
.Builder("foo-index")
.settings(settings(Version.CURRENT))
.numberOfShards(1)
.numberOfReplicas(1)
.build()

assertNull(noRolloverMetadata.getOldestRolloverTime())
val oldest = RolloverInfo("bar-alias", emptyList(), 17L)

val metadata = IndexMetadata
.Builder(noRolloverMetadata)
.putRolloverInfo(RolloverInfo("foo-alias", emptyList(), 42L))
.putRolloverInfo(oldest)
.putRolloverInfo(RolloverInfo("baz-alias", emptyList(), 134345L))
.build()

assertEquals("Did not get the oldest rollover time", oldest.time, metadata.getOldestRolloverTime()?.toEpochMilli())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import com.nhaarman.mockitokotlin2.mock
import com.nhaarman.mockitokotlin2.whenever
import kotlinx.coroutines.runBlocking
import org.opensearch.action.ActionListener
import org.opensearch.action.admin.indices.rollover.RolloverInfo
import org.opensearch.action.admin.indices.stats.CommonStats
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse
import org.opensearch.client.AdminClient
Expand All @@ -42,6 +43,7 @@ import org.opensearch.cluster.ClusterState
import org.opensearch.cluster.metadata.IndexMetadata
import org.opensearch.cluster.metadata.Metadata
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.collect.ImmutableOpenMap
import org.opensearch.index.shard.DocsStats
import org.opensearch.indexmanagement.indexstatemanagement.model.Conditions
import org.opensearch.indexmanagement.indexstatemanagement.model.ManagedIndexMetaData
Expand All @@ -56,7 +58,10 @@ import java.time.Instant

class AttemptTransitionStepTests : OpenSearchTestCase() {

private val indexMetadata: IndexMetadata = mock()
@Suppress("UNCHECKED_CAST")
private val indexMetadata: IndexMetadata = mock {
on { rolloverInfos } doReturn ImmutableOpenMap.builder<String, RolloverInfo>().build()
}
private val metadata: Metadata = mock { on { index(any<String>()) } doReturn indexMetadata }
private val clusterState: ClusterState = mock { on { metadata() } doReturn metadata }
private val clusterService: ClusterService = mock { on { state() } doReturn clusterState }
Expand Down
Loading