From 67066787017ff6b840dd277240f30b27b5dd0140 Mon Sep 17 00:00:00 2001 From: alex079 <> Date: Sat, 2 Apr 2022 16:10:00 +0200 Subject: [PATCH 1/3] Use dataloader registry which returns completable future on dispatch Key points: graphql.execution.AsyncExecutionStrategy.execute - combine execution result with fields value hook completion graphql.execution.instrumentation.dataloader.FieldLevelTrackingApproach.dispatch - return completable future --- build.gradle | 2 +- .../execution/AsyncExecutionStrategy.java | 5 +- .../ChainedInstrumentation.java | 21 ++++-- ...ecutionStrategyInstrumentationContext.java | 5 +- .../InstrumentationContext.java | 3 +- .../SimpleInstrumentation.java | 4 +- .../SimpleInstrumentationContext.java | 3 +- .../DataLoaderDispatcherInstrumentation.java | 15 ++-- .../FieldLevelTrackingApproach.java | 24 ++++--- .../graphql/ChainedDataLoadersTest.groovy | 69 +++++++++++++++++++ .../AsyncExecutionStrategyTest.groovy | 8 +-- .../InstrumentationTest.groovy | 3 +- .../TestingInstrumentContext.groovy | 3 +- ...LoaderDispatcherInstrumentationTest.groovy | 4 +- 14 files changed, 130 insertions(+), 39 deletions(-) create mode 100644 src/test/groovy/graphql/ChainedDataLoadersTest.groovy diff --git a/build.gradle b/build.gradle index ad82317bd4..7c4f237d0d 100644 --- a/build.gradle +++ b/build.gradle @@ -87,7 +87,7 @@ dependencies { compileOnly 'org.jetbrains:annotations:23.0.0' implementation 'org.antlr:antlr4-runtime:' + antlrVersion implementation 'org.slf4j:slf4j-api:' + slf4jVersion - api 'com.graphql-java:java-dataloader:3.1.2' + api 'com.graphql-java:java-dataloader:3.1.3-SNAPSHOT' api 'org.reactivestreams:reactive-streams:' + reactiveStreamsVersion antlr 'org.antlr:antlr4:' + antlrVersion implementation 'com.google.guava:guava:31.0.1-jre' diff --git a/src/main/java/graphql/execution/AsyncExecutionStrategy.java b/src/main/java/graphql/execution/AsyncExecutionStrategy.java index 3800657680..fe91e46ad2 100644 --- a/src/main/java/graphql/execution/AsyncExecutionStrategy.java +++ b/src/main/java/graphql/execution/AsyncExecutionStrategy.java @@ -70,8 +70,9 @@ public CompletableFuture execute(ExecutionContext executionCont return; } List> executionResultFuture = map(completeValueInfos, FieldValueInfo::getFieldValue); - executionStrategyCtx.onFieldValuesInfo(completeValueInfos); - Async.each(executionResultFuture).whenComplete(handleResultsConsumer); + Async.each(executionResultFuture) + .thenCombine(executionStrategyCtx.onFieldValuesInfo(completeValueInfos), (result, __) -> result) + .whenComplete(handleResultsConsumer); }).exceptionally((ex) -> { // if there are any issues with combining/handling the field results, // complete the future at all costs and bubble up any thrown exception so diff --git a/src/main/java/graphql/execution/instrumentation/ChainedInstrumentation.java b/src/main/java/graphql/execution/instrumentation/ChainedInstrumentation.java index 48f22a11fa..78f425a435 100644 --- a/src/main/java/graphql/execution/instrumentation/ChainedInstrumentation.java +++ b/src/main/java/graphql/execution/instrumentation/ChainedInstrumentation.java @@ -1,6 +1,7 @@ package graphql.execution.instrumentation; import com.google.common.collect.ImmutableList; + import graphql.ExecutionInput; import graphql.ExecutionResult; import graphql.PublicApi; @@ -229,8 +230,11 @@ private static class ChainedInstrumentationContext implements Instrumentation } @Override - public void onDispatched(CompletableFuture result) { - contexts.forEach(context -> context.onDispatched(result)); + public CompletableFuture onDispatched(CompletableFuture result) { + return CompletableFuture.allOf(contexts.stream() + .map(context -> context.onDispatched(result)) + .toArray(CompletableFuture[]::new)) + .thenCompose(__ -> result); } @Override @@ -248,8 +252,11 @@ private static class ChainedExecutionStrategyInstrumentationContext implements E } @Override - public void onDispatched(CompletableFuture result) { - contexts.forEach(context -> context.onDispatched(result)); + public CompletableFuture onDispatched(CompletableFuture result) { + return CompletableFuture.allOf(contexts.stream() + .map(context -> context.onDispatched(result)) + .toArray(CompletableFuture[]::new)) + .thenCompose(__ -> result); } @Override @@ -258,8 +265,10 @@ public void onCompleted(ExecutionResult result, Throwable t) { } @Override - public void onFieldValuesInfo(List fieldValueInfoList) { - contexts.forEach(context -> context.onFieldValuesInfo(fieldValueInfoList)); + public CompletableFuture onFieldValuesInfo(List fieldValueInfoList) { + return CompletableFuture.allOf(contexts.stream() + .map(context -> context.onFieldValuesInfo(fieldValueInfoList)) + .toArray(CompletableFuture[]::new)); } } diff --git a/src/main/java/graphql/execution/instrumentation/ExecutionStrategyInstrumentationContext.java b/src/main/java/graphql/execution/instrumentation/ExecutionStrategyInstrumentationContext.java index da11626bc3..07a6b7d19d 100644 --- a/src/main/java/graphql/execution/instrumentation/ExecutionStrategyInstrumentationContext.java +++ b/src/main/java/graphql/execution/instrumentation/ExecutionStrategyInstrumentationContext.java @@ -5,12 +5,13 @@ import graphql.execution.FieldValueInfo; import java.util.List; +import java.util.concurrent.CompletableFuture; @PublicSpi public interface ExecutionStrategyInstrumentationContext extends InstrumentationContext { - default void onFieldValuesInfo(List fieldValueInfoList) { - + default CompletableFuture onFieldValuesInfo(List fieldValueInfoList) { + return CompletableFuture.completedFuture(null); } default void onFieldValuesException() { diff --git a/src/main/java/graphql/execution/instrumentation/InstrumentationContext.java b/src/main/java/graphql/execution/instrumentation/InstrumentationContext.java index 2d9626a113..5cbcd92a4c 100644 --- a/src/main/java/graphql/execution/instrumentation/InstrumentationContext.java +++ b/src/main/java/graphql/execution/instrumentation/InstrumentationContext.java @@ -19,8 +19,9 @@ public interface InstrumentationContext { * This is invoked when the instrumentation step is initially dispatched * * @param result the result of the step as a completable future + * @return instrumented or unmodified result */ - void onDispatched(CompletableFuture result); + CompletableFuture onDispatched(CompletableFuture result); /** * This is invoked when the instrumentation step is fully completed diff --git a/src/main/java/graphql/execution/instrumentation/SimpleInstrumentation.java b/src/main/java/graphql/execution/instrumentation/SimpleInstrumentation.java index ea096c4fcb..0dafcb09b6 100644 --- a/src/main/java/graphql/execution/instrumentation/SimpleInstrumentation.java +++ b/src/main/java/graphql/execution/instrumentation/SimpleInstrumentation.java @@ -45,8 +45,8 @@ public InstrumentationContext> beginValidation(Instrumenta public ExecutionStrategyInstrumentationContext beginExecutionStrategy(InstrumentationExecutionStrategyParameters parameters) { return new ExecutionStrategyInstrumentationContext() { @Override - public void onDispatched(CompletableFuture result) { - + public CompletableFuture onDispatched(CompletableFuture result) { + return result; } @Override diff --git a/src/main/java/graphql/execution/instrumentation/SimpleInstrumentationContext.java b/src/main/java/graphql/execution/instrumentation/SimpleInstrumentationContext.java index 575b143b14..3a949e3ed3 100644 --- a/src/main/java/graphql/execution/instrumentation/SimpleInstrumentationContext.java +++ b/src/main/java/graphql/execution/instrumentation/SimpleInstrumentationContext.java @@ -39,10 +39,11 @@ private SimpleInstrumentationContext(Consumer> codeToRunOnD } @Override - public void onDispatched(CompletableFuture result) { + public CompletableFuture onDispatched(CompletableFuture result) { if (codeToRunOnDispatch != null) { codeToRunOnDispatch.accept(result); } + return result; } @Override diff --git a/src/main/java/graphql/execution/instrumentation/dataloader/DataLoaderDispatcherInstrumentation.java b/src/main/java/graphql/execution/instrumentation/dataloader/DataLoaderDispatcherInstrumentation.java index 44ffd4deeb..64d339aac0 100644 --- a/src/main/java/graphql/execution/instrumentation/dataloader/DataLoaderDispatcherInstrumentation.java +++ b/src/main/java/graphql/execution/instrumentation/dataloader/DataLoaderDispatcherInstrumentation.java @@ -80,18 +80,18 @@ public DataFetcher instrumentDataFetcher(DataFetcher dataFetcher, Instrume return dataFetcher; } // - // currently only AsyncExecutionStrategy with DataLoader and hence this allows us to "dispatch" - // on every object if its not using aggressive batching for other execution strategies + // currently, only AsyncExecutionStrategy with DataLoader and hence this allows us to "dispatch" + // on every object if it's not using aggressive batching for other execution strategies // which allows them to work if used. return (DataFetcher) environment -> { Object obj = dataFetcher.get(environment); - immediatelyDispatch(state); - return obj; + return Async.toCompletableFuture(obj) + .thenCombine(immediatelyDispatch(state), (result, __) -> result); }; } - private void immediatelyDispatch(DataLoaderDispatcherInstrumentationState state) { - state.getApproach().dispatch(); + private CompletableFuture immediatelyDispatch(DataLoaderDispatcherInstrumentationState state) { + return state.getApproach().dispatch(); } @Override @@ -128,7 +128,8 @@ public ExecutionStrategyInstrumentationContext beginExecutionStrategy(Instrument if (state.hasNoDataLoaders()) { return new ExecutionStrategyInstrumentationContext() { @Override - public void onDispatched(CompletableFuture result) { + public CompletableFuture onDispatched(CompletableFuture result) { + return result; } @Override diff --git a/src/main/java/graphql/execution/instrumentation/dataloader/FieldLevelTrackingApproach.java b/src/main/java/graphql/execution/instrumentation/dataloader/FieldLevelTrackingApproach.java index 1a30d875f0..7f924a564c 100644 --- a/src/main/java/graphql/execution/instrumentation/dataloader/FieldLevelTrackingApproach.java +++ b/src/main/java/graphql/execution/instrumentation/dataloader/FieldLevelTrackingApproach.java @@ -131,8 +131,8 @@ ExecutionStrategyInstrumentationContext beginExecutionStrategy(InstrumentationEx return new ExecutionStrategyInstrumentationContext() { @Override - public void onDispatched(CompletableFuture result) { - + public CompletableFuture onDispatched(CompletableFuture result) { + return result; } @Override @@ -141,14 +141,15 @@ public void onCompleted(ExecutionResult result, Throwable t) { } @Override - public void onFieldValuesInfo(List fieldValueInfoList) { + public CompletableFuture onFieldValuesInfo(List fieldValueInfoList) { boolean dispatchNeeded; synchronized (callStack) { dispatchNeeded = handleOnFieldValuesInfo(fieldValueInfoList, callStack, curLevel); } if (dispatchNeeded) { - dispatch(); + return dispatch(); } + return CompletableFuture.completedFuture(null); } @Override @@ -197,16 +198,16 @@ public InstrumentationContext beginFieldFetch(InstrumentationFieldFetchP return new InstrumentationContext() { @Override - public void onDispatched(CompletableFuture result) { + public CompletableFuture onDispatched(CompletableFuture result) { boolean dispatchNeeded; synchronized (callStack) { callStack.increaseFetchCount(level); dispatchNeeded = dispatchIfNeeded(callStack, level); } if (dispatchNeeded) { - dispatch(); + return result.thenCombine(dispatch(), (value, __) -> value); } - + return result; } @Override @@ -241,12 +242,17 @@ private boolean levelReady(CallStack callStack, int level) { return false; } - void dispatch() { + CompletableFuture dispatch() { DataLoaderRegistry dataLoaderRegistry = getDataLoaderRegistry(); if (log.isDebugEnabled()) { log.debug("Dispatching data loaders ({})", dataLoaderRegistry.getKeys()); } - dataLoaderRegistry.dispatchAll(); + return dataLoaderRegistry.dispatch().thenApply(count -> { + if (log.isDebugEnabled()) { + log.debug("Dispatched {} keys to load", count); + } + return null; + }); } private DataLoaderRegistry getDataLoaderRegistry() { diff --git a/src/test/groovy/graphql/ChainedDataLoadersTest.groovy b/src/test/groovy/graphql/ChainedDataLoadersTest.groovy new file mode 100644 index 0000000000..aff04a802b --- /dev/null +++ b/src/test/groovy/graphql/ChainedDataLoadersTest.groovy @@ -0,0 +1,69 @@ +package graphql + +import graphql.schema.GraphQLSchema +import org.dataloader.BatchLoader +import org.dataloader.DataLoader +import org.dataloader.DataLoaderFactory +import org.dataloader.DataLoaderRegistry +import spock.lang.Specification + +import static graphql.Scalars.GraphQLInt +import static graphql.Scalars.GraphQLString +import static graphql.schema.GraphQLFieldDefinition.newFieldDefinition +import static graphql.schema.GraphQLObjectType.newObject +import static java.util.concurrent.CompletableFuture.completedFuture + +class ChainedDataLoadersTest extends Specification { + + private static final def QUERY = '{ level1field1 { level2field1 level2field2 } }' + + private static final BatchLoader LOADER = v -> completedFuture(v.collect() {++it}) + + private static final def REGISTRY = DataLoaderRegistry.newRegistry() + .register('loader', DataLoaderFactory.newDataLoader(LOADER)) + .build() + + private static def dummyQuerySubtype = newObject() + .name('Subtype') + .field(newFieldDefinition() + .name('level2field1') + .type(GraphQLString) + .dataFetcher(environment -> { + DataLoader loader = REGISTRY.getDataLoader('loader') + loader.load(1).thenCompose(loader::load) + })) + .field(newFieldDefinition() + .name('level2field2') + .type(GraphQLInt) + .dataFetcher(environment -> { + DataLoader loader = REGISTRY.getDataLoader('loader') + loader.load(5).thenCompose(loader::load) + })) + .build() + + private static def dummyQueryType = newObject() + .name('Query') + .field(newFieldDefinition() + .name('level1field1') + .type(dummyQuerySubtype) + .dataFetcher(environment -> { + DataLoader loader = REGISTRY.getDataLoader('loader') + loader.load(11).thenCompose(loader::load).thenApply {[:]} + }) + .build()) + + def 'composed data loaders must complete to finish field fetching level'() { + given: + def graphQL = GraphQL.newGraphQL(GraphQLSchema.newSchema().query(dummyQueryType).build()).build() + when: + def result = graphQL.execute(ExecutionInput.newExecutionInput(QUERY).dataLoaderRegistry(REGISTRY).build()).data + then: + result == [ + level1field1: [ + level2field1: '3', + level2field2: 7 + ] + ] + } + +} diff --git a/src/test/groovy/graphql/execution/AsyncExecutionStrategyTest.groovy b/src/test/groovy/graphql/execution/AsyncExecutionStrategyTest.groovy index 29a6c4b771..fbe8bc6bd9 100644 --- a/src/test/groovy/graphql/execution/AsyncExecutionStrategyTest.groovy +++ b/src/test/groovy/graphql/execution/AsyncExecutionStrategyTest.groovy @@ -246,17 +246,17 @@ class AsyncExecutionStrategyTest extends Specification { return new ExecutionStrategyInstrumentationContext() { @Override - void onFieldValuesInfo(List fieldValueInfoList) { + CompletableFuture onFieldValuesInfo(List fieldValueInfoList) { throw new RuntimeException("Exception raised from instrumentation") } @Override - public void onDispatched(CompletableFuture result) { - + CompletableFuture onDispatched(CompletableFuture result) { + return result; } @Override - public void onCompleted(ExecutionResult result, Throwable t) { + void onCompleted(ExecutionResult result, Throwable t) { } } diff --git a/src/test/groovy/graphql/execution/instrumentation/InstrumentationTest.groovy b/src/test/groovy/graphql/execution/instrumentation/InstrumentationTest.groovy index f8b469e45b..51ce07148e 100644 --- a/src/test/groovy/graphql/execution/instrumentation/InstrumentationTest.groovy +++ b/src/test/groovy/graphql/execution/instrumentation/InstrumentationTest.groovy @@ -169,9 +169,10 @@ class InstrumentationTest extends Specification { return new ExecutionStrategyInstrumentationContext() { @Override - void onDispatched(CompletableFuture result) { + CompletableFuture onDispatched(CompletableFuture result) { System.out.println(String.format("t%s setting go signal on", Thread.currentThread().getId())) goSignal.set(true) + return result; } @Override diff --git a/src/test/groovy/graphql/execution/instrumentation/TestingInstrumentContext.groovy b/src/test/groovy/graphql/execution/instrumentation/TestingInstrumentContext.groovy index cdfaa84513..8ae0c2aebc 100644 --- a/src/test/groovy/graphql/execution/instrumentation/TestingInstrumentContext.groovy +++ b/src/test/groovy/graphql/execution/instrumentation/TestingInstrumentContext.groovy @@ -25,10 +25,11 @@ class TestingInstrumentContext implements InstrumentationContext { } @Override - void onDispatched(CompletableFuture result) { + CompletableFuture onDispatched(CompletableFuture result) { if (useOnDispatch) { this.executionList << "onDispatched:$op" } + result } @Override diff --git a/src/test/groovy/graphql/execution/instrumentation/dataloader/DataLoaderDispatcherInstrumentationTest.groovy b/src/test/groovy/graphql/execution/instrumentation/dataloader/DataLoaderDispatcherInstrumentationTest.groovy index 4b8786d838..e7c732ac82 100644 --- a/src/test/groovy/graphql/execution/instrumentation/dataloader/DataLoaderDispatcherInstrumentationTest.groovy +++ b/src/test/groovy/graphql/execution/instrumentation/dataloader/DataLoaderDispatcherInstrumentationTest.groovy @@ -99,9 +99,9 @@ class DataLoaderDispatcherInstrumentationTest extends Specification { def dispatchedCalled = false def dataLoaderRegistry = new DataLoaderRegistry() { @Override - void dispatchAll() { + CompletableFuture dispatch() { dispatchedCalled = true - super.dispatchAll() + super.dispatch() } } def dataLoader = DataLoader.newDataLoader(new BatchLoader() { From 1bceff8b82c6bf145b40055362c1254dd17a51ed Mon Sep 17 00:00:00 2001 From: alex079 <> Date: Sat, 2 Apr 2022 17:31:59 +0200 Subject: [PATCH 2/3] Use dataloader registry which returns completable future on dispatch Key points: graphql.execution.ExecutionStrategy.fetchField - combine execution result with fields value hook completion --- src/main/java/graphql/execution/ExecutionStrategy.java | 3 +-- .../execution/instrumentation/InstrumentationTest.groovy | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/src/main/java/graphql/execution/ExecutionStrategy.java b/src/main/java/graphql/execution/ExecutionStrategy.java index 5f5467292d..4a057b0246 100644 --- a/src/main/java/graphql/execution/ExecutionStrategy.java +++ b/src/main/java/graphql/execution/ExecutionStrategy.java @@ -286,8 +286,7 @@ protected CompletableFuture fetchField(ExecutionContext executionC fetchedValue = new CompletableFuture<>(); fetchedValue.completeExceptionally(e); } - fetchCtx.onDispatched(fetchedValue); - return fetchedValue + return fetchCtx.onDispatched(fetchedValue) .handle((result, exception) -> { fetchCtx.onCompleted(result, exception); if (exception != null) { diff --git a/src/test/groovy/graphql/execution/instrumentation/InstrumentationTest.groovy b/src/test/groovy/graphql/execution/instrumentation/InstrumentationTest.groovy index 51ce07148e..5397fb6585 100644 --- a/src/test/groovy/graphql/execution/instrumentation/InstrumentationTest.groovy +++ b/src/test/groovy/graphql/execution/instrumentation/InstrumentationTest.groovy @@ -146,7 +146,7 @@ class InstrumentationTest extends Specification { then: instrumentation.throwableList.size() == 1 - instrumentation.throwableList[0].getMessage() == "DF BANG!" + instrumentation.throwableList[0].getCause().getMessage() == "DF BANG!" } /** From 4ba5410c123224fbc7ac515f43cc54d5d9128aa9 Mon Sep 17 00:00:00 2001 From: alex079 <> Date: Wed, 20 Apr 2022 11:00:32 +0200 Subject: [PATCH 3/3] rename test class --- ...est.groovy => ComposedDataLoadersTest.groovy} | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) rename src/test/groovy/graphql/{ChainedDataLoadersTest.groovy => ComposedDataLoadersTest.groovy} (79%) diff --git a/src/test/groovy/graphql/ChainedDataLoadersTest.groovy b/src/test/groovy/graphql/ComposedDataLoadersTest.groovy similarity index 79% rename from src/test/groovy/graphql/ChainedDataLoadersTest.groovy rename to src/test/groovy/graphql/ComposedDataLoadersTest.groovy index aff04a802b..f0d4cdd901 100644 --- a/src/test/groovy/graphql/ChainedDataLoadersTest.groovy +++ b/src/test/groovy/graphql/ComposedDataLoadersTest.groovy @@ -13,7 +13,7 @@ import static graphql.schema.GraphQLFieldDefinition.newFieldDefinition import static graphql.schema.GraphQLObjectType.newObject import static java.util.concurrent.CompletableFuture.completedFuture -class ChainedDataLoadersTest extends Specification { +class ComposedDataLoadersTest extends Specification { private static final def QUERY = '{ level1field1 { level2field1 level2field2 } }' @@ -23,38 +23,38 @@ class ChainedDataLoadersTest extends Specification { .register('loader', DataLoaderFactory.newDataLoader(LOADER)) .build() - private static def dummyQuerySubtype = newObject() + private static final def DUMMY_QUERY_SUBTYPE = newObject() .name('Subtype') .field(newFieldDefinition() .name('level2field1') .type(GraphQLString) .dataFetcher(environment -> { - DataLoader loader = REGISTRY.getDataLoader('loader') + DataLoader loader = environment.getDataLoader('loader') loader.load(1).thenCompose(loader::load) })) .field(newFieldDefinition() .name('level2field2') .type(GraphQLInt) .dataFetcher(environment -> { - DataLoader loader = REGISTRY.getDataLoader('loader') + DataLoader loader = environment.getDataLoader('loader') loader.load(5).thenCompose(loader::load) })) .build() - private static def dummyQueryType = newObject() + private static final def DUMMY_QUERY_TYPE = newObject() .name('Query') .field(newFieldDefinition() .name('level1field1') - .type(dummyQuerySubtype) + .type(DUMMY_QUERY_SUBTYPE) .dataFetcher(environment -> { - DataLoader loader = REGISTRY.getDataLoader('loader') + DataLoader loader = environment.getDataLoader('loader') loader.load(11).thenCompose(loader::load).thenApply {[:]} }) .build()) def 'composed data loaders must complete to finish field fetching level'() { given: - def graphQL = GraphQL.newGraphQL(GraphQLSchema.newSchema().query(dummyQueryType).build()).build() + def graphQL = GraphQL.newGraphQL(GraphQLSchema.newSchema().query(DUMMY_QUERY_TYPE).build()).build() when: def result = graphQL.execute(ExecutionInput.newExecutionInput(QUERY).dataLoaderRegistry(REGISTRY).build()).data then: