diff --git a/rsocket-core/src/main/java/io/rsocket/RSocketClient.java b/rsocket-core/src/main/java/io/rsocket/RSocketClient.java index 902d8487e..64aa3ccbf 100644 --- a/rsocket-core/src/main/java/io/rsocket/RSocketClient.java +++ b/rsocket-core/src/main/java/io/rsocket/RSocketClient.java @@ -519,7 +519,6 @@ private void handleFrame(int streamId, FrameType type, ByteBuf frame) { case CANCEL: { LimitableRequestPublisher sender = senders.remove(streamId); - receivers.remove(streamId); if (sender != null) { sender.cancel(); } diff --git a/rsocket-core/src/test/java/io/rsocket/RSocketClientTest.java b/rsocket-core/src/test/java/io/rsocket/RSocketClientTest.java index 2224ba393..788256c9e 100644 --- a/rsocket-core/src/test/java/io/rsocket/RSocketClientTest.java +++ b/rsocket-core/src/test/java/io/rsocket/RSocketClientTest.java @@ -35,6 +35,7 @@ import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; +import org.assertj.core.api.Assertions; import org.junit.Rule; import org.junit.Test; import org.reactivestreams.Publisher; @@ -44,6 +45,7 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.MonoProcessor; +import reactor.core.publisher.UnicastProcessor; public class RSocketClientTest { @@ -194,6 +196,26 @@ public void testChannelRequestCancellation() { .blockFirst(); } + @Test + public void testChannelRequestServerSideCancellation() { + MonoProcessor cancelled = MonoProcessor.create(); + UnicastProcessor request = UnicastProcessor.create(); + request.onNext(EmptyPayload.INSTANCE); + rule.socket.requestChannel(request).subscribe(cancelled); + int streamId = rule.getStreamIdForRequestType(REQUEST_CHANNEL); + rule.connection.addToReceivedBuffer( + CancelFrameFlyweight.encode(ByteBufAllocator.DEFAULT, streamId)); + rule.connection.addToReceivedBuffer( + PayloadFrameFlyweight.encodeComplete(ByteBufAllocator.DEFAULT, streamId)); + Flux.first( + cancelled, + Flux.error(new IllegalStateException("Channel request not cancelled")) + .delaySubscription(Duration.ofSeconds(1))) + .blockFirst(); + + Assertions.assertThat(request.isDisposed()).isTrue(); + } + public int sendRequestResponse(Publisher response) { Subscriber sub = TestSubscriber.create(); response.subscribe(sub);