Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit fe3b390

Browse filesBrowse files
committed
Merge branch 'release/0.12.2-RC3'
2 parents 119d578 + 9a4668f commit fe3b390
Copy full SHA for fe3b390

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.
Dismiss banner
Expand file treeCollapse file tree

41 files changed

+1204
-1506
lines changed
Open diff view settings
Collapse file

‎.travis.yml‎

Copy file name to clipboardExpand all lines: .travis.yml
+7-4Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,13 @@
1616
---
1717
language: java
1818

19-
jdk:
20-
- oraclejdk8
21-
# - oraclejdk9
19+
matrix:
20+
include:
21+
- jdk: oraclejdk8
22+
- jdk: openjdk11
23+
env: SKIP_RELEASE=true
24+
- jdk: openjdk12
25+
env: SKIP_RELEASE=true
2226

2327
env:
2428
global:
@@ -37,4 +41,3 @@ cache:
3741
directories:
3842
- $HOME/.gradle/caches/
3943
- $HOME/.gradle/wrapper/
40-
Collapse file

‎README.md‎

Copy file name to clipboardExpand all lines: README.md
+6-6Lines changed: 6 additions & 6 deletions
  • Display the source diff
  • Display the rich diff
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,10 @@ Example:
2323

2424
```groovy
2525
dependencies {
26-
implementation 'io.rsocket:rsocket-core:0.11.14'
27-
implementation 'io.rsocket:rsocket-transport-netty:0.11.14'
28-
// implementation 'io.rsocket:rsocket-core:0.11.15.BUILD-SNAPSHOT'
29-
// implementation 'io.rsocket:rsocket-transport-netty:0.11.15.BUILD-SNAPSHOT'
26+
implementation 'io.rsocket:rsocket-core:0.12.2-RC2'
27+
implementation 'io.rsocket:rsocket-transport-netty:0.12.2-RC2'
28+
// implementation 'io.rsocket:rsocket-core:0.12.2-RC3-SNAPSHOT'
29+
// implementation 'io.rsocket:rsocket-transport-netty:0.12.2-RC3-SNAPSHOT'
3030
}
3131
```
3232

@@ -91,7 +91,7 @@ or you will get a memory leak. Used correctly this will reduce latency and incre
9191
```java
9292
RSocketFactory.receive()
9393
// Enable Zero Copy
94-
.payloadDecoder(Frame::retain)
94+
.frameDecoder(PayloadDecoder.ZERO_COPY)
9595
.acceptor(new PingHandler())
9696
.transport(TcpServerTransport.create(7878))
9797
.start()
@@ -105,7 +105,7 @@ RSocketFactory.receive()
105105
Mono<RSocket> client =
106106
RSocketFactory.connect()
107107
// Enable Zero Copy
108-
.payloadDecoder(Frame::retain)
108+
.frameDecoder(PayloadDecoder.ZERO_COPY)
109109
.transport(TcpClientTransport.create(7878))
110110
.start();
111111
```
Collapse file

‎artifactory.gradle‎

Copy file name to clipboardExpand all lines: artifactory.gradle
+3Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,9 @@ if (project.hasProperty('bintrayUser') && project.hasProperty('bintrayKey')) {
3535
}
3636
}
3737
}
38+
tasks.named("artifactoryPublish").configure {
39+
onlyIf { System.getenv('SKIP_RELEASE') != "true" }
40+
}
3841
}
3942
}
4043
}
Collapse file

‎bintray.gradle‎

Copy file name to clipboardExpand all lines: bintray.gradle
+3Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,9 @@ if (project.hasProperty('bintrayUser') && project.hasProperty('bintrayKey') &&
5555
}
5656
}
5757
}
58+
tasks.named("bintrayUpload").configure {
59+
onlyIf { System.getenv('SKIP_RELEASE') != "true" }
60+
}
5861
}
5962
}
6063
}
Collapse file

‎build.gradle‎

