diff --git a/build.gradle b/build.gradle index ee3c2c50b..34acf5e37 100644 --- a/build.gradle +++ b/build.gradle @@ -81,7 +81,7 @@ subprojects { dependencies { compile "io.projectreactor:reactor-core:3.1.1.RELEASE" - compile "io.netty:netty-buffer:4.1.15.Final" + compile "io.netty:netty-buffer:4.1.16.Final" compile "org.reactivestreams:reactive-streams:1.0.1" compile "org.slf4j:slf4j-api:1.7.25" compile "com.google.code.findbugs:jsr305:3.0.2" diff --git a/rsocket-core/src/main/java/io/rsocket/util/ByteBufPayload.java b/rsocket-core/src/main/java/io/rsocket/util/ByteBufPayload.java index b88f4c9a3..a39214dfd 100644 --- a/rsocket-core/src/main/java/io/rsocket/util/ByteBufPayload.java +++ b/rsocket-core/src/main/java/io/rsocket/util/ByteBufPayload.java @@ -21,6 +21,7 @@ import io.netty.buffer.ByteBufUtil; import io.netty.buffer.Unpooled; import io.netty.util.AbstractReferenceCounted; +import io.netty.util.Recycler; import io.rsocket.Payload; import javax.annotation.Nullable; @@ -29,17 +30,19 @@ import java.nio.charset.Charset; public final class ByteBufPayload extends AbstractReferenceCounted implements Payload { - private final ByteBuf data; - private final ByteBuf metadata; + private static final Recycler RECYCLER = + new Recycler() { + protected ByteBufPayload newObject(Handle handle) { + return new ByteBufPayload(handle); + } + }; - private ByteBufPayload(ByteBuf data) { - this.data = data.asReadOnly(); - this.metadata = null; - } + private final Recycler.Handle handle; + private ByteBuf data; + private ByteBuf metadata; - private ByteBufPayload(ByteBuf data, @Nullable ByteBuf metadata) { - this.data = data.asReadOnly(); - this.metadata = metadata == null ? null : metadata.asReadOnly(); + private ByteBufPayload(final Recycler.Handle handle) { + this.handle = handle; } @Override @@ -49,12 +52,12 @@ public boolean hasMetadata() { @Override public ByteBuf sliceMetadata() { - return metadata == null ? Unpooled.EMPTY_BUFFER : metadata.duplicate(); + return metadata == null ? Unpooled.EMPTY_BUFFER : metadata; } @Override public ByteBuf sliceData() { - return data.duplicate(); + return data; } @Override @@ -90,9 +93,12 @@ public ByteBufPayload touch(Object hint) { @Override protected void deallocate() { data.release(); + data = null; if (metadata != null) { metadata.release(); + metadata = null; } + handle.recycle(this); } /** @@ -102,7 +108,7 @@ protected void deallocate() { * @return a payload. */ public static Payload create(String data) { - return new ByteBufPayload(ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, data)); + return create(ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, data), null); } /** @@ -114,48 +120,51 @@ public static Payload create(String data) { * @return a payload. */ public static Payload create(String data, @Nullable String metadata) { - return new ByteBufPayload( + return create( ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, data), metadata == null ? null : ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, metadata) ); } public static Payload create(CharSequence data, Charset dataCharset) { - return new ByteBufPayload(ByteBufUtil.encodeString(ByteBufAllocator.DEFAULT, CharBuffer.wrap(data), dataCharset)); + return create(ByteBufUtil.encodeString(ByteBufAllocator.DEFAULT, CharBuffer.wrap(data), dataCharset), null); } public static Payload create(CharSequence data, Charset dataCharset, @Nullable CharSequence metadata, Charset metadataCharset) { - return new ByteBufPayload( + return create( ByteBufUtil.encodeString(ByteBufAllocator.DEFAULT, CharBuffer.wrap(data), dataCharset), metadata == null ? null : ByteBufUtil.encodeString(ByteBufAllocator.DEFAULT, CharBuffer.wrap(metadata), metadataCharset) ); } public static Payload create(byte[] data) { - return new ByteBufPayload(Unpooled.wrappedBuffer(data)); + return create(Unpooled.wrappedBuffer(data), null); } public static Payload create(byte[] data, @Nullable byte[] metadata) { - return new ByteBufPayload(Unpooled.wrappedBuffer(data), metadata == null ? null : Unpooled.wrappedBuffer(metadata)); + return create(Unpooled.wrappedBuffer(data), metadata == null ? null : Unpooled.wrappedBuffer(metadata)); } public static Payload create(ByteBuffer data) { - return new ByteBufPayload(Unpooled.wrappedBuffer(data)); + return create(Unpooled.wrappedBuffer(data), null); } public static Payload create(ByteBuffer data, @Nullable ByteBuffer metadata) { - return new ByteBufPayload(Unpooled.wrappedBuffer(data), metadata == null ? null : Unpooled.wrappedBuffer(metadata)); + return create(Unpooled.wrappedBuffer(data), metadata == null ? null : Unpooled.wrappedBuffer(metadata)); } public static Payload create(ByteBuf data) { - return new ByteBufPayload(data); + return create(data, null); } public static Payload create(ByteBuf data, @Nullable ByteBuf metadata) { - return new ByteBufPayload(data, metadata); + ByteBufPayload payload = RECYCLER.get(); + payload.data = data; + payload.metadata = metadata; + return payload; } public static Payload create(Payload payload) { - return new ByteBufPayload(payload.sliceData(), payload.hasMetadata() ? payload.sliceMetadata() : null); + return create(payload.sliceData(), payload.hasMetadata() ? payload.sliceMetadata() : null); } } diff --git a/rsocket-core/src/main/java/io/rsocket/util/DefaultPayload.java b/rsocket-core/src/main/java/io/rsocket/util/DefaultPayload.java index df7fd2662..8962bc315 100644 --- a/rsocket-core/src/main/java/io/rsocket/util/DefaultPayload.java +++ b/rsocket-core/src/main/java/io/rsocket/util/DefaultPayload.java @@ -35,14 +35,9 @@ public final class DefaultPayload implements Payload { private final ByteBuffer data; private final ByteBuffer metadata; - private DefaultPayload(ByteBuffer data) { - this.data = data.asReadOnlyBuffer(); - this.metadata = null; - } - private DefaultPayload(ByteBuffer data, @Nullable ByteBuffer metadata) { - this.data = data.asReadOnlyBuffer(); - this.metadata = metadata == null ? null : metadata.asReadOnlyBuffer(); + this.data = data; + this.metadata = metadata; } @Override @@ -62,12 +57,12 @@ public ByteBuf sliceData() { @Override public ByteBuffer getMetadata() { - return metadata == null ? DefaultPayload.EMPTY_BUFFER : metadata.duplicate(); + return metadata == null ? DefaultPayload.EMPTY_BUFFER : metadata; } @Override public ByteBuffer getData() { - return data.duplicate(); + return data; } @Override @@ -112,7 +107,7 @@ public boolean release(int decrement) { * @return a payload. */ public static Payload create(CharSequence data) { - return new DefaultPayload(StandardCharsets.UTF_8.encode(CharBuffer.wrap(data))); + return create(StandardCharsets.UTF_8.encode(CharBuffer.wrap(data)), null); } /** @@ -124,33 +119,33 @@ public static Payload create(CharSequence data) { * @return a payload. */ public static Payload create(CharSequence data, @Nullable CharSequence metadata) { - return new DefaultPayload( + return create( StandardCharsets.UTF_8.encode(CharBuffer.wrap(data)), metadata == null ? null : StandardCharsets.UTF_8.encode(CharBuffer.wrap(metadata)) ); } public static Payload create(CharSequence data, Charset dataCharset) { - return new DefaultPayload(dataCharset.encode(CharBuffer.wrap(data))); + return create(dataCharset.encode(CharBuffer.wrap(data)), null); } public static Payload create(CharSequence data, Charset dataCharset, @Nullable CharSequence metadata, Charset metadataCharset) { - return new DefaultPayload( + return create( dataCharset.encode(CharBuffer.wrap(data)), metadata == null ? null : metadataCharset.encode(CharBuffer.wrap(metadata)) ); } public static Payload create(byte[] data) { - return new DefaultPayload(ByteBuffer.wrap(data)); + return create(ByteBuffer.wrap(data), null); } public static Payload create(byte[] data, @Nullable byte[] metadata) { - return new DefaultPayload(ByteBuffer.wrap(data), metadata == null ? null : ByteBuffer.wrap(metadata)); + return create(ByteBuffer.wrap(data), metadata == null ? null : ByteBuffer.wrap(metadata)); } public static Payload create(ByteBuffer data) { - return new DefaultPayload(data); + return create(data, null); } public static Payload create(ByteBuffer data, @Nullable ByteBuffer metadata) { @@ -158,6 +153,6 @@ public static Payload create(ByteBuffer data, @Nullable ByteBuffer metadata) { } public static Payload create(Payload payload) { - return new DefaultPayload(payload.getData(), payload.hasMetadata() ? payload.getMetadata() : null); + return create(payload.getData(), payload.hasMetadata() ? payload.getMetadata() : null); } } diff --git a/rsocket-core/src/test/java/io/rsocket/util/DefaultPayloadTest.java b/rsocket-core/src/test/java/io/rsocket/util/DefaultPayloadTest.java index d97682850..aa30c2a62 100644 --- a/rsocket-core/src/test/java/io/rsocket/util/DefaultPayloadTest.java +++ b/rsocket-core/src/test/java/io/rsocket/util/DefaultPayloadTest.java @@ -13,7 +13,6 @@ package io.rsocket.util; -import static io.rsocket.util.DefaultPayload.create; import static org.hamcrest.MatcherAssert.*; import static org.hamcrest.Matchers.*; @@ -32,14 +31,6 @@ public void testReuse() { assertDataAndMetadata(p, DATA_VAL, METADATA_VAL); } - @Test - public void testReuseWithExternalMark() { - Payload p = DefaultPayload.create(DATA_VAL, METADATA_VAL); - assertDataAndMetadata(p, DATA_VAL, METADATA_VAL); - p.getData().position(2).mark(); - assertDataAndMetadata(p, DATA_VAL, METADATA_VAL); - } - public void assertDataAndMetadata(Payload p, String dataVal, @Nullable String metadataVal) { assertThat("Unexpected data.", p.getDataUtf8(), equalTo(dataVal)); if (metadataVal == null) { diff --git a/rsocket-test/build.gradle b/rsocket-test/build.gradle index 10705bd19..8c0243533 100644 --- a/rsocket-test/build.gradle +++ b/rsocket-test/build.gradle @@ -20,5 +20,5 @@ dependencies { compile "org.mockito:mockito-core:2.10.0" compile "org.hamcrest:hamcrest-library:1.3" compile "org.hdrhistogram:HdrHistogram:2.1.9" - compile "io.projectreactor:reactor-test:3.1.0.RELEASE" + compile "io.projectreactor:reactor-test:3.1.1.RELEASE" }