Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@
import io.reactivesocket.Payload;
import io.reactivesocket.ReactiveSocket;
import io.reactivesocket.client.filter.FailureAwareClient;
import io.reactivesocket.util.PayloadImpl;
import io.reactivex.subscribers.TestSubscriber;
import org.junit.Test;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.publisher.Mono;

import java.nio.ByteBuffer;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -35,23 +35,11 @@

public class FailureReactiveSocketTest {

private final Payload dummyPayload = new Payload() {
@Override
public ByteBuffer getData() {
return null;
}

@Override
public ByteBuffer getMetadata() {
return null;
}
};

@Test
public void testError() throws InterruptedException {
testReactiveSocket((latch, socket) -> {
assertEquals(1.0, socket.availability(), 0.0);
Publisher<Payload> payloadPublisher = socket.requestResponse(dummyPayload);
Publisher<Payload> payloadPublisher = socket.requestResponse(PayloadImpl.EMPTY);

TestSubscriber<Payload> subscriber = new TestSubscriber<>();
payloadPublisher.subscribe(subscriber);
Expand Down Expand Up @@ -79,7 +67,7 @@ public void testError() throws InterruptedException {
public void testWidowReset() throws InterruptedException {
testReactiveSocket((latch, socket) -> {
assertEquals(1.0, socket.availability(), 0.0);
Publisher<Payload> payloadPublisher = socket.requestResponse(dummyPayload);
Publisher<Payload> payloadPublisher = socket.requestResponse(PayloadImpl.EMPTY);

TestSubscriber<Payload> subscriber = new TestSubscriber<>();
payloadPublisher.subscribe(subscriber);
Expand Down Expand Up @@ -110,7 +98,7 @@ private void testReactiveSocket(BiConsumer<CountDownLatch, ReactiveSocket> f) th
AtomicInteger count = new AtomicInteger(0);
TestingReactiveSocket socket = new TestingReactiveSocket(input -> {
if (count.getAndIncrement() < 1) {
return dummyPayload;
return PayloadImpl.EMPTY;
} else {
throw new RuntimeException();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import io.reactivesocket.Payload;
import io.reactivesocket.ReactiveSocket;
import io.reactivesocket.util.PayloadImpl;
import org.junit.Assert;
import org.junit.Test;
import org.reactivestreams.Publisher;
Expand All @@ -27,26 +28,13 @@

import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.function.Function;

public class LoadBalancerTest {

private Payload dummy = new Payload() {
@Override
public ByteBuffer getData() {
return null;
}

@Override
public ByteBuffer getMetadata() {
return null;
}
};

@Test(timeout = 10_000L)
public void testNeverSelectFailingFactories() throws InterruptedException {
InetSocketAddress local0 = InetSocketAddress.createUnresolved("localhost", 7000);
Expand Down Expand Up @@ -105,7 +93,7 @@ private void testBalancer(List<ReactiveSocketClient> factories) throws Interrupt
private void makeAcall(ReactiveSocket balancer) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);

balancer.requestResponse(dummy).subscribe(new Subscriber<Payload>() {
balancer.requestResponse(PayloadImpl.EMPTY).subscribe(new Subscriber<Payload>() {
@Override
public void onSubscribe(Subscription s) {
s.request(1L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@
import io.reactivesocket.ReactiveSocket;
import io.reactivesocket.client.filter.ReactiveSockets;
import io.reactivesocket.exceptions.TimeoutException;
import io.reactivesocket.util.PayloadImpl;
import org.hamcrest.MatcherAssert;
import org.junit.Test;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import java.nio.ByteBuffer;
import java.time.Duration;

import static org.hamcrest.Matchers.instanceOf;
Expand All @@ -36,17 +36,7 @@ public void testTimeoutSocket() {
TestingReactiveSocket socket = new TestingReactiveSocket((subscriber, payload) -> {return false;});
ReactiveSocket timeout = ReactiveSockets.timeout(Duration.ofMillis(50)).apply(socket);

timeout.requestResponse(new Payload() {
@Override
public ByteBuffer getData() {
return null;
}

@Override
public ByteBuffer getMetadata() {
return null;
}
}).subscribe(new Subscriber<Payload>() {
timeout.requestResponse(PayloadImpl.EMPTY).subscribe(new Subscriber<Payload>() {
@Override
public void onSubscribe(Subscription s) {
s.request(1);
Expand Down
4 changes: 2 additions & 2 deletions 4 reactivesocket-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ jmh {
}

dependencies {
compile 'io.projectreactor:reactor-core:3.0.5.RELEASE'
compile 'org.agrona:Agrona:0.5.4'
compile 'io.projectreactor:reactor-core:3.0.6.RELEASE'
compile 'io.netty:netty-buffer:4.1.8.Final'

jmh 'org.openjdk.jmh:jmh-core:1.15'
jmh 'org.openjdk.jmh:jmh-generator-annprocess:1.15'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package io.reactivesocket;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.reactivesocket.client.KeepAliveProvider;
import io.reactivesocket.client.ReactiveSocketClient;
import io.reactivesocket.client.SetupProvider;
Expand All @@ -24,6 +26,7 @@
import io.reactivesocket.perfutil.TestDuplexConnection;
import io.reactivesocket.server.ReactiveSocketServer;
import io.reactivesocket.transport.TransportServer;
import io.reactivesocket.util.PayloadImpl;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
Expand Down Expand Up @@ -88,37 +91,8 @@ public static class Input {
public Blackhole bh;

static final ByteBuffer HELLO = ByteBuffer.wrap("HELLO".getBytes(StandardCharsets.UTF_8));
static final ByteBuffer HELLO_WORLD = ByteBuffer.wrap("HELLO_WORLD".getBytes(StandardCharsets.UTF_8));
static final ByteBuffer EMPTY = ByteBuffer.allocate(0);

static final Payload HELLO_PAYLOAD = new Payload() {

@Override
public ByteBuffer getMetadata() {
return EMPTY;
}

@Override
public ByteBuffer getData() {
HELLO.position(0);
return HELLO;
}
};

static final Payload HELLO_WORLD_PAYLOAD = new Payload() {

@Override
public ByteBuffer getMetadata() {
return EMPTY;
}

@Override
public ByteBuffer getData() {
HELLO_WORLD.position(0);
return HELLO_WORLD;
}
};

static final Payload HELLO_PAYLOAD = new PayloadImpl(HELLO);

static final DirectProcessor<Frame> clientReceive = DirectProcessor.create();
static final DirectProcessor<Frame> serverReceive = DirectProcessor.create();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,13 @@ public TestDuplexConnection(DirectProcessor<Frame> send, DirectProcessor<Frame>
public Mono<Void> send(Publisher<Frame> frame) {
return Flux
.from(frame)
.doOnNext(f -> send.onNext(f))
.doOnError(t -> {throw new RuntimeException(t); })
.doOnNext(f -> {
try {
send.onNext(f);
} finally {
f.release();
}
})
.then();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@

package io.reactivesocket;

import io.netty.buffer.Unpooled;
import io.netty.util.collection.IntObjectHashMap;
import io.reactivesocket.client.KeepAliveProvider;
import io.reactivesocket.exceptions.Exceptions;
import io.reactivesocket.frame.ByteBufferUtil;
import io.reactivesocket.internal.KnownErrorFilter;
import io.reactivesocket.internal.LimitableRequestPublisher;
import io.reactivesocket.lease.Lease;
import io.reactivesocket.lease.LeaseImpl;
import org.agrona.collections.Int2ObjectHashMap;
import io.reactivesocket.util.PayloadImpl;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import reactor.core.Disposable;
Expand All @@ -33,6 +34,7 @@
import reactor.core.publisher.UnicastProcessor;

import java.nio.channels.ClosedChannelException;
import java.nio.charset.StandardCharsets;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
Expand All @@ -50,8 +52,8 @@ public class ClientReactiveSocket implements ReactiveSocket {
private final StreamIdSupplier streamIdSupplier;
private final KeepAliveProvider keepAliveProvider;
private final MonoProcessor<Void> started;
private final Int2ObjectHashMap<LimitableRequestPublisher> senders;
private final Int2ObjectHashMap<Subscriber<? super Frame>> receivers;
private final IntObjectHashMap<LimitableRequestPublisher> senders;
private final IntObjectHashMap<Subscriber<Payload>> receivers;

private Disposable keepAliveSendSub;
private volatile Consumer<Lease> leaseConsumer; // Provided on start()
Expand All @@ -63,8 +65,8 @@ public ClientReactiveSocket(DuplexConnection connection, Consumer<Throwable> err
this.streamIdSupplier = streamIdSupplier;
this.keepAliveProvider = keepAliveProvider;
this.started = MonoProcessor.create();
this.senders = new Int2ObjectHashMap<>(256, 0.9f);
this.receivers = new Int2ObjectHashMap<>(256, 0.9f);
this.senders = new IntObjectHashMap<>(256, 0.9f);
this.receivers = new IntObjectHashMap<>(256, 0.9f);

connection
.onClose()
Expand Down Expand Up @@ -124,7 +126,7 @@ public ClientReactiveSocket start(Consumer<Lease> leaseConsumer) {
this.leaseConsumer = leaseConsumer;

keepAliveSendSub = connection.send(keepAliveProvider.ticks()
.map(i -> Frame.Keepalive.from(Frame.NULL_BYTEBUFFER, true)))
.map(i -> Frame.Keepalive.from(Unpooled.EMPTY_BUFFER, true)))
.subscribe(null, errorConsumer);

connection
Expand Down Expand Up @@ -308,12 +310,16 @@ private synchronized void cleanUpSubscriber(Subscriber<?> subscriber) {
}

private void handleIncomingFrames(Frame frame) {
int streamId = frame.getStreamId();
FrameType type = frame.getType();
if (streamId == 0) {
handleStreamZero(type, frame);
} else {
handleFrame(streamId, type, frame);
try {
int streamId = frame.getStreamId();
FrameType type = frame.getType();
if (streamId == 0) {
handleStreamZero(type, frame);
} else {
handleFrame(streamId, type, frame);
}
} finally {
frame.release();
}
}

Expand All @@ -323,6 +329,7 @@ private void handleStreamZero(FrameType type, Frame frame) {
throw Exceptions.from(frame);
case LEASE: {
if (leaseConsumer != null) {

leaseConsumer.accept(new LeaseImpl(frame));
}
break;
Expand All @@ -342,7 +349,7 @@ private void handleStreamZero(FrameType type, Frame frame) {

@SuppressWarnings("unchecked")
private void handleFrame(int streamId, FrameType type, Frame frame) {
Subscriber<? super Frame> receiver;
Subscriber<Payload> receiver;
synchronized (this) {
receiver = receivers.get(streamId);
}
Expand All @@ -355,7 +362,7 @@ private void handleFrame(int streamId, FrameType type, Frame frame) {
removeReceiver(streamId);
break;
case NEXT_COMPLETE:
receiver.onNext(frame);
receiver.onNext(new PayloadImpl(frame));
receiver.onComplete();
break;
case CANCEL: {
Expand All @@ -370,7 +377,7 @@ private void handleFrame(int streamId, FrameType type, Frame frame) {
break;
}
case NEXT:
receiver.onNext(frame);
receiver.onNext(new PayloadImpl(frame));
break;
case REQUEST_N: {
LimitableRequestPublisher sender;
Expand Down Expand Up @@ -401,7 +408,7 @@ private void handleMissingResponseProcessor(int streamId, FrameType type, Frame
if (type == FrameType.ERROR) {
// message for stream that has never existed, we have a problem with
// the overall connection and must tear down
String errorMessage = ByteBufferUtil.toUtf8String(frame.getData());
String errorMessage = StandardCharsets.UTF_8.decode(frame.getData()).toString();

throw new IllegalStateException("Client received error for non-existent stream: "
+ streamId + " Message: " + errorMessage);
Expand Down
Loading
Morty Proxy This is a proxified and sanitized view of the page, visit original site.