From bcee05bd4822d736178ddd37d93bad5aa6c3437b Mon Sep 17 00:00:00 2001 From: Oleh Dokuka Date: Tue, 26 May 2020 10:37:06 +0300 Subject: [PATCH 001/240] ensures streams are terminated if racing terminal and new stream (#848) --- .../io/rsocket/core/RSocketRequester.java | 82 ++++++++----- .../io/rsocket/core/RSocketReconnectTest.java | 15 ++- .../io/rsocket/core/RSocketRequesterTest.java | 111 +++++++++++++++--- 3 files changed, 161 insertions(+), 47 deletions(-) diff --git a/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java b/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java index f7fb161fd..d9da1017b 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java +++ b/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java @@ -189,7 +189,7 @@ public void dispose() { @Override public boolean isDisposed() { - return onClose.isDisposed(); + return terminationError != null; } @Override @@ -222,6 +222,12 @@ private Mono handleFireAndForget(Payload payload) { new IllegalStateException("FireAndForgetMono allows only a single subscriber")); } + if (isDisposed()) { + payload.release(); + final Throwable t = terminationError; + return Mono.error(t); + } + final int streamId = streamIdSupplier.nextStreamId(receivers); final ByteBuf requestFrame = RequestFireAndForgetFrameCodec.encodeReleasingPayload( @@ -270,6 +276,13 @@ private Mono handleRequestResponse(final Payload payload) { @Override void hookOnFirstRequest(long n) { + if (isDisposed()) { + payload.release(); + final Throwable t = terminationError; + receiver.onError(t); + return; + } + int streamId = streamIdSupplier.nextStreamId(receivers); this.streamId = streamId; @@ -335,6 +348,13 @@ private Flux handleRequestStream(final Payload payload) { @Override void hookOnFirstRequest(long n) { + if (isDisposed()) { + payload.release(); + final Throwable t = terminationError; + receiver.onError(t); + return; + } + int streamId = streamIdSupplier.nextStreamId(receivers); this.streamId = streamId; @@ -477,6 +497,14 @@ protected void hookFinally(SignalType type) { @Override void hookOnFirstRequest(long n) { + if (isDisposed()) { + initialPayload.release(); + final Throwable t = terminationError; + upstreamSubscriber.cancel(); + receiver.onError(t); + return; + } + final int streamId = streamIdSupplier.nextStreamId(receivers); this.streamId = streamId; @@ -712,7 +740,7 @@ private void tryTerminate(Supplier errorSupplier) { if (terminationError == null) { Throwable e = errorSupplier.get(); if (TERMINATION_ERROR.compareAndSet(this, null, e)) { - terminate(e); + serialScheduler.schedule(() -> terminate(e)); } } } @@ -720,7 +748,7 @@ private void tryTerminate(Supplier errorSupplier) { private void tryShutdown() { if (terminationError == null) { if (TERMINATION_ERROR.compareAndSet(this, null, CLOSED_CHANNEL_EXCEPTION)) { - terminate(CLOSED_CHANNEL_EXCEPTION); + serialScheduler.schedule(() -> terminate(CLOSED_CHANNEL_EXCEPTION)); } } } @@ -729,34 +757,30 @@ private void terminate(Throwable e) { connection.dispose(); leaseHandler.dispose(); - synchronized (receivers) { - receivers - .values() - .forEach( - receiver -> { - try { - receiver.onError(e); - } catch (Throwable t) { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Dropped exception", t); - } + receivers + .values() + .forEach( + receiver -> { + try { + receiver.onError(e); + } catch (Throwable t) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Dropped exception", t); } - }); - } - synchronized (senders) { - senders - .values() - .forEach( - sender -> { - try { - sender.cancel(); - } catch (Throwable t) { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Dropped exception", t); - } + } + }); + senders + .values() + .forEach( + sender -> { + try { + sender.cancel(); + } catch (Throwable t) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Dropped exception", t); } - }); - } + } + }); senders.clear(); receivers.clear(); sendProcessor.dispose(); diff --git a/rsocket-core/src/test/java/io/rsocket/core/RSocketReconnectTest.java b/rsocket-core/src/test/java/io/rsocket/core/RSocketReconnectTest.java index 3233187c7..9ecdd13ba 100644 --- a/rsocket-core/src/test/java/io/rsocket/core/RSocketReconnectTest.java +++ b/rsocket-core/src/test/java/io/rsocket/core/RSocketReconnectTest.java @@ -25,12 +25,15 @@ import java.util.Iterator; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; import org.mockito.Mockito; import reactor.core.Exceptions; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; import reactor.util.retry.Retry; public class RSocketReconnectTest { @@ -38,7 +41,15 @@ public class RSocketReconnectTest { private Queue retries = new ConcurrentLinkedQueue<>(); @Test - public void shouldBeASharedReconnectableInstanceOfRSocketMono() { + public void shouldBeASharedReconnectableInstanceOfRSocketMono() throws InterruptedException { + CountDownLatch latch = new CountDownLatch(1); + Schedulers.onScheduleHook( + "test", + r -> + () -> { + r.run(); + latch.countDown(); + }); TestClientTransport[] testClientTransport = new TestClientTransport[] {new TestClientTransport()}; Mono rSocketMono = @@ -52,8 +63,10 @@ public void shouldBeASharedReconnectableInstanceOfRSocketMono() { Assertions.assertThat(rSocket1).isEqualTo(rSocket2); testClientTransport[0].testConnection().dispose(); + Assertions.assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue(); testClientTransport[0] = new TestClientTransport(); + System.out.println("here"); RSocket rSocket3 = rSocketMono.block(); RSocket rSocket4 = rSocketMono.block(); diff --git a/rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterTest.java b/rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterTest.java index 1ba75f75a..5c06d6602 100644 --- a/rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterTest.java +++ b/rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterTest.java @@ -62,11 +62,13 @@ import io.rsocket.util.ByteBufPayload; import io.rsocket.util.DefaultPayload; import io.rsocket.util.EmptyPayload; +import java.nio.channels.ClosedChannelException; import java.time.Duration; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; import java.util.function.BiFunction; import java.util.function.Function; @@ -90,6 +92,7 @@ import reactor.core.publisher.Mono; import reactor.core.publisher.MonoProcessor; import reactor.core.publisher.UnicastProcessor; +import reactor.core.scheduler.Schedulers; import reactor.test.StepVerifier; import reactor.test.publisher.TestPublisher; import reactor.test.util.RaceTestUtils; @@ -941,7 +944,7 @@ private static Stream requestNInteractions() { } @ParameterizedTest - @MethodSource("streamIdRacingCases") + @MethodSource("streamRacingCases") public void ensuresCorrectOrderOfStreamIdIssuingInCaseOfRacing( BiFunction> interaction1, BiFunction> interaction2) { @@ -956,44 +959,118 @@ public void ensuresCorrectOrderOfStreamIdIssuingInCaseOfRacing( Assertions.assertThat(rule.connection.getSent()) .extracting(FrameHeaderCodec::streamId) .containsExactly(i, i + 2); + rule.connection.getSent().forEach(bb -> bb.release()); rule.connection.getSent().clear(); } } - public static Stream streamIdRacingCases() { + public static Stream streamRacingCases() { return Stream.of( Arguments.of( (BiFunction>) (r, p) -> r.socket.fireAndForget(p), (BiFunction>) - (r, p) -> r.socket.requestResponse(p)), + (r, p) -> r.socket.requestResponse(p), + REQUEST_FNF, + REQUEST_RESPONSE), Arguments.of( (BiFunction>) (r, p) -> r.socket.requestResponse(p), (BiFunction>) - (r, p) -> r.socket.requestStream(p)), + (r, p) -> r.socket.requestStream(p), + REQUEST_RESPONSE, + REQUEST_STREAM), Arguments.of( (BiFunction>) (r, p) -> r.socket.requestStream(p), (BiFunction>) - (r, p) -> r.socket.requestChannel(Flux.just(p))), + (r, p) -> { + AtomicBoolean subscribed = new AtomicBoolean(); + Flux just = Flux.just(p).doOnSubscribe((__) -> subscribed.set(true)); + return r.socket + .requestChannel(just) + .doFinally( + __ -> { + if (!subscribed.get()) { + p.release(); + } + }); + }, + REQUEST_STREAM, + REQUEST_CHANNEL), Arguments.of( (BiFunction>) - (r, p) -> r.socket.requestChannel(Flux.just(p)), + (r, p) -> { + AtomicBoolean subscribed = new AtomicBoolean(); + Flux just = Flux.just(p).doOnSubscribe((__) -> subscribed.set(true)); + return r.socket + .requestChannel(just) + .doFinally( + __ -> { + if (!subscribed.get()) { + p.release(); + } + }); + }, (BiFunction>) - (r, p) -> r.socket.fireAndForget(p))); + (r, p) -> r.socket.fireAndForget(p), + REQUEST_CHANNEL, + REQUEST_FNF)); } - public int sendRequestResponse(Publisher response) { - Subscriber sub = TestSubscriber.create(); - response.subscribe(sub); - int streamId = rule.getStreamIdForRequestType(REQUEST_RESPONSE); - rule.connection.addToReceivedBuffer( - PayloadFrameCodec.encodeNextCompleteReleasingPayload( - rule.alloc(), streamId, EmptyPayload.INSTANCE)); - verify(sub).onNext(any(Payload.class)); - verify(sub).onComplete(); - return streamId; + @ParameterizedTest + @MethodSource("streamRacingCases") + @SuppressWarnings({"rawtypes", "unchecked"}) + public void shouldTerminateAllStreamsIfThereRacingBetweenDisposeAndRequests( + BiFunction> interaction1, + BiFunction> interaction2, + FrameType interactionType1, + FrameType interactionType2) { + for (int i = 1; i < 10000; i++) { + Payload payload1 = ByteBufPayload.create("test"); + Payload payload2 = ByteBufPayload.create("test"); + AssertSubscriber assertSubscriber1 = AssertSubscriber.create(); + AssertSubscriber assertSubscriber2 = AssertSubscriber.create(); + Publisher publisher1 = interaction1.apply(rule, payload1); + Publisher publisher2 = interaction2.apply(rule, payload2); + RaceTestUtils.race( + () -> rule.socket.dispose(), + () -> + RaceTestUtils.race( + () -> publisher1.subscribe(assertSubscriber1), + () -> publisher2.subscribe(assertSubscriber2), + Schedulers.parallel()), + Schedulers.parallel()); + + assertSubscriber1.await().assertTerminated(); + if (interactionType1 != REQUEST_FNF) { + assertSubscriber1.assertError(ClosedChannelException.class); + } else { + try { + assertSubscriber1.assertError(ClosedChannelException.class); + } catch (Throwable t) { + // fnf call may be completed + assertSubscriber1.assertComplete(); + } + } + assertSubscriber2.await().assertTerminated(); + if (interactionType2 != REQUEST_FNF) { + assertSubscriber2.assertError(ClosedChannelException.class); + } else { + try { + assertSubscriber2.assertError(ClosedChannelException.class); + } catch (Throwable t) { + // fnf call may be completed + assertSubscriber2.assertComplete(); + } + } + + Assertions.assertThat(rule.connection.getSent()).allMatch(ReferenceCounted::release); + rule.connection.getSent().clear(); + + Assertions.assertThat(payload1.refCnt()).isZero(); + Assertions.assertThat(payload2.refCnt()).isZero(); + } } public static class ClientSocketRule extends AbstractSocketRule { From 3f17a45358b38d3ce5af3f3b320e444feae4ffd7 Mon Sep 17 00:00:00 2001 From: Oleh Dokuka Date: Tue, 26 May 2020 13:04:59 +0300 Subject: [PATCH 002/240] fixes ReconnectMono behaviour when racing invalidate and subscribe #847 Co-authored-by: Rossen Stoyanchev --- .../java/io/rsocket/core/ReconnectMono.java | 70 ++++- .../io/rsocket/core/ReconnectMonoTests.java | 278 +++++++++++++++++- 2 files changed, 323 insertions(+), 25 deletions(-) diff --git a/rsocket-core/src/main/java/io/rsocket/core/ReconnectMono.java b/rsocket-core/src/main/java/io/rsocket/core/ReconnectMono.java index 81f6625f0..ee24a2f40 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/ReconnectMono.java +++ b/rsocket-core/src/main/java/io/rsocket/core/ReconnectMono.java @@ -112,12 +112,25 @@ public void subscribe(CoreSubscriber actual) { final ReconnectInner inner = new ReconnectInner<>(actual, this); actual.onSubscribe(inner); - final int state = this.add(inner); + for (; ; ) { + final int state = this.add(inner); + + T value = this.value; + + if (state == READY_STATE) { + if (value != null) { + inner.complete(value); + return; + } + // value == null means racing between invalidate and this subscriber + // thus, we have to loop again + continue; + } else if (state == TERMINATED_STATE) { + inner.onError(this.t); + return; + } - if (state == READY_STATE) { - inner.complete(this.value); - } else if (state == TERMINATED_STATE) { - inner.onError(this.t); + return; } } @@ -150,7 +163,14 @@ public T block(@Nullable Duration timeout) { try { ReconnectInner[] subscribers = this.subscribers; if (subscribers == READY) { - return this.value; + final T value = this.value; + if (value != null) { + return value; + } else { + // value == null means racing between invalidate and this block + // thus, we have to update the state again and see what happened + subscribers = this.subscribers; + } } if (subscribers == TERMINATED) { @@ -175,7 +195,14 @@ public T block(@Nullable Duration timeout) { ReconnectInner[] inners = this.subscribers; if (inners == READY) { - return this.value; + final T value = this.value; + if (value != null) { + return value; + } else { + // value == null means racing between invalidate and this block + // thus, we have to update the state again and see what happened + inners = this.subscribers; + } } if (inners == TERMINATED) { RuntimeException re = Exceptions.propagate(this.t); @@ -282,13 +309,31 @@ public void invalidate() { final ReconnectInner[] subscribers = this.subscribers; - if (subscribers == READY && SUBSCRIBERS.compareAndSet(this, READY, EMPTY_UNSUBSCRIBED)) { + if (subscribers == READY) { + // guarded section to ensure we expire value exactly once if there is racing + if (WIP.getAndIncrement(this) != 0) { + return; + } + final T value = this.value; this.value = null; - if (value != null) { this.onValueExpired.accept(value); } + + int m = 1; + for (; ; ) { + if (isDisposed()) { + return; + } + + m = WIP.addAndGet(this, -m); + if (m == 0) { + break; + } + } + + SUBSCRIBERS.compareAndSet(this, READY, EMPTY_UNSUBSCRIBED); } } @@ -355,6 +400,11 @@ void remove(ReconnectInner ps) { } } + /** + * Subscriber that subscribes to the source {@link Mono} to receive its value.
+ * Note that the source is not expected to complete empty, and if this happens, execution will + * terminate with an {@code IllegalStateException}. + */ static final class ReconnectMainSubscriber implements CoreSubscriber { final ReconnectMono parent; @@ -389,7 +439,7 @@ public void onComplete() { } if (value == null) { - p.terminate(new IllegalStateException("Unexpected Completion of the Upstream")); + p.terminate(new IllegalStateException("Source completed empty")); } else { p.complete(); } diff --git a/rsocket-core/src/test/java/io/rsocket/core/ReconnectMonoTests.java b/rsocket-core/src/test/java/io/rsocket/core/ReconnectMonoTests.java index 968a1a793..c00072593 100644 --- a/rsocket-core/src/test/java/io/rsocket/core/ReconnectMonoTests.java +++ b/rsocket-core/src/test/java/io/rsocket/core/ReconnectMonoTests.java @@ -58,7 +58,7 @@ public class ReconnectMonoTests { public void shouldExpireValueOnRacingDisposeAndNext() { Hooks.onErrorDropped(t -> {}); Hooks.onNextDropped(System.out::println); - for (int i = 0; i < 100000; i++) { + for (int i = 0; i < 10000; i++) { final int index = i; final CoreSubscriber[] monoSubscribers = new CoreSubscriber[1]; Subscription mockSubscription = Mockito.mock(Subscription.class); @@ -104,7 +104,7 @@ public void subscribe(CoreSubscriber actual) { @Test public void shouldNotifyAllTheSubscribersUnderRacingBetweenSubscribeAndComplete() { Hooks.onErrorDropped(t -> {}); - for (int i = 0; i < 100000; i++) { + for (int i = 0; i < 10000; i++) { final TestPublisher cold = TestPublisher.createNoncompliant(TestPublisher.Violation.REQUEST_OVERFLOW); @@ -141,9 +141,204 @@ public void shouldNotifyAllTheSubscribersUnderRacingBetweenSubscribeAndComplete( } } + @Test + public void shouldNotExpireNewlyResolvedValueIfSubscribeIsRacingWithInvalidate() { + Hooks.onErrorDropped(t -> {}); + for (int i = 0; i < 10000; i++) { + final int index = i; + final TestPublisher cold = + TestPublisher.createNoncompliant(TestPublisher.Violation.REQUEST_OVERFLOW); + + final ReconnectMono reconnectMono = + cold.mono().as(source -> new ReconnectMono<>(source, onExpire(), onValue())); + + final MonoProcessor processor = reconnectMono.subscribeWith(MonoProcessor.create()); + final MonoProcessor racerProcessor = MonoProcessor.create(); + + Assertions.assertThat(expired).isEmpty(); + Assertions.assertThat(received).isEmpty(); + + reconnectMono.mainSubscriber.onNext("value_to_expire" + i); + reconnectMono.mainSubscriber.onComplete(); + + RaceTestUtils.race( + reconnectMono::invalidate, + () -> { + reconnectMono.subscribe(racerProcessor); + if (!racerProcessor.isTerminated()) { + reconnectMono.mainSubscriber.onNext("value_to_not_expire" + index); + reconnectMono.mainSubscriber.onComplete(); + } + }, + Schedulers.parallel()); + + Assertions.assertThat(processor.isTerminated()).isTrue(); + + Assertions.assertThat(processor.peek()).isEqualTo("value_to_expire" + i); + StepVerifier.create(racerProcessor) + .expectNextMatches( + (v) -> { + if (reconnectMono.subscribers == ReconnectMono.READY) { + return v.equals("value_to_not_expire" + index); + } else { + return v.equals("value_to_expire" + index); + } + }) + .expectComplete() + .verify(Duration.ofMillis(100)); + + Assertions.assertThat(expired).hasSize(1).containsOnly("value_to_expire" + i); + if (reconnectMono.subscribers == ReconnectMono.READY) { + Assertions.assertThat(received) + .hasSize(2) + .containsExactly( + Tuples.of("value_to_expire" + i, reconnectMono), + Tuples.of("value_to_not_expire" + i, reconnectMono)); + } else { + Assertions.assertThat(received) + .hasSize(1) + .containsOnly(Tuples.of("value_to_expire" + i, reconnectMono)); + } + + expired.clear(); + received.clear(); + } + } + + @Test + public void shouldNotExpireNewlyResolvedValueIfSubscribeIsRacingWithInvalidates() { + Hooks.onErrorDropped(t -> {}); + for (int i = 0; i < 10000; i++) { + final int index = i; + final TestPublisher cold = + TestPublisher.createNoncompliant(TestPublisher.Violation.REQUEST_OVERFLOW); + + final ReconnectMono reconnectMono = + cold.mono().as(source -> new ReconnectMono<>(source, onExpire(), onValue())); + + final MonoProcessor processor = reconnectMono.subscribeWith(MonoProcessor.create()); + final MonoProcessor racerProcessor = MonoProcessor.create(); + + Assertions.assertThat(expired).isEmpty(); + Assertions.assertThat(received).isEmpty(); + + reconnectMono.mainSubscriber.onNext("value_to_expire" + i); + reconnectMono.mainSubscriber.onComplete(); + + RaceTestUtils.race( + () -> + RaceTestUtils.race( + reconnectMono::invalidate, reconnectMono::invalidate, Schedulers.parallel()), + () -> { + reconnectMono.subscribe(racerProcessor); + if (!racerProcessor.isTerminated()) { + reconnectMono.mainSubscriber.onNext("value_to_possibly_expire" + index); + reconnectMono.mainSubscriber.onComplete(); + } + }, + Schedulers.parallel()); + + Assertions.assertThat(processor.isTerminated()).isTrue(); + + Assertions.assertThat(processor.peek()).isEqualTo("value_to_expire" + i); + StepVerifier.create(racerProcessor) + .expectNextMatches( + (v) -> + v.equals("value_to_possibly_expire" + index) + || v.equals("value_to_expire" + index)) + .expectComplete() + .verify(Duration.ofMillis(100)); + + if (expired.size() == 2) { + Assertions.assertThat(expired) + .hasSize(2) + .containsExactly("value_to_expire" + i, "value_to_possibly_expire" + i); + } else { + Assertions.assertThat(expired).hasSize(1).containsOnly("value_to_expire" + i); + } + if (received.size() == 2) { + Assertions.assertThat(received) + .hasSize(2) + .containsExactly( + Tuples.of("value_to_expire" + i, reconnectMono), + Tuples.of("value_to_possibly_expire" + i, reconnectMono)); + } else { + Assertions.assertThat(received) + .hasSize(1) + .containsOnly(Tuples.of("value_to_expire" + i, reconnectMono)); + } + + expired.clear(); + received.clear(); + } + } + + @Test + public void shouldNotExpireNewlyResolvedValueIfBlockIsRacingWithInvalidate() { + Hooks.onErrorDropped(t -> {}); + for (int i = 0; i < 10000; i++) { + final int index = i; + final TestPublisher cold = + TestPublisher.createNoncompliant(TestPublisher.Violation.REQUEST_OVERFLOW); + + final ReconnectMono reconnectMono = + cold.mono().as(source -> new ReconnectMono<>(source, onExpire(), onValue())); + + final MonoProcessor processor = reconnectMono.subscribeWith(MonoProcessor.create()); + + Assertions.assertThat(expired).isEmpty(); + Assertions.assertThat(received).isEmpty(); + + reconnectMono.mainSubscriber.onNext("value_to_expire" + i); + reconnectMono.mainSubscriber.onComplete(); + + RaceTestUtils.race( + () -> + Assertions.assertThat(reconnectMono.block()) + .matches( + (v) -> + v.equals("value_to_not_expire" + index) + || v.equals("value_to_expire" + index)), + () -> + RaceTestUtils.race( + reconnectMono::invalidate, + () -> { + for (; ; ) { + if (reconnectMono.subscribers != ReconnectMono.READY) { + reconnectMono.mainSubscriber.onNext("value_to_not_expire" + index); + reconnectMono.mainSubscriber.onComplete(); + break; + } + } + }, + Schedulers.parallel()), + Schedulers.parallel()); + + Assertions.assertThat(processor.isTerminated()).isTrue(); + + Assertions.assertThat(processor.peek()).isEqualTo("value_to_expire" + i); + + Assertions.assertThat(expired).hasSize(1).containsOnly("value_to_expire" + i); + if (reconnectMono.subscribers == ReconnectMono.READY) { + Assertions.assertThat(received) + .hasSize(2) + .containsExactly( + Tuples.of("value_to_expire" + i, reconnectMono), + Tuples.of("value_to_not_expire" + i, reconnectMono)); + } else { + Assertions.assertThat(received) + .hasSize(1) + .containsOnly(Tuples.of("value_to_expire" + i, reconnectMono)); + } + + expired.clear(); + received.clear(); + } + } + @Test public void shouldEstablishValueOnceInCaseOfRacingBetweenSubscribers() { - for (int i = 0; i < 100000; i++) { + for (int i = 0; i < 10000; i++) { final TestPublisher cold = TestPublisher.createCold(); cold.next("value" + i); @@ -187,7 +382,7 @@ public void shouldEstablishValueOnceInCaseOfRacingBetweenSubscribers() { @Test public void shouldEstablishValueOnceInCaseOfRacingBetweenSubscribeAndBlock() { Duration timeout = Duration.ofMillis(100); - for (int i = 0; i < 100000; i++) { + for (int i = 0; i < 10000; i++) { final TestPublisher cold = TestPublisher.createCold(); cold.next("value" + i); @@ -231,7 +426,7 @@ public void shouldEstablishValueOnceInCaseOfRacingBetweenSubscribeAndBlock() { @Test public void shouldEstablishValueOnceInCaseOfRacingBetweenBlocks() { Duration timeout = Duration.ofMillis(100); - for (int i = 0; i < 100000; i++) { + for (int i = 0; i < 10000; i++) { final TestPublisher cold = TestPublisher.createCold(); cold.next("value" + i); @@ -274,7 +469,7 @@ public void shouldEstablishValueOnceInCaseOfRacingBetweenBlocks() { @Test public void shouldExpireValueOnRacingDisposeAndNoValueComplete() { Hooks.onErrorDropped(t -> {}); - for (int i = 0; i < 100000; i++) { + for (int i = 0; i < 10000; i++) { final TestPublisher cold = TestPublisher.createNoncompliant(TestPublisher.Violation.REQUEST_OVERFLOW); @@ -299,7 +494,7 @@ public void shouldExpireValueOnRacingDisposeAndNoValueComplete() { } else { Assertions.assertThat(error) .isInstanceOf(IllegalStateException.class) - .hasMessage("Unexpected Completion of the Upstream"); + .hasMessage("Source completed empty"); } Assertions.assertThat(expired).isEmpty(); @@ -312,7 +507,7 @@ public void shouldExpireValueOnRacingDisposeAndNoValueComplete() { @Test public void shouldExpireValueOnRacingDisposeAndComplete() { Hooks.onErrorDropped(t -> {}); - for (int i = 0; i < 100000; i++) { + for (int i = 0; i < 10000; i++) { final TestPublisher cold = TestPublisher.createNoncompliant(TestPublisher.Violation.REQUEST_OVERFLOW); @@ -352,7 +547,7 @@ public void shouldExpireValueOnRacingDisposeAndComplete() { public void shouldExpireValueOnRacingDisposeAndError() { Hooks.onErrorDropped(t -> {}); RuntimeException runtimeException = new RuntimeException("test"); - for (int i = 0; i < 100000; i++) { + for (int i = 0; i < 10000; i++) { final TestPublisher cold = TestPublisher.createNoncompliant(TestPublisher.Violation.REQUEST_OVERFLOW); @@ -398,7 +593,7 @@ public void shouldExpireValueOnRacingDisposeAndError() { public void shouldExpireValueOnRacingDisposeAndErrorWithNoBackoff() { Hooks.onErrorDropped(t -> {}); RuntimeException runtimeException = new RuntimeException("test"); - for (int i = 0; i < 100000; i++) { + for (int i = 0; i < 10000; i++) { final TestPublisher cold = TestPublisher.createNoncompliant(TestPublisher.Violation.REQUEST_OVERFLOW); @@ -710,13 +905,10 @@ public void shouldNotifyAllTheSubscribers() { } @Test - public void shouldExpireValueExactlyOnce() { - for (int i = 0; i < 1000; i++) { + public void shouldExpireValueExactlyOnceOnRacingBetweenInvalidates() { + for (int i = 0; i < 10000; i++) { final TestPublisher cold = TestPublisher.createCold(); cold.next("value"); - // given - final int minBackoff = 1; - final int maxBackoff = 5; final int timeout = 10; final ReconnectMono reconnectMono = @@ -730,6 +922,7 @@ public void shouldExpireValueExactlyOnce() { Assertions.assertThat(expired).isEmpty(); Assertions.assertThat(received).hasSize(1).containsOnly(Tuples.of("value", reconnectMono)); + RaceTestUtils.race(reconnectMono::invalidate, reconnectMono::invalidate); Assertions.assertThat(expired).hasSize(1).containsOnly("value"); @@ -753,6 +946,45 @@ public void shouldExpireValueExactlyOnce() { } } + @Test + public void shouldExpireValueExactlyOnceOnRacingBetweenInvalidateAndDispose() { + for (int i = 0; i < 10000; i++) { + final TestPublisher cold = TestPublisher.createCold(); + cold.next("value"); + final int timeout = 10; + + final ReconnectMono reconnectMono = + cold.mono().as(source -> new ReconnectMono<>(source, onExpire(), onValue())); + + StepVerifier.create(reconnectMono.subscribeOn(Schedulers.elastic())) + .expectSubscription() + .expectNext("value") + .expectComplete() + .verify(Duration.ofSeconds(timeout)); + + Assertions.assertThat(expired).isEmpty(); + Assertions.assertThat(received).hasSize(1).containsOnly(Tuples.of("value", reconnectMono)); + + RaceTestUtils.race(reconnectMono::invalidate, reconnectMono::dispose); + + Assertions.assertThat(expired).hasSize(1).containsOnly("value"); + Assertions.assertThat(received).hasSize(1).containsOnly(Tuples.of("value", reconnectMono)); + + StepVerifier.create(reconnectMono.subscribeOn(Schedulers.elastic())) + .expectSubscription() + .expectError(CancellationException.class) + .verify(Duration.ofSeconds(timeout)); + + Assertions.assertThat(expired).hasSize(1).containsOnly("value"); + Assertions.assertThat(received).hasSize(1).containsOnly(Tuples.of("value", reconnectMono)); + + Assertions.assertThat(cold.subscribeCount()).isEqualTo(1); + + expired.clear(); + received.clear(); + } + } + @Test public void shouldTimeoutRetryWithVirtualTime() { // given @@ -780,6 +1012,22 @@ public void shouldTimeoutRetryWithVirtualTime() { Assertions.assertThat(expired).isEmpty(); } + @Test + public void ensuresThatMainSubscriberAllowsOnlyTerminationWithValue() { + final int timeout = 10; + final ReconnectMono reconnectMono = + new ReconnectMono<>(Mono.empty(), onExpire(), onValue()); + + StepVerifier.create(reconnectMono.subscribeOn(Schedulers.elastic())) + .expectSubscription() + .expectErrorSatisfies( + t -> + Assertions.assertThat(t) + .hasMessage("Source completed empty") + .isInstanceOf(IllegalStateException.class)) + .verify(Duration.ofSeconds(timeout)); + } + @Test public void monoRetryNoBackoff() { Mono mono = From 814c2792b206374e840ac75260d113317d76ff1e Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Fri, 29 May 2020 11:38:47 +0100 Subject: [PATCH 003/240] Forward compatibility with Reactor Netty 1.0 (#851) --- .../netty/server/CloseableChannel.java | 24 +++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) diff --git a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/CloseableChannel.java b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/CloseableChannel.java index c0340c7a2..c4a257f76 100644 --- a/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/CloseableChannel.java +++ b/rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/CloseableChannel.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2018 the original author or authors. + * Copyright 2015-2020 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,6 +17,7 @@ package io.rsocket.transport.netty.server; import io.rsocket.Closeable; +import java.lang.reflect.Method; import java.net.InetSocketAddress; import java.util.Objects; import reactor.core.publisher.Mono; @@ -28,6 +29,17 @@ */ public final class CloseableChannel implements Closeable { + /** For forward compatibility: remove when RSocket compiles against Reactor 1.0. */ + private static final Method channelAddressMethod; + + static { + try { + channelAddressMethod = DisposableChannel.class.getMethod("address"); + } catch (NoSuchMethodException ex) { + throw new IllegalStateException("Expected address method", ex); + } + } + private final DisposableChannel channel; /** @@ -47,7 +59,15 @@ public final class CloseableChannel implements Closeable { * @see DisposableChannel#address() */ public InetSocketAddress address() { - return channel.address(); + try { + return channel.address(); + } catch (NoSuchMethodError e) { + try { + return (InetSocketAddress) channelAddressMethod.invoke(this.channel); + } catch (Exception ex) { + throw new IllegalStateException("Unable to obtain address", ex); + } + } } @Override From ea6e59b21b407ee76c6731808e51cc79267a5d64 Mon Sep 17 00:00:00 2001 From: Oleh Dokuka Date: Mon, 1 Jun 2020 12:34:45 +0300 Subject: [PATCH 004/240] updates gitter badge --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index f8110a31e..8a45aa351 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # RSocket -[![Join the chat at https://gitter.im/RSocket/reactivesocket-java](https://badges.gitter.im/RSocket/reactivesocket-java.svg)](https://gitter.im/ReactiveSocket/reactivesocket-java) +[![Join the chat at https://gitter.im/RSocket/RSocket-Java](https://badges.gitter.im/rsocket/rsocket-java.svg)](https://gitter.im/rsocket/rsocket-java) RSocket is a binary protocol for use on byte stream transports such as TCP, WebSockets, and Aeron. From 951bf0396f193ca6c1cb49bd27a54522ddd4966b Mon Sep 17 00:00:00 2001 From: Oleh Dokuka Date: Thu, 28 May 2020 14:58:11 +0300 Subject: [PATCH 005/240] introduces common operator for delayed value resolving Signed-off-by: Oleh Dokuka --- .../java/io/rsocket/core/ReconnectMono.java | 412 ++------ .../io/rsocket/core/ResolvingOperator.java | 598 +++++++++++ .../io/rsocket/core/ReconnectMonoTests.java | 107 +- .../rsocket/core/ResolvingOperatorTests.java | 970 ++++++++++++++++++ 4 files changed, 1702 insertions(+), 385 deletions(-) create mode 100644 rsocket-core/src/main/java/io/rsocket/core/ResolvingOperator.java create mode 100644 rsocket-core/src/test/java/io/rsocket/core/ResolvingOperatorTests.java diff --git a/rsocket-core/src/main/java/io/rsocket/core/ReconnectMono.java b/rsocket-core/src/main/java/io/rsocket/core/ReconnectMono.java index ee24a2f40..44e4ffa81 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/ReconnectMono.java +++ b/rsocket-core/src/main/java/io/rsocket/core/ReconnectMono.java @@ -25,11 +25,9 @@ import org.reactivestreams.Subscription; import reactor.core.CoreSubscriber; import reactor.core.Disposable; -import reactor.core.Exceptions; import reactor.core.Scannable; import reactor.core.publisher.Mono; import reactor.core.publisher.Operators; -import reactor.core.publisher.Operators.MonoSubscriber; import reactor.util.annotation.Nullable; import reactor.util.context.Context; @@ -38,39 +36,7 @@ final class ReconnectMono extends Mono implements Invalidatable, Disposabl final Mono source; final BiConsumer onValueReceived; final Consumer onValueExpired; - final ReconnectMainSubscriber mainSubscriber; - - volatile int wip; - - @SuppressWarnings("rawtypes") - static final AtomicIntegerFieldUpdater WIP = - AtomicIntegerFieldUpdater.newUpdater(ReconnectMono.class, "wip"); - - volatile ReconnectInner[] subscribers; - - @SuppressWarnings("rawtypes") - static final AtomicReferenceFieldUpdater SUBSCRIBERS = - AtomicReferenceFieldUpdater.newUpdater( - ReconnectMono.class, ReconnectInner[].class, "subscribers"); - - @SuppressWarnings("rawtypes") - static final ReconnectInner[] EMPTY_UNSUBSCRIBED = new ReconnectInner[0]; - - @SuppressWarnings("rawtypes") - static final ReconnectInner[] EMPTY_SUBSCRIBED = new ReconnectInner[0]; - - @SuppressWarnings("rawtypes") - static final ReconnectInner[] READY = new ReconnectInner[0]; - - @SuppressWarnings("rawtypes") - static final ReconnectInner[] TERMINATED = new ReconnectInner[0]; - - static final int ADDED_STATE = 0; - static final int READY_STATE = 1; - static final int TERMINATED_STATE = 2; - - T value; - Throwable t; + final ResolvingInner resolvingInner; ReconnectMono( Mono source, @@ -79,9 +45,7 @@ final class ReconnectMono extends Mono implements Invalidatable, Disposabl this.source = source; this.onValueExpired = onValueExpired; this.onValueReceived = onValueReceived; - this.mainSubscriber = new ReconnectMainSubscriber<>(this); - - SUBSCRIBERS.lazySet(this, EMPTY_UNSUBSCRIBED); + this.resolvingInner = new ResolvingInner<>(this); } @Override @@ -91,47 +55,35 @@ public Object scanUnsafe(Attr key) { final boolean isDisposed = isDisposed(); if (key == Attr.TERMINATED) return isDisposed; - if (key == Attr.ERROR) return t; + if (key == Attr.ERROR) return this.resolvingInner.t; return null; } + @Override + public void invalidate() { + this.resolvingInner.invalidate(); + } + @Override public void dispose() { - this.terminate(new CancellationException("ReconnectMono has already been disposed")); + this.resolvingInner.terminate( + new CancellationException("ReconnectMono has already been disposed")); } @Override public boolean isDisposed() { - return this.subscribers == TERMINATED; + return this.resolvingInner.isDisposed(); } @Override @SuppressWarnings("uncheked") public void subscribe(CoreSubscriber actual) { - final ReconnectInner inner = new ReconnectInner<>(actual, this); + final ResolvingOperator.MonoDeferredResolutionOperator inner = + new ResolvingOperator.MonoDeferredResolutionOperator<>(this.resolvingInner, actual); actual.onSubscribe(inner); - for (; ; ) { - final int state = this.add(inner); - - T value = this.value; - - if (state == READY_STATE) { - if (value != null) { - inner.complete(value); - return; - } - // value == null means racing between invalidate and this subscriber - // thus, we have to loop again - continue; - } else if (state == TERMINATED_STATE) { - inner.onError(this.t); - return; - } - - return; - } + this.resolvingInner.observe(inner); } /** @@ -160,244 +112,7 @@ public T block() { @Nullable @SuppressWarnings("uncheked") public T block(@Nullable Duration timeout) { - try { - ReconnectInner[] subscribers = this.subscribers; - if (subscribers == READY) { - final T value = this.value; - if (value != null) { - return value; - } else { - // value == null means racing between invalidate and this block - // thus, we have to update the state again and see what happened - subscribers = this.subscribers; - } - } - - if (subscribers == TERMINATED) { - RuntimeException re = Exceptions.propagate(this.t); - re = Exceptions.addSuppressed(re, new Exception("ReconnectMono terminated with an error")); - throw re; - } - - // connect once - if (subscribers == EMPTY_UNSUBSCRIBED - && SUBSCRIBERS.compareAndSet(this, EMPTY_UNSUBSCRIBED, EMPTY_SUBSCRIBED)) { - this.source.subscribe(this.mainSubscriber); - } - - long delay; - if (null == timeout) { - delay = 0L; - } else { - delay = System.nanoTime() + timeout.toNanos(); - } - for (; ; ) { - ReconnectInner[] inners = this.subscribers; - - if (inners == READY) { - final T value = this.value; - if (value != null) { - return value; - } else { - // value == null means racing between invalidate and this block - // thus, we have to update the state again and see what happened - inners = this.subscribers; - } - } - if (inners == TERMINATED) { - RuntimeException re = Exceptions.propagate(this.t); - re = - Exceptions.addSuppressed(re, new Exception("ReconnectMono terminated with an error")); - throw re; - } - if (timeout != null && delay < System.nanoTime()) { - throw new IllegalStateException("Timeout on Mono blocking read"); - } - - Thread.sleep(1); - } - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - - throw new IllegalStateException("Thread Interruption on Mono blocking read"); - } - } - - @SuppressWarnings("unchecked") - void terminate(Throwable t) { - if (isDisposed()) { - return; - } - - // writes happens before volatile write - this.t = t; - - final ReconnectInner[] subscribers = SUBSCRIBERS.getAndSet(this, TERMINATED); - if (subscribers == TERMINATED) { - Operators.onErrorDropped(t, Context.empty()); - return; - } - - this.mainSubscriber.dispose(); - - this.doFinally(); - - for (CoreSubscriber consumer : subscribers) { - consumer.onError(t); - } - } - - void complete() { - ReconnectInner[] subscribers = this.subscribers; - if (subscribers == TERMINATED) { - return; - } - - final T value = this.value; - - for (; ; ) { - // ensures TERMINATE is going to be replaced with READY - if (SUBSCRIBERS.compareAndSet(this, subscribers, READY)) { - break; - } - - subscribers = this.subscribers; - - if (subscribers == TERMINATED) { - this.doFinally(); - return; - } - } - - this.onValueReceived.accept(value, this); - - for (ReconnectInner consumer : subscribers) { - consumer.complete(value); - } - } - - void doFinally() { - if (WIP.getAndIncrement(this) != 0) { - return; - } - - int m = 1; - T value; - - for (; ; ) { - value = this.value; - - if (value != null && isDisposed()) { - this.value = null; - this.onValueExpired.accept(value); - return; - } - - m = WIP.addAndGet(this, -m); - if (m == 0) { - return; - } - } - } - - // Check RSocket is not good - @Override - public void invalidate() { - if (this.subscribers == TERMINATED) { - return; - } - - final ReconnectInner[] subscribers = this.subscribers; - - if (subscribers == READY) { - // guarded section to ensure we expire value exactly once if there is racing - if (WIP.getAndIncrement(this) != 0) { - return; - } - - final T value = this.value; - this.value = null; - if (value != null) { - this.onValueExpired.accept(value); - } - - int m = 1; - for (; ; ) { - if (isDisposed()) { - return; - } - - m = WIP.addAndGet(this, -m); - if (m == 0) { - break; - } - } - - SUBSCRIBERS.compareAndSet(this, READY, EMPTY_UNSUBSCRIBED); - } - } - - int add(ReconnectInner ps) { - for (; ; ) { - ReconnectInner[] a = this.subscribers; - - if (a == TERMINATED) { - return TERMINATED_STATE; - } - - if (a == READY) { - return READY_STATE; - } - - int n = a.length; - @SuppressWarnings("unchecked") - ReconnectInner[] b = new ReconnectInner[n + 1]; - System.arraycopy(a, 0, b, 0, n); - b[n] = ps; - - if (SUBSCRIBERS.compareAndSet(this, a, b)) { - if (a == EMPTY_UNSUBSCRIBED) { - this.source.subscribe(this.mainSubscriber); - } - return ADDED_STATE; - } - } - } - - @SuppressWarnings("unchecked") - void remove(ReconnectInner ps) { - for (; ; ) { - ReconnectInner[] a = this.subscribers; - int n = a.length; - if (n == 0) { - return; - } - - int j = -1; - for (int i = 0; i < n; i++) { - if (a[i] == ps) { - j = i; - break; - } - } - - if (j < 0) { - return; - } - - ReconnectInner[] b; - - if (n == 1) { - b = EMPTY_SUBSCRIBED; - } else { - b = new ReconnectInner[n - 1]; - System.arraycopy(a, 0, b, 0, j); - System.arraycopy(a, j + 1, b, j, n - j - 1); - } - if (SUBSCRIBERS.compareAndSet(this, a, b)) { - return; - } - } + return this.resolvingInner.block(timeout); } /** @@ -407,7 +122,7 @@ void remove(ReconnectInner ps) { */ static final class ReconnectMainSubscriber implements CoreSubscriber { - final ReconnectMono parent; + final ResolvingInner parent; volatile Subscription s; @@ -416,7 +131,15 @@ static final class ReconnectMainSubscriber implements CoreSubscriber { AtomicReferenceFieldUpdater.newUpdater( ReconnectMainSubscriber.class, Subscription.class, "s"); - ReconnectMainSubscriber(ReconnectMono parent) { + volatile int wip; + + @SuppressWarnings("rawtypes") + static final AtomicIntegerFieldUpdater WIP = + AtomicIntegerFieldUpdater.newUpdater(ReconnectMainSubscriber.class, "wip"); + + T value; + + ReconnectMainSubscriber(ResolvingInner parent) { this.parent = parent; } @@ -430,93 +153,114 @@ public void onSubscribe(Subscription s) { @Override public void onComplete() { final Subscription s = this.s; - final ReconnectMono p = this.parent; - final T value = p.value; + final T value = this.value; if (s == Operators.cancelledSubscription() || !S.compareAndSet(this, s, null)) { - p.doFinally(); + this.doFinally(); return; } + final ResolvingInner p = this.parent; if (value == null) { p.terminate(new IllegalStateException("Source completed empty")); } else { - p.complete(); + p.complete(value); } } @Override public void onError(Throwable t) { final Subscription s = this.s; - final ReconnectMono p = this.parent; if (s == Operators.cancelledSubscription() || S.getAndSet(this, Operators.cancelledSubscription()) == Operators.cancelledSubscription()) { - p.doFinally(); + this.doFinally(); Operators.onErrorDropped(t, Context.empty()); return; } + this.doFinally(); // terminate upstream which means retryBackoff has exhausted - p.terminate(t); + this.parent.terminate(t); } @Override public void onNext(T value) { if (this.s == Operators.cancelledSubscription()) { - this.parent.onValueExpired.accept(value); + this.parent.doOnValueExpired(value); return; } - final ReconnectMono p = this.parent; - - p.value = value; + this.value = value; // volatile write and check on racing - p.doFinally(); + this.doFinally(); } void dispose() { - Operators.terminate(S, this); + if (Operators.terminate(S, this)) { + this.doFinally(); + } + } + + final void doFinally() { + if (WIP.getAndIncrement(this) != 0) { + return; + } + + int m = 1; + T value; + + for (; ; ) { + value = this.value; + if (value != null && this.s == Operators.cancelledSubscription()) { + this.value = null; + this.parent.doOnValueExpired(value); + return; + } + + m = WIP.addAndGet(this, -m); + if (m == 0) { + return; + } + } } } - static final class ReconnectInner extends MonoSubscriber { + static final class ResolvingInner extends ResolvingOperator implements Scannable { + final ReconnectMono parent; + final ReconnectMainSubscriber mainSubscriber; - ReconnectInner(CoreSubscriber actual, ReconnectMono parent) { - super(actual); + ResolvingInner(ReconnectMono parent) { this.parent = parent; + this.mainSubscriber = new ReconnectMainSubscriber<>(this); } @Override - public void cancel() { - if (!isCancelled()) { - super.cancel(); - this.parent.remove(this); - } + protected void doOnValueExpired(T value) { + this.parent.onValueExpired.accept(value); } @Override - public void onComplete() { - if (!isCancelled()) { - this.actual.onComplete(); - } + protected void doOnValueResolved(T value) { + this.parent.onValueReceived.accept(value, this.parent); } @Override - public void onError(Throwable t) { - if (isCancelled()) { - Operators.onErrorDropped(t, currentContext()); - } else { - this.actual.onError(t); - } + protected void doOnDispose() { + this.mainSubscriber.dispose(); + } + + @Override + protected void doSubscribe() { + this.parent.source.subscribe(this.mainSubscriber); } @Override public Object scanUnsafe(Attr key) { if (key == Attr.PARENT) return this.parent; - return super.scanUnsafe(key); + return null; } } } diff --git a/rsocket-core/src/main/java/io/rsocket/core/ResolvingOperator.java b/rsocket-core/src/main/java/io/rsocket/core/ResolvingOperator.java new file mode 100644 index 000000000..c431b3f3f --- /dev/null +++ b/rsocket-core/src/main/java/io/rsocket/core/ResolvingOperator.java @@ -0,0 +1,598 @@ +package io.rsocket.core; + +import java.time.Duration; +import java.util.concurrent.CancellationException; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.function.BiConsumer; +import org.reactivestreams.Subscription; +import reactor.core.CoreSubscriber; +import reactor.core.Disposable; +import reactor.core.Exceptions; +import reactor.core.Scannable; +import reactor.core.publisher.Operators; +import reactor.util.annotation.Nullable; +import reactor.util.context.Context; + +class ResolvingOperator implements Disposable { + + static final CancellationException ON_DISPOSE = new CancellationException("Disposed"); + + volatile int wip; + + @SuppressWarnings("rawtypes") + static final AtomicIntegerFieldUpdater WIP = + AtomicIntegerFieldUpdater.newUpdater(ResolvingOperator.class, "wip"); + + volatile BiConsumer[] subscribers; + + @SuppressWarnings("rawtypes") + static final AtomicReferenceFieldUpdater SUBSCRIBERS = + AtomicReferenceFieldUpdater.newUpdater( + ResolvingOperator.class, BiConsumer[].class, "subscribers"); + + @SuppressWarnings("unchecked") + static final BiConsumer[] EMPTY_UNSUBSCRIBED = new BiConsumer[0]; + + @SuppressWarnings("unchecked") + static final BiConsumer[] EMPTY_SUBSCRIBED = new BiConsumer[0]; + + @SuppressWarnings("unchecked") + static final BiConsumer[] READY = new BiConsumer[0]; + + @SuppressWarnings("unchecked") + static final BiConsumer[] TERMINATED = new BiConsumer[0]; + + static final int ADDED_STATE = 0; + static final int READY_STATE = 1; + static final int TERMINATED_STATE = 2; + + T value; + Throwable t; + + public ResolvingOperator() { + + SUBSCRIBERS.lazySet(this, EMPTY_UNSUBSCRIBED); + } + + @Override + public final void dispose() { + this.terminate(ON_DISPOSE); + } + + @Override + public final boolean isDisposed() { + return this.subscribers == TERMINATED; + } + + public final boolean isPending() { + BiConsumer[] state = this.subscribers; + return state != READY && state != TERMINATED; + } + + @Nullable + public final T valueIfResolved() { + if (this.subscribers == READY) { + T value = this.value; + if (value != null) { + return value; + } + } + + return null; + } + + final void observe(BiConsumer actual) { + for (; ; ) { + final int state = this.add(actual); + + T value = this.value; + + if (state == READY_STATE) { + if (value != null) { + actual.accept(value, null); + return; + } + // value == null means racing between invalidate and this subscriber + // thus, we have to loop again + continue; + } else if (state == TERMINATED_STATE) { + actual.accept(null, this.t); + return; + } + + return; + } + } + + /** + * Block the calling thread for the specified time, waiting for the completion of this {@code + * ReconnectMono}. If the {@link ResolvingOperator} is completed with an error a RuntimeException + * that wraps the error is thrown. + * + * @param timeout the timeout value as a {@link Duration} + * @return the value of this {@link ResolvingOperator} or {@code null} if the timeout is reached + * and the {@link ResolvingOperator} has not completed + * @throws RuntimeException if terminated with error + * @throws IllegalStateException if timed out or {@link Thread} was interrupted with {@link + * InterruptedException} + */ + @Nullable + @SuppressWarnings({"uncheked", "BusyWait"}) + public T block(@Nullable Duration timeout) { + try { + BiConsumer[] subscribers = this.subscribers; + if (subscribers == READY) { + final T value = this.value; + if (value != null) { + return value; + } else { + // value == null means racing between invalidate and this block + // thus, we have to update the state again and see what happened + subscribers = this.subscribers; + } + } + + if (subscribers == TERMINATED) { + RuntimeException re = Exceptions.propagate(this.t); + re = Exceptions.addSuppressed(re, new Exception("Terminated with an error")); + throw re; + } + + // connect once + if (subscribers == EMPTY_UNSUBSCRIBED + && SUBSCRIBERS.compareAndSet(this, EMPTY_UNSUBSCRIBED, EMPTY_SUBSCRIBED)) { + this.doSubscribe(); + } + + long delay; + if (null == timeout) { + delay = 0L; + } else { + delay = System.nanoTime() + timeout.toNanos(); + } + for (; ; ) { + BiConsumer[] inners = this.subscribers; + + if (inners == READY) { + final T value = this.value; + if (value != null) { + return value; + } else { + // value == null means racing between invalidate and this block + // thus, we have to update the state again and see what happened + inners = this.subscribers; + } + } + if (inners == TERMINATED) { + RuntimeException re = Exceptions.propagate(this.t); + re = Exceptions.addSuppressed(re, new Exception("Terminated with an error")); + throw re; + } + if (timeout != null && delay < System.nanoTime()) { + throw new IllegalStateException("Timeout on Mono blocking read"); + } + + Thread.sleep(1); + } + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + + throw new IllegalStateException("Thread Interruption on Mono blocking read"); + } + } + + @SuppressWarnings("unchecked") + final void terminate(Throwable t) { + if (isDisposed()) { + return; + } + + // writes happens before volatile write + this.t = t; + + final BiConsumer[] subscribers = SUBSCRIBERS.getAndSet(this, TERMINATED); + if (subscribers == TERMINATED) { + Operators.onErrorDropped(t, Context.empty()); + return; + } + + this.doOnDispose(); + + this.doFinally(); + + for (BiConsumer consumer : subscribers) { + consumer.accept(null, t); + } + } + + final void complete(T value) { + BiConsumer[] subscribers = this.subscribers; + if (subscribers == TERMINATED) { + this.doOnValueExpired(value); + return; + } + + this.value = value; + + for (; ; ) { + // ensures TERMINATE is going to be replaced with READY + if (SUBSCRIBERS.compareAndSet(this, subscribers, READY)) { + break; + } + + subscribers = this.subscribers; + + if (subscribers == TERMINATED) { + this.doFinally(); + return; + } + } + + this.doOnValueResolved(value); + + for (BiConsumer consumer : subscribers) { + consumer.accept(value, null); + } + } + + protected void doOnValueResolved(T value) { + // no ops + } + + final void doFinally() { + if (WIP.getAndIncrement(this) != 0) { + return; + } + + int m = 1; + T value; + + for (; ; ) { + value = this.value; + if (value != null && isDisposed()) { + this.value = null; + this.doOnValueExpired(value); + return; + } + + m = WIP.addAndGet(this, -m); + if (m == 0) { + return; + } + } + } + + final void invalidate() { + if (this.subscribers == TERMINATED) { + return; + } + + final BiConsumer[] subscribers = this.subscribers; + + if (subscribers == READY) { + // guarded section to ensure we expire value exactly once if there is racing + if (WIP.getAndIncrement(this) != 0) { + return; + } + + final T value = this.value; + if (value != null) { + this.value = null; + this.doOnValueExpired(value); + } + + int m = 1; + for (; ; ) { + if (isDisposed()) { + return; + } + + m = WIP.addAndGet(this, -m); + if (m == 0) { + break; + } + } + + SUBSCRIBERS.compareAndSet(this, READY, EMPTY_UNSUBSCRIBED); + } + } + + protected void doOnValueExpired(T value) { + // no ops + } + + protected void doOnDispose() { + // no ops + } + + final int add(BiConsumer ps) { + for (; ; ) { + BiConsumer[] a = this.subscribers; + + if (a == TERMINATED) { + return TERMINATED_STATE; + } + + if (a == READY) { + return READY_STATE; + } + + int n = a.length; + @SuppressWarnings("unchecked") + BiConsumer[] b = new BiConsumer[n + 1]; + System.arraycopy(a, 0, b, 0, n); + b[n] = ps; + + if (SUBSCRIBERS.compareAndSet(this, a, b)) { + if (a == EMPTY_UNSUBSCRIBED) { + this.doSubscribe(); + } + return ADDED_STATE; + } + } + } + + protected void doSubscribe() { + // no ops + } + + @SuppressWarnings("unchecked") + final void remove(BiConsumer ps) { + for (; ; ) { + BiConsumer[] a = this.subscribers; + int n = a.length; + if (n == 0) { + return; + } + + int j = -1; + for (int i = 0; i < n; i++) { + if (a[i] == ps) { + j = i; + break; + } + } + + if (j < 0) { + return; + } + + BiConsumer[] b; + + if (n == 1) { + b = EMPTY_SUBSCRIBED; + } else { + b = new BiConsumer[n - 1]; + System.arraycopy(a, 0, b, 0, j); + System.arraycopy(a, j + 1, b, j, n - j - 1); + } + if (SUBSCRIBERS.compareAndSet(this, a, b)) { + return; + } + } + } + + abstract static class DeferredResolution + implements CoreSubscriber, Subscription, Scannable, BiConsumer { + + final ResolvingOperator parent; + final CoreSubscriber actual; + + volatile long requested; + + @SuppressWarnings("rawtypes") + static final AtomicLongFieldUpdater REQUESTED = + AtomicLongFieldUpdater.newUpdater(DeferredResolution.class, "requested"); + + static final long STATE_SUBSCRIBED = -1; + static final long STATE_CANCELLED = Long.MIN_VALUE; + + Subscription s; + boolean done; + + DeferredResolution(ResolvingOperator parent, CoreSubscriber actual) { + this.parent = parent; + this.actual = actual; + } + + @Override + public final Context currentContext() { + return this.actual.currentContext(); + } + + @Nullable + @Override + public Object scanUnsafe(Attr key) { + long state = this.requested; + + if (key == Attr.PARENT) { + return this.s; + } + if (key == Attr.ACTUAL) { + return this.parent; + } + if (key == Attr.TERMINATED) { + return this.done; + } + if (key == Attr.CANCELLED) { + return state == STATE_CANCELLED; + } + + return null; + } + + @Override + public final void onSubscribe(Subscription s) { + final long state = this.requested; + Subscription a = this.s; + if (state == STATE_CANCELLED) { + s.cancel(); + return; + } + if (a != null) { + s.cancel(); + return; + } + + long r; + long accumulated = 0; + for (; ; ) { + r = this.requested; + + if (r == STATE_CANCELLED || r == STATE_SUBSCRIBED) { + s.cancel(); + return; + } + + this.s = s; + + long toRequest = r - accumulated; + if (toRequest > 0) { // if there is something, + s.request(toRequest); // then we do a request on the given subscription + } + accumulated = r; + + if (REQUESTED.compareAndSet(this, r, STATE_SUBSCRIBED)) { + return; + } + } + } + + @Override + public final void onNext(T payload) { + this.actual.onNext(payload); + } + + @Override + public final void onError(Throwable t) { + if (this.done) { + Operators.onErrorDropped(t, this.actual.currentContext()); + return; + } + + this.done = true; + this.actual.onError(t); + } + + @Override + public final void onComplete() { + if (this.done) { + return; + } + + this.done = true; + this.actual.onComplete(); + } + + @Override + public void request(long n) { + if (Operators.validate(n)) { + long r = this.requested; // volatile read beforehand + + if (r > STATE_SUBSCRIBED) { // works only in case onSubscribe has not happened + long u; + for (; ; ) { // normal CAS loop with overflow protection + if (r == Long.MAX_VALUE) { + // if r == Long.MAX_VALUE then we dont care and we can loose this + // request just in case of racing + return; + } + u = Operators.addCap(r, n); + if (REQUESTED.compareAndSet(this, r, u)) { + // Means increment happened before onSubscribe + return; + } else { + // Means increment happened after onSubscribe + + // update new state to see what exactly happened (onSubscribe |cancel | requestN) + r = this.requested; + + // check state (expect -1 | -2 to exit, otherwise repeat) + if (r < 0) { + break; + } + } + } + } + + if (r == STATE_CANCELLED) { // if canceled, just exit + return; + } + + // if onSubscribe -> subscription exists (and we sure of that because volatile read + // after volatile write) so we can execute requestN on the subscription + this.s.request(n); + } + } + + public boolean isCancelled() { + return this.requested == STATE_CANCELLED; + } + + public void cancel() { + long state = REQUESTED.getAndSet(this, STATE_CANCELLED); + if (state == STATE_CANCELLED) { + return; + } + + if (state == STATE_SUBSCRIBED) { + this.s.cancel(); + } else { + this.parent.remove(this); + } + } + } + + static class MonoDeferredResolutionOperator extends Operators.MonoSubscriber + implements BiConsumer { + + final ResolvingOperator parent; + + MonoDeferredResolutionOperator(ResolvingOperator parent, CoreSubscriber actual) { + super(actual); + this.parent = parent; + } + + @Override + public void accept(T t, Throwable throwable) { + if (throwable != null) { + onError(throwable); + return; + } + + complete(t); + } + + @Override + public void cancel() { + if (!isCancelled()) { + super.cancel(); + this.parent.remove(this); + } + } + + @Override + public void onComplete() { + if (!isCancelled()) { + this.actual.onComplete(); + } + } + + @Override + public void onError(Throwable t) { + if (isCancelled()) { + Operators.onErrorDropped(t, currentContext()); + } else { + this.actual.onError(t); + } + } + + @Override + public Object scanUnsafe(Attr key) { + if (key == Attr.PARENT) return this.parent; + return super.scanUnsafe(key); + } + } +} diff --git a/rsocket-core/src/test/java/io/rsocket/core/ReconnectMonoTests.java b/rsocket-core/src/test/java/io/rsocket/core/ReconnectMonoTests.java index c00072593..8d96222df 100644 --- a/rsocket-core/src/test/java/io/rsocket/core/ReconnectMonoTests.java +++ b/rsocket-core/src/test/java/io/rsocket/core/ReconnectMonoTests.java @@ -83,6 +83,8 @@ public void subscribe(CoreSubscriber actual) { RaceTestUtils.race(() -> monoSubscribers[0].onNext("value" + index), reconnectMono::dispose); + monoSubscribers[0].onComplete(); + Assertions.assertThat(processor.isTerminated()).isTrue(); Mockito.verify(mockSubscription).cancel(); @@ -126,11 +128,14 @@ public void shouldNotifyAllTheSubscribersUnderRacingBetweenSubscribeAndComplete( Assertions.assertThat(processor.peek()).isEqualTo("value" + i); Assertions.assertThat(racerProcessor.peek()).isEqualTo("value" + i); - Assertions.assertThat(reconnectMono.subscribers).isEqualTo(ReconnectMono.READY); + Assertions.assertThat(reconnectMono.resolvingInner.subscribers) + .isEqualTo(ResolvingOperator.READY); Assertions.assertThat( - reconnectMono.add(new ReconnectMono.ReconnectInner<>(processor, reconnectMono))) - .isEqualTo(ReconnectMono.READY_STATE); + reconnectMono.resolvingInner.add( + new ResolvingOperator.MonoDeferredResolutionOperator<>( + reconnectMono.resolvingInner, processor))) + .isEqualTo(ResolvingOperator.READY_STATE); Assertions.assertThat(expired).isEmpty(); Assertions.assertThat(received) @@ -158,16 +163,16 @@ public void shouldNotExpireNewlyResolvedValueIfSubscribeIsRacingWithInvalidate() Assertions.assertThat(expired).isEmpty(); Assertions.assertThat(received).isEmpty(); - reconnectMono.mainSubscriber.onNext("value_to_expire" + i); - reconnectMono.mainSubscriber.onComplete(); + reconnectMono.resolvingInner.mainSubscriber.onNext("value_to_expire" + i); + reconnectMono.resolvingInner.mainSubscriber.onComplete(); RaceTestUtils.race( reconnectMono::invalidate, () -> { reconnectMono.subscribe(racerProcessor); if (!racerProcessor.isTerminated()) { - reconnectMono.mainSubscriber.onNext("value_to_not_expire" + index); - reconnectMono.mainSubscriber.onComplete(); + reconnectMono.resolvingInner.mainSubscriber.onNext("value_to_not_expire" + index); + reconnectMono.resolvingInner.mainSubscriber.onComplete(); } }, Schedulers.parallel()); @@ -178,7 +183,7 @@ public void shouldNotExpireNewlyResolvedValueIfSubscribeIsRacingWithInvalidate() StepVerifier.create(racerProcessor) .expectNextMatches( (v) -> { - if (reconnectMono.subscribers == ReconnectMono.READY) { + if (reconnectMono.resolvingInner.subscribers == ResolvingOperator.READY) { return v.equals("value_to_not_expire" + index); } else { return v.equals("value_to_expire" + index); @@ -188,7 +193,7 @@ public void shouldNotExpireNewlyResolvedValueIfSubscribeIsRacingWithInvalidate() .verify(Duration.ofMillis(100)); Assertions.assertThat(expired).hasSize(1).containsOnly("value_to_expire" + i); - if (reconnectMono.subscribers == ReconnectMono.READY) { + if (reconnectMono.resolvingInner.subscribers == ResolvingOperator.READY) { Assertions.assertThat(received) .hasSize(2) .containsExactly( @@ -222,8 +227,8 @@ public void shouldNotExpireNewlyResolvedValueIfSubscribeIsRacingWithInvalidates( Assertions.assertThat(expired).isEmpty(); Assertions.assertThat(received).isEmpty(); - reconnectMono.mainSubscriber.onNext("value_to_expire" + i); - reconnectMono.mainSubscriber.onComplete(); + reconnectMono.resolvingInner.mainSubscriber.onNext("value_to_expire" + i); + reconnectMono.resolvingInner.mainSubscriber.onComplete(); RaceTestUtils.race( () -> @@ -232,8 +237,9 @@ public void shouldNotExpireNewlyResolvedValueIfSubscribeIsRacingWithInvalidates( () -> { reconnectMono.subscribe(racerProcessor); if (!racerProcessor.isTerminated()) { - reconnectMono.mainSubscriber.onNext("value_to_possibly_expire" + index); - reconnectMono.mainSubscriber.onComplete(); + reconnectMono.resolvingInner.mainSubscriber.onNext( + "value_to_possibly_expire" + index); + reconnectMono.resolvingInner.mainSubscriber.onComplete(); } }, Schedulers.parallel()); @@ -289,8 +295,8 @@ public void shouldNotExpireNewlyResolvedValueIfBlockIsRacingWithInvalidate() { Assertions.assertThat(expired).isEmpty(); Assertions.assertThat(received).isEmpty(); - reconnectMono.mainSubscriber.onNext("value_to_expire" + i); - reconnectMono.mainSubscriber.onComplete(); + reconnectMono.resolvingInner.mainSubscriber.onNext("value_to_expire" + i); + reconnectMono.resolvingInner.mainSubscriber.onComplete(); RaceTestUtils.race( () -> @@ -304,9 +310,10 @@ public void shouldNotExpireNewlyResolvedValueIfBlockIsRacingWithInvalidate() { reconnectMono::invalidate, () -> { for (; ; ) { - if (reconnectMono.subscribers != ReconnectMono.READY) { - reconnectMono.mainSubscriber.onNext("value_to_not_expire" + index); - reconnectMono.mainSubscriber.onComplete(); + if (reconnectMono.resolvingInner.subscribers != ResolvingOperator.READY) { + reconnectMono.resolvingInner.mainSubscriber.onNext( + "value_to_not_expire" + index); + reconnectMono.resolvingInner.mainSubscriber.onComplete(); break; } } @@ -319,7 +326,7 @@ public void shouldNotExpireNewlyResolvedValueIfBlockIsRacingWithInvalidate() { Assertions.assertThat(processor.peek()).isEqualTo("value_to_expire" + i); Assertions.assertThat(expired).hasSize(1).containsOnly("value_to_expire" + i); - if (reconnectMono.subscribers == ReconnectMono.READY) { + if (reconnectMono.resolvingInner.subscribers == ResolvingOperator.READY) { Assertions.assertThat(received) .hasSize(2) .containsExactly( @@ -362,13 +369,16 @@ public void shouldEstablishValueOnceInCaseOfRacingBetweenSubscribers() { Assertions.assertThat(processor.peek()).isEqualTo("value" + i); Assertions.assertThat(racerProcessor.peek()).isEqualTo("value" + i); - Assertions.assertThat(reconnectMono.subscribers).isEqualTo(ReconnectMono.READY); + Assertions.assertThat(reconnectMono.resolvingInner.subscribers) + .isEqualTo(ResolvingOperator.READY); Assertions.assertThat(cold.subscribeCount()).isOne(); Assertions.assertThat( - reconnectMono.add(new ReconnectMono.ReconnectInner<>(processor, reconnectMono))) - .isEqualTo(ReconnectMono.READY_STATE); + reconnectMono.resolvingInner.add( + new ResolvingOperator.MonoDeferredResolutionOperator<>( + reconnectMono.resolvingInner, processor))) + .isEqualTo(ResolvingOperator.READY_STATE); Assertions.assertThat(expired).isEmpty(); Assertions.assertThat(received) @@ -406,13 +416,16 @@ public void shouldEstablishValueOnceInCaseOfRacingBetweenSubscribeAndBlock() { Assertions.assertThat(processor.peek()).isEqualTo("value" + i); Assertions.assertThat(values).containsExactly("value" + i); - Assertions.assertThat(reconnectMono.subscribers).isEqualTo(ReconnectMono.READY); + Assertions.assertThat(reconnectMono.resolvingInner.subscribers) + .isEqualTo(ResolvingOperator.READY); Assertions.assertThat(cold.subscribeCount()).isOne(); Assertions.assertThat( - reconnectMono.add(new ReconnectMono.ReconnectInner<>(processor, reconnectMono))) - .isEqualTo(ReconnectMono.READY_STATE); + reconnectMono.resolvingInner.add( + new ResolvingOperator.MonoDeferredResolutionOperator<>( + reconnectMono.resolvingInner, processor))) + .isEqualTo(ResolvingOperator.READY_STATE); Assertions.assertThat(expired).isEmpty(); Assertions.assertThat(received) @@ -448,14 +461,16 @@ public void shouldEstablishValueOnceInCaseOfRacingBetweenBlocks() { Assertions.assertThat(values2).containsExactly("value" + i); Assertions.assertThat(values1).containsExactly("value" + i); - Assertions.assertThat(reconnectMono.subscribers).isEqualTo(ReconnectMono.READY); + Assertions.assertThat(reconnectMono.resolvingInner.subscribers) + .isEqualTo(ResolvingOperator.READY); Assertions.assertThat(cold.subscribeCount()).isOne(); Assertions.assertThat( - reconnectMono.add( - new ReconnectMono.ReconnectInner<>(MonoProcessor.create(), reconnectMono))) - .isEqualTo(ReconnectMono.READY_STATE); + reconnectMono.resolvingInner.add( + new ResolvingOperator.MonoDeferredResolutionOperator<>( + reconnectMono.resolvingInner, MonoProcessor.create()))) + .isEqualTo(ResolvingOperator.READY_STATE); Assertions.assertThat(expired).isEmpty(); Assertions.assertThat(received) @@ -664,7 +679,7 @@ public void shouldThrowOnBlockingIfHasAlreadyTerminated() { Assertions.assertThatThrownBy(() -> reconnectMono.block(Duration.ofMillis(100))) .isInstanceOf(RuntimeException.class) .hasMessage("test") - .hasSuppressedException(new Exception("ReconnectMono terminated with an error")); + .hasSuppressedException(new Exception("Terminated with an error")); } @Test @@ -697,9 +712,12 @@ public void shouldBeScannable() { .parents() .map(s -> s.getClass()) .collect(Collectors.toList())) - .hasSize(3) + .hasSize(4) .containsExactly( - ReconnectMono.ReconnectInner.class, ReconnectMono.class, publisher.mono().getClass()); + ResolvingOperator.MonoDeferredResolutionOperator.class, + ReconnectMono.ResolvingInner.class, + ReconnectMono.class, + publisher.mono().getClass()); reconnectMono.dispose(); @@ -783,10 +801,6 @@ public void shouldNotEmitUntilCompletion() { public void shouldBePossibleToRemoveThemSelvesFromTheList_CancellationTest() { final TestPublisher publisher = TestPublisher.createNoncompliant(TestPublisher.Violation.REQUEST_OVERFLOW); - // given - final int minBackoff = 1; - final int maxBackoff = 5; - final int timeout = 10; final ReconnectMono reconnectMono = publisher.mono().as(source -> new ReconnectMono<>(source, onExpire(), onValue())); @@ -807,7 +821,8 @@ public void shouldBePossibleToRemoveThemSelvesFromTheList_CancellationTest() { processor.cancel(); - Assertions.assertThat(reconnectMono.subscribers).isEqualTo(ReconnectMono.EMPTY_SUBSCRIBED); + Assertions.assertThat(reconnectMono.resolvingInner.subscribers) + .isEqualTo(ResolvingOperator.EMPTY_SUBSCRIBED); publisher.complete(); @@ -821,8 +836,6 @@ public void shouldBePossibleToRemoveThemSelvesFromTheList_CancellationTest() { public void shouldExpireValueOnDispose() { final TestPublisher publisher = TestPublisher.create(); // given - final int minBackoff = 1; - final int maxBackoff = 5; final int timeout = 10; final ReconnectMono reconnectMono = @@ -853,10 +866,6 @@ public void shouldExpireValueOnDispose() { @Test public void shouldNotifyAllTheSubscribers() { final TestPublisher publisher = TestPublisher.create(); - // given - final int minBackoff = 1; - final int maxBackoff = 5; - final int timeout = 10; final ReconnectMono reconnectMono = publisher.mono().as(source -> new ReconnectMono<>(source, onExpire(), onValue())); @@ -871,7 +880,7 @@ public void shouldNotifyAllTheSubscribers() { reconnectMono.subscribe(sub3); reconnectMono.subscribe(sub4); - Assertions.assertThat(reconnectMono.subscribers).hasSize(4); + Assertions.assertThat(reconnectMono.resolvingInner.subscribers).hasSize(4); final ArrayList> processors = new ArrayList<>(200); @@ -883,11 +892,11 @@ public void shouldNotifyAllTheSubscribers() { RaceTestUtils.race(() -> reconnectMono.subscribe(subA), () -> reconnectMono.subscribe(subB)); } - Assertions.assertThat(reconnectMono.subscribers).hasSize(204); + Assertions.assertThat(reconnectMono.resolvingInner.subscribers).hasSize(204); sub1.dispose(); - Assertions.assertThat(reconnectMono.subscribers).hasSize(203); + Assertions.assertThat(reconnectMono.resolvingInner.subscribers).hasSize(203); publisher.next("value"); @@ -951,7 +960,7 @@ public void shouldExpireValueExactlyOnceOnRacingBetweenInvalidateAndDispose() { for (int i = 0; i < 10000; i++) { final TestPublisher cold = TestPublisher.createCold(); cold.next("value"); - final int timeout = 10; + final int timeout = 10000; final ReconnectMono reconnectMono = cold.mono().as(source -> new ReconnectMono<>(source, onExpire(), onValue())); @@ -1109,8 +1118,4 @@ private final void assertRetries(Class... exceptions) { index++; } } - - static boolean isRetryExhausted(Throwable e, Class cause) { - return Exceptions.isRetryExhausted(e) && cause.isInstance(e.getCause()); - } } diff --git a/rsocket-core/src/test/java/io/rsocket/core/ResolvingOperatorTests.java b/rsocket-core/src/test/java/io/rsocket/core/ResolvingOperatorTests.java new file mode 100644 index 000000000..29748abbe --- /dev/null +++ b/rsocket-core/src/test/java/io/rsocket/core/ResolvingOperatorTests.java @@ -0,0 +1,970 @@ +/* + * Copyright 2015-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.core; + +import io.rsocket.internal.subscriber.AssertSubscriber; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Queue; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Predicate; +import java.util.stream.Stream; +import org.assertj.core.api.Assertions; +import org.assertj.core.api.Condition; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; +import org.mockito.Mockito; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscription; +import reactor.core.publisher.Hooks; +import reactor.core.publisher.MonoProcessor; +import reactor.core.scheduler.Schedulers; +import reactor.test.StepVerifier; +import reactor.test.util.RaceTestUtils; +import reactor.util.retry.Retry; + +public class ResolvingOperatorTests { + + private Queue retries = new ConcurrentLinkedQueue<>(); + + @Test + public void shouldExpireValueOnRacingDisposeAndComplete() { + for (int i = 0; i < 10000; i++) { + final int index = i; + + MonoProcessor processor = MonoProcessor.create(); + BiConsumer consumer = + (v, t) -> { + if (t != null) { + processor.onError(t); + return; + } + + processor.onNext(v); + }; + + ResolvingTest.create() + .assertNothingExpired() + .assertNothingReceived() + .assertPendingResolution() + .thenAddObserver(consumer) + .assertPendingSubscribers(1) + .assertPendingResolution() + .then(self -> RaceTestUtils.race(() -> self.complete("value" + index), self::dispose)) + .assertDisposeCalled(1) + .assertExpiredExactly("value" + index) + .ifResolvedAssertEqual("value" + index) + .assertIsDisposed(); + + if (processor.isError()) { + Assertions.assertThat(processor.getError()) + .isInstanceOf(CancellationException.class) + .hasMessage("Disposed"); + + } else { + Assertions.assertThat(processor.peek()).isEqualTo("value" + i); + } + } + } + + @Test + public void shouldNotifyAllTheSubscribersUnderRacingBetweenSubscribeAndComplete() { + for (int i = 0; i < 10000; i++) { + final String valueToSend = "value" + i; + + MonoProcessor processor = MonoProcessor.create(); + BiConsumer consumer = + (v, t) -> { + if (t != null) { + processor.onError(t); + return; + } + + processor.onNext(v); + }; + + MonoProcessor processor2 = MonoProcessor.create(); + BiConsumer consumer2 = + (v, t) -> { + if (t != null) { + processor2.onError(t); + return; + } + + processor2.onNext(v); + }; + + ResolvingTest.create() + .assertNothingExpired() + .assertNothingReceived() + .assertPendingSubscribers(0) + .assertPendingResolution() + .then( + self -> { + RaceTestUtils.race(() -> self.complete(valueToSend), () -> self.observe(consumer)); + + StepVerifier.create(processor) + .expectNext(valueToSend) + .expectComplete() + .verify(Duration.ofMillis(10)); + }) + .assertDisposeCalled(0) + .assertReceivedExactly(valueToSend) + .assertNothingExpired() + .thenAddObserver(consumer2) + .assertPendingSubscribers(0); + + StepVerifier.create(processor2) + .expectNext(valueToSend) + .expectComplete() + .verify(Duration.ofMillis(10)); + } + } + + @Test + public void shouldNotExpireNewlyResolvedValueIfSubscribeIsRacingWithInvalidate() { + for (int i = 0; i < 10000; i++) { + final String valueToSend = "value" + i; + final String valueToSend2 = "value2" + i; + + MonoProcessor processor = MonoProcessor.create(); + BiConsumer consumer = + (v, t) -> { + if (t != null) { + processor.onError(t); + return; + } + + processor.onNext(v); + }; + + MonoProcessor processor2 = MonoProcessor.create(); + BiConsumer consumer2 = + (v, t) -> { + if (t != null) { + processor2.onError(t); + return; + } + + processor2.onNext(v); + }; + + ResolvingTest.create() + .assertNothingExpired() + .assertNothingReceived() + .assertPendingSubscribers(0) + .assertPendingResolution() + .thenAddObserver(consumer) + .then( + self -> { + self.complete(valueToSend); + + StepVerifier.create(processor) + .expectNext(valueToSend) + .expectComplete() + .verify(Duration.ofMillis(10)); + }) + .assertReceivedExactly(valueToSend) + .then( + self -> + RaceTestUtils.race( + self::invalidate, + () -> { + self.observe(consumer2); + if (!processor2.isTerminated()) { + self.complete(valueToSend2); + } + }, + Schedulers.parallel())) + .then( + self -> { + if (self.isPending()) { + self.assertReceivedExactly(valueToSend); + } else { + self.assertReceivedExactly(valueToSend, valueToSend2); + } + }) + .assertExpiredExactly(valueToSend) + .assertPendingSubscribers(0) + .assertDisposeCalled(0) + .then( + self -> + StepVerifier.create(processor2) + .expectNextMatches( + (v) -> { + if (self.subscribers == ResolvingOperator.READY) { + return v.equals(valueToSend2); + } else { + return v.equals(valueToSend); + } + }) + .expectComplete() + .verify(Duration.ofMillis(100))); + } + } + + @Test + public void shouldNotExpireNewlyResolvedValueIfSubscribeIsRacingWithInvalidates() { + for (int i = 0; i < 10000; i++) { + final String valueToSend = "value" + i; + final String valueToSend2 = "value_to_possibly_expire" + i; + + MonoProcessor processor = MonoProcessor.create(); + BiConsumer consumer = + (v, t) -> { + if (t != null) { + processor.onError(t); + return; + } + + processor.onNext(v); + }; + + MonoProcessor processor2 = MonoProcessor.create(); + BiConsumer consumer2 = + (v, t) -> { + if (t != null) { + processor2.onError(t); + return; + } + + processor2.onNext(v); + }; + + ResolvingTest.create() + .assertNothingExpired() + .assertNothingReceived() + .assertPendingSubscribers(0) + .assertPendingResolution() + .thenAddObserver(consumer) + .then( + self -> { + self.complete(valueToSend); + + StepVerifier.create(processor) + .expectNext(valueToSend) + .expectComplete() + .verify(Duration.ofMillis(10)); + }) + .assertReceivedExactly(valueToSend) + .then( + self -> + RaceTestUtils.race( + () -> + RaceTestUtils.race( + self::invalidate, self::invalidate, Schedulers.parallel()), + () -> { + self.observe(consumer2); + if (!processor2.isTerminated()) { + self.complete(valueToSend2); + } + }, + Schedulers.parallel())) + .then( + self -> { + if (!self.isPending()) { + self.assertReceivedExactly(valueToSend, valueToSend2); + } else { + if (self.received.size() > 1) { + self.assertReceivedExactly(valueToSend, valueToSend2); + } else { + self.assertReceivedExactly(valueToSend); + } + } + + Assertions.assertThat(self.expired) + .haveAtMost( + 2, + new Condition<>( + new Predicate() { + int time = 0; + + @Override + public boolean test(Object s) { + if (time++ == 0) { + return valueToSend.equals(s); + } else { + return valueToSend2.equals(s); + } + } + }, + "should matches one of the given values")); + }) + .assertPendingSubscribers(0) + .assertDisposeCalled(0) + .then( + new Consumer>() { + @Override + public void accept(ResolvingTest self) { + StepVerifier.create(processor2) + .expectNextMatches( + (v) -> { + if (self.subscribers == ResolvingOperator.READY) { + return v.equals(valueToSend2); + } else { + return v.equals(valueToSend) || v.equals(valueToSend2); + } + }) + .expectComplete() + .verify(Duration.ofMillis(100)); + } + }); + } + } + + @Test + public void shouldNotExpireNewlyResolvedValueIfBlockIsRacingWithInvalidate() { + for (int i = 0; i < 10000; i++) { + final String valueToSend = "value" + i; + final String valueToSend2 = "value2" + i; + + MonoProcessor processor = MonoProcessor.create(); + BiConsumer consumer = + (v, t) -> { + if (t != null) { + processor.onError(t); + return; + } + + processor.onNext(v); + }; + + ResolvingTest.create() + .assertNothingExpired() + .assertNothingReceived() + .assertPendingSubscribers(0) + .assertPendingResolution() + .thenAddObserver(consumer) + .then( + self -> { + self.complete(valueToSend); + + StepVerifier.create(processor) + .expectNext(valueToSend) + .expectComplete() + .verify(Duration.ofMillis(10)); + }) + .assertReceivedExactly(valueToSend) + .then( + self -> + RaceTestUtils.race( + () -> + Assertions.assertThat(self.block(null)) + .matches((v) -> v.equals(valueToSend) || v.equals(valueToSend2)), + () -> + RaceTestUtils.race( + self::invalidate, + () -> { + for (; ; ) { + if (self.subscribers != ResolvingOperator.READY) { + self.complete(valueToSend2); + break; + } + } + }, + Schedulers.parallel()), + Schedulers.parallel())) + .then( + self -> { + if (self.isPending()) { + self.assertReceivedExactly(valueToSend); + } else { + self.assertReceivedExactly(valueToSend, valueToSend2); + } + }) + .assertExpiredExactly(valueToSend) + .assertPendingSubscribers(0) + .assertDisposeCalled(0); + } + } + + @Test + public void shouldEstablishValueOnceInCaseOfRacingBetweenSubscribers() { + for (int i = 0; i < 10000; i++) { + final String valueToSend = "value" + i; + + MonoProcessor processor = MonoProcessor.create(); + BiConsumer consumer = + (v, t) -> { + if (t != null) { + processor.onError(t); + return; + } + + processor.onNext(v); + }; + + MonoProcessor processor2 = MonoProcessor.create(); + BiConsumer consumer2 = + (v, t) -> { + if (t != null) { + processor2.onError(t); + return; + } + + processor2.onNext(v); + }; + + ResolvingTest.create() + .assertNothingExpired() + .assertNothingReceived() + .assertPendingSubscribers(0) + .assertPendingResolution() + .then( + self -> + RaceTestUtils.race(() -> self.observe(consumer), () -> self.observe(consumer2))) + .assertSubscribeCalled(1) + .assertPendingSubscribers(2) + .then(self -> self.complete(valueToSend)) + .assertPendingSubscribers(0) + .assertReceivedExactly(valueToSend) + .assertNothingExpired() + .assertDisposeCalled(0) + .then( + self -> { + Assertions.assertThat(processor.isTerminated()).isTrue(); + Assertions.assertThat(processor2.isTerminated()).isTrue(); + + Assertions.assertThat(processor.peek()).isEqualTo(valueToSend); + Assertions.assertThat(processor2.peek()).isEqualTo(valueToSend); + + Assertions.assertThat(self.subscribers).isEqualTo(ResolvingOperator.READY); + + Assertions.assertThat(self.add(consumer)).isEqualTo(ResolvingOperator.READY_STATE); + }); + } + } + + @Test + public void shouldEstablishValueOnceInCaseOfRacingBetweenSubscribeAndBlock() { + for (int i = 0; i < 10000; i++) { + final String valueToSend = "value" + i; + + MonoProcessor processor = MonoProcessor.create(); + + MonoProcessor processor2 = MonoProcessor.create(); + BiConsumer consumer2 = + (v, t) -> { + if (t != null) { + processor2.onError(t); + return; + } + + processor2.onNext(v); + }; + + ResolvingTest.create() + .assertNothingExpired() + .assertNothingReceived() + .assertPendingSubscribers(0) + .assertPendingResolution() + .whenSubscribe(self -> self.complete(valueToSend)) + .then( + self -> + RaceTestUtils.race( + () -> processor.onNext(self.block(null)), () -> self.observe(consumer2))) + .assertSubscribeCalled(1) + .assertPendingSubscribers(0) + .assertReceivedExactly(valueToSend) + .assertNothingExpired() + .assertDisposeCalled(0) + .then( + self -> { + Assertions.assertThat(processor.isTerminated()).isTrue(); + Assertions.assertThat(processor2.isTerminated()).isTrue(); + + Assertions.assertThat(processor.peek()).isEqualTo(valueToSend); + Assertions.assertThat(processor2.peek()).isEqualTo(valueToSend); + + Assertions.assertThat(self.subscribers).isEqualTo(ResolvingOperator.READY); + + Assertions.assertThat(self.add(consumer2)).isEqualTo(ResolvingOperator.READY_STATE); + }); + } + } + + @Test + public void shouldEstablishValueOnceInCaseOfRacingBetweenBlocks() { + Duration timeout = Duration.ofMillis(100); + for (int i = 0; i < 10000; i++) { + final String valueToSend = "value" + i; + + MonoProcessor processor = MonoProcessor.create(); + MonoProcessor processor2 = MonoProcessor.create(); + + ResolvingTest.create() + .assertNothingExpired() + .assertNothingReceived() + .assertPendingSubscribers(0) + .assertPendingResolution() + .whenSubscribe(self -> self.complete(valueToSend)) + .then( + self -> + RaceTestUtils.race( + () -> processor.onNext(self.block(timeout)), + () -> processor2.onNext(self.block(timeout)))) + .assertSubscribeCalled(1) + .assertPendingSubscribers(0) + .assertReceivedExactly(valueToSend) + .assertNothingExpired() + .assertDisposeCalled(0) + .then( + self -> { + Assertions.assertThat(processor.isTerminated()).isTrue(); + Assertions.assertThat(processor2.isTerminated()).isTrue(); + + Assertions.assertThat(processor.peek()).isEqualTo(valueToSend); + Assertions.assertThat(processor2.peek()).isEqualTo(valueToSend); + + Assertions.assertThat(self.subscribers).isEqualTo(ResolvingOperator.READY); + + Assertions.assertThat(self.add((v, t) -> {})) + .isEqualTo(ResolvingOperator.READY_STATE); + }); + } + } + + @Test + public void shouldExpireValueOnRacingDisposeAndError() { + Hooks.onErrorDropped(t -> {}); + RuntimeException runtimeException = new RuntimeException("test"); + for (int i = 0; i < 10000; i++) { + MonoProcessor processor = MonoProcessor.create(); + BiConsumer consumer = + (v, t) -> { + if (t != null) { + processor.onError(t); + return; + } + + processor.onNext(v); + }; + MonoProcessor processor2 = MonoProcessor.create(); + BiConsumer consumer2 = + (v, t) -> { + if (t != null) { + processor2.onError(t); + return; + } + + processor2.onNext(v); + }; + + ResolvingTest.create() + .assertNothingExpired() + .assertNothingReceived() + .assertPendingSubscribers(0) + .assertPendingResolution() + .thenAddObserver(consumer) + .assertSubscribeCalled(1) + .assertPendingSubscribers(1) + .then(self -> RaceTestUtils.race(() -> self.terminate(runtimeException), self::dispose)) + .assertPendingSubscribers(0) + .assertNothingExpired() + .assertDisposeCalled(1) + .then( + self -> { + Assertions.assertThat(self.subscribers).isEqualTo(ResolvingOperator.TERMINATED); + + Assertions.assertThat(self.add((v, t) -> {})) + .isEqualTo(ResolvingOperator.TERMINATED_STATE); + }) + .thenAddObserver(consumer2); + + StepVerifier.create(processor) + .expectErrorSatisfies( + t -> { + if (t instanceof CancellationException) { + Assertions.assertThat(t) + .isInstanceOf(CancellationException.class) + .hasMessage("Disposed"); + } else { + Assertions.assertThat(t).isInstanceOf(RuntimeException.class).hasMessage("test"); + } + }) + .verify(Duration.ofMillis(10)); + + StepVerifier.create(processor2) + .expectErrorSatisfies( + t -> { + if (t instanceof CancellationException) { + Assertions.assertThat(t) + .isInstanceOf(CancellationException.class) + .hasMessage("Disposed"); + } else { + Assertions.assertThat(t).isInstanceOf(RuntimeException.class).hasMessage("test"); + } + }) + .verify(Duration.ofMillis(10)); + + // no way to guarantee equality because of racing + // Assertions.assertThat(processor.getError()) + // .isEqualTo(processor2.getError()); + } + } + + @Test + public void shouldThrowOnBlocking() { + ResolvingTest.create() + .assertNothingExpired() + .assertNothingReceived() + .assertPendingSubscribers(0) + .assertPendingResolution() + .then( + self -> + Assertions.assertThatThrownBy(() -> self.block(Duration.ofMillis(100))) + .isInstanceOf(IllegalStateException.class) + .hasMessage("Timeout on Mono blocking read")) + .assertPendingSubscribers(0) + .assertNothingExpired() + .assertNothingReceived() + .assertDisposeCalled(0); + } + + @Test + public void shouldThrowOnBlockingIfHasAlreadyTerminated() { + ResolvingTest.create() + .assertNothingExpired() + .assertNothingReceived() + .assertPendingSubscribers(0) + .assertPendingResolution() + .whenSubscribe(self -> self.terminate(new RuntimeException("test"))) + .then( + self -> + Assertions.assertThatThrownBy(() -> self.block(Duration.ofMillis(100))) + .isInstanceOf(RuntimeException.class) + .hasMessage("test") + .hasSuppressedException(new Exception("Terminated with an error"))) + .assertPendingSubscribers(0) + .assertNothingExpired() + .assertNothingReceived() + .assertDisposeCalled(1); + } + + static Stream, Publisher>> innerCases() { + return Stream.of( + (self) -> { + final MonoProcessor processor = MonoProcessor.create(); + final ResolvingOperator.DeferredResolution operator = + new ResolvingOperator.DeferredResolution(self, processor) { + @Override + public void accept(String v, Throwable t) { + if (t != null) { + onError(t); + return; + } + + onNext(v); + } + }; + return processor.doOnSubscribe(s -> self.observe(operator)).doOnCancel(operator::cancel); + }, + (self) -> { + final MonoProcessor processor = MonoProcessor.create(); + final ResolvingOperator.MonoDeferredResolutionOperator operator = + new ResolvingOperator.MonoDeferredResolutionOperator<>(self, processor); + processor.onSubscribe(operator); + return processor.doOnSubscribe(s -> self.observe(operator)).doOnCancel(operator::cancel); + }); + } + + @ParameterizedTest + @MethodSource("innerCases") + public void shouldBePossibleToRemoveThemSelvesFromTheList_CancellationTest( + Function, Publisher> caseProducer) { + ResolvingTest.create() + .then( + self -> { + Publisher resolvingInner = caseProducer.apply(self); + StepVerifier.create(resolvingInner) + .expectSubscription() + .then(() -> self.assertSubscribeCalled(1).assertPendingSubscribers(1)) + .thenCancel() + .verify(Duration.ofMillis(100)); + }) + .assertPendingSubscribers(0) + .assertNothingExpired() + .then(self -> self.complete("test")) + .assertReceivedExactly("test"); + } + + @ParameterizedTest + @MethodSource("innerCases") + public void shouldExpireValueOnDispose( + Function, Publisher> caseProducer) { + ResolvingTest.create() + .then( + self -> { + Publisher resolvingInner = caseProducer.apply(self); + + StepVerifier.create(resolvingInner) + .expectSubscription() + .then(() -> self.complete("test")) + .expectNext("test") + .expectComplete() + .verify(Duration.ofMillis(100)); + }) + .assertPendingSubscribers(0) + .assertNothingExpired() + .assertReceivedExactly("test") + .then(ResolvingOperator::dispose) + .assertExpiredExactly("test") + .assertDisposeCalled(1); + } + + @ParameterizedTest + @MethodSource("innerCases") + public void shouldNotifyAllTheSubscribers( + Function, Publisher> caseProducer) { + + final MonoProcessor sub1 = MonoProcessor.create(); + final MonoProcessor sub2 = MonoProcessor.create(); + final MonoProcessor sub3 = MonoProcessor.create(); + final MonoProcessor sub4 = MonoProcessor.create(); + + final ArrayList> processors = new ArrayList<>(200); + + ResolvingTest.create() + .assertDisposeCalled(0) + .assertPendingSubscribers(0) + .assertNothingExpired() + .assertNothingReceived() + .assertPendingResolution() + .then( + self -> { + caseProducer.apply(self).subscribe(sub1); + caseProducer.apply(self).subscribe(sub2); + caseProducer.apply(self).subscribe(sub3); + caseProducer.apply(self).subscribe(sub4); + }) + .assertSubscribeCalled(1) + .assertPendingSubscribers(4) + .then( + self -> { + for (int i = 0; i < 100; i++) { + final MonoProcessor subA = MonoProcessor.create(); + final MonoProcessor subB = MonoProcessor.create(); + processors.add(subA); + processors.add(subB); + RaceTestUtils.race( + () -> caseProducer.apply(self).subscribe(subA), + () -> caseProducer.apply(self).subscribe(subB)); + } + }) + .assertSubscribeCalled(1) + .assertPendingSubscribers(204) + .then(self -> sub1.dispose()) + .assertPendingSubscribers(203) + .then( + self -> { + String valueToSend = "value"; + self.complete(valueToSend); + + Assertions.assertThatThrownBy(sub1::peek).isInstanceOf(CancellationException.class); + Assertions.assertThat(sub2.peek()).isEqualTo(valueToSend); + Assertions.assertThat(sub3.peek()).isEqualTo(valueToSend); + Assertions.assertThat(sub4.peek()).isEqualTo(valueToSend); + + for (MonoProcessor sub : processors) { + Assertions.assertThat(sub.peek()).isEqualTo(valueToSend); + Assertions.assertThat(sub.isTerminated()).isTrue(); + } + }) + .assertPendingSubscribers(0) + .assertNothingExpired() + .assertReceivedExactly("value"); + } + + @Test + public void shouldBeSerialIfRacyMonoInner() { + for (int i = 0; i < 10000; i++) { + long[] requested = new long[] {0}; + Subscription mockSubscription = Mockito.mock(Subscription.class); + Mockito.doAnswer( + a -> { + long argument = a.getArgument(0); + return requested[0] += argument; + }) + .when(mockSubscription) + .request(Mockito.anyLong()); + ResolvingOperator.DeferredResolution resolution = + new ResolvingOperator.DeferredResolution( + ResolvingTest.create(), AssertSubscriber.create(0)) { + + @Override + public void accept(Object o, Object o2) {} + }; + + resolution.request(5); + + RaceTestUtils.race( + () -> resolution.onSubscribe(mockSubscription), + () -> { + resolution.request(10); + resolution.request(10); + resolution.request(10); + }); + + resolution.request(15); + + Assertions.assertThat(requested[0]).isEqualTo(50L); + } + } + + @Test + public void shouldExpireValueExactlyOnceOnRacingBetweenInvalidates() { + for (int i = 0; i < 10000; i++) { + ResolvingTest.create() + .assertNothingExpired() + .assertNothingReceived() + .assertPendingResolution() + .then(self -> self.complete("test")) + .assertReceivedExactly("test") + .then(self -> RaceTestUtils.race(self::invalidate, self::invalidate)) + .assertExpiredExactly("test"); + } + } + + @Test + public void shouldExpireValueExactlyOnceOnRacingBetweenInvalidateAndDispose() { + for (int i = 0; i < 10000; i++) { + ResolvingTest.create() + .assertNothingExpired() + .assertNothingReceived() + .assertPendingResolution() + .then(self -> self.complete("test")) + .assertReceivedExactly("test") + .then(self -> RaceTestUtils.race(self::invalidate, self::dispose)) + .assertExpiredExactly("test"); + } + } + + static class ResolvingTest extends ResolvingOperator { + + final AtomicInteger subscribeCalls = new AtomicInteger(); + final AtomicInteger onDisposeCalls = new AtomicInteger(); + + final Queue received = new ConcurrentLinkedQueue<>(); + final Queue expired = new ConcurrentLinkedQueue<>(); + + Consumer> whenSubscribeConsumer = (self) -> {}; + + static ResolvingTest create() { + return new ResolvingTest<>(); + } + + public ResolvingTest assertPendingSubscribers(int cnt) { + Assertions.assertThat(this.subscribers.length).isEqualTo(cnt); + + return this; + } + + public ResolvingTest whenSubscribe(Consumer> consumer) { + this.whenSubscribeConsumer = consumer; + return this; + } + + public ResolvingTest then(Consumer> consumer) { + consumer.accept(this); + + return this; + } + + public ResolvingTest thenAddObserver(BiConsumer consumer) { + this.observe(consumer); + return this; + } + + public ResolvingTest assertPendingResolution() { + Assertions.assertThat(this.isPending()).isTrue(); + + return this; + } + + public ResolvingTest assertIsDisposed() { + Assertions.assertThat(this.isDisposed()).isTrue(); + + return this; + } + + public ResolvingTest assertSubscribeCalled(int times) { + Assertions.assertThat(subscribeCalls).hasValue(times); + + return this; + } + + public ResolvingTest assertDisposeCalled(int times) { + Assertions.assertThat(onDisposeCalls).hasValue(times); + return this; + } + + public ResolvingTest assertNothingExpired() { + return assertExpiredExactly(); + } + + public ResolvingTest assertExpiredExactly(T... values) { + Assertions.assertThat(expired).hasSize(values.length).containsExactly(values); + + return this; + } + + public ResolvingTest assertNothingReceived() { + return assertReceivedExactly(); + } + + public ResolvingTest assertReceivedExactly(T... values) { + Assertions.assertThat(received).hasSize(values.length).containsExactly(values); + + return this; + } + + public ResolvingTest ifResolvedAssertEqual(T value) { + if (received.size() > 0) { + Assertions.assertThat(received).hasSize(1).containsExactly(value); + } + + return this; + } + + @Override + protected void doOnValueResolved(T value) { + received.offer(value); + } + + @Override + protected void doOnValueExpired(T value) { + expired.offer(value); + } + + @Override + protected void doSubscribe() { + whenSubscribeConsumer.accept(this); + subscribeCalls.incrementAndGet(); + } + + @Override + protected void doOnDispose() { + onDisposeCalls.incrementAndGet(); + } + } +} From 2fa21208752892aa55cfebdbfce171063c444bd6 Mon Sep 17 00:00:00 2001 From: Oleh Dokuka Date: Thu, 28 May 2020 15:08:49 +0300 Subject: [PATCH 006/240] provides RSocketClient interface Signed-off-by: Oleh Dokuka --- .../main/java/io/rsocket/RSocketClient.java | 72 +++ .../io/rsocket/core/DefaultRSocketClient.java | 526 +++++++++++++++++ .../io/rsocket/core/RSocketConnector.java | 258 +++++---- .../core/DefaultRSocketClientTests.java | 533 ++++++++++++++++++ .../tcp/client/RSocketClientExample.java | 54 ++ 5 files changed, 1337 insertions(+), 106 deletions(-) create mode 100644 rsocket-core/src/main/java/io/rsocket/RSocketClient.java create mode 100644 rsocket-core/src/main/java/io/rsocket/core/DefaultRSocketClient.java create mode 100644 rsocket-core/src/test/java/io/rsocket/core/DefaultRSocketClientTests.java create mode 100644 rsocket-examples/src/main/java/io/rsocket/examples/transport/tcp/client/RSocketClientExample.java diff --git a/rsocket-core/src/main/java/io/rsocket/RSocketClient.java b/rsocket-core/src/main/java/io/rsocket/RSocketClient.java new file mode 100644 index 000000000..95c3d4096 --- /dev/null +++ b/rsocket-core/src/main/java/io/rsocket/RSocketClient.java @@ -0,0 +1,72 @@ +package io.rsocket; + +import org.reactivestreams.Publisher; +import reactor.core.Disposable; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +/** + * A client-side interface to simplify interactions with the {@link + * io.rsocket.core.RSocketConnector}. This interface represents logical communication over {@link + * RSocket}, hiding the complexity of {@code Mono} resolution. Also, in opposite to {@link + * RSocket} , {@link RSocketClient} supports multi-subscription on the same {@link Publisher} from + * the interaction in the way of accepting input as {@link Publisher} like {@link Mono} or {@link + * Flux} Despite, {@link RSocket} interface, {@link RSocketClient} does not coupled with a single + * connection, hence disposal of the {@link #source()} {@link RSocket} will affect the {@link + * RSocketClient} it selves. In such a case, a new request will cause automatic reconnection if + * necessary. + * + * @since 1.0.1 + */ +public interface RSocketClient extends Disposable { + + /** + * Provides access to the source {@link RSocket} used by this {@link RSocketClient} + * + * @return returns a {@link Mono} which returns the source {@link RSocket} + */ + Mono source(); + + /** + * Fire and Forget interaction model of {@link RSocketClient}. + * + * @param payloadMono Request payload as {@link Mono}. + * @return {@code Publisher} that completes when the passed {@code payload} is successfully + * handled, otherwise errors. + */ + Mono fireAndForget(Mono payloadMono); + + /** + * Request-Response interaction model of {@link RSocketClient}. + * + * @param payloadMono Request payload as {@link Mono}. + * @return {@code Publisher} containing at most a single {@code Payload} representing the + * response. + */ + Mono requestResponse(Mono payloadMono); + + /** + * Request-Stream interaction model of {@link RSocketClient}. + * + * @param payloadMono Request payload as {@link Mono}. + * @return {@code Publisher} containing the stream of {@code Payload}s representing the response. + */ + Flux requestStream(Mono payloadMono); + + /** + * Request-Channel interaction model of {@link RSocketClient}. + * + * @param payloads Stream of request payloads. + * @return Stream of response payloads. + */ + Flux requestChannel(Publisher payloads); + + /** + * Metadata-Push interaction model of {@link RSocketClient}. + * + * @param payloadMono Request payload as {@link Mono}. + * @return {@code Publisher} that completes when the passed {@code payload} is successfully + * handled, otherwise errors. + */ + Mono metadataPush(Mono payloadMono); +} diff --git a/rsocket-core/src/main/java/io/rsocket/core/DefaultRSocketClient.java b/rsocket-core/src/main/java/io/rsocket/core/DefaultRSocketClient.java new file mode 100644 index 000000000..24fa8f84c --- /dev/null +++ b/rsocket-core/src/main/java/io/rsocket/core/DefaultRSocketClient.java @@ -0,0 +1,526 @@ +package io.rsocket.core; + +import io.netty.util.IllegalReferenceCountException; +import io.netty.util.ReferenceCounted; +import io.rsocket.Payload; +import io.rsocket.RSocket; +import io.rsocket.RSocketClient; +import io.rsocket.frame.FrameType; +import java.util.AbstractMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.function.Consumer; +import java.util.stream.Stream; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import reactor.core.CorePublisher; +import reactor.core.CoreSubscriber; +import reactor.core.Scannable; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.publisher.MonoOperator; +import reactor.core.publisher.Operators; +import reactor.util.annotation.Nullable; +import reactor.util.context.Context; + +/** + * Default implementation of {@link RSocketClient} + * + * @since 1.0.1 + */ +class DefaultRSocketClient extends ResolvingOperator + implements CoreSubscriber, CorePublisher, RSocketClient { + static final Consumer DISCARD_ELEMENTS_CONSUMER = + referenceCounted -> { + if (referenceCounted.refCnt() > 0) { + try { + referenceCounted.release(); + } catch (IllegalReferenceCountException e) { + // ignored + } + } + }; + + static final Object ON_DISCARD_KEY; + + static { + Context discardAwareContext = Operators.enableOnDiscard(null, DISCARD_ELEMENTS_CONSUMER); + ON_DISCARD_KEY = discardAwareContext.stream().findFirst().get().getKey(); + } + + final Mono source; + + volatile Subscription s; + + static final AtomicReferenceFieldUpdater S = + AtomicReferenceFieldUpdater.newUpdater(DefaultRSocketClient.class, Subscription.class, "s"); + + DefaultRSocketClient(Mono source) { + this.source = source; + } + + @Override + public Mono source() { + return Mono.fromDirect(this); + } + + @Override + public Mono fireAndForget(Mono payloadMono) { + return new RSocketClientMonoOperator<>(this, FrameType.REQUEST_FNF, payloadMono); + } + + @Override + public Mono requestResponse(Mono payloadMono) { + return new RSocketClientMonoOperator<>(this, FrameType.REQUEST_RESPONSE, payloadMono); + } + + @Override + public Flux requestStream(Mono payloadMono) { + return new RSocketClientFluxOperator<>(this, FrameType.REQUEST_STREAM, payloadMono); + } + + @Override + public Flux requestChannel(Publisher payloads) { + return new RSocketClientFluxOperator<>(this, FrameType.REQUEST_CHANNEL, payloads); + } + + @Override + public Mono metadataPush(Mono payloadMono) { + return new RSocketClientMonoOperator<>(this, FrameType.METADATA_PUSH, payloadMono); + } + + @Override + @SuppressWarnings("uncheked") + public void subscribe(CoreSubscriber actual) { + final ResolvingOperator.MonoDeferredResolutionOperator inner = + new ResolvingOperator.MonoDeferredResolutionOperator<>(this, actual); + actual.onSubscribe(inner); + + this.observe(inner); + } + + @Override + public void subscribe(Subscriber s) { + subscribe(Operators.toCoreSubscriber(s)); + } + + @Override + public void onSubscribe(Subscription s) { + if (Operators.setOnce(S, this, s)) { + s.request(Long.MAX_VALUE); + } + } + + @Override + public void onComplete() { + final Subscription s = this.s; + final RSocket value = this.value; + + if (s == Operators.cancelledSubscription() || !S.compareAndSet(this, s, null)) { + this.doFinally(); + return; + } + + if (value == null) { + this.terminate(new IllegalStateException("Source completed empty")); + } else { + this.complete(value); + } + } + + @Override + public void onError(Throwable t) { + final Subscription s = this.s; + + if (s == Operators.cancelledSubscription() + || S.getAndSet(this, Operators.cancelledSubscription()) + == Operators.cancelledSubscription()) { + this.doFinally(); + Operators.onErrorDropped(t, Context.empty()); + return; + } + + this.doFinally(); + // terminate upstream which means retryBackoff has exhausted + this.terminate(t); + } + + @Override + public void onNext(RSocket value) { + if (this.s == Operators.cancelledSubscription()) { + this.doOnValueExpired(value); + return; + } + + this.value = value; + // volatile write and check on racing + this.doFinally(); + } + + @Override + protected void doSubscribe() { + this.source.subscribe(this); + } + + @Override + protected void doOnValueResolved(RSocket value) { + value.onClose().subscribe(null, t -> this.invalidate(), this::invalidate); + } + + @Override + protected void doOnValueExpired(RSocket value) { + value.dispose(); + } + + @Override + protected void doOnDispose() { + Operators.terminate(S, this); + } + + static final class FlatMapMain implements CoreSubscriber, Context, Scannable { + + final DefaultRSocketClient parent; + final CoreSubscriber actual; + + final FlattingInner second; + + Subscription s; + + boolean done; + + FlatMapMain( + DefaultRSocketClient parent, CoreSubscriber actual, FrameType requestType) { + this.parent = parent; + this.actual = actual; + this.second = new FlattingInner<>(parent, this, actual, requestType); + } + + @Override + public Context currentContext() { + return this; + } + + @Override + public Stream inners() { + return Stream.of(this.second); + } + + @Override + @Nullable + public Object scanUnsafe(Attr key) { + if (key == Attr.PARENT) return this.s; + if (key == Attr.CANCELLED) return this.second.isCancelled(); + if (key == Attr.TERMINATED) return this.done; + + return null; + } + + @Override + public void onSubscribe(Subscription s) { + if (Operators.validate(this.s, s)) { + this.s = s; + this.actual.onSubscribe(this.second); + } + } + + @Override + public void onNext(Payload payload) { + if (this.done) { + if (payload.refCnt() > 0) { + try { + payload.release(); + } catch (IllegalReferenceCountException e) { + // ignored + } + } + return; + } + this.done = true; + + final FlattingInner inner = this.second; + + if (inner.isCancelled()) { + if (payload.refCnt() > 0) { + try { + payload.release(); + } catch (IllegalReferenceCountException e) { + // ignored + } + } + return; + } + + inner.payload = payload; + + if (inner.isCancelled()) { + if (FlattingInner.PAYLOAD.compareAndSet(inner, payload, null)) { + if (payload.refCnt() > 0) { + try { + payload.release(); + } catch (IllegalReferenceCountException e) { + // ignored + } + } + } + return; + } + + this.parent.observe(inner); + } + + @Override + public void onError(Throwable t) { + if (this.done) { + Operators.onErrorDropped(t, this.actual.currentContext()); + return; + } + this.done = true; + + this.actual.onError(t); + } + + @Override + public void onComplete() { + if (this.done) { + return; + } + this.done = true; + + this.actual.onComplete(); + } + + void request(long n) { + this.s.request(n); + } + + void cancel() { + this.s.cancel(); + } + + @Override + @SuppressWarnings("unchecked") + public K get(Object key) { + if (key == ON_DISCARD_KEY) { + return (K) DISCARD_ELEMENTS_CONSUMER; + } + return this.actual.currentContext().get(key); + } + + @Override + public boolean hasKey(Object key) { + if (key == ON_DISCARD_KEY) { + return true; + } + return this.actual.currentContext().hasKey(key); + } + + @Override + public Context put(Object key, Object value) { + return this.actual + .currentContext() + .put(ON_DISCARD_KEY, DISCARD_ELEMENTS_CONSUMER) + .put(key, value); + } + + @Override + public Context delete(Object key) { + return this.actual + .currentContext() + .put(ON_DISCARD_KEY, DISCARD_ELEMENTS_CONSUMER) + .delete(key); + } + + @Override + public int size() { + return this.actual.currentContext().size() + 1; + } + + @Override + public Stream> stream() { + return Stream.concat( + Stream.of( + new AbstractMap.SimpleImmutableEntry<>(ON_DISCARD_KEY, DISCARD_ELEMENTS_CONSUMER)), + this.actual.currentContext().stream()); + } + } + + static final class FlattingInner extends DeferredResolution { + + final FlatMapMain main; + final FrameType interactionType; + + volatile Payload payload; + + @SuppressWarnings("rawtypes") + static final AtomicReferenceFieldUpdater PAYLOAD = + AtomicReferenceFieldUpdater.newUpdater(FlattingInner.class, Payload.class, "payload"); + + FlattingInner( + DefaultRSocketClient parent, + FlatMapMain main, + CoreSubscriber actual, + FrameType interactionType) { + super(parent, actual); + + this.main = main; + this.interactionType = interactionType; + } + + @Override + @SuppressWarnings({"unchecked", "rawtypes"}) + public void accept(RSocket rSocket, Throwable t) { + if (this.isCancelled()) { + return; + } + + Payload payload = PAYLOAD.getAndSet(this, null); + + // means cancelled + if (payload == null) { + return; + } + + if (t != null) { + if (payload.refCnt() > 0) { + try { + payload.release(); + } catch (IllegalReferenceCountException e) { + // ignored + } + } + onError(t); + return; + } + + CorePublisher source; + switch (this.interactionType) { + case REQUEST_FNF: + source = rSocket.fireAndForget(payload); + break; + case REQUEST_RESPONSE: + source = rSocket.requestResponse(payload); + break; + case REQUEST_STREAM: + source = rSocket.requestStream(payload); + break; + case METADATA_PUSH: + source = rSocket.metadataPush(payload); + break; + default: + this.onError(new IllegalStateException("Should never happen")); + return; + } + + source.subscribe((CoreSubscriber) this); + } + + @Override + public void request(long n) { + this.main.request(n); + super.request(n); + } + + public void cancel() { + long state = REQUESTED.getAndSet(this, STATE_CANCELLED); + if (state == STATE_CANCELLED) { + return; + } + + this.main.cancel(); + + if (state == STATE_SUBSCRIBED) { + this.s.cancel(); + } else { + this.parent.remove(this); + Payload payload = PAYLOAD.getAndSet(this, null); + if (payload != null) { + payload.release(); + } + } + } + } + + static final class RequestChannelInner extends DeferredResolution { + + final FrameType interactionType; + final Publisher upstream; + + RequestChannelInner( + DefaultRSocketClient parent, + Publisher upstream, + CoreSubscriber actual, + FrameType interactionType) { + super(parent, actual); + + this.upstream = upstream; + this.interactionType = interactionType; + } + + @Override + public void accept(RSocket rSocket, Throwable t) { + if (this.isCancelled()) { + return; + } + + if (t != null) { + onError(t); + return; + } + + Flux source; + if (this.interactionType == FrameType.REQUEST_CHANNEL) { + source = rSocket.requestChannel(this.upstream); + } else { + this.onError(new IllegalStateException("Should never happen")); + return; + } + + source.subscribe(this); + } + } + + static class RSocketClientMonoOperator extends MonoOperator { + + final DefaultRSocketClient parent; + final FrameType requestType; + + public RSocketClientMonoOperator( + DefaultRSocketClient parent, FrameType requestType, Mono source) { + super(source); + this.parent = parent; + this.requestType = requestType; + } + + @Override + public void subscribe(CoreSubscriber actual) { + this.source.subscribe(new FlatMapMain(this.parent, actual, this.requestType)); + } + } + + static class RSocketClientFluxOperator> extends Flux { + + final DefaultRSocketClient parent; + final FrameType requestType; + final ST source; + + public RSocketClientFluxOperator( + DefaultRSocketClient parent, FrameType requestType, ST source) { + this.parent = parent; + this.requestType = requestType; + this.source = source; + } + + @Override + public void subscribe(CoreSubscriber actual) { + if (requestType == FrameType.REQUEST_CHANNEL) { + RequestChannelInner inner = + new RequestChannelInner(this.parent, source, actual, requestType); + actual.onSubscribe(inner); + this.parent.observe(inner); + } else { + this.source.subscribe(new FlatMapMain<>(this.parent, actual, this.requestType)); + } + } + } +} diff --git a/rsocket-core/src/main/java/io/rsocket/core/RSocketConnector.java b/rsocket-core/src/main/java/io/rsocket/core/RSocketConnector.java index aa1e0a553..37857be30 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/RSocketConnector.java +++ b/rsocket-core/src/main/java/io/rsocket/core/RSocketConnector.java @@ -21,6 +21,7 @@ import io.rsocket.DuplexConnection; import io.rsocket.Payload; import io.rsocket.RSocket; +import io.rsocket.RSocketClient; import io.rsocket.SocketAcceptor; import io.rsocket.fragmentation.FragmentationDuplexConnection; import io.rsocket.fragmentation.ReassemblyDuplexConnection; @@ -117,6 +118,15 @@ public static Mono connectWith(ClientTransport transport) { return RSocketConnector.create().connect(() -> transport); } + /** + * @param transport + * @return + * @since 1.0.1 + */ + public static RSocketClient createRSocketClient(ClientTransport transport) { + return RSocketConnector.create().toRSocketClient(transport); + } + /** * Provide a {@code Payload} with data and/or metadata for the initial {@code SETUP} frame. Data * and metadata should be formatted according to the MIME types specified via {@link @@ -470,112 +480,7 @@ public Mono connect(ClientTransport transport) { * @return a {@code Mono} with the connected RSocket */ public Mono connect(Supplier transportSupplier) { - - Mono connectionMono = - Mono.fromSupplier(transportSupplier) - .flatMap(ClientTransport::connect) - .map( - connection -> - mtu > 0 - ? new FragmentationDuplexConnection(connection, mtu, "client") - : new ReassemblyDuplexConnection(connection)); - return connectionMono - .flatMap( - connection -> { - ByteBuf resumeToken; - KeepAliveHandler keepAliveHandler; - DuplexConnection wrappedConnection; - - if (resume != null) { - resumeToken = resume.getTokenSupplier().get(); - ClientRSocketSession session = - new ClientRSocketSession( - connection, - resume.getSessionDuration(), - resume.getRetry(), - resume.getStoreFactory(CLIENT_TAG).apply(resumeToken), - resume.getStreamTimeout(), - resume.isCleanupStoreOnKeepAlive()) - .continueWith(connectionMono) - .resumeToken(resumeToken); - keepAliveHandler = - new KeepAliveHandler.ResumableKeepAliveHandler(session.resumableConnection()); - wrappedConnection = session.resumableConnection(); - } else { - resumeToken = Unpooled.EMPTY_BUFFER; - keepAliveHandler = new KeepAliveHandler.DefaultKeepAliveHandler(connection); - wrappedConnection = connection; - } - - ClientServerInputMultiplexer multiplexer = - new ClientServerInputMultiplexer(wrappedConnection, interceptors, true); - - boolean leaseEnabled = leasesSupplier != null; - Leases leases = leaseEnabled ? leasesSupplier.get() : null; - RequesterLeaseHandler requesterLeaseHandler = - leaseEnabled - ? new RequesterLeaseHandler.Impl(CLIENT_TAG, leases.receiver()) - : RequesterLeaseHandler.None; - - RSocket rSocketRequester = - new RSocketRequester( - multiplexer.asClientConnection(), - payloadDecoder, - StreamIdSupplier.clientSupplier(), - mtu, - (int) keepAliveInterval.toMillis(), - (int) keepAliveMaxLifeTime.toMillis(), - keepAliveHandler, - requesterLeaseHandler, - Schedulers.single(Schedulers.parallel())); - - RSocket wrappedRSocketRequester = interceptors.initRequester(rSocketRequester); - - ByteBuf setupFrame = - SetupFrameCodec.encode( - wrappedConnection.alloc(), - leaseEnabled, - (int) keepAliveInterval.toMillis(), - (int) keepAliveMaxLifeTime.toMillis(), - resumeToken, - metadataMimeType, - dataMimeType, - setupPayload); - - SocketAcceptor acceptor = - this.acceptor != null ? this.acceptor : SocketAcceptor.with(new RSocket() {}); - - ConnectionSetupPayload setup = new DefaultConnectionSetupPayload(setupFrame); - - return interceptors - .initSocketAcceptor(acceptor) - .accept(setup, wrappedRSocketRequester) - .flatMap( - rSocketHandler -> { - RSocket wrappedRSocketHandler = interceptors.initResponder(rSocketHandler); - - ResponderLeaseHandler responderLeaseHandler = - leaseEnabled - ? new ResponderLeaseHandler.Impl<>( - CLIENT_TAG, - wrappedConnection.alloc(), - leases.sender(), - leases.stats()) - : ResponderLeaseHandler.None; - - RSocket rSocketResponder = - new RSocketResponder( - multiplexer.asServerConnection(), - wrappedRSocketHandler, - payloadDecoder, - responderLeaseHandler, - mtu); - - return wrappedConnection - .sendOne(setupFrame) - .thenReturn(wrappedRSocketRequester); - }); - }) + return this.connect0(transportSupplier) .as( source -> { if (retrySpec != null) { @@ -586,4 +491,145 @@ public Mono connect(Supplier transportSupplier) { } }); } + + /** + * The final step to connect with the transport to use as input and the resulting {@link + * RSocketClient} as output. + * + *

Please note, {@link RSocketClient} does not represent a single or wired connection and will + * do that depends on the demand (pending requests). Therefore, in order to ensure that connection + * will be established in a case of error, {@link #reconnect(Retry) reconnect} may be enabled. + * + *

The following transports are available (via additional RSocket Java modules): + * + *

    + *
  • {@link io.rsocket.transport.netty.client.TcpClientTransport TcpClientTransport} via + * {@code rsocket-transport-netty}. + *
  • {@link io.rsocket.transport.netty.client.WebsocketClientTransport + * WebsocketClientTransport} via {@code rsocket-transport-netty}. + *
  • {@link io.rsocket.transport.local.LocalClientTransport LocalClientTransport} via {@code + * rsocket-transport-local} + *
+ * + * @param transport the transport of choice to connect with + * @return a {@code RSocketClient} with not established connection. Note, connection will be + * established on the first request + * @since 1.0.1 + */ + public RSocketClient toRSocketClient(ClientTransport transport) { + Mono source = connect0(() -> transport); + + if (retrySpec != null) { + source = source.retryWhen(retrySpec); + } + + return new DefaultRSocketClient(source); + } + + private Mono connect0(Supplier transportSupplier) { + Mono connectionMono = + Mono.fromSupplier(transportSupplier) + .flatMap(ClientTransport::connect) + .map( + connection -> + mtu > 0 + ? new FragmentationDuplexConnection(connection, mtu, "client") + : new ReassemblyDuplexConnection(connection)); + return connectionMono.flatMap( + connection -> { + ByteBuf resumeToken; + KeepAliveHandler keepAliveHandler; + DuplexConnection wrappedConnection; + + if (resume != null) { + resumeToken = resume.getTokenSupplier().get(); + ClientRSocketSession session = + new ClientRSocketSession( + connection, + resume.getSessionDuration(), + resume.getRetry(), + resume.getStoreFactory(CLIENT_TAG).apply(resumeToken), + resume.getStreamTimeout(), + resume.isCleanupStoreOnKeepAlive()) + .continueWith(connectionMono) + .resumeToken(resumeToken); + keepAliveHandler = + new KeepAliveHandler.ResumableKeepAliveHandler(session.resumableConnection()); + wrappedConnection = session.resumableConnection(); + } else { + resumeToken = Unpooled.EMPTY_BUFFER; + keepAliveHandler = new KeepAliveHandler.DefaultKeepAliveHandler(connection); + wrappedConnection = connection; + } + + ClientServerInputMultiplexer multiplexer = + new ClientServerInputMultiplexer(wrappedConnection, interceptors, true); + + boolean leaseEnabled = leasesSupplier != null; + Leases leases = leaseEnabled ? leasesSupplier.get() : null; + RequesterLeaseHandler requesterLeaseHandler = + leaseEnabled + ? new RequesterLeaseHandler.Impl(CLIENT_TAG, leases.receiver()) + : RequesterLeaseHandler.None; + + RSocket rSocketRequester = + new RSocketRequester( + multiplexer.asClientConnection(), + payloadDecoder, + StreamIdSupplier.clientSupplier(), + mtu, + (int) keepAliveInterval.toMillis(), + (int) keepAliveMaxLifeTime.toMillis(), + keepAliveHandler, + requesterLeaseHandler, + Schedulers.single(Schedulers.parallel())); + + RSocket wrappedRSocketRequester = interceptors.initRequester(rSocketRequester); + + ByteBuf setupFrame = + SetupFrameCodec.encode( + wrappedConnection.alloc(), + leaseEnabled, + (int) keepAliveInterval.toMillis(), + (int) keepAliveMaxLifeTime.toMillis(), + resumeToken, + metadataMimeType, + dataMimeType, + setupPayload); + + SocketAcceptor acceptor = + this.acceptor != null ? this.acceptor : SocketAcceptor.with(new RSocket() {}); + + ConnectionSetupPayload setup = new DefaultConnectionSetupPayload(setupFrame); + + return interceptors + .initSocketAcceptor(acceptor) + .accept(setup, wrappedRSocketRequester) + .flatMap( + rSocketHandler -> { + RSocket wrappedRSocketHandler = interceptors.initResponder(rSocketHandler); + + ResponderLeaseHandler responderLeaseHandler = + leaseEnabled + ? new ResponderLeaseHandler.Impl<>( + CLIENT_TAG, + wrappedConnection.alloc(), + leases.sender(), + leases.stats()) + : ResponderLeaseHandler.None; + + RSocket rSocketResponder = + new RSocketResponder( + multiplexer.asServerConnection(), + wrappedRSocketHandler, + payloadDecoder, + responderLeaseHandler, + mtu); + + return wrappedConnection + .sendOne(setupFrame) + .thenReturn(wrappedRSocketRequester); + }); + }); + } } diff --git a/rsocket-core/src/test/java/io/rsocket/core/DefaultRSocketClientTests.java b/rsocket-core/src/test/java/io/rsocket/core/DefaultRSocketClientTests.java new file mode 100644 index 000000000..ec52713e1 --- /dev/null +++ b/rsocket-core/src/test/java/io/rsocket/core/DefaultRSocketClientTests.java @@ -0,0 +1,533 @@ +package io.rsocket.core; +/* + * Copyright 2015-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import static io.rsocket.frame.FrameHeaderCodec.frameType; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.hasSize; + +import io.netty.buffer.ByteBuf; +import io.netty.util.CharsetUtil; +import io.netty.util.ReferenceCountUtil; +import io.netty.util.ReferenceCounted; +import io.rsocket.Payload; +import io.rsocket.RSocket; +import io.rsocket.RSocketClient; +import io.rsocket.TestScheduler; +import io.rsocket.frame.ErrorFrameCodec; +import io.rsocket.frame.FrameHeaderCodec; +import io.rsocket.frame.FrameType; +import io.rsocket.frame.PayloadFrameCodec; +import io.rsocket.frame.decoder.PayloadDecoder; +import io.rsocket.internal.subscriber.AssertSubscriber; +import io.rsocket.lease.RequesterLeaseHandler; +import io.rsocket.util.ByteBufPayload; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CancellationException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.BiFunction; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.assertj.core.api.Assertions; +import org.assertj.core.api.Assumptions; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.junit.runners.model.Statement; +import org.reactivestreams.Publisher; +import reactor.core.Disposable; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Hooks; +import reactor.core.publisher.Mono; +import reactor.core.publisher.MonoProcessor; +import reactor.core.publisher.SignalType; +import reactor.test.StepVerifier; +import reactor.test.publisher.TestPublisher; +import reactor.test.util.RaceTestUtils; +import reactor.util.context.Context; +import reactor.util.retry.Retry; + +public class DefaultRSocketClientTests { + + ClientSocketRule rule; + + @BeforeEach + public void setUp() throws Throwable { + Hooks.onNextDropped(ReferenceCountUtil::safeRelease); + Hooks.onErrorDropped((t) -> {}); + rule = new ClientSocketRule(); + rule.apply( + new Statement() { + @Override + public void evaluate() {} + }, + null) + .evaluate(); + } + + @AfterEach + public void tearDown() { + Hooks.resetOnErrorDropped(); + Hooks.resetOnNextDropped(); + } + + static Stream interactions() { + return Stream.of( + Arguments.of( + (BiFunction, Publisher>) + (client, payload) -> client.fireAndForget(Mono.fromDirect(payload)), + FrameType.REQUEST_FNF), + Arguments.of( + (BiFunction, Publisher>) + (client, payload) -> client.requestResponse(Mono.fromDirect(payload)), + FrameType.REQUEST_RESPONSE), + Arguments.of( + (BiFunction, Publisher>) + (client, payload) -> client.requestStream(Mono.fromDirect(payload)), + FrameType.REQUEST_STREAM), + Arguments.of( + (BiFunction, Publisher>) + RSocketClient::requestChannel, + FrameType.REQUEST_CHANNEL), + Arguments.of( + (BiFunction, Publisher>) + (client, payload) -> client.metadataPush(Mono.fromDirect(payload)), + FrameType.METADATA_PUSH)); + } + + @ParameterizedTest + @MethodSource("interactions") + public void shouldSentFrameOnResolution( + BiFunction, Publisher> request, FrameType requestType) { + Payload payload = ByteBufPayload.create("test", "testMetadata"); + TestPublisher testPublisher = + TestPublisher.createNoncompliant(TestPublisher.Violation.DEFER_CANCELLATION); + + Publisher publisher = request.apply(rule.client, testPublisher); + + StepVerifier.create(publisher) + .expectSubscription() + .then(() -> Assertions.assertThat(rule.connection.getSent()).isEmpty()) + .then( + () -> { + if (requestType != FrameType.REQUEST_CHANNEL) { + testPublisher.next(payload); + } + }) + .then(() -> rule.delayer.run()) + .then( + () -> { + if (requestType == FrameType.REQUEST_CHANNEL) { + testPublisher.next(payload); + } + }) + .then(testPublisher::complete) + .then( + () -> + Assertions.assertThat(rule.connection.getSent()) + .hasSize(1) + .first() + .matches(bb -> FrameHeaderCodec.frameType(bb).equals(requestType)) + .matches(ReferenceCounted::release)) + .then( + () -> { + if (requestType != FrameType.REQUEST_FNF && requestType != FrameType.METADATA_PUSH) { + rule.connection.addToReceivedBuffer( + PayloadFrameCodec.encodeComplete(rule.allocator, 1)); + } + }) + .expectComplete() + .verify(Duration.ofMillis(1000)); + + rule.allocator.assertHasNoLeaks(); + } + + @ParameterizedTest + @MethodSource("interactions") + @SuppressWarnings({"unchecked", "rawtypes"}) + public void shouldHaveNoLeaksOnPayloadInCaseOfRacingOfOnNextAndCancel( + BiFunction, Publisher> request, FrameType requestType) + throws Throwable { + Assumptions.assumeThat(requestType).isNotEqualTo(FrameType.REQUEST_CHANNEL); + + for (int i = 0; i < 10000; i++) { + ClientSocketRule rule = new ClientSocketRule(); + rule.apply( + new Statement() { + @Override + public void evaluate() {} + }, + null) + .evaluate(); + Payload payload = ByteBufPayload.create("test", "testMetadata"); + TestPublisher testPublisher = + TestPublisher.createNoncompliant(TestPublisher.Violation.DEFER_CANCELLATION); + AssertSubscriber assertSubscriber = AssertSubscriber.create(0); + + Publisher publisher = request.apply(rule.client, testPublisher); + publisher.subscribe(assertSubscriber); + + testPublisher.assertWasNotRequested(); + + assertSubscriber.request(1); + + testPublisher.assertWasRequested(); + testPublisher.assertMaxRequested(1); + testPublisher.assertMinRequested(1); + + RaceTestUtils.race( + () -> { + testPublisher.next(payload); + rule.delayer.run(); + }, + assertSubscriber::cancel); + + Collection sent = rule.connection.getSent(); + if (sent.size() == 1) { + Assertions.assertThat(sent) + .allMatch(bb -> FrameHeaderCodec.frameType(bb).equals(requestType)) + .allMatch(ReferenceCounted::release); + } else if (sent.size() == 2) { + Assertions.assertThat(sent) + .first() + .matches(bb -> FrameHeaderCodec.frameType(bb).equals(requestType)) + .matches(ReferenceCounted::release); + Assertions.assertThat(sent) + .element(1) + .matches(bb -> FrameHeaderCodec.frameType(bb).equals(FrameType.CANCEL)) + .matches(ReferenceCounted::release); + } else { + Assertions.assertThat(sent).isEmpty(); + } + + rule.allocator.assertHasNoLeaks(); + } + } + + @ParameterizedTest + @MethodSource("interactions") + @SuppressWarnings({"unchecked", "rawtypes"}) + public void shouldHaveNoLeaksOnPayloadInCaseOfRacingOfRequestAndCancel( + BiFunction, Publisher> request, FrameType requestType) + throws Throwable { + Assumptions.assumeThat(requestType).isNotEqualTo(FrameType.REQUEST_CHANNEL); + + for (int i = 0; i < 10000; i++) { + ClientSocketRule rule = new ClientSocketRule(); + rule.apply( + new Statement() { + @Override + public void evaluate() {} + }, + null) + .evaluate(); + ByteBuf dataBuffer = rule.allocator.buffer(); + dataBuffer.writeCharSequence("test", CharsetUtil.UTF_8); + + ByteBuf metadataBuffer = rule.allocator.buffer(); + metadataBuffer.writeCharSequence("testMetadata", CharsetUtil.UTF_8); + + Payload payload = ByteBufPayload.create(dataBuffer, metadataBuffer); + AssertSubscriber assertSubscriber = AssertSubscriber.create(0); + + Publisher publisher = request.apply(rule.client, Mono.just(payload)); + publisher.subscribe(assertSubscriber); + + RaceTestUtils.race( + () -> { + assertSubscriber.request(1); + rule.delayer.run(); + }, + assertSubscriber::cancel); + + Collection sent = rule.connection.getSent(); + if (sent.size() == 1) { + Assertions.assertThat(sent) + .allMatch(bb -> FrameHeaderCodec.frameType(bb).equals(requestType)) + .allMatch(ReferenceCounted::release); + } else if (sent.size() == 2) { + Assertions.assertThat(sent) + .first() + .matches(bb -> FrameHeaderCodec.frameType(bb).equals(requestType)) + .matches(ReferenceCounted::release); + Assertions.assertThat(sent) + .element(1) + .matches(bb -> FrameHeaderCodec.frameType(bb).equals(FrameType.CANCEL)) + .matches(ReferenceCounted::release); + } else { + Assertions.assertThat(sent).isEmpty(); + } + + rule.allocator.assertHasNoLeaks(); + } + } + + @ParameterizedTest + @MethodSource("interactions") + @SuppressWarnings({"unchecked", "rawtypes"}) + public void shouldPropagateDownstreamContext( + BiFunction, Publisher> request, FrameType requestType) { + Assumptions.assumeThat(requestType).isNotEqualTo(FrameType.REQUEST_CHANNEL); + + ByteBuf dataBuffer = rule.allocator.buffer(); + dataBuffer.writeCharSequence("test", CharsetUtil.UTF_8); + + ByteBuf metadataBuffer = rule.allocator.buffer(); + metadataBuffer.writeCharSequence("testMetadata", CharsetUtil.UTF_8); + + Payload payload = ByteBufPayload.create(dataBuffer, metadataBuffer); + AssertSubscriber assertSubscriber = new AssertSubscriber(Context.of("test", "test")); + + Context[] receivedContext = new Context[1]; + Publisher publisher = + request.apply( + rule.client, + Mono.just(payload) + .mergeWith( + Mono.subscriberContext() + .doOnNext(c -> receivedContext[0] = c) + .then(Mono.empty()))); + publisher.subscribe(assertSubscriber); + + rule.delayer.run(); + + Collection sent = rule.connection.getSent(); + if (sent.size() == 1) { + Assertions.assertThat(sent) + .allMatch(bb -> FrameHeaderCodec.frameType(bb).equals(requestType)) + .allMatch(ReferenceCounted::release); + } else if (sent.size() == 2) { + Assertions.assertThat(sent) + .first() + .matches(bb -> FrameHeaderCodec.frameType(bb).equals(requestType)) + .matches(ReferenceCounted::release); + Assertions.assertThat(sent) + .element(1) + .matches(bb -> FrameHeaderCodec.frameType(bb).equals(FrameType.CANCEL)) + .matches(ReferenceCounted::release); + } else { + Assertions.assertThat(sent).isEmpty(); + } + + Assertions.assertThat(receivedContext) + .hasSize(1) + .allSatisfy( + c -> + Assertions.assertThat( + c.stream() + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))) + .containsKeys("test", DefaultRSocketClient.ON_DISCARD_KEY)); + + rule.allocator.assertHasNoLeaks(); + } + + @ParameterizedTest + @MethodSource("interactions") + @SuppressWarnings({"unchecked", "rawtypes"}) + public void shouldSupportMultiSubscriptionOnTheSameInteractionPublisher( + BiFunction, Publisher> request, FrameType requestType) { + AtomicBoolean once1 = new AtomicBoolean(); + AtomicBoolean once2 = new AtomicBoolean(); + Mono source = + Mono.fromCallable( + () -> { + if (!once1.getAndSet(true)) { + throw new IllegalStateException("test"); + } + return ByteBufPayload.create("test", "testMetadata"); + }) + .doFinally( + st -> { + rule.delayer.run(); + if (requestType != FrameType.METADATA_PUSH + && requestType != FrameType.REQUEST_FNF) { + if (st != SignalType.ON_ERROR) { + if (!once2.getAndSet(true)) { + rule.connection.addToReceivedBuffer( + ErrorFrameCodec.encode( + rule.allocator, 1, new IllegalStateException("test"))); + } else { + rule.connection.addToReceivedBuffer( + PayloadFrameCodec.encodeComplete(rule.allocator, 3)); + } + } + } + }); + AssertSubscriber assertSubscriber = AssertSubscriber.create(0); + + Publisher publisher = request.apply(rule.client, source); + if (publisher instanceof Mono) { + ((Mono) publisher) + .retryWhen(Retry.backoff(3, Duration.ofMillis(100))) + .subscribe(assertSubscriber); + } else { + ((Flux) publisher) + .retryWhen(Retry.backoff(3, Duration.ofMillis(100))) + .subscribe(assertSubscriber); + } + + assertSubscriber.request(1); + + if (requestType == FrameType.REQUEST_CHANNEL) { + rule.delayer.run(); + } + + assertSubscriber.await(Duration.ofSeconds(10)).assertComplete(); + + Collection sent = rule.connection.getSent(); + Assertions.assertThat(sent) + .allMatch(bb -> FrameHeaderCodec.frameType(bb).equals(requestType)) + .allMatch(ReferenceCounted::release); + + rule.allocator.assertHasNoLeaks(); + } + + @Test + public void shouldBeAbleToResolveOriginalSource() { + AssertSubscriber assertSubscriber = AssertSubscriber.create(0); + rule.client.source().subscribe(assertSubscriber); + + assertSubscriber.assertNotTerminated(); + + rule.delayer.run(); + + assertSubscriber.request(1); + + assertSubscriber.assertTerminated().assertValueCount(1); + + AssertSubscriber assertSubscriber1 = AssertSubscriber.create(); + + rule.client.source().subscribe(assertSubscriber1); + + assertSubscriber1.assertTerminated().assertValueCount(1); + + Assertions.assertThat(assertSubscriber1.values()).isEqualTo(assertSubscriber.values()); + } + + @Test + public void shouldDisposeOriginalSource() { + AssertSubscriber assertSubscriber = AssertSubscriber.create(); + rule.client.source().subscribe(assertSubscriber); + rule.delayer.run(); + assertSubscriber.assertTerminated().assertValueCount(1); + + rule.client.dispose(); + + Assertions.assertThat(rule.client.isDisposed()).isTrue(); + + AssertSubscriber assertSubscriber1 = AssertSubscriber.create(); + + rule.client.source().subscribe(assertSubscriber1); + + assertSubscriber1 + .assertTerminated() + .assertError(CancellationException.class) + .assertErrorMessage("Disposed"); + + Assertions.assertThat(rule.socket.isDisposed()).isTrue(); + } + + @Test + public void shouldDisposeOriginalSourceIfRacing() throws Throwable { + for (int i = 0; i < 10000; i++) { + ClientSocketRule rule = new ClientSocketRule(); + rule.apply( + new Statement() { + @Override + public void evaluate() {} + }, + null) + .evaluate(); + + AssertSubscriber assertSubscriber = AssertSubscriber.create(); + rule.client.source().subscribe(assertSubscriber); + + RaceTestUtils.race(rule.delayer, () -> rule.client.dispose()); + + assertSubscriber.assertTerminated(); + + Assertions.assertThat(rule.client.isDisposed()).isTrue(); + Assertions.assertThat(rule.socket.isDisposed()).isTrue(); + + AssertSubscriber assertSubscriber1 = AssertSubscriber.create(); + + rule.client.source().subscribe(assertSubscriber1); + + assertSubscriber1 + .assertTerminated() + .assertError(CancellationException.class) + .assertErrorMessage("Disposed"); + } + } + + public static class ClientSocketRule extends AbstractSocketRule { + + protected RSocketClient client; + protected Runnable delayer; + protected MonoProcessor producer; + + @Override + protected void init() { + super.init(); + delayer = () -> producer.onNext(socket); + producer = MonoProcessor.create(); + client = + new DefaultRSocketClient( + producer + .doOnCancel(() -> socket.dispose()) + .doOnDiscard(Disposable.class, Disposable::dispose)); + } + + @Override + protected RSocketRequester newRSocket() { + return new RSocketRequester( + connection, + PayloadDecoder.ZERO_COPY, + StreamIdSupplier.clientSupplier(), + 0, + 0, + 0, + null, + RequesterLeaseHandler.None, + TestScheduler.INSTANCE); + } + + public int getStreamIdForRequestType(FrameType expectedFrameType) { + assertThat("Unexpected frames sent.", connection.getSent(), hasSize(greaterThanOrEqualTo(1))); + List framesFound = new ArrayList<>(); + for (ByteBuf frame : connection.getSent()) { + FrameType frameType = frameType(frame); + if (frameType == expectedFrameType) { + return FrameHeaderCodec.streamId(frame); + } + framesFound.add(frameType); + } + throw new AssertionError( + "No frames sent with frame type: " + + expectedFrameType + + ", frames found: " + + framesFound); + } + } +} diff --git a/rsocket-examples/src/main/java/io/rsocket/examples/transport/tcp/client/RSocketClientExample.java b/rsocket-examples/src/main/java/io/rsocket/examples/transport/tcp/client/RSocketClientExample.java new file mode 100644 index 000000000..e1bf459b9 --- /dev/null +++ b/rsocket-examples/src/main/java/io/rsocket/examples/transport/tcp/client/RSocketClientExample.java @@ -0,0 +1,54 @@ +package io.rsocket.examples.transport.tcp.client; + +import io.rsocket.Payload; +import io.rsocket.RSocketClient; +import io.rsocket.SocketAcceptor; +import io.rsocket.core.RSocketConnector; +import io.rsocket.core.RSocketServer; +import io.rsocket.transport.netty.client.TcpClientTransport; +import io.rsocket.transport.netty.server.TcpServerTransport; +import io.rsocket.util.DefaultPayload; +import java.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.publisher.Mono; +import reactor.util.retry.Retry; + +public class RSocketClientExample { + static final Logger logger = LoggerFactory.getLogger(RSocketClientExample.class); + + public static void main(String[] args) { + + RSocketServer.create( + SocketAcceptor.forRequestResponse( + p -> { + String data = p.getDataUtf8(); + logger.info("Received request data {}", data); + + Payload responsePayload = DefaultPayload.create("Echo: " + data); + p.release(); + + return Mono.just(responsePayload); + })) + .bind(TcpServerTransport.create("localhost", 7000)) + .delaySubscription(Duration.ofSeconds(5)) + .doOnNext(cc -> logger.info("Server started on the address : {}", cc.address())) + .subscribe(); + + RSocketClient rSocketClient = + RSocketConnector.create() + .reconnect(Retry.backoff(50, Duration.ofMillis(500))) + .toRSocketClient(TcpClientTransport.create("localhost", 7000)); + + rSocketClient + .requestResponse(Mono.just(DefaultPayload.create("Test Request"))) + .doOnSubscribe(s -> logger.info("Executing Request")) + .doOnNext( + d -> { + logger.info("Received response data {}", d.getDataUtf8()); + d.release(); + }) + .repeat(10) + .blockLast(); + } +} From 021e17d4525489cb78fa8b078ffa4e8666f21d8a Mon Sep 17 00:00:00 2001 From: Oleh Dokuka Date: Mon, 1 Jun 2020 20:16:26 +0300 Subject: [PATCH 007/240] fixes ConnectionSetupPayload refcnt management (#854) Signed-off-by: Oleh Dokuka --- .../io/rsocket/core/RSocketConnector.java | 5 +- .../io/rsocket/core/RSocketConnectorTest.java | 53 +++++++++++++++++++ 2 files changed, 56 insertions(+), 2 deletions(-) diff --git a/rsocket-core/src/main/java/io/rsocket/core/RSocketConnector.java b/rsocket-core/src/main/java/io/rsocket/core/RSocketConnector.java index 37857be30..621e1df1d 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/RSocketConnector.java +++ b/rsocket-core/src/main/java/io/rsocket/core/RSocketConnector.java @@ -627,9 +627,10 @@ private Mono connect0(Supplier transportSupplier) { mtu); return wrappedConnection - .sendOne(setupFrame) + .sendOne(setupFrame.retain()) .thenReturn(wrappedRSocketRequester); - }); + }) + .doFinally(signalType -> setup.release()); }); } } diff --git a/rsocket-core/src/test/java/io/rsocket/core/RSocketConnectorTest.java b/rsocket-core/src/test/java/io/rsocket/core/RSocketConnectorTest.java index de85a635d..00da3c24b 100644 --- a/rsocket-core/src/test/java/io/rsocket/core/RSocketConnectorTest.java +++ b/rsocket-core/src/test/java/io/rsocket/core/RSocketConnectorTest.java @@ -1,6 +1,9 @@ package io.rsocket.core; +import io.netty.buffer.ByteBuf; +import io.netty.util.CharsetUtil; import io.netty.util.ReferenceCounted; +import io.rsocket.ConnectionSetupPayload; import io.rsocket.Payload; import io.rsocket.RSocket; import io.rsocket.test.util.TestClientTransport; @@ -9,10 +12,60 @@ import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; import reactor.core.publisher.Mono; +import reactor.core.publisher.MonoProcessor; import reactor.test.StepVerifier; public class RSocketConnectorTest { + @Test + public void ensuresThatSetupPayloadCanBeRetained() { + MonoProcessor retainedSetupPayload = MonoProcessor.create(); + TestClientTransport transport = new TestClientTransport(); + + ByteBuf data = transport.alloc().buffer(); + + data.writeCharSequence("data", CharsetUtil.UTF_8); + + RSocketConnector.create() + .setupPayload(ByteBufPayload.create(data)) + .acceptor( + (setup, sendingSocket) -> { + retainedSetupPayload.onNext(setup.retain()); + return Mono.just(new RSocket() {}); + }) + .connect(transport) + .block(); + + Assertions.assertThat(transport.testConnection().getSent()) + .hasSize(1) + .first() + .matches( + bb -> { + DefaultConnectionSetupPayload payload = new DefaultConnectionSetupPayload(bb); + return !payload.hasMetadata() && payload.getDataUtf8().equals("data"); + }) + .matches(buf -> buf.refCnt() == 2) + .matches( + buf -> { + buf.release(); + return buf.refCnt() == 1; + }); + + retainedSetupPayload + .as(StepVerifier::create) + .expectNextMatches( + setup -> { + String dataUtf8 = setup.getDataUtf8(); + return "data".equals(dataUtf8) && setup.release(); + }) + .expectComplete() + .verify(Duration.ofSeconds(5)); + + Assertions.assertThat(retainedSetupPayload.peek().refCnt()).isZero(); + + transport.alloc().assertHasNoLeaks(); + } + @Test public void ensuresThatMonoFromRSocketConnectorCanBeUsedForMultipleSubscriptions() { Payload setupPayload = ByteBufPayload.create("TestData", "TestMetadata"); From 3b7058c474064c8f70b77c61312557036e115e49 Mon Sep 17 00:00:00 2001 From: Oleh Dokuka Date: Mon, 1 Jun 2020 22:31:48 +0300 Subject: [PATCH 008/240] ensures metadatapush is terminated if it's racing with terminal (#848) Signed-off-by: Oleh Dokuka --- .../io/rsocket/core/RSocketRequester.java | 6 +++++ .../io/rsocket/core/RSocketRequesterTest.java | 26 ++++++++++++------- 2 files changed, 23 insertions(+), 9 deletions(-) diff --git a/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java b/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java index d9da1017b..15d04463a 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java +++ b/rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java @@ -581,6 +581,12 @@ private Mono handleMetadataPush(Payload payload) { new IllegalStateException("MetadataPushMono allows only a single subscriber")); } + if (isDisposed()) { + payload.release(); + final Throwable t = terminationError; + return Mono.error(t); + } + ByteBuf metadataPushFrame = MetadataPushFrameCodec.encodeReleasingPayload(allocator, payload); diff --git a/rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterTest.java b/rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterTest.java index 5c06d6602..c60cb8b64 100644 --- a/rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterTest.java +++ b/rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterTest.java @@ -18,11 +18,7 @@ import static io.rsocket.core.PayloadValidationUtils.INVALID_PAYLOAD_ERROR_MESSAGE; import static io.rsocket.frame.FrameHeaderCodec.frameType; -import static io.rsocket.frame.FrameType.CANCEL; -import static io.rsocket.frame.FrameType.REQUEST_CHANNEL; -import static io.rsocket.frame.FrameType.REQUEST_FNF; -import static io.rsocket.frame.FrameType.REQUEST_RESPONSE; -import static io.rsocket.frame.FrameType.REQUEST_STREAM; +import static io.rsocket.frame.FrameType.*; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.hasSize; @@ -74,6 +70,7 @@ import java.util.function.Function; import java.util.stream.Stream; import org.assertj.core.api.Assertions; +import org.assertj.core.api.Assumptions; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Disabled; @@ -947,9 +944,13 @@ private static Stream requestNInteractions() { @MethodSource("streamRacingCases") public void ensuresCorrectOrderOfStreamIdIssuingInCaseOfRacing( BiFunction> interaction1, - BiFunction> interaction2) { + BiFunction> interaction2, + FrameType interactionType1, + FrameType interactionType2) { + Assumptions.assumeThat(interactionType1).isNotEqualTo(METADATA_PUSH); + Assumptions.assumeThat(interactionType2).isNotEqualTo(METADATA_PUSH); for (int i = 1; i < 10000; i += 4) { - Payload payload = DefaultPayload.create("test"); + Payload payload = DefaultPayload.create("test", "test"); Publisher publisher1 = interaction1.apply(rule, payload); Publisher publisher2 = interaction2.apply(rule, payload); RaceTestUtils.race( @@ -1015,6 +1016,13 @@ public static Stream streamRacingCases() { (BiFunction>) (r, p) -> r.socket.fireAndForget(p), REQUEST_CHANNEL, + REQUEST_FNF), + Arguments.of( + (BiFunction>) + (r, p) -> r.socket.metadataPush(p), + (BiFunction>) + (r, p) -> r.socket.fireAndForget(p), + METADATA_PUSH, REQUEST_FNF)); } @@ -1027,8 +1035,8 @@ public void shouldTerminateAllStreamsIfThereRacingBetweenDisposeAndRequests( FrameType interactionType1, FrameType interactionType2) { for (int i = 1; i < 10000; i++) { - Payload payload1 = ByteBufPayload.create("test"); - Payload payload2 = ByteBufPayload.create("test"); + Payload payload1 = ByteBufPayload.create("test", "test"); + Payload payload2 = ByteBufPayload.create("test", "test"); AssertSubscriber assertSubscriber1 = AssertSubscriber.create(); AssertSubscriber assertSubscriber2 = AssertSubscriber.create(); Publisher publisher1 = interaction1.apply(rule, payload1); From 07cf959cdcc4c696f6febf318230b889a40b1088 Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Tue, 2 Jun 2020 12:33:39 +0100 Subject: [PATCH 009/240] RSocketClient and RSocketConnector Javadoc update (#856) Signed-off-by: Rossen Stoyanchev --- .../main/java/io/rsocket/RSocketClient.java | 84 +++++++++++-------- .../io/rsocket/core/RSocketConnector.java | 83 +++++++++--------- 2 files changed, 87 insertions(+), 80 deletions(-) diff --git a/rsocket-core/src/main/java/io/rsocket/RSocketClient.java b/rsocket-core/src/main/java/io/rsocket/RSocketClient.java index 95c3d4096..098cdfe9c 100644 --- a/rsocket-core/src/main/java/io/rsocket/RSocketClient.java +++ b/rsocket-core/src/main/java/io/rsocket/RSocketClient.java @@ -4,69 +4,79 @@ import reactor.core.Disposable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.util.retry.Retry; /** - * A client-side interface to simplify interactions with the {@link - * io.rsocket.core.RSocketConnector}. This interface represents logical communication over {@link - * RSocket}, hiding the complexity of {@code Mono} resolution. Also, in opposite to {@link - * RSocket} , {@link RSocketClient} supports multi-subscription on the same {@link Publisher} from - * the interaction in the way of accepting input as {@link Publisher} like {@link Mono} or {@link - * Flux} Despite, {@link RSocket} interface, {@link RSocketClient} does not coupled with a single - * connection, hence disposal of the {@link #source()} {@link RSocket} will affect the {@link - * RSocketClient} it selves. In such a case, a new request will cause automatic reconnection if - * necessary. + * Contract to perform RSocket requests from client to server, transparently connecting and ensuring + * a single, shared connection to make requests with. + * + *

{@code RSocketClient} contains a {@code Mono} {@link #source() source}. It uses it to + * obtain a live, shared {@link RSocket} connection on the first request and on subsequent requests + * if the connection is lost. This eliminates the need to obtain a connection first, and makes it + * easy to pass a single {@code RSocketClient} to use from multiple places. + * + *

Request methods of {@code RSocketClient} allow multiple subscriptions with each subscription + * performing a new request. Therefore request methods accept {@code Mono} rather than + * {@code Payload} as on {@link RSocket}. By contrast, {@link RSocket} request methods cannot be + * subscribed to more than once. + * + *

Use {@link io.rsocket.core.RSocketConnector RSocketConnector} to create a client: + * + *

{@code
+ * RSocketClient client =
+ *         RSocketConnector.create()
+ *                 .metadataMimeType("message/x.rsocket.composite-metadata.v0")
+ *                 .dataMimeType("application/cbor")
+ *                 .toRSocketClient(TcpClientTransport.create("localhost", 7000));
+ * }
+ * + *

Use the {@link io.rsocket.core.RSocketConnector#reconnect(Retry) RSocketConnector#reconnect} + * method to configure the retry logic to use whenever a shared {@code RSocket} connection needs to + * be obtained: + * + *

{@code
+ * RSocketClient client =
+ *         RSocketConnector.create()
+ *                 .metadataMimeType("message/x.rsocket.composite-metadata.v0")
+ *                 .dataMimeType("application/cbor")
+ *                 .reconnect(Retry.fixedDelay(3, Duration.ofSeconds(1)))
+ *                 .toRSocketClient(TcpClientTransport.create("localhost", 7000));
+ * }
* * @since 1.0.1 */ public interface RSocketClient extends Disposable { - /** - * Provides access to the source {@link RSocket} used by this {@link RSocketClient} - * - * @return returns a {@link Mono} which returns the source {@link RSocket} - */ + /** Return the underlying source used to obtain a shared {@link RSocket} connection. */ Mono source(); /** - * Fire and Forget interaction model of {@link RSocketClient}. - * - * @param payloadMono Request payload as {@link Mono}. - * @return {@code Publisher} that completes when the passed {@code payload} is successfully - * handled, otherwise errors. + * Perform a Fire-and-Forget interaction via {@link RSocket#fireAndForget(Payload)}. Allows + * multiple subscriptions and performs a request per subscriber. */ Mono fireAndForget(Mono payloadMono); /** - * Request-Response interaction model of {@link RSocketClient}. - * - * @param payloadMono Request payload as {@link Mono}. - * @return {@code Publisher} containing at most a single {@code Payload} representing the - * response. + * Perform a Request-Response interaction via {@link RSocket#requestResponse(Payload)}. Allows + * multiple subscriptions and performs a request per subscriber. */ Mono requestResponse(Mono payloadMono); /** - * Request-Stream interaction model of {@link RSocketClient}. - * - * @param payloadMono Request payload as {@link Mono}. - * @return {@code Publisher} containing the stream of {@code Payload}s representing the response. + * Perform a Request-Stream interaction via {@link RSocket#requestStream(Payload)}. Allows + * multiple subscriptions and performs a request per subscriber. */ Flux requestStream(Mono payloadMono); /** - * Request-Channel interaction model of {@link RSocketClient}. - * - * @param payloads Stream of request payloads. - * @return Stream of response payloads. + * Perform a Request-Channel interaction via {@link RSocket#requestChannel(Publisher)}. Allows + * multiple subscriptions and performs a request per subscriber. */ Flux requestChannel(Publisher payloads); /** - * Metadata-Push interaction model of {@link RSocketClient}. - * - * @param payloadMono Request payload as {@link Mono}. - * @return {@code Publisher} that completes when the passed {@code payload} is successfully - * handled, otherwise errors. + * Perform a Metadata Push via {@link RSocket#metadataPush(Payload)}. Allows multiple + * subscriptions and performs a request per subscriber. */ Mono metadataPush(Mono payloadMono); } diff --git a/rsocket-core/src/main/java/io/rsocket/core/RSocketConnector.java b/rsocket-core/src/main/java/io/rsocket/core/RSocketConnector.java index 621e1df1d..d54d267d4 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/RSocketConnector.java +++ b/rsocket-core/src/main/java/io/rsocket/core/RSocketConnector.java @@ -53,23 +53,23 @@ /** * The main class to use to establish a connection to an RSocket server. * - *

To connect over TCP using default settings: + *

For using TCP using default settings: * *

{@code
  * import io.rsocket.transport.netty.client.TcpClientTransport;
  *
- * Mono rocketMono =
- *         RSocketConnector.connectWith(TcpClientTransport.create("localhost", 7000));
+ * RSocketClient client =
+ *         RSocketConnector.createRSocketClient(TcpClientTransport.create("localhost", 7000));
  * }
* *

To customize connection settings before connecting: * *

{@code
- * Mono rocketMono =
+ * RSocketClient client =
  *         RSocketConnector.create()
  *                 .metadataMimeType("message/x.rsocket.composite-metadata.v0")
  *                 .dataMimeType("application/cbor")
- *                 .connect(TcpClientTransport.create("localhost", 7000));
+ *                 .toRSocketClient(TcpClientTransport.create("localhost", 7000));
  * }
*/ public class RSocketConnector { @@ -448,11 +448,42 @@ public RSocketConnector payloadDecoder(PayloadDecoder decoder) { } /** - * The final step to connect with the transport to use as input and the resulting {@code - * Mono} as output. Each subscriber to the returned {@code Mono} starts a new connection - * if neither {@link #reconnect(Retry) reconnect} nor {@link #resume(Resume)} are enabled. + * Create {@link RSocketClient} that will use {@link #connect(ClientTransport)} as its source to + * obtain a live, shared {@code RSocket} when the first request is made, and also on subsequent + * requests after the connection is lost. * - *

The following transports are available (via additional RSocket Java modules): + *

The following transports are available through additional RSocket Java modules: + * + *

    + *
  • {@link io.rsocket.transport.netty.client.TcpClientTransport TcpClientTransport} via + * {@code rsocket-transport-netty}. + *
  • {@link io.rsocket.transport.netty.client.WebsocketClientTransport + * WebsocketClientTransport} via {@code rsocket-transport-netty}. + *
  • {@link io.rsocket.transport.local.LocalClientTransport LocalClientTransport} via {@code + * rsocket-transport-local} + *
+ * + * @param transport the transport of choice to connect with + * @return a {@code RSocketClient} with not established connection. Note, connection will be + * established on the first request + * @since 1.0.1 + */ + public RSocketClient toRSocketClient(ClientTransport transport) { + Mono source = connect0(() -> transport); + + if (retrySpec != null) { + source = source.retryWhen(retrySpec); + } + + return new DefaultRSocketClient(source); + } + + /** + * Connect with the given transport and obtain a live {@link RSocket} to use for making requests. + * Each subscriber to the returned {@code Mono} receives a new connection, if neither {@link + * #reconnect(Retry) reconnect} nor {@link #resume(Resume)} are enabled. + * + *

The following transports are available through additional RSocket Java modules: * *