Copy file name to clipboardExpand all lines: build.gradle
+7-3Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,11 @@ subprojects {
2929
apply plugin: 'io.spring.dependency-management'
3030
apply plugin: 'com.github.sherter.google-java-format'
3131

32-
ext['reactor-bom.version'] = 'Californium-SR5'
32+
ext['reactor-bom.version'] = 'Californium-SR8'
3333
ext['logback.version'] = '1.2.3'
3434
ext['findbugs.version'] = '3.0.2'
35-
ext['netty.version'] = '4.1.31.Final'
36-
ext['netty-boringssl.version'] = '2.0.18.Final'
35+
ext['netty.version'] = '4.1.36.Final'
36+
ext['netty-boringssl.version'] = '2.0.25.Final'
3737
ext['hdrhistogram.version'] = '2.1.10'
3838
ext['mockito.version'] = '2.25.1'
3939
ext['slf4j.version'] = '1.7.25'
@@ -116,6 +116,10 @@ subprojects {
116116

117117
systemProperty "io.netty.leakDetection.level", "ADVANCED"
118118
}
119+
120+
tasks.named("javadoc").configure {
121+
onlyIf { System.getenv('SKIP_RELEASE') != "true" }
122+
}
119123
}
120124

121125
plugins.withType(JavaLibraryPlugin) {
Collapse file

‎gradle.properties‎

Copy file name to clipboardExpand all lines: gradle.properties
+1-1Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,4 @@
1111
# See the License for the specific language governing permissions and
1212
# limitations under the License.
1313
#
14-
version=0.12.2-RC2
14+
version=0.12.2-RC3
Collapse file

‎rsocket-core/src/main/java/io/rsocket/RSocketClient.java‎

Copy file name to clipboardExpand all lines: rsocket-core/src/main/java/io/rsocket/RSocketClient.java
+47-18Lines changed: 47 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,23 @@
1616

1717
package io.rsocket;
1818

19+
import static io.rsocket.keepalive.KeepAliveSupport.ClientKeepAliveSupport;
20+
import static io.rsocket.keepalive.KeepAliveSupport.KeepAlive;
21+
1922
import io.netty.buffer.ByteBuf;
2023
import io.netty.buffer.ByteBufAllocator;
2124
import io.netty.util.ReferenceCountUtil;
2225
import io.netty.util.collection.IntObjectHashMap;
26+
import io.rsocket.exceptions.ConnectionErrorException;
2327
import io.rsocket.exceptions.Exceptions;
2428
import io.rsocket.frame.*;
2529
import io.rsocket.frame.decoder.PayloadDecoder;
2630
import io.rsocket.internal.LimitableRequestPublisher;
2731
import io.rsocket.internal.UnboundedProcessor;
2832
import io.rsocket.internal.UnicastMonoProcessor;
33+
import io.rsocket.keepalive.KeepAliveFramesAcceptor;
34+
import io.rsocket.keepalive.KeepAliveHandler;
35+
import io.rsocket.keepalive.KeepAliveSupport;
2936
import java.nio.channels.ClosedChannelException;
3037
import java.util.Collections;
3138
import java.util.Map;
@@ -36,11 +43,7 @@
3643
import org.reactivestreams.Processor;
3744
import org.reactivestreams.Publisher;
3845
import org.reactivestreams.Subscriber;
39-
import reactor.core.publisher.BaseSubscriber;
40-
import reactor.core.publisher.Flux;
41-
import reactor.core.publisher.Mono;
42-
import reactor.core.publisher.SignalType;
43-
import reactor.core.publisher.UnicastProcessor;
46+
import reactor.core.publisher.*;
4447

4548
/** Client Side of a RSocket socket. Sends {@link ByteBuf}s to a {@link RSocketServer} */
4649
class RSocketClient implements RSocket {
@@ -54,14 +57,18 @@ class RSocketClient implements RSocket {
5457
private final UnboundedProcessor<ByteBuf> sendProcessor;
5558
private final Lifecycle lifecycle = new Lifecycle();
5659
private final ByteBufAllocator allocator;
60+
private final KeepAliveFramesAcceptor keepAliveFramesAcceptor;
5761

5862
/*client requester*/
5963
RSocketClient(
6064
ByteBufAllocator allocator,
6165
DuplexConnection connection,
6266
PayloadDecoder payloadDecoder,
6367
Consumer<Throwable> errorConsumer,
64-
StreamIdSupplier streamIdSupplier) {
68+
StreamIdSupplier streamIdSupplier,
69+
int keepAliveTickPeriod,
70+
int keepAliveAckTimeout,
71+
KeepAliveHandler keepAliveHandler) {
6572
this.allocator = allocator;
6673
this.connection = connection;
6774
this.payloadDecoder = payloadDecoder;
@@ -74,19 +81,40 @@ class RSocketClient implements RSocket {
7481
this.sendProcessor = new UnboundedProcessor<>();
7582

7683
connection.onClose().doFinally(signalType -> terminate()).subscribe(null, errorConsumer);
77-
78-
sendProcessor
79-
.doOnRequest(
80-
r -> {
81-
for (LimitableRequestPublisher lrp : senders.values()) {
82-
lrp.increaseInternalLimit(r);
83-
}
84-
})
85-
.transform(connection::send)
84+
connection
85+
.send(sendProcessor)
8686
.doFinally(this::handleSendProcessorCancel)
8787
.subscribe(null, this::handleSendProcessorError);
8888

8989
connection.receive().subscribe(this::handleIncomingFrames, errorConsumer);
90+
91+
if (keepAliveTickPeriod != 0 && keepAliveHandler != null) {
92+
KeepAliveSupport keepAliveSupport =
93+
new ClientKeepAliveSupport(allocator, keepAliveTickPeriod, keepAliveAckTimeout);
94+
this.keepAliveFramesAcceptor =
95+
keepAliveHandler.start(keepAliveSupport, sendProcessor::onNext, this::terminate);
96+
} else {
97+
keepAliveFramesAcceptor = null;
98+
}
99+
}
100+
101+
/*server requester*/
102+
RSocketClient(
103+
ByteBufAllocator allocator,
104+
DuplexConnection connection,
105+
PayloadDecoder payloadDecoder,
106+
Consumer<Throwable> errorConsumer,
107+
StreamIdSupplier streamIdSupplier) {
108+
this(allocator, connection, payloadDecoder, errorConsumer, streamIdSupplier, 0, 0, null);
109+
}
110+
111+
private void terminate(KeepAlive keepAlive) {
112+
String message =
113+
String.format("No keep-alive acks for %d ms", keepAlive.getTimeout().toMillis());
114+
ConnectionErrorException err = new ConnectionErrorException(message);
115+
lifecycle.setTerminationError(err);
116+
errorConsumer.accept(err);
117+
connection.dispose();
90118
}
91119

92120
private void handleSendProcessorError(Throwable t) {
@@ -294,7 +322,7 @@ public void accept(long n) {
294322
.transform(
295323
f -> {
296324
LimitableRequestPublisher<Payload> wrapped =
297-
LimitableRequestPublisher.wrap(f, sendProcessor.available());
325+
LimitableRequestPublisher.wrap(f);
298326
// Need to set this to one for first the frame
299327
wrapped.request(1);
300328
senders.put(streamId, wrapped);
@@ -452,8 +480,9 @@ private void handleStreamZero(FrameType type, ByteBuf frame) {
452480
case LEASE:
453481
break;
454482
case KEEPALIVE:
455-
// KeepAlive is handled by corresponding connection interceptor,
456-
// just release its frame here
483+
if (keepAliveFramesAcceptor != null) {
484+
keepAliveFramesAcceptor.receive(frame);
485+
}
457486
break;
458487
default:
459488
// Ignore unknown frames. Throwing an error will close the socket.

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.