[#3773] Replace joinunwrap usage from PooledStreamingEventProcessor#4552
[#3773] Replace joinunwrap usage from PooledStreamingEventProcessor#4552laura-devriendt-lemon wants to merge 26 commits intomainAxonIQ/AxonFramework:mainfrom enhancement/3773/replace-joinunwrap-pooled-processorAxonIQ/AxonFramework:enhancement/3773/replace-joinunwrap-pooled-processorCopy head branch name to clipboard
joinunwrap usage from PooledStreamingEventProcessor#4552Conversation
…WorkPackage thenRun discarded the CompletableFuture returned by executeWithResult, causing the shutdown future to complete before releaseClaim and onSegmentReleased finished. Replacing with thenCompose chains the futures correctly so the abort flow waits for the token release and segment change listener before completing. Refs #3773
Removes all joinAndUnwrap calls from ClaimTask, SplitTask and MergeTask. Each task's internal logic is now a proper async chain — no coordinator threads are blocked waiting on token store operations. Key changes: - ClaimTask: two sequential UoW calls chained with thenCompose; fetchToken failure caught via exceptionally returning false (preserving original try/catch behavior) - SplitTask: splitAndRelease(Segment) changed from boolean to CompletableFuture<Boolean>; callers updated from thenApply to thenCompose to avoid unwaited nested futures; try/finally replaced with whenComplete to guarantee deadline cleanup on both success and failure - MergeTask: two sequential segment fetches chained with thenCompose; thenCombine(...).thenCompose(f -> f) keeps parallel token fetching while flattening the nested future; mergeSegments now returns CompletableFuture<Boolean> with whenComplete for deadline cleanup Adds ClaimTaskTest covering all four scenarios: segment already active, segment not in available list, successful claim, and failed claim (returns false as normal completion, not exceptional). Extends MergeTaskTest with a missing branch: merge initiated from the higher-ID segment, verifying the lower-ID segment token is always placed first in MergedTrackingToken regardless of which segment triggered the merge. Refs #3773
Replace all blocking joinAndUnwrap call sites in WorkPackage (4 sites) and Coordinator (8 sites) with proper async chains using thenCompose, thenAccept, and CompletableFuture composition. Key changes: - WorkPackage: storeToken, extendClaimIfThresholdIsMet, processEvents, and scheduleWorker now return CompletableFuture<Void> instead of blocking - Coordinator: start() returns CompletableFuture<Void>; initializeTokenStore becomes async with retry logic; claimNewSegments, createWorkPackage, ensureOpenStream, and extendClaim handling all converted to async chains - CoordinatorTest: comprehensive tests for all new async behaviour including stale-task generation guard, extend-claim failure handling, claim-token error propagation, onSegmentReleased lifecycle, start() state transitions, and initializeTokenStoreWithRetry retry/exhaustion paths
…semantics Add processor lifecycle documentation to reflect that start() is fully async and only completes after the token store identifier is resolved and the coordinator has started. - index.adoc: add general lifecycle section covering start(), shutdown(), isRunning(), and isError() for all EventProcessor implementations - streaming.adoc: add PSEP-specific lifecycle section before Configuring, documenting the two-step start() sequence and the IllegalStateException thrown by getTokenStoreIdentifier() when called before start() completes - streaming.adoc: update "Retrieving the token store identifier" section to reflect eager resolution during start()
…ugfix [#3773]: phase 0 Replace thenRun + joinAndUnwrap with thenCompose in Coordinator#abortWorkPackage
Move mergedWith and MergedTrackingToken.merged inside executeWithResult so synchronous exceptions are captured by UnitOfWork.safe() and whenComplete always removes releasesDeadlines entries even on failure. Fix thenThrow → failedFuture in MergeTaskTest and ClaimTaskTest for CompletableFuture-returning mocks to correctly simulate async failures. Add WhenMergeSegmentsFails test verifying releasesDeadlines are cleared when mergeSegments throws due to incompatible segments. Refs #3773
[#3773]: phase 1 Make ClaimTask, SplitTask and MergeTask fully async
…ge-coordinator [#3773]: phase 2 remove joinAndUnwrap from workPackage and coordinator
…e 3) Resolve the token store identifier eagerly during start() instead of lazily blocking on the first call to getTokenStoreIdentifier(). Changes: - start() now chains tokenStore.retrieveStorageIdentifier() before coordinator.start(), forming a fully async pipeline with no blocking - calculateIdentifier() helper removed along with its joinAndUnwrap call - AtomicReference<String> replaced with a volatile String field; a single write in start() and read-only access afterwards makes AtomicReference unnecessary - getTokenStoreIdentifier() is now a direct field lookup; throws IllegalStateException when called before start() has completed, making the lifecycle contract explicit instead of silently blocking - Updated existing getTokenStoreIdentifier test to call startEventProcessor() first; added test for the pre-start IllegalStateException
- Resolve identifier lazily in getTokenStoreIdentifier() instead of throwing IllegalStateException, using a dedicated resolveTokenStoreIdentifier() method with FutureUtils.joinAndUnwrap - Update StreamingEventProcessor interface Javadoc to accurately describe both lazy and eager resolution strategies - Update PSEP override Javadoc to reflect new lazy fallback behaviour - Add tests for lazy resolution (happy path and failure propagation)
… lifecycle snippets Update the three code snippets flagged in review to inject AxonConfiguration and retrieve the processor via getComponent(Class, name) before using it, matching the real-world usage pattern shown in the framework examples.
[#3773] Document PooledStreamingEventProcessor lifecycle and start() …
…tifier [#3773]: phase 3 Remove joinAndUnwrap from PooledStreamingEventProcessor
…rocessor # Conflicts resolved in: # messaging/src/main/java/org/axonframework/messaging/eventhandling/processing/streaming/pooled/PooledStreamingEventProcessor.java # messaging/src/main/java/org/axonframework/messaging/eventhandling/processing/streaming/pooled/WorkPackage.java # messaging/src/test/java/org/axonframework/messaging/eventhandling/processing/streaming/pooled/WorkPackageTest.java
| * fails to retrieve the identifier | ||
| */ | ||
| @Override | ||
| public String getTokenStoreIdentifier() { |
There was a problem hiding this comment.
There was a problem hiding this comment.
I cannot conceive of a scenario where users would programmatically find the token store identifier through this method. This is, in essence, an @Internal method for Framework usage. And even then we tend to use the TokenStore instead.
At any rate, it seems you originally threw an exception if the Processor wasn't started yet. I don't think we have to throw at all if it's not started. Again, don't know when somebody would want it through the processors before a processor is started, but it isn't technically wrong to do so.
|
joinunwrap usage from PooledStreamingEventProcessor
smcvb
left a comment
There was a problem hiding this comment.
Really happy you picked up such a daunting task right of the bat. Resolving #3773 isn't a small feat. However, I have quite a lot of concerns I'd like to see addressed before we can merge this in. Nothing conceptually major, just a lot of small remarks combined that (IMO) merit a request for changes.
|
|
||
| The processor supports all core streaming operations including <<replaying-events,replaying events>>, <<parallel_processing,parallelism>>, and tracking progress with <<tracking_tokens,tokens>>. | ||
|
|
||
| [[lifecycle]] |
There was a problem hiding this comment.
I'd rather have a lifecycle section after the configuration section, not before. Lifecycles don't trigger if they aren't configured. Putting it on top now makes it seem "super important", while we're dealing with a technicality 9 out of 10 developers won't care about.
| 1. Resolves the <<token_store,token store>> identifier from the configured `TokenStore`. | ||
| 2. Starts the internal `Coordinator`, which opens the event stream and begins claiming segments. | ||
|
|
||
| Because `start()` is fully asynchronous, callers should await the returned future before interacting with any processor state that depends on a running processor: |
There was a problem hiding this comment.
I'd add a note that users typically don't interact with start() at all. Axon Framework invokes it as part of the referred to life cycle management. Thus, some notice telling users that they Do not have to invoke start() in most cases is important. Otherwise readers might think "wait, do I need to start manually?". I'd like to ensure that misconception doesn't occur.
| * fails to retrieve the identifier | ||
| */ | ||
| @Override | ||
| public String getTokenStoreIdentifier() { |
There was a problem hiding this comment.
I cannot conceive of a scenario where users would programmatically find the token store identifier through this method. This is, in essence, an @Internal method for Framework usage. And even then we tend to use the TokenStore instead.
At any rate, it seems you originally threw an exception if the Processor wasn't started yet. I don't think we have to throw at all if it's not started. Again, don't know when somebody would want it through the processors before a processor is started, but it isn't technically wrong to do so.
| private final WorkPackage.EventFilter workPackageEventFilter; | ||
|
|
||
| private final AtomicReference<@Nullable String> tokenStoreIdentifier = new AtomicReference<>(); | ||
| private volatile @Nullable String tokenStoreIdentifier; |
There was a problem hiding this comment.
Why switch from an AtomicReference to a volatile field?
|
|
||
| [IMPORTANT] | ||
| ==== | ||
| `getTokenStoreIdentifier()` requires the processor to have been started. |
There was a problem hiding this comment.
I believe this notice isn't correct anymore, as it seems your implementation no longer throws anything. Furthermore, it's perfectly fine to just let this succeed, even if the processor didn't start yet.
| testSubject.scheduleEvent(eventTwo); | ||
|
|
||
| // then - second event is only reachable if scheduleWorker reschedules after the first batch | ||
| await().atMost(TIMEOUT).untilAsserted( |
There was a problem hiding this comment.
How does this validate the fact a worker was rescheduled?
| * {@link UnableToClaimTokenException} during token claiming. | ||
| */ | ||
| @Nested | ||
| class ClaimNewSegments { |
There was a problem hiding this comment.
Lovely addition here. This is a complex bit of code and having those tests definitely doesn't hurt!
| CompletableFuture<Void> startFuture = testSubject.start(); | ||
|
|
||
| // then - fetchSegments is called twice (initial attempt + 1 retry) and start completes normally | ||
| assertWithin(1, TimeUnit.SECONDS, () -> { |
There was a problem hiding this comment.
I'd prefer we stop using our custom assertWithin method in favor of Awaitility. We added assertWithin before Awaitility was a thing, but it does a great job, and more, than we could with our own assertion utils.
There was a problem hiding this comment.
This comment accounts for all new tests you have added that use assertWithin, by the way.
| } | ||
|
|
||
| @Test | ||
| void completesExceptionallyWithIllegalStateExceptionWhenAllAttemptsExhausted() { |
There was a problem hiding this comment.
Although valuable, this is going to be a slow test. Do we really want this to run every time?
| workPackages.put(SEGMENT_ID, workPackage); | ||
|
|
||
| // when | ||
| coordinator.start(); |
There was a problem hiding this comment.
This may not have completed yet when you're moving to the then-phase. We should await start to be successful. Use of awaitility seems appropriate to me here.
There was a problem hiding this comment.
This pointer holds for all places where you're invoking Coordinator#start right now. As you've made it async, we need to ensure it completes as expected everywhere where it is mandatory to test a subsequent step.

Summary
Removes all
FutureUtils.joinAndUnwrapblocking calls from thePooledStreamingEventProcessorstack as part of [#3773](#3773). Every blocking point inClaimTask,SplitTask,MergeTask,WorkPackage,Coordinator, andPooledStreamingEventProcessoris replaced with proper asyncCompletableFuturecomposition — no coordinator or caller thread is blocked waiting on token store operations.This work was delivered in four phases:
Phase 0 — Bug fix:
Coordinator#abortWorkPackagethenRunwas silently discarding theCompletableFuturereturned byexecuteWithResult, causing the abort flow to complete beforereleaseClaimandonSegmentReleasedhad finished. Replaced withthenComposeto correctly chain the futures.Phase 1 —
ClaimTask,SplitTask,MergeTaskfully asyncAll three task types are now pure async chains:
ClaimTask: two sequential UoW calls chained withthenCompose; fetch failure returnsfalseviaexceptionallySplitTask:splitAndReleasereturnsCompletableFuture<Boolean>;try/finallyreplaced withwhenCompleteto guarantee deadline cleanupMergeTask: parallel token fetches withthenCombine; nested future flattened;whenCompletefor deadline cleanupPhase 2 —
WorkPackageandCoordinatorfully asyncWorkPackage:storeToken,extendClaimIfThresholdIsMet, andprocessEventsreturnCompletableFuture<Void>Coordinator:start(),claimNewSegments(),createWorkPackage(),ensureOpenStream(), andinitializeTokenStore()are all now async; initialization uses an async retry loop replacingexecuteUntilTrue; a stale-task generation guard prevents races inrun()Phase 3 —
PooledStreamingEventProcessorfully asyncMoves token store identifier resolution from lazy on-demand (blocking via
joinAndUnwrap) to the asyncstart()chain.calculateIdentifier()is removed;AtomicReference<String>replaced with avolatilefield;getTokenStoreIdentifier()throwsIllegalStateExceptionwhen called beforestart()has completed, making the lifecycle contract explicit.