Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit d476291

Browse filesBrowse files
mostroverkhovrobertroeser
authored andcommitted
Example of resumable file transfer (rsocket#631)
* resumable file transfer example Signed-off-by: Maksym Ostroverkhov <m.ostroverkhov@gmail.com> * formatter Signed-off-by: Maksym Ostroverkhov <m.ostroverkhov@gmail.com> * remove code Signed-off-by: Maksym Ostroverkhov <m.ostroverkhov@gmail.com>
1 parent b89aa74 commit d476291
Copy full SHA for d476291

File tree

Expand file treeCollapse file tree

5 files changed

+316
-1
lines changed
Open diff view settings
Filter options
Expand file treeCollapse file tree

5 files changed

+316
-1
lines changed
Open diff view settings
Collapse file

‎rsocket-core/src/main/java/io/rsocket/resume/ClientResume.java‎

Copy file name to clipboardExpand all lines: rsocket-core/src/main/java/io/rsocket/resume/ClientResume.java
+1-1Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
import io.netty.buffer.ByteBuf;
2020
import java.time.Duration;
2121

22-
class ClientResume {
22+
public class ClientResume {
2323
private final Duration sessionDuration;
2424
private final ByteBuf resumeToken;
2525

Collapse file
+134Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
package io.rsocket.examples.transport.tcp.resume;
2+
3+
import io.netty.buffer.ByteBuf;
4+
import io.netty.buffer.Unpooled;
5+
import io.rsocket.Payload;
6+
import java.io.*;
7+
import org.reactivestreams.Subscriber;
8+
import org.reactivestreams.Subscription;
9+
import reactor.core.publisher.Flux;
10+
import reactor.core.publisher.SynchronousSink;
11+
12+
class Files {
13+
14+
public static Flux<ByteBuf> fileSource(String fileName, int chunkSizeBytes) {
15+
return Flux.generate(
16+
() -> new FileState(fileName, chunkSizeBytes), FileState::consumeNext, FileState::dispose);
17+
}
18+
19+
public static Subscriber<Payload> fileSink(String fileName, int windowSize) {
20+
return new Subscriber<Payload>() {
21+
Subscription s;
22+
int requests = windowSize;
23+
OutputStream outputStream;
24+
int receivedBytes;
25+
int receivedCount;
26+
27+
@Override
28+
public void onSubscribe(Subscription s) {
29+
this.s = s;
30+
this.s.request(requests);
31+
}
32+
33+
@Override
34+
public void onNext(Payload payload) {
35+
ByteBuf data = payload.data();
36+
receivedBytes += data.readableBytes();
37+
receivedCount += 1;
38+
System.out.println(
39+
"Received file chunk: " + receivedCount + ". Total size: " + receivedBytes);
40+
if (outputStream == null) {
41+
outputStream = open(fileName);
42+
}
43+
write(outputStream, data);
44+
payload.release();
45+
46+
requests--;
47+
if (requests == windowSize / 2) {
48+
requests += windowSize;
49+
s.request(windowSize);
50+
}
51+
}
52+
53+
private void write(OutputStream outputStream, ByteBuf byteBuf) {
54+
try {
55+
byteBuf.readBytes(outputStream, byteBuf.readableBytes());
56+
} catch (IOException e) {
57+
throw new RuntimeException(e);
58+
}
59+
}
60+
61+
@Override
62+
public void onError(Throwable t) {
63+
close(outputStream);
64+
}
65+
66+
@Override
67+
public void onComplete() {
68+
close(outputStream);
69+
}
70+
71+
private OutputStream open(String filename) {
72+
try {
73+
/*do not buffer for demo purposes*/
74+
return new FileOutputStream(filename);
75+
} catch (FileNotFoundException e) {
76+
throw new RuntimeException(e);
77+
}
78+
}
79+
80+
private void close(OutputStream stream) {
81+
if (stream != null) {
82+
try {
83+
stream.close();
84+
} catch (IOException e) {
85+
}
86+
}
87+
}
88+
};
89+
}
90+
91+
private static class FileState {
92+
private final String fileName;
93+
private final int chunkSizeBytes;
94+
private BufferedInputStream inputStream;
95+
private byte[] chunkBytes;
96+
97+
public FileState(String fileName, int chunkSizeBytes) {
98+
this.fileName = fileName;
99+
this.chunkSizeBytes = chunkSizeBytes;
100+
}
101+
102+
public FileState consumeNext(SynchronousSink<ByteBuf> sink) {
103+
if (inputStream == null) {
104+
InputStream in = getClass().getClassLoader().getResourceAsStream(fileName);
105+
if (in == null) {
106+
sink.error(new FileNotFoundException(fileName));
107+
return this;
108+
}
109+
this.inputStream = new BufferedInputStream(in);
110+
this.chunkBytes = new byte[chunkSizeBytes];
111+
}
112+
try {
113+
int consumedBytes = inputStream.read(chunkBytes);
114+
if (consumedBytes == -1) {
115+
sink.complete();
116+
} else {
117+
sink.next(Unpooled.copiedBuffer(chunkBytes, 0, consumedBytes));
118+
}
119+
} catch (IOException e) {
120+
sink.error(e);
121+
}
122+
return this;
123+
}
124+
125+
public void dispose() {
126+
if (inputStream != null) {
127+
try {
128+
inputStream.close();
129+
} catch (IOException e) {
130+
}
131+
}
132+
}
133+
}
134+
}
Collapse file
+120Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
package io.rsocket.examples.transport.tcp.resume;
2+
3+
import io.rsocket.AbstractRSocket;
4+
import io.rsocket.Payload;
5+
import io.rsocket.RSocket;
6+
import io.rsocket.RSocketFactory;
7+
import io.rsocket.resume.ClientResume;
8+
import io.rsocket.resume.PeriodicResumeStrategy;
9+
import io.rsocket.resume.ResumeStrategy;
10+
import io.rsocket.transport.netty.client.TcpClientTransport;
11+
import io.rsocket.transport.netty.server.CloseableChannel;
12+
import io.rsocket.transport.netty.server.TcpServerTransport;
13+
import io.rsocket.util.DefaultPayload;
14+
import java.time.Duration;
15+
import org.reactivestreams.Publisher;
16+
import reactor.core.publisher.Flux;
17+
import reactor.core.publisher.Mono;
18+
19+
public class ResumeFileTransfer {
20+
21+
public static void main(String[] args) {
22+
RequestCodec requestCodec = new RequestCodec();
23+
24+
CloseableChannel server =
25+
RSocketFactory.receive()
26+
.resume()
27+
.resumeSessionDuration(Duration.ofMinutes(5))
28+
.acceptor((setup, rSocket) -> Mono.just(new FileServer(requestCodec)))
29+
.transport(TcpServerTransport.create("localhost", 8000))
30+
.start()
31+
.block();
32+
33+
RSocket client =
34+
RSocketFactory.connect()
35+
.resume()
36+
.resumeStrategy(
37+
() -> new VerboseResumeStrategy(new PeriodicResumeStrategy(Duration.ofSeconds(1))))
38+
.resumeSessionDuration(Duration.ofMinutes(5))
39+
.transport(TcpClientTransport.create("localhost", 8001))
40+
.start()
41+
.block();
42+
43+
client
44+
.requestStream(requestCodec.encode(new Request(16, "lorem.txt")))
45+
.doFinally(s -> server.dispose())
46+
.subscribe(Files.fileSink("rsocket-examples/out/lorem_output.txt", 256));
47+
48+
server.onClose().block();
49+
}
50+
51+
private static class FileServer extends AbstractRSocket {
52+
private final RequestCodec requestCodec;
53+
54+
public FileServer(RequestCodec requestCodec) {
55+
this.requestCodec = requestCodec;
56+
}
57+
58+
@Override
59+
public Flux<Payload> requestStream(Payload payload) {
60+
Request request = requestCodec.decode(payload);
61+
payload.release();
62+
String fileName = request.getFileName();
63+
int chunkSize = request.getChunkSize();
64+
65+
Flux<Long> ticks = Flux.interval(Duration.ofMillis(500)).onBackpressureDrop();
66+
67+
return Files.fileSource(fileName, chunkSize)
68+
.map(DefaultPayload::create)
69+
.zipWith(ticks, (p, tick) -> p);
70+
}
71+
}
72+
73+
private static class VerboseResumeStrategy implements ResumeStrategy {
74+
private final ResumeStrategy resumeStrategy;
75+
76+
public VerboseResumeStrategy(ResumeStrategy resumeStrategy) {
77+
this.resumeStrategy = resumeStrategy;
78+
}
79+
80+
@Override
81+
public Publisher<?> apply(ClientResume clientResume, Throwable throwable) {
82+
return Flux.from(resumeStrategy.apply(clientResume, throwable))
83+
.doOnNext(v -> System.out.println("Disconnected. Trying to resume connection..."));
84+
}
85+
}
86+
87+
private static class RequestCodec {
88+
89+
public Payload encode(Request request) {
90+
String encoded = request.getChunkSize() + ":" + request.getFileName();
91+
return DefaultPayload.create(encoded);
92+
}
93+
94+
public Request decode(Payload payload) {
95+
String encoded = payload.getDataUtf8();
96+
String[] chunkSizeAndFileName = encoded.split(":");
97+
int chunkSize = Integer.parseInt(chunkSizeAndFileName[0]);
98+
String fileName = chunkSizeAndFileName[1];
99+
return new Request(chunkSize, fileName);
100+
}
101+
}
102+
103+
private static class Request {
104+
private final int chunkSize;
105+
private final String fileName;
106+
107+
public Request(int chunkSize, String fileName) {
108+
this.chunkSize = chunkSize;
109+
this.fileName = fileName;
110+
}
111+
112+
public int getChunkSize() {
113+
return chunkSize;
114+
}
115+
116+
public String getFileName() {
117+
return fileName;
118+
}
119+
}
120+
}
Collapse file
+29Lines changed: 29 additions & 0 deletions
  • Display the source diff
  • Display the rich diff
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
1. Start socat. It is used for emulation of transport disconnects
2+
3+
`socat -d TCP-LISTEN:8001,fork,reuseaddr TCP:localhost:8000`
4+
5+
2. start `ResumeFileTransfer.main`
6+
7+
3. terminate/start socat periodically for session resumption
8+
9+
`ResumeFileTransfer` output is as follows
10+
11+
```
12+
Received file chunk: 7. Total size: 112
13+
Received file chunk: 8. Total size: 128
14+
Received file chunk: 9. Total size: 144
15+
Received file chunk: 10. Total size: 160
16+
Disconnected. Trying to resume connection...
17+
Disconnected. Trying to resume connection...
18+
Disconnected. Trying to resume connection...
19+
Disconnected. Trying to resume connection...
20+
Disconnected. Trying to resume connection...
21+
Received file chunk: 11. Total size: 176
22+
Received file chunk: 12. Total size: 192
23+
Received file chunk: 13. Total size: 208
24+
Received file chunk: 14. Total size: 224
25+
Received file chunk: 15. Total size: 240
26+
Received file chunk: 16. Total size: 256
27+
```
28+
29+
It transfers file from `resources/lorem.txt` to `build/out/lorem_output.txt` in chunks of 16 bytes every 500 millis
Collapse file
+32Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
Alteration literature to or an sympathize mr imprudence. Of is ferrars subject as enjoyed or tedious cottage.
2+
Procuring as in resembled by in agreeable. Next long no gave mr eyes. Admiration advantages no he celebrated so pianoforte unreserved.
3+
Not its herself forming charmed amiable. Him why feebly expect future now.
4+
5+
Situation admitting promotion at or to perceived be. Mr acuteness we as estimable enjoyment up.
6+
An held late as felt know. Learn do allow solid to grave. Middleton suspicion age her attention.
7+
Chiefly several bed its wishing. Is so moments on chamber pressed to. Doubtful yet way properly answered humanity its desirous.
8+
Minuter believe service arrived civilly add all. Acuteness allowance an at eagerness favourite in extensive exquisite ye.
9+
10+
Unpleasant nor diminution excellence apartments imprudence the met new. Draw part them he an to he roof only.
11+
Music leave say doors him. Tore bred form if sigh case as do. Staying he no looking if do opinion.
12+
Sentiments way understood end partiality and his.
13+
14+
Ladyship it daughter securing procured or am moreover mr. Put sir she exercise vicinity cheerful wondered.
15+
Continual say suspicion provision you neglected sir curiosity unwilling. Simplicity end themselves increasing led day sympathize yet.
16+
General windows effects not are drawing man garrets. Common indeed garden you his ladies out yet. Preference imprudence contrasted to remarkably in on.
17+
Taken now you him trees tears any. Her object giving end sister except oppose.
18+
19+
No comfort do written conduct at prevent manners on. Celebrated contrasted discretion him sympathize her collecting occasional.
20+
Do answered bachelor occasion in of offended no concerns. Supply worthy warmth branch of no ye. Voice tried known to as my to.
21+
Though wished merits or be. Alone visit use these smart rooms ham. No waiting in on enjoyed placing it inquiry.
22+
23+
So insisted received is occasion advanced honoured. Among ready to which up. Attacks smiling and may out assured moments man nothing outward.
24+
Thrown any behind afford either the set depend one temper. Instrument melancholy in acceptance collecting frequently be if.
25+
Zealously now pronounce existence add you instantly say offending. Merry their far had widen was. Concerns no in expenses raillery formerly.
26+
27+
As am hastily invited settled at limited civilly fortune me. Really spring in extent an by. Judge but built gay party world.
28+
Of so am he remember although required. Bachelor unpacked be advanced at. Confined in declared marianne is vicinity.
29+
30+
In alteration insipidity impression by travelling reasonable up motionless. Of regard warmth by unable sudden garden ladies.
31+
No kept hung am size spot no. Likewise led and dissuade rejoiced welcomed husbands boy. Do listening on he suspected resembled.
32+
Water would still if to. Position boy required law moderate was may.

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.