Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings
Discussion options

Hello!

I have a question about the internal TCP connection. As I see, there are two ways to handle an MQTT packet: asynchronous (default) and sequential (using handleMessage function).

I'm using this client library with the EMQX broker.

In the first case, when I don't override the handleMessage function and subscribe to incoming messages, everything works fine.
However, this doesn't work for me as I need to handle messages one at a time.

So, I overridden the handleMessage function and put all the logic there. But now, when I generate more than 1k req/s, I get a TCP socket overload warning from the broker and after a while, I get a TCP timeout error, and my client just closes.

As far as I know this repository, there is no difference between the TCP connection implementation in these two cases (in both cases, one connection is kept until there are no parsed mqtt packets left in the internal queue). Sample from connect() function in src/lib/client.ts

const work = () => {
  this.log('work :: getting next packet in queue');
  const packet = packets.shift();

  if (packet) {
    this.log('work :: packet pulled from queue');
    handlePacket(this, packet, nextTickWork);
  } else {
    this.log('work :: no packets in queue');
    const done = completeParse;
    completeParse = null;
    this.log('work :: done flag is %s', !!done);
    if (done) done();
  }
};

const nextTickWork = () => {
  if (packets.length) {
    nextTick(work);
  } else {
    const done = completeParse;
    completeParse = null;
    done();
  }
};

writable._write = (buf, enc, done) => {
  completeParse = done;
  this.log('writable stream :: parsing buffer');
  parser.parse(buf);
  work();
};

Is this the correct understanding, or am I missing something? Thank you in advance for your help.

You must be logged in to vote

I have a question about the internal TCP connection. As I see, there are two ways to handle an MQTT packet: asynchronous (default) and sequential (using handleMessage function).

Essentially handleMessage creates backpressure on stream and the other (the event) doesn't. The error you get could be due to the stream reaching its highwatermark because it's processing data too slowly in your handlePacket logic causing the read stream to be paused and then the connection is closed after a while by the broker (because of keepalive)

NodeJS reference: https://nodejs.org/en/learn/modules/backpressuring-in-streams

Replies: 3 comments · 2 replies

Comment options

I have a question about the internal TCP connection. As I see, there are two ways to handle an MQTT packet: asynchronous (default) and sequential (using handleMessage function).

Essentially handleMessage creates backpressure on stream and the other (the event) doesn't. The error you get could be due to the stream reaching its highwatermark because it's processing data too slowly in your handlePacket logic causing the read stream to be paused and then the connection is closed after a while by the broker (because of keepalive)

NodeJS reference: https://nodejs.org/en/learn/modules/backpressuring-in-streams

You must be logged in to vote
0 replies
Answer selected by cssArchitect
Comment options

Thanks for the quick reply @robertsLando!

Yes, you are right about the slow data processing. You have now clarified my question when you pointed out highWaterMark. I believe that basically the timeout is reached because the socket is not accepting new data as you said earlier.

Anyway, it is now clear that because of this behaviour I need to go back to the previous implementation without handleMessage.

Thank you again for your answer!

You must be logged in to vote
0 replies
Comment options

And just out of curiosity, @robertsLando

I've checked the Node.js documentation about stream buffering and found that in the writable.write method, even when the highWaterMark is reached, the data is still buffered to an internal queue.

Once write() returns false, do not write more chunks until the 'drain' event is emitted. While calling write() on a stream that is not draining is allowed, Node.js will buffer all written chunks until maximum memory usage occurs, at which point it will abort unconditionally.

But maybe there is something similar for read streams, though I haven't experienced a lot of memory consumption. This loses part of the reason for stopping read consumption (since the data is actually read into the internal buffer) and the subsequent timeout call.

You must be logged in to vote
2 replies
@robertsLando
Comment options

This loses part of the reason for stopping read consumption (since the data is actually read into the internal buffer) and the subsequent timeout call.

We don't stop the read, it's the internal stream mechanism that does that when highWatermark is reached

@cssArchitect
Comment options

Got it, thanks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Category
🙏
Q&A
Labels
None yet
2 participants
Morty Proxy This is a proxified and sanitized view of the page, visit original site.