-
Notifications
You must be signed in to change notification settings - Fork 355
Closed
Description
I have been playing around with the latest code from the 0.5.x branch. I started with the following code from the examples subproject:
Flowable.fromPublisher(c.underlyingSocket.requestResponse(new PayloadImpl("Hello")))
.doOnNext(toConsumer(x => println(s"### client ### ${ByteBufferUtil.toUtf8String(x.getData)}")))
.blockingFirst()which prints out the response as expected. However when trying to connect the Publisher that is returned from the requestResponse to Akka Streams with the following code:
Source.fromPublisher(c.underlyingSocket.requestResponse(new PayloadImpl("Hello")))
.map { x => println(s"### client ### ${ByteBufferUtil.toUtf8String(x.getData)}"); x }
.runWith(Sink.head)I get an exception:
io.netty.util.IllegalReferenceCountException: refCnt: 0
at io.netty.buffer.AbstractByteBuf.ensureAccessible(AbstractByteBuf.java:1407)
at io.netty.buffer.AbstractByteBuf.checkIndex(AbstractByteBuf.java:1353)
at io.netty.buffer.AbstractByteBuf.getInt(AbstractByteBuf.java:415)
at io.netty.buffer.PooledSlicedByteBuf.getInt(PooledSlicedByteBuf.java:188)
at io.reactivesocket.transport.tcp.MutableDirectByteBuf.getInt(MutableDirectByteBuf.java:284)
at io.reactivesocket.frame.FrameHeaderFlyweight.frameLength(FrameHeaderFlyweight.java:229)
at io.reactivesocket.frame.FrameHeaderFlyweight.dataLength(FrameHeaderFlyweight.java:251)
at io.reactivesocket.frame.FrameHeaderFlyweight.sliceFrameData(FrameHeaderFlyweight.java:202)
at io.reactivesocket.Frame.getData(Frame.java:98)
at akka.stream.alpakka.reactivesocket.scaladsl.HelloWorld$$anonfun$start$1$$anonfun$2.apply(HelloWorld.scala:113)
at akka.stream.alpakka.reactivesocket.scaladsl.HelloWorld$$anonfun$start$1$$anonfun$2.apply(HelloWorld.scala:113)
at akka.stream.impl.fusing.Map$$anon$8.onPush(Ops.scala:43)
at akka.stream.impl.fusing.GraphInterpreter.processPush(GraphInterpreter.scala:747)
at akka.stream.impl.fusing.GraphInterpreter.processEvent(GraphInterpreter.scala:710)
at akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:616)
at akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:471)
at akka.stream.impl.fusing.GraphInterpreterShell.receive(ActorGraphInterpreter.scala:410)
at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:603)
at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:618)
at akka.actor.Actor$class.aroundReceive(Actor.scala:484)
at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:529)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
I there a race condition between something a Publisher from reactivesocket expects and my call to getData?
P.S. currently I am publishing locally latest code of the 0.5.x branch. Would it be possible to get a milestone release, so it is easier to share and run these experiments?
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
No labels