Skip to content

Commit

Permalink
feat(batching): v6 synchronize executions when attempting to remove a…
Browse files Browse the repository at this point in the history
…n entry (#2014)

### 📝 Description
cherry pick #2013

---------

Co-authored-by: Samuel Vazquez <samvazquez@expediagroup.com>
  • Loading branch information
samuelAndalon and Samuel Vazquez committed Jul 12, 2024
1 parent 9eabdde commit 2388c4a
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,23 +37,25 @@ fun <V> CompletableFuture<V>.dispatchIfNeeded(
val dataLoaderRegistry = environment.dataLoaderRegistry as? KotlinDataLoaderRegistry ?: throw MissingKotlinDataLoaderRegistryException()

if (dataLoaderRegistry.dataLoadersInvokedOnDispatch()) {
val cantContinueExecution = when {
when {
environment.graphQlContext.hasKey(ExecutionLevelDispatchedState::class) -> {
environment
.graphQlContext.get<ExecutionLevelDispatchedState>(ExecutionLevelDispatchedState::class)
.allExecutionsDispatched(Level(environment.executionStepInfo.path.level))
val cantContinueExecution =
environment
.graphQlContext.get<ExecutionLevelDispatchedState>(ExecutionLevelDispatchedState::class)
.allExecutionsDispatched(Level(environment.executionStepInfo.path.level))
if (cantContinueExecution) {
dataLoaderRegistry.dispatchAll()
}
}
environment.graphQlContext.hasKey(SyncExecutionExhaustedState::class) -> {
environment
.graphQlContext.get<SyncExecutionExhaustedState>(SyncExecutionExhaustedState::class)
.allSyncExecutionsExhausted()
.ifAllSyncExecutionsExhausted {
dataLoaderRegistry.dispatchAll()
}
}
else -> throw MissingInstrumentationStateException()
}

if (cantContinueExecution) {
dataLoaderRegistry.dispatchAll()
}
}
return this
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,20 +46,6 @@ class SyncExecutionExhaustedState(
private val totalExecutions: AtomicReference<Int> = AtomicReference(totalOperations)
val executions = ConcurrentHashMap<ExecutionId, ExecutionBatchState>()

/**
* Remove an [ExecutionBatchState] from the state in case operation does not qualify for starting an execution,
* for example:
* - parsing, validation errors
* - persisted query errors
* - an exception during execution was thrown
*/
private fun removeExecution(executionId: ExecutionId) {
if (executions.containsKey(executionId)) {
executions.remove(executionId)
totalExecutions.set(totalExecutions.get() - 1)
}
}

/**
* Create the [ExecutionBatchState] When a specific [ExecutionInput] starts his execution
*
Expand All @@ -84,11 +70,12 @@ class SyncExecutionExhaustedState(
override fun onCompleted(result: ExecutionResult?, t: Throwable?) {
if ((result != null && result.errors.size > 0) || t != null) {
if (executions.containsKey(parameters.executionInput.executionId)) {
executions.remove(parameters.executionInput.executionId)
totalExecutions.set(totalExecutions.get() - 1)
val allSyncExecutionsExhausted = allSyncExecutionsExhausted()
if (allSyncExecutionsExhausted) {
onSyncExecutionExhausted(executions.keys().toList())
synchronized(executions) {
executions.remove(parameters.executionInput.executionId)
totalExecutions.set(totalExecutions.get() - 1)
}
ifAllSyncExecutionsExhausted { executionIds ->
onSyncExecutionExhausted(executionIds)
}
}
}
Expand Down Expand Up @@ -147,9 +134,8 @@ class SyncExecutionExhaustedState(
executionState
}

val allSyncExecutionsExhausted = allSyncExecutionsExhausted()
if (allSyncExecutionsExhausted) {
onSyncExecutionExhausted(executions.keys().toList())
ifAllSyncExecutionsExhausted { executionIds ->
onSyncExecutionExhausted(executionIds)
}
}
override fun onCompleted(result: Any?, t: Throwable?) {
Expand All @@ -158,26 +144,26 @@ class SyncExecutionExhaustedState(
executionState
}

val allSyncExecutionsExhausted = allSyncExecutionsExhausted()
if (allSyncExecutionsExhausted) {
onSyncExecutionExhausted(executions.keys().toList())
ifAllSyncExecutionsExhausted { executionIds ->
onSyncExecutionExhausted(executionIds)
}
}
}
}

/**
* Provide the information about when all [ExecutionInput] sharing a [GraphQLContext] exhausted their execution
* execute a given [predicate] when all [ExecutionInput] sharing a [GraphQLContext] exhausted their execution.
* A Synchronous Execution is considered Exhausted when all [DataFetcher]s of all paths were executed up until
* a scalar leaf or a [DataFetcher] that returns a [CompletableFuture]
*/
fun allSyncExecutionsExhausted(): Boolean = synchronized(executions) {
val operationsToExecute = totalExecutions.get()
when {
executions.size < operationsToExecute || !dataLoaderRegistry.onDispatchFuturesHandled() -> false
else -> {
executions.values.all(ExecutionBatchState::isSyncExecutionExhausted)
fun ifAllSyncExecutionsExhausted(predicate: (List<ExecutionId>) -> Unit) =
synchronized(executions) {
val operationsToExecute = totalExecutions.get()
if (executions.size < operationsToExecute || !dataLoaderRegistry.onDispatchFuturesHandled())
return@synchronized

if (executions.values.all(ExecutionBatchState::isSyncExecutionExhausted)) {
predicate(executions.keys().toList())
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -616,9 +616,9 @@ class DataLoaderSyncExecutionExhaustedInstrumentationTest {
fun `Instrumentation should not consider executions that thrown exceptions`() {
val executions = listOf(
ExecutionInput.newExecutionInput("query test1 { astronaut(id: 1) { id name } }").operationName("test1").build(),
ExecutionInput.newExecutionInput("query test2 { astronaut(id: 2) { id name } }").operationName("test2").build(),
ExecutionInput.newExecutionInput("query test3 { mission(id: 3) { id designation } }").operationName("test3").build(),
ExecutionInput.newExecutionInput("query test4 { mission(id: 4) { designation } }").operationName("OPERATION_NOT_IN_DOCUMENT").build()
ExecutionInput.newExecutionInput("query test2 { astronaut(id: 2) { id name } }").operationName("OPERATION_NOT_IN_DOCUMENT").build(),
ExecutionInput.newExecutionInput("query test3 { mission(id: 3) { id designation } }").operationName("OPERATION_NOT_IN_DOCUMENT").build(),
ExecutionInput.newExecutionInput("query test4 { mission(id: 4) { designation } }").operationName("test4").build()
)

val (results, kotlinDataLoaderRegistry) = AstronautGraphQL.execute(
Expand All @@ -633,7 +633,7 @@ class DataLoaderSyncExecutionExhaustedInstrumentationTest {
val missionStatistics = kotlinDataLoaderRegistry.dataLoadersMap["MissionDataLoader"]?.statistics

assertEquals(1, astronautStatistics?.batchInvokeCount)
assertEquals(2, astronautStatistics?.batchLoadCount)
assertEquals(1, astronautStatistics?.batchLoadCount)

assertEquals(1, missionStatistics?.batchInvokeCount)
assertEquals(1, missionStatistics?.batchLoadCount)
Expand Down

0 comments on commit 2388c4a

Please sign in to comment.