Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

[#3773] Replace joinunwrap usage from PooledStreamingEventProcessor#4552

Open
laura-devriendt-lemon wants to merge 26 commits intomainAxonIQ/AxonFramework:mainfrom
enhancement/3773/replace-joinunwrap-pooled-processorAxonIQ/AxonFramework:enhancement/3773/replace-joinunwrap-pooled-processorCopy head branch name to clipboard
Open

[#3773] Replace joinunwrap usage from PooledStreamingEventProcessor#4552
laura-devriendt-lemon wants to merge 26 commits intomainAxonIQ/AxonFramework:mainfrom
enhancement/3773/replace-joinunwrap-pooled-processorAxonIQ/AxonFramework:enhancement/3773/replace-joinunwrap-pooled-processorCopy head branch name to clipboard

Conversation

@laura-devriendt-lemon
Copy link
Copy Markdown
Contributor

Summary

Removes all FutureUtils.joinAndUnwrap blocking calls from the PooledStreamingEventProcessor stack as part of [#3773](#3773). Every blocking point in ClaimTask, SplitTask, MergeTask, WorkPackage, Coordinator, and PooledStreamingEventProcessor is replaced with proper async CompletableFuture composition — no coordinator or caller thread is blocked waiting on token store operations.

This work was delivered in four phases:

Phase 0 — Bug fix: Coordinator#abortWorkPackage

thenRun was silently discarding the CompletableFuture returned by executeWithResult, causing the abort flow to complete before releaseClaim and onSegmentReleased had finished. Replaced with thenCompose to correctly chain the futures.

Phase 1 — ClaimTask, SplitTask, MergeTask fully async

All three task types are now pure async chains:

  • ClaimTask: two sequential UoW calls chained with thenCompose; fetch failure returns false via exceptionally
  • SplitTask: splitAndRelease returns CompletableFuture<Boolean>; try/finally replaced with whenComplete to guarantee deadline cleanup
  • MergeTask: parallel token fetches with thenCombine; nested future flattened; whenComplete for deadline cleanup

Phase 2 — WorkPackage and Coordinator fully async

  • WorkPackage: storeToken, extendClaimIfThresholdIsMet, and processEvents return CompletableFuture<Void>
  • Coordinator: start(), claimNewSegments(), createWorkPackage(), ensureOpenStream(), and initializeTokenStore() are all now async; initialization uses an async retry loop replacing executeUntilTrue; a stale-task generation guard prevents races in run()

Phase 3 — PooledStreamingEventProcessor fully async

Moves token store identifier resolution from lazy on-demand (blocking via joinAndUnwrap) to the async start() chain. calculateIdentifier() is removed; AtomicReference<String> replaced with a volatile field; getTokenStoreIdentifier() throws IllegalStateException when called before start() has completed, making the lifecycle contract explicit.

…WorkPackage

thenRun discarded the CompletableFuture returned by executeWithResult, causing
the shutdown future to complete before releaseClaim and onSegmentReleased finished.
Replacing with thenCompose chains the futures correctly so the abort flow waits
for the token release and segment change listener before completing.

Refs #3773
Removes all joinAndUnwrap calls from ClaimTask, SplitTask and MergeTask.
Each task's internal logic is now a proper async chain — no coordinator
threads are blocked waiting on token store operations.

Key changes:
- ClaimTask: two sequential UoW calls chained with thenCompose; fetchToken
  failure caught via exceptionally returning false (preserving original
  try/catch behavior)
- SplitTask: splitAndRelease(Segment) changed from boolean to
  CompletableFuture<Boolean>; callers updated from thenApply to thenCompose
  to avoid unwaited nested futures; try/finally replaced with whenComplete
  to guarantee deadline cleanup on both success and failure
- MergeTask: two sequential segment fetches chained with thenCompose;
  thenCombine(...).thenCompose(f -> f) keeps parallel token fetching while
  flattening the nested future; mergeSegments now returns
  CompletableFuture<Boolean> with whenComplete for deadline cleanup

Adds ClaimTaskTest covering all four scenarios: segment already active,
segment not in available list, successful claim, and failed claim (returns
false as normal completion, not exceptional).

Extends MergeTaskTest with a missing branch: merge initiated from the
higher-ID segment, verifying the lower-ID segment token is always placed
first in MergedTrackingToken regardless of which segment triggered the merge.

Refs #3773
Replace all blocking joinAndUnwrap call sites in WorkPackage (4 sites)
and Coordinator (8 sites) with proper async chains using thenCompose,
thenAccept, and CompletableFuture composition.

Key changes:
- WorkPackage: storeToken, extendClaimIfThresholdIsMet, processEvents,
  and scheduleWorker now return CompletableFuture<Void> instead of blocking
- Coordinator: start() returns CompletableFuture<Void>; initializeTokenStore
  becomes async with retry logic; claimNewSegments, createWorkPackage,
  ensureOpenStream, and extendClaim handling all converted to async chains
- CoordinatorTest: comprehensive tests for all new async behaviour including
  stale-task generation guard, extend-claim failure handling, claim-token
  error propagation, onSegmentReleased lifecycle, start() state transitions,
  and initializeTokenStoreWithRetry retry/exhaustion paths
…semantics

Add processor lifecycle documentation to reflect that start() is fully
async and only completes after the token store identifier is resolved and
the coordinator has started.

- index.adoc: add general lifecycle section covering start(), shutdown(),
  isRunning(), and isError() for all EventProcessor implementations
- streaming.adoc: add PSEP-specific lifecycle section before Configuring,
  documenting the two-step start() sequence and the IllegalStateException
  thrown by getTokenStoreIdentifier() when called before start() completes
- streaming.adoc: update "Retrieving the token store identifier" section
  to reflect eager resolution during start()
…ugfix

[#3773]: phase 0 Replace thenRun + joinAndUnwrap with thenCompose in Coordinator#abortWorkPackage
Move mergedWith and MergedTrackingToken.merged inside executeWithResult
so synchronous exceptions are captured by UnitOfWork.safe() and
whenComplete always removes releasesDeadlines entries even on failure.

Fix thenThrow → failedFuture in MergeTaskTest and ClaimTaskTest for
CompletableFuture-returning mocks to correctly simulate async failures.

Add WhenMergeSegmentsFails test verifying releasesDeadlines are cleared
when mergeSegments throws due to incompatible segments.

Refs #3773
[#3773]: phase 1 Make ClaimTask, SplitTask and MergeTask fully async
…ge-coordinator

[#3773]: phase 2 remove joinAndUnwrap from workPackage and coordinator
…e 3)

Resolve the token store identifier eagerly during start() instead of lazily
blocking on the first call to getTokenStoreIdentifier().

Changes:
- start() now chains tokenStore.retrieveStorageIdentifier() before
  coordinator.start(), forming a fully async pipeline with no blocking
- calculateIdentifier() helper removed along with its joinAndUnwrap call
- AtomicReference<String> replaced with a volatile String field; a single
  write in start() and read-only access afterwards makes AtomicReference
  unnecessary
- getTokenStoreIdentifier() is now a direct field lookup; throws
  IllegalStateException when called before start() has completed, making
  the lifecycle contract explicit instead of silently blocking
- Updated existing getTokenStoreIdentifier test to call startEventProcessor()
  first; added test for the pre-start IllegalStateException
- Resolve identifier lazily in getTokenStoreIdentifier() instead of throwing
  IllegalStateException, using a dedicated resolveTokenStoreIdentifier() method
  with FutureUtils.joinAndUnwrap
- Update StreamingEventProcessor interface Javadoc to accurately describe
  both lazy and eager resolution strategies
- Update PSEP override Javadoc to reflect new lazy fallback behaviour
- Add tests for lazy resolution (happy path and failure propagation)
… lifecycle snippets

Update the three code snippets flagged in review to inject AxonConfiguration
and retrieve the processor via getComponent(Class, name) before using it,
matching the real-world usage pattern shown in the framework examples.
[#3773] Document PooledStreamingEventProcessor lifecycle and start() …
…tifier

[#3773]: phase 3 Remove joinAndUnwrap from PooledStreamingEventProcessor
…rocessor

# Conflicts resolved in:
#	messaging/src/main/java/org/axonframework/messaging/eventhandling/processing/streaming/pooled/PooledStreamingEventProcessor.java
#	messaging/src/main/java/org/axonframework/messaging/eventhandling/processing/streaming/pooled/WorkPackage.java
#	messaging/src/test/java/org/axonframework/messaging/eventhandling/processing/streaming/pooled/WorkPackageTest.java
@laura-devriendt-lemon laura-devriendt-lemon marked this pull request as ready for review May 8, 2026 07:34
@laura-devriendt-lemon laura-devriendt-lemon requested a review from a team as a code owner May 8, 2026 07:34
@laura-devriendt-lemon laura-devriendt-lemon requested review from jangalinski and zambrovski and removed request for a team May 8, 2026 07:34
* fails to retrieve the identifier
*/
@Override
public String getTokenStoreIdentifier() {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I cannot conceive of a scenario where users would programmatically find the token store identifier through this method. This is, in essence, an @Internal method for Framework usage. And even then we tend to use the TokenStore instead.

At any rate, it seems you originally threw an exception if the Processor wasn't started yet. I don't think we have to throw at all if it's not started. Again, don't know when somebody would want it through the processors before a processor is started, but it isn't technically wrong to do so.

@laura-devriendt-lemon laura-devriendt-lemon self-assigned this May 8, 2026
@laura-devriendt-lemon laura-devriendt-lemon added Type: Enhancement Use to signal an issue enhances an already existing feature of the project. Priority 1: Must Highest priority. A release cannot be made if this issue isn’t resolved. labels May 8, 2026
@laura-devriendt-lemon laura-devriendt-lemon added this to the Release 5.2.0 milestone May 8, 2026
@sonarqubecloud
Copy link
Copy Markdown

sonarqubecloud Bot commented May 8, 2026

@smcvb smcvb changed the title Enhancement/3773/replace joinunwrap pooled processor [#3773] Replace joinunwrap usage from PooledStreamingEventProcessor May 8, 2026
Copy link
Copy Markdown
Contributor

@smcvb smcvb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really happy you picked up such a daunting task right of the bat. Resolving #3773 isn't a small feat. However, I have quite a lot of concerns I'd like to see addressed before we can merge this in. Nothing conceptually major, just a lot of small remarks combined that (IMO) merit a request for changes.


The processor supports all core streaming operations including <<replaying-events,replaying events>>, <<parallel_processing,parallelism>>, and tracking progress with <<tracking_tokens,tokens>>.

[[lifecycle]]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather have a lifecycle section after the configuration section, not before. Lifecycles don't trigger if they aren't configured. Putting it on top now makes it seem "super important", while we're dealing with a technicality 9 out of 10 developers won't care about.

1. Resolves the <<token_store,token store>> identifier from the configured `TokenStore`.
2. Starts the internal `Coordinator`, which opens the event stream and begins claiming segments.

Because `start()` is fully asynchronous, callers should await the returned future before interacting with any processor state that depends on a running processor:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd add a note that users typically don't interact with start() at all. Axon Framework invokes it as part of the referred to life cycle management. Thus, some notice telling users that they Do not have to invoke start() in most cases is important. Otherwise readers might think "wait, do I need to start manually?". I'd like to ensure that misconception doesn't occur.

* fails to retrieve the identifier
*/
@Override
public String getTokenStoreIdentifier() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I cannot conceive of a scenario where users would programmatically find the token store identifier through this method. This is, in essence, an @Internal method for Framework usage. And even then we tend to use the TokenStore instead.

At any rate, it seems you originally threw an exception if the Processor wasn't started yet. I don't think we have to throw at all if it's not started. Again, don't know when somebody would want it through the processors before a processor is started, but it isn't technically wrong to do so.

private final WorkPackage.EventFilter workPackageEventFilter;

private final AtomicReference<@Nullable String> tokenStoreIdentifier = new AtomicReference<>();
private volatile @Nullable String tokenStoreIdentifier;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why switch from an AtomicReference to a volatile field?


[IMPORTANT]
====
`getTokenStoreIdentifier()` requires the processor to have been started.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this notice isn't correct anymore, as it seems your implementation no longer throws anything. Furthermore, it's perfectly fine to just let this succeed, even if the processor didn't start yet.

testSubject.scheduleEvent(eventTwo);

// then - second event is only reachable if scheduleWorker reschedules after the first batch
await().atMost(TIMEOUT).untilAsserted(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does this validate the fact a worker was rescheduled?

* {@link UnableToClaimTokenException} during token claiming.
*/
@Nested
class ClaimNewSegments {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lovely addition here. This is a complex bit of code and having those tests definitely doesn't hurt!

CompletableFuture<Void> startFuture = testSubject.start();

// then - fetchSegments is called twice (initial attempt + 1 retry) and start completes normally
assertWithin(1, TimeUnit.SECONDS, () -> {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer we stop using our custom assertWithin method in favor of Awaitility. We added assertWithin before Awaitility was a thing, but it does a great job, and more, than we could with our own assertion utils.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment accounts for all new tests you have added that use assertWithin, by the way.

}

@Test
void completesExceptionallyWithIllegalStateExceptionWhenAllAttemptsExhausted() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although valuable, this is going to be a slow test. Do we really want this to run every time?

workPackages.put(SEGMENT_ID, workPackage);

// when
coordinator.start();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may not have completed yet when you're moving to the then-phase. We should await start to be successful. Use of awaitility seems appropriate to me here.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This pointer holds for all places where you're invoking Coordinator#start right now. As you've made it async, we need to ensure it completes as expected everywhere where it is mandatory to test a subsequent step.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Priority 1: Must Highest priority. A release cannot be made if this issue isn’t resolved. Type: Enhancement Use to signal an issue enhances an already existing feature of the project.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Replace usages of FutureUtils#joinAndUnwrap in PooledStreamingEventProcessor

2 participants

Morty Proxy This is a proxified and sanitized view of the page, visit original site.