-
Notifications
You must be signed in to change notification settings - Fork 354
Closed
Description
Given the code below, the client hangs with the following output:
18:31:15.813 [main] INFO r.F.C.1 - onSubscribe(FluxConcatArray.ConcatArraySubscriber)
18:31:15.827 [main] INFO r.F.C.1 - request(unbounded)
public static void main(String[] args) {
CloseableChannel serverCloseable = RSocketFactory.receive()
.acceptor((setup, sendingSocket) ->
Mono.just(new AbstractRSocket() {
@Override
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
return Flux.from(payloads).log().next()
.flatMapMany(in -> Flux.range(1, 3).map(i -> {
String data = in.getDataUtf8();
return DefaultPayload.create(data + "-" + i);
}));
}
}))
.transport(TcpServerTransport.create("localhost", 7000))
.start()
.block();
RSocket rsocket = RSocketFactory.connect()
.dataMimeType(MimeTypeUtils.TEXT_PLAIN_VALUE)
.transport(TcpClientTransport.create("localhost", 7000))
.start()
.block();
Flux<Payload> payloadFlux = Flux.just(DefaultPayload.create("foo"), DefaultPayload.create("bar"));
rsocket.requestChannel(payloadFlux).log().blockLast();
rsocket.dispose();
serverCloseable.dispose();
}I think the scenario is invalid because the server cancels the input stream and then continues to produce output, but still the client probably shouldn't hang.
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
No labels