diff --git a/.serena/cache/java/document_symbols_cache_v23-06-25.pkl b/.serena/cache/java/document_symbols_cache_v23-06-25.pkl deleted file mode 100644 index a0c1ae55d..000000000 Binary files a/.serena/cache/java/document_symbols_cache_v23-06-25.pkl and /dev/null differ diff --git a/server-common/src/main/java/io/a2a/server/events/EventConsumer.java b/server-common/src/main/java/io/a2a/server/events/EventConsumer.java index f43aec9cc..0be198cbf 100644 --- a/server-common/src/main/java/io/a2a/server/events/EventConsumer.java +++ b/server-common/src/main/java/io/a2a/server/events/EventConsumer.java @@ -10,8 +10,11 @@ import mutiny.zero.BackpressureStrategy; import mutiny.zero.TubeConfiguration; import mutiny.zero.ZeroPublisher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class EventConsumer { + private static final Logger LOGGER = LoggerFactory.getLogger(EventConsumer.class); private final EventQueue queue; private Throwable error; @@ -21,6 +24,7 @@ public class EventConsumer { public EventConsumer(EventQueue queue) { this.queue = queue; + LOGGER.debug("EventConsumer created with queue {}", System.identityHashCode(queue)); } public Event consumeOne() throws A2AServerException, EventQueueClosedException { @@ -107,4 +111,11 @@ public EnhancedRunnable.DoneCallback createAgentRunnableDoneCallback() { } }; } + + public void close() { + // Close the queue to stop the polling loop in consumeAll() + // This will cause EventQueueClosedException and exit the while(true) loop + LOGGER.debug("EventConsumer closing queue {}", System.identityHashCode(queue)); + queue.close(); + } } diff --git a/server-common/src/main/java/io/a2a/server/events/EventQueue.java b/server-common/src/main/java/io/a2a/server/events/EventQueue.java index 3e7ede598..91ade646e 100644 --- a/server-common/src/main/java/io/a2a/server/events/EventQueue.java +++ b/server-common/src/main/java/io/a2a/server/events/EventQueue.java @@ -111,11 +111,14 @@ public Event dequeueEvent(int waitMilliSeconds) throws EventQueueClosedException return event; } try { + LOGGER.trace("Polling queue {} (wait={}ms)", System.identityHashCode(this), waitMilliSeconds); Event event = queue.poll(waitMilliSeconds, TimeUnit.MILLISECONDS); if (event != null) { // Call toString() since for errors we don't really want the full stacktrace LOGGER.debug("Dequeued event (waiting) {} {}", this, event instanceof Throwable ? event.toString() : event); semaphore.release(); + } else { + LOGGER.trace("Dequeue timeout (null) from queue {}", System.identityHashCode(this)); } return event; } catch (InterruptedException e) { diff --git a/server-common/src/main/java/io/a2a/server/events/InMemoryQueueManager.java b/server-common/src/main/java/io/a2a/server/events/InMemoryQueueManager.java index 41dfac222..d696a937f 100644 --- a/server-common/src/main/java/io/a2a/server/events/InMemoryQueueManager.java +++ b/server-common/src/main/java/io/a2a/server/events/InMemoryQueueManager.java @@ -4,9 +4,13 @@ import java.util.concurrent.ConcurrentMap; import jakarta.enterprise.context.ApplicationScoped; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @ApplicationScoped public class InMemoryQueueManager implements QueueManager { + private static final Logger LOGGER = LoggerFactory.getLogger(InMemoryQueueManager.class); + private final ConcurrentMap queues = new ConcurrentHashMap<>(); private final EventQueueFactory factory; @@ -43,6 +47,9 @@ public void close(String taskId) { if (existing == null) { throw new NoTaskQueueException(); } + // Close the queue to stop EventConsumer polling loop + LOGGER.debug("Closing queue {} for task {}", System.identityHashCode(existing), taskId); + existing.close(); } @Override @@ -57,7 +64,14 @@ public EventQueue createOrTap(String taskId) { // Make sure an existing queue has not been added in the meantime existing = queues.putIfAbsent(taskId, newQueue); } - return existing == null ? newQueue : existing.tap(); + EventQueue result = existing == null ? newQueue : existing.tap(); + if (existing == null) { + LOGGER.debug("Created new queue {} for task {}", System.identityHashCode(result), taskId); + } else { + LOGGER.debug("Tapped existing queue {} -> child {} for task {}", + System.identityHashCode(existing), System.identityHashCode(result), taskId); + } + return result; } @Override diff --git a/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java b/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java index 7bf2b8af8..8fa55030c 100644 --- a/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java +++ b/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java @@ -28,6 +28,7 @@ import io.a2a.server.events.EnhancedRunnable; import io.a2a.server.events.EventConsumer; import io.a2a.server.events.EventQueue; +import io.a2a.server.events.NoTaskQueueException; import io.a2a.server.events.QueueManager; import io.a2a.server.events.TaskQueueExistsException; import io.a2a.server.tasks.PushNotificationConfigStore; @@ -201,7 +202,7 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte // any errors thrown by the producerRunnable are not picked up by the consumer producerRunnable.addDoneCallback(consumer.createAgentRunnableDoneCallback()); etai = resultAggregator.consumeAndBreakOnInterrupt(consumer, blocking, pushNotificationCallback); - + if (etai == null) { LOGGER.debug("No result, throwing InternalError"); throw new InternalError("No result"); @@ -218,10 +219,10 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte pushNotificationCallback.run(); } finally { if (interruptedOrNonBlocking) { - CompletableFuture cleanupTask = CompletableFuture.runAsync(() -> cleanupProducer(taskId), executor); + CompletableFuture cleanupTask = CompletableFuture.runAsync(() -> cleanupProducer(taskId, false), executor); trackBackgroundTask(cleanupTask); } else { - cleanupProducer(taskId); + cleanupProducer(taskId, false); } } @@ -342,7 +343,7 @@ private void startBackgroundConsumption() { } }); } finally { - CompletableFuture cleanupTask = CompletableFuture.runAsync(() -> cleanupProducer(taskId.get()), executor); + CompletableFuture cleanupTask = CompletableFuture.runAsync(() -> cleanupProducer(taskId.get(), true), executor); trackBackgroundTask(cleanupTask); } } @@ -473,8 +474,17 @@ public void run() { .whenComplete((v, err) -> { if (err != null) { runnable.setError(err); + // Close queue on error + queue.close(); + } else { + // Only close queue if task is in a final state + Task task = taskStore.get(taskId); + if (task != null && task.getStatus().state().isFinal()) { + queue.close(); + } else { + LOGGER.debug("Task {} not in final state or not yet created, keeping queue open", taskId); + } } - queue.close(); runnable.invokeDoneCallbacks(); }); runningAgents.put(taskId, cf); @@ -499,11 +509,48 @@ private void trackBackgroundTask(CompletableFuture task) { }); } - private void cleanupProducer(String taskId) { + private void cleanupProducer(String taskId, boolean isStreaming) { + LOGGER.debug("Starting cleanup for task {} (streaming={})", taskId, isStreaming); // TODO the Python implementation waits for the producerRunnable - runningAgents.get(taskId) - .whenComplete((v, t) -> { - queueManager.close(taskId); + CompletableFuture agentFuture = runningAgents.get(taskId); + if (agentFuture == null) { + LOGGER.debug("No running agent found for task {}", taskId); + return; + } + agentFuture.whenComplete((v, t) -> { + LOGGER.debug("Agent completed for task {}", taskId); + + boolean closeQueue = false; + if (isStreaming) { + // For streaming calls, always close queue when agent completes + // Each streaming call is independent and doesn't support multi-message reuse + LOGGER.debug("Streaming call, closing queue for task {}", taskId); + closeQueue = true; + } else { + // For non-streaming calls, only close queue if task is in final state + // For non-final states, queue must stay open for potential future messages to same taskId + // so we can handle the "fire and forget' case used e.g. in the TCK + Task task = taskStore.get(taskId); + if (task != null && task.getStatus().state().isFinal()) { + LOGGER.debug("Task in final state, closing queue for task {}", taskId); + closeQueue = true; + } else { + LOGGER.debug("Task not in final state, keeping queue open for task {}", taskId); + } + } + + if (closeQueue) { + try { + queueManager.close(taskId); + } catch (NoTaskQueueException e) { + // This can happen if the queue was already closed and removed, which is not an error in this cleanup context. + LOGGER.debug("Queue for task {} was already closed or does not exist.", taskId); + } catch (Exception e) { + LOGGER.debug("Error closing queue for task {}: {}", taskId, e.getMessage()); + } + } + + // Always remove from running agents runningAgents.remove(taskId); }); } @@ -545,5 +592,10 @@ private void sendPushNotification(String taskId, ResultAggregator resultAggregat } } + private boolean isTaskFinalInTaskStore(String taskId) { + Task task = taskStore.get(taskId); + return task != null && task.getStatus().state().isFinal(); + } + private record MessageSendSetup(TaskManager taskManager, Task task, RequestContext requestContext) {} } diff --git a/server-common/src/main/java/io/a2a/server/tasks/ResultAggregator.java b/server-common/src/main/java/io/a2a/server/tasks/ResultAggregator.java index a30d60b01..41bb8a1c4 100644 --- a/server-common/src/main/java/io/a2a/server/tasks/ResultAggregator.java +++ b/server-common/src/main/java/io/a2a/server/tasks/ResultAggregator.java @@ -5,6 +5,7 @@ import static io.a2a.server.util.async.AsyncUtils.processor; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.Flow; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -40,7 +41,12 @@ public Flow.Publisher consumeAndEmit(EventConsumer consumer) { Flow.Publisher all = consumer.consumeAll(); return processor(createTubeConfig(), all, ((errorConsumer, event) -> { - callTaskManagerProcess(event); + try { + callTaskManagerProcess(event); + } catch (A2AServerException e) { + errorConsumer.accept(e); + return false; + } return true; })); } @@ -60,7 +66,12 @@ public EventKind consumeAll(EventConsumer consumer) throws JSONRPCError { return false; } } - callTaskManagerProcess(event); + try { + callTaskManagerProcess(event); + } catch (A2AServerException e) { + error.set(e); + return false; + } return true; }, error::set); @@ -84,24 +95,45 @@ public EventTypeAndInterrupt consumeAndBreakOnInterrupt(EventConsumer consumer, Flow.Publisher all = consumer.consumeAll(); AtomicReference message = new AtomicReference<>(); AtomicBoolean interrupted = new AtomicBoolean(false); + AtomicBoolean shouldCloseConsumer = new AtomicBoolean(false); AtomicReference errorRef = new AtomicReference<>(); - consumer( + CompletableFuture completionFuture = new CompletableFuture<>(); + + // CRITICAL: The subscription itself must run on a background thread to avoid blocking + // the Vert.x worker thread. EventConsumer.consumeAll() starts a polling loop that + // blocks in dequeueEvent(), so we must subscribe from a background thread. + CompletableFuture.runAsync(() -> { + consumer( createTubeConfig(), all, - (event -> { + (event) -> { + // Handle Throwable events if (event instanceof Throwable t) { errorRef.set(t); + completionFuture.completeExceptionally(t); return false; } + + // Handle Message events if (event instanceof Message msg) { - this.message = msg; + ResultAggregator.this.message = msg; message.set(msg); + completionFuture.complete(null); return false; } - callTaskManagerProcess(event); + // Process event through TaskManager + try { + callTaskManagerProcess(event); + } catch (A2AServerException e) { + errorRef.set(e); + completionFuture.completeExceptionally(e); + return false; + } + // Determine interrupt behavior boolean shouldInterrupt = false; + boolean continueInBackground = false; boolean isAuthRequired = (event instanceof Task task && task.getStatus().state() == TaskState.AUTH_REQUIRED) || (event instanceof TaskStatusUpdateEvent tsue && tsue.getStatus().state() == TaskState.AUTH_REQUIRED); @@ -114,21 +146,80 @@ public EventTypeAndInterrupt consumeAndBreakOnInterrupt(EventConsumer consumer, // new request is expected in order for the agent to make progress, // so the agent should exit. shouldInterrupt = true; + continueInBackground = true; } // For non-blocking calls, interrupt as soon as a task is available. else if (!blocking) { shouldInterrupt = true; + continueInBackground = true; + } + // For blocking calls, interrupt when we get any task-related event (EventKind except Message) + // Cancel subscription to free resources (client can resubscribe if needed) + else { + // Note: The initial version of this change only triggered this block with + // else if (blocking && (event instanceof EventKind && !(event instanceof Message))) + // However, a review comment indicated this might not be necessary, and everything passes + // with this simpler catch-all else block. Keeping this comment here for a while (added + // 9 Oct 2025) just in case. + shouldInterrupt = true; + continueInBackground = false; } if (shouldInterrupt) { - // Continue consuming the rest of the events in the background. - CompletableFuture.runAsync(() -> continueConsuming(all, eventCallback)); + // Complete the future to unblock the main thread interrupted.set(true); - return false; + completionFuture.complete(null); + + if (continueInBackground) { + // Continue consuming in background - keep requesting events + return true; + } else { + // Blocking call - cancel subscription AND close consumer to stop polling loop + // We need to close the consumer after the consumer() call completes + shouldCloseConsumer.set(true); + return false; + } } + + // Continue processing return true; - }), - errorRef::set); + }, + throwable -> { + // Handle onError and onComplete + if (throwable != null) { + errorRef.set(throwable); + completionFuture.completeExceptionally(throwable); + } else { + // onComplete + completionFuture.complete(null); + } + } + ); + }); + + // Wait for completion or interruption + try { + completionFuture.join(); + } catch (CompletionException e) { + // CompletionException wraps the actual exception + Throwable cause = e.getCause(); + if (cause != null) { + Utils.rethrow(cause); + } else { + throw e; + } + } + + // Close consumer if blocking interrupt occurred + if (shouldCloseConsumer.get()) { + // Close the EventConsumer's queue to stop the infinite polling loop + // This will cause EventQueueClosedException and exit the while(true) loop + consumer.close(); + } + + // Background consumption continues automatically via the subscription + // No need to create a new publisher - returning true in nextFunction + // keeps the subscription alive Throwable error = errorRef.get(); if (error != null) { @@ -139,26 +230,8 @@ else if (!blocking) { message.get() != null ? message.get() : taskManager.getTask(), interrupted.get()); } - private void continueConsuming(Flow.Publisher all, Runnable eventCallback) { - consumer(createTubeConfig(), - all, - event -> { - callTaskManagerProcess(event); - if (eventCallback != null) { - eventCallback.run(); - } - return true; - }, - t -> {}); - } - - private void callTaskManagerProcess(Event event) { - try { - taskManager.process(event); - } catch (A2AServerException e) { - // TODO Decide what to do in case of failure - e.printStackTrace(); - } + private void callTaskManagerProcess(Event event) throws A2AServerException { + taskManager.process(event); } public record EventTypeAndInterrupt(EventKind eventType, boolean interrupted) { diff --git a/server-common/src/main/java/io/a2a/server/tasks/TaskManager.java b/server-common/src/main/java/io/a2a/server/tasks/TaskManager.java index 067aa2832..523b9346f 100644 --- a/server-common/src/main/java/io/a2a/server/tasks/TaskManager.java +++ b/server-common/src/main/java/io/a2a/server/tasks/TaskManager.java @@ -94,7 +94,7 @@ Task saveTaskEvent(TaskArtifactUpdateEvent event) throws A2AServerException { public Event process(Event event) throws A2AServerException { if (event instanceof Task task) { - saveTask(task); + saveTaskEvent(task); } else if (event instanceof TaskStatusUpdateEvent taskStatusUpdateEvent) { saveTaskEvent(taskStatusUpdateEvent); } else if (event instanceof TaskArtifactUpdateEvent taskArtifactUpdateEvent) { diff --git a/server-common/src/test/java/io/a2a/server/requesthandlers/DefaultRequestHandlerTest.java b/server-common/src/test/java/io/a2a/server/requesthandlers/DefaultRequestHandlerTest.java index 9951b7ba7..2fc55ea26 100644 --- a/server-common/src/test/java/io/a2a/server/requesthandlers/DefaultRequestHandlerTest.java +++ b/server-common/src/test/java/io/a2a/server/requesthandlers/DefaultRequestHandlerTest.java @@ -60,6 +60,65 @@ void setUp() { serverCallContext = new ServerCallContext(UnauthenticatedUser.INSTANCE, Map.of(), Set.of()); } + /** + * Test that multiple blocking messages to the same task work correctly + * when agent doesn't emit final events (fire-and-forget pattern). + * This replicates TCK test: test_message_send_continue_task + */ + @Test + @Timeout(10) + void testBlockingMessageContinueTask() throws Exception { + String taskId = "continue-task-1"; + String contextId = "continue-ctx-1"; + + // Configure agent to NOT complete tasks (like TCK fire-and-forget agent) + agentExecutor.setExecuteCallback((context, queue) -> { + Task task = context.getTask(); + if (task == null) { + // First message: create SUBMITTED task + task = new Task.Builder() + .id(context.getTaskId()) + .contextId(context.getContextId()) + .status(new TaskStatus(TaskState.SUBMITTED)) + .build(); + queue.enqueueEvent(task); + } + // Don't complete - just return (fire-and-forget) + }); + + // First blocking message - should return SUBMITTED task + Message message1 = new Message.Builder() + .messageId("msg-1") + .role(Message.Role.USER) + .parts(new TextPart("first message")) + .taskId(taskId) + .contextId(contextId) + .build(); + + MessageSendParams params1 = new MessageSendParams(message1, null, null); + Object result1 = requestHandler.onMessageSend(params1, serverCallContext); + + assertTrue(result1 instanceof Task); + Task task1 = (Task) result1; + assertTrue(task1.getId().equals(taskId)); + assertTrue(task1.getStatus().state() == TaskState.SUBMITTED); + + // Second blocking message to SAME taskId - should not hang + Message message2 = new Message.Builder() + .messageId("msg-2") + .role(Message.Role.USER) + .parts(new TextPart("second message")) + .taskId(taskId) + .contextId(contextId) + .build(); + + MessageSendParams params2 = new MessageSendParams(message2, null, null); + Object result2 = requestHandler.onMessageSend(params2, serverCallContext); + + // Should complete successfully (not timeout) + assertTrue(result2 instanceof Task); + } + /** * Test that background cleanup tasks are properly tracked and cleared. * Backported from Python test: test_background_cleanup_task_is_tracked_and_cleared diff --git a/tests/server-common/src/test/java/io/a2a/server/apps/common/AbstractA2AServerTest.java b/tests/server-common/src/test/java/io/a2a/server/apps/common/AbstractA2AServerTest.java index 0abba1f77..4e12f13ba 100644 --- a/tests/server-common/src/test/java/io/a2a/server/apps/common/AbstractA2AServerTest.java +++ b/tests/server-common/src/test/java/io/a2a/server/apps/common/AbstractA2AServerTest.java @@ -4,6 +4,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.fail; import static org.junit.jupiter.api.Assumptions.assumeTrue; @@ -15,6 +16,7 @@ import io.a2a.client.config.ClientConfig; import io.a2a.client.ClientEvent; import io.a2a.client.MessageEvent; +import io.a2a.client.TaskEvent; import io.a2a.client.TaskUpdateEvent; import jakarta.ws.rs.core.MediaType; @@ -27,6 +29,7 @@ import java.nio.charset.StandardCharsets; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -45,6 +48,7 @@ import io.a2a.spec.Artifact; import io.a2a.spec.DeleteTaskPushNotificationConfigParams; import io.a2a.spec.Event; +import io.a2a.spec.EventKind; import io.a2a.spec.GetTaskPushNotificationConfigParams; import io.a2a.spec.InvalidParamsError; import io.a2a.spec.InvalidRequestError; @@ -52,6 +56,7 @@ import io.a2a.spec.JSONRPCErrorResponse; import io.a2a.spec.ListTaskPushNotificationConfigParams; import io.a2a.spec.Message; +import io.a2a.spec.MessageSendConfiguration; import io.a2a.spec.MessageSendParams; import io.a2a.spec.MethodNotFoundError; import io.a2a.spec.Part; @@ -117,6 +122,7 @@ public abstract class AbstractA2AServerTest { protected final int serverPort; private Client client; private Client nonStreamingClient; + private Client pollingClient; protected AbstractA2AServerTest(int serverPort) { this.serverPort = serverPort; @@ -754,6 +760,157 @@ public void testDeletePushNotificationConfigSetWithoutConfigId() throws Exceptio } } + @Test + @Timeout(value = 1, unit = TimeUnit.MINUTES) + public void testNonBlockingWithMultipleMessages() throws Exception { + // 1. Send first non-blocking message to create task in WORKING state + Message message1 = new Message.Builder(MESSAGE) + .taskId("multi-event-test") + .contextId("test-context") + .parts(new TextPart("First request")) + .build(); + + AtomicReference taskIdRef = new AtomicReference<>(); + CountDownLatch firstTaskLatch = new CountDownLatch(1); + + BiConsumer firstMessageConsumer = (event, agentCard) -> { + System.out.println("First message consumer received: " + event.getClass().getSimpleName()); + if (event instanceof TaskEvent te) { + System.out.println(" Task state: " + te.getTask().getStatus().state()); + taskIdRef.set(te.getTask().getId()); + firstTaskLatch.countDown(); + } else if (event instanceof TaskUpdateEvent tue && tue.getUpdateEvent() instanceof TaskStatusUpdateEvent status) { + System.out.println(" Task status: " + status.getStatus().state()); + taskIdRef.set(status.getTaskId()); + firstTaskLatch.countDown(); + } + }; + + // Non-blocking message creates task in WORKING state and returns immediately + // Queue stays open because task is not in final state + getPollingClient().sendMessage(message1, List.of(firstMessageConsumer), null); + + assertTrue(firstTaskLatch.await(10, TimeUnit.SECONDS)); + String taskId = taskIdRef.get(); + assertNotNull(taskId); + assertEquals("multi-event-test", taskId); + + // 2. Resubscribe to task (queue should still be open) + CountDownLatch resubEventLatch = new CountDownLatch(2); // artifact-2 + completion + List resubReceivedEvents = new CopyOnWriteArrayList<>(); + AtomicBoolean resubUnexpectedEvent = new AtomicBoolean(false); + AtomicReference resubErrorRef = new AtomicReference<>(); + + BiConsumer resubConsumer = (event, agentCard) -> { + System.out.println("Resubscription received event: " + event.getClass().getSimpleName() + + (event instanceof TaskUpdateEvent tue ? " - " + tue.getUpdateEvent().getClass().getSimpleName() : "")); + if (event instanceof TaskUpdateEvent tue) { + resubReceivedEvents.add(tue.getUpdateEvent()); + resubEventLatch.countDown(); + System.out.println("Resub event latch count: " + resubEventLatch.getCount()); + } else { + resubUnexpectedEvent.set(true); + } + }; + + Consumer resubErrorHandler = error -> { + if (!isStreamClosedError(error)) { + resubErrorRef.set(error); + } + }; + + // Wait for subscription to be active + CountDownLatch subscriptionLatch = new CountDownLatch(1); + awaitStreamingSubscription() + .whenComplete((unused, throwable) -> subscriptionLatch.countDown()); + + getClient().resubscribe(new TaskIdParams(taskId), + List.of(resubConsumer), + resubErrorHandler); + + assertTrue(subscriptionLatch.await(15, TimeUnit.SECONDS)); + + // 3. Send second streaming message to same taskId + Message message2 = new Message.Builder(MESSAGE) + .taskId("multi-event-test") // Same taskId + .contextId("test-context") + .parts(new TextPart("Second request")) + .build(); + + CountDownLatch streamEventLatch = new CountDownLatch(2); // artifact-2 + completion + List streamReceivedEvents = new CopyOnWriteArrayList<>(); + AtomicBoolean streamUnexpectedEvent = new AtomicBoolean(false); + + BiConsumer streamConsumer = (event, agentCard) -> { + System.out.println("Streaming consumer received event: " + event.getClass().getSimpleName() + + (event instanceof TaskUpdateEvent tue ? " - " + tue.getUpdateEvent().getClass().getSimpleName() : "")); + if (event instanceof TaskUpdateEvent tue) { + streamReceivedEvents.add(tue.getUpdateEvent()); + streamEventLatch.countDown(); + System.out.println("Stream event latch count: " + streamEventLatch.getCount()); + } else { + streamUnexpectedEvent.set(true); + } + }; + + // Streaming message adds artifact-2 and completes task + getClient().sendMessage(message2, List.of(streamConsumer), null); + + // 4. Verify both consumers received artifact-2 and completion + assertTrue(resubEventLatch.await(10, TimeUnit.SECONDS)); + assertTrue(streamEventLatch.await(10, TimeUnit.SECONDS)); + + assertFalse(resubUnexpectedEvent.get()); + assertFalse(streamUnexpectedEvent.get()); + assertNull(resubErrorRef.get()); + + // Both should have received 2 events: artifact-2 and completion + assertEquals(2, resubReceivedEvents.size()); + assertEquals(2, streamReceivedEvents.size()); + + // Verify resubscription events + long resubArtifactCount = resubReceivedEvents.stream() + .filter(e -> e instanceof TaskArtifactUpdateEvent) + .count(); + assertEquals(1, resubArtifactCount); + + long resubCompletionCount = resubReceivedEvents.stream() + .filter(e -> e instanceof TaskStatusUpdateEvent) + .filter(e -> ((TaskStatusUpdateEvent) e).isFinal()) + .count(); + assertEquals(1, resubCompletionCount); + + // Verify streaming events + long streamArtifactCount = streamReceivedEvents.stream() + .filter(e -> e instanceof TaskArtifactUpdateEvent) + .count(); + assertEquals(1, streamArtifactCount); + + long streamCompletionCount = streamReceivedEvents.stream() + .filter(e -> e instanceof TaskStatusUpdateEvent) + .filter(e -> ((TaskStatusUpdateEvent) e).isFinal()) + .count(); + assertEquals(1, streamCompletionCount); + + // Verify artifact-2 details from resubscription + TaskArtifactUpdateEvent resubArtifact = (TaskArtifactUpdateEvent) resubReceivedEvents.stream() + .filter(e -> e instanceof TaskArtifactUpdateEvent) + .findFirst() + .orElseThrow(); + assertEquals("artifact-2", resubArtifact.getArtifact().artifactId()); + assertEquals("Second message artifact", + ((TextPart) resubArtifact.getArtifact().parts().get(0)).getText()); + + // Verify artifact-2 details from streaming + TaskArtifactUpdateEvent streamArtifact = (TaskArtifactUpdateEvent) streamReceivedEvents.stream() + .filter(e -> e instanceof TaskArtifactUpdateEvent) + .findFirst() + .orElseThrow(); + assertEquals("artifact-2", streamArtifact.getArtifact().artifactId()); + assertEquals("Second message artifact", + ((TextPart) streamArtifact.getArtifact().parts().get(0)).getText()); + } + @Test public void testMalformedJSONRPCRequest() { // skip this test for non-JSONRPC transports @@ -1268,6 +1425,16 @@ protected Client getNonStreamingClient() throws A2AClientException { return nonStreamingClient; } + /** + * Get a client configured for polling (non-blocking) operations. + */ + protected Client getPollingClient() throws A2AClientException { + if (pollingClient == null) { + pollingClient = createPollingClient(); + } + return pollingClient; + } + /** * Create a client with the specified streaming configuration. */ @@ -1317,4 +1484,23 @@ private ClientConfig createClientConfig(boolean streaming) { .build(); } + /** + * Create a client configured for polling (non-blocking) operations. + */ + private Client createPollingClient() throws A2AClientException { + AgentCard agentCard = createTestAgentCard(); + ClientConfig clientConfig = new ClientConfig.Builder() + .setStreaming(false) // Non-streaming + .setPolling(true) // Polling mode (translates to blocking=false on server) + .build(); + + ClientBuilder clientBuilder = Client + .builder(agentCard) + .clientConfig(clientConfig); + + configureTransport(clientBuilder); + + return clientBuilder.build(); + } + } \ No newline at end of file diff --git a/tests/server-common/src/test/java/io/a2a/server/apps/common/AgentExecutorProducer.java b/tests/server-common/src/test/java/io/a2a/server/apps/common/AgentExecutorProducer.java index 9576da4a2..f472a37ab 100644 --- a/tests/server-common/src/test/java/io/a2a/server/apps/common/AgentExecutorProducer.java +++ b/tests/server-common/src/test/java/io/a2a/server/apps/common/AgentExecutorProducer.java @@ -3,11 +3,14 @@ import jakarta.enterprise.context.ApplicationScoped; import jakarta.enterprise.inject.Produces; +import java.util.List; + import io.a2a.server.agentexecution.AgentExecutor; import io.a2a.server.agentexecution.RequestContext; import io.a2a.server.events.EventQueue; import io.a2a.server.tasks.TaskUpdater; import io.a2a.spec.JSONRPCError; +import io.a2a.spec.TextPart; import io.a2a.spec.UnsupportedOperationError; import io.quarkus.arc.profile.IfBuildProfile; @@ -20,6 +23,26 @@ public AgentExecutor agentExecutor() { return new AgentExecutor() { @Override public void execute(RequestContext context, EventQueue eventQueue) throws JSONRPCError { + TaskUpdater updater = new TaskUpdater(context, eventQueue); + String taskId = context.getTaskId(); + + // Special handling for multi-event test + if ("multi-event-test".equals(taskId)) { + // First call: context.getTask() == null (new task) + if (context.getTask() == null) { + updater.startWork(); + // Return immediately - queue stays open because task is in WORKING state + return; + } else { + // Second call: context.getTask() != null (existing task) + updater.addArtifact( + List.of(new TextPart("Second message artifact", null)), + "artifact-2", "Second Artifact", null); + updater.complete(); + return; + } + } + if (context.getTaskId().equals("task-not-supported-123")) { eventQueue.enqueueEvent(new UnsupportedOperationError()); } diff --git a/transport/jsonrpc/src/test/java/io/a2a/transport/jsonrpc/handler/JSONRPCHandlerTest.java b/transport/jsonrpc/src/test/java/io/a2a/transport/jsonrpc/handler/JSONRPCHandlerTest.java index 7da45eb14..0ad43501f 100644 --- a/transport/jsonrpc/src/test/java/io/a2a/transport/jsonrpc/handler/JSONRPCHandlerTest.java +++ b/transport/jsonrpc/src/test/java/io/a2a/transport/jsonrpc/handler/JSONRPCHandlerTest.java @@ -72,6 +72,7 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; import org.mockito.MockedConstruction; import org.mockito.Mockito;