From 5f8f0f31b66c6a868cae180b32a1c7a1244c4b8f Mon Sep 17 00:00:00 2001 From: Oleh Dokuka Date: Tue, 5 Mar 2019 13:12:34 +0200 Subject: [PATCH] fixes issue with requestChannel never completes if server cancels (#592) Signed-off-by: Oleh Dokuka --- .../main/java/io/rsocket/RSocketClient.java | 1 - .../java/io/rsocket/RSocketClientTest.java | 22 +++++++++++++++++++ 2 files changed, 22 insertions(+), 1 deletion(-) diff --git a/rsocket-core/src/main/java/io/rsocket/RSocketClient.java b/rsocket-core/src/main/java/io/rsocket/RSocketClient.java index 902d8487e..64aa3ccbf 100644 --- a/rsocket-core/src/main/java/io/rsocket/RSocketClient.java +++ b/rsocket-core/src/main/java/io/rsocket/RSocketClient.java @@ -519,7 +519,6 @@ private void handleFrame(int streamId, FrameType type, ByteBuf frame) { case CANCEL: { LimitableRequestPublisher sender = senders.remove(streamId); - receivers.remove(streamId); if (sender != null) { sender.cancel(); } diff --git a/rsocket-core/src/test/java/io/rsocket/RSocketClientTest.java b/rsocket-core/src/test/java/io/rsocket/RSocketClientTest.java index 2224ba393..788256c9e 100644 --- a/rsocket-core/src/test/java/io/rsocket/RSocketClientTest.java +++ b/rsocket-core/src/test/java/io/rsocket/RSocketClientTest.java @@ -35,6 +35,7 @@ import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; +import org.assertj.core.api.Assertions; import org.junit.Rule; import org.junit.Test; import org.reactivestreams.Publisher; @@ -44,6 +45,7 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.MonoProcessor; +import reactor.core.publisher.UnicastProcessor; public class RSocketClientTest { @@ -194,6 +196,26 @@ public void testChannelRequestCancellation() { .blockFirst(); } + @Test + public void testChannelRequestServerSideCancellation() { + MonoProcessor cancelled = MonoProcessor.create(); + UnicastProcessor request = UnicastProcessor.create(); + request.onNext(EmptyPayload.INSTANCE); + rule.socket.requestChannel(request).subscribe(cancelled); + int streamId = rule.getStreamIdForRequestType(REQUEST_CHANNEL); + rule.connection.addToReceivedBuffer( + CancelFrameFlyweight.encode(ByteBufAllocator.DEFAULT, streamId)); + rule.connection.addToReceivedBuffer( + PayloadFrameFlyweight.encodeComplete(ByteBufAllocator.DEFAULT, streamId)); + Flux.first( + cancelled, + Flux.error(new IllegalStateException("Channel request not cancelled")) + .delaySubscription(Duration.ofSeconds(1))) + .blockFirst(); + + Assertions.assertThat(request.isDisposed()).isTrue(); + } + public int sendRequestResponse(Publisher response) { Subscriber sub = TestSubscriber.create(); response.subscribe(sub);