Skip to content

Commit

Permalink
feat: synchronize executions when attempting to remove an entry (#2012)
Browse files Browse the repository at this point in the history
### 📝 Description
when attempting to remove an execution from the concurrent map
(executions) we need to make sure that the executions map is locked,
that way we will guarantee the atomic calculation of of synchronous
execution exhausted state

Co-authored-by: Samuel Vazquez <samvazquez@expediagroup.com>
  • Loading branch information
samuelAndalon and Samuel Vazquez committed Jul 12, 2024
1 parent 62bcf34 commit c6c2782
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ tasks {
limit {
counter = "INSTRUCTION"
value = "COVEREDRATIO"
minimum = "0.94".toBigDecimal()
minimum = "0.93".toBigDecimal()
}
limit {
counter = "BRANCH"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,16 @@ fun <V> CompletableFuture<V>.dispatchIfNeeded(
val dataLoaderRegistry = environment.dataLoaderRegistry as? KotlinDataLoaderRegistry ?: throw MissingKotlinDataLoaderRegistryException()

if (dataLoaderRegistry.dataLoadersInvokedOnDispatch()) {
val cantContinueExecution = when {
when {
environment.graphQlContext.hasKey(SyncExecutionExhaustedState::class) -> {
environment
.graphQlContext.get<SyncExecutionExhaustedState>(SyncExecutionExhaustedState::class)
.allSyncExecutionsExhausted()
.ifAllSyncExecutionsExhausted {
dataLoaderRegistry.dispatchAll()
}
}
else -> throw MissingInstrumentationStateException()
}

if (cantContinueExecution) {
dataLoaderRegistry.dispatchAll()
}
}
return this
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,12 @@ class SyncExecutionExhaustedState(
override fun onCompleted(result: ExecutionResult?, t: Throwable?) {
if ((result != null && result.errors.size > 0) || t != null) {
if (executions.containsKey(parameters.executionInput.executionId)) {
executions.remove(parameters.executionInput.executionId)
totalExecutions.set(totalExecutions.get() - 1)
val allSyncExecutionsExhausted = allSyncExecutionsExhausted()
if (allSyncExecutionsExhausted) {
onSyncExecutionExhausted(executions.keys().toList())
synchronized(executions) {
executions.remove(parameters.executionInput.executionId)
totalExecutions.set(totalExecutions.get() - 1)
}
ifAllSyncExecutionsExhausted { executionIds ->
onSyncExecutionExhausted(executionIds)
}
}
}
Expand Down Expand Up @@ -131,9 +132,8 @@ class SyncExecutionExhaustedState(
executionState
}

val allSyncExecutionsExhausted = allSyncExecutionsExhausted()
if (allSyncExecutionsExhausted) {
onSyncExecutionExhausted(executions.keys().toList())
ifAllSyncExecutionsExhausted { executionIds ->
onSyncExecutionExhausted(executionIds)
}
}

Expand All @@ -143,9 +143,8 @@ class SyncExecutionExhaustedState(
executionState
}

val allSyncExecutionsExhausted = allSyncExecutionsExhausted()
if (allSyncExecutionsExhausted) {
onSyncExecutionExhausted(executions.keys().toList())
ifAllSyncExecutionsExhausted { executionIds ->
onSyncExecutionExhausted(executionIds)
}
}

Expand All @@ -155,17 +154,18 @@ class SyncExecutionExhaustedState(
}

/**
* Provide the information about when all [ExecutionInput] sharing a [GraphQLContext] exhausted their execution
* execute a given [predicate] when all [ExecutionInput] sharing a [GraphQLContext] exhausted their execution.
* A Synchronous Execution is considered Exhausted when all [DataFetcher]s of all paths were executed up until
* a scalar leaf or a [DataFetcher] that returns a [CompletableFuture]
*/
fun allSyncExecutionsExhausted(): Boolean = synchronized(executions) {
val operationsToExecute = totalExecutions.get()
when {
executions.size < operationsToExecute || !dataLoaderRegistry.onDispatchFuturesHandled() -> false
else -> {
executions.values.all(ExecutionBatchState::isSyncExecutionExhausted)
fun ifAllSyncExecutionsExhausted(predicate: (List<ExecutionId>) -> Unit) =
synchronized(executions) {
val operationsToExecute = totalExecutions.get()
if (executions.size < operationsToExecute || !dataLoaderRegistry.onDispatchFuturesHandled())
return@synchronized

if (executions.values.all(ExecutionBatchState::isSyncExecutionExhausted)) {
predicate(executions.keys().toList())
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -614,9 +614,9 @@ class DataLoaderSyncExecutionExhaustedInstrumentationTest {
fun `Instrumentation should not consider executions that thrown exceptions`() {
val executions = listOf(
ExecutionInput.newExecutionInput("query test1 { astronaut(id: 1) { id name } }").operationName("test1").build(),
ExecutionInput.newExecutionInput("query test2 { astronaut(id: 2) { id name } }").operationName("test2").build(),
ExecutionInput.newExecutionInput("query test3 { mission(id: 3) { id designation } }").operationName("test3").build(),
ExecutionInput.newExecutionInput("query test4 { mission(id: 4) { designation } }").operationName("OPERATION_NOT_IN_DOCUMENT").build()
ExecutionInput.newExecutionInput("query test2 { astronaut(id: 2) { id name } }").operationName("OPERATION_NOT_IN_DOCUMENT").build(),
ExecutionInput.newExecutionInput("query test3 { mission(id: 3) { id designation } }").operationName("OPERATION_NOT_IN_DOCUMENT").build(),
ExecutionInput.newExecutionInput("query test4 { mission(id: 4) { designation } }").operationName("test4").build()
)

val (results, kotlinDataLoaderRegistry) = AstronautGraphQL.execute(
Expand All @@ -631,7 +631,7 @@ class DataLoaderSyncExecutionExhaustedInstrumentationTest {
val missionStatistics = kotlinDataLoaderRegistry.dataLoadersMap["MissionDataLoader"]?.statistics

assertEquals(1, astronautStatistics?.batchInvokeCount)
assertEquals(2, astronautStatistics?.batchLoadCount)
assertEquals(1, astronautStatistics?.batchLoadCount)

assertEquals(1, missionStatistics?.batchInvokeCount)
assertEquals(1, missionStatistics?.batchLoadCount)
Expand Down

0 comments on commit c6c2782

Please sign in to comment.