Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit 73af98f

Browse filesBrowse files
authored
Merge pull request #128 from arcivanov/verbose_trace
Perform a non-blocking read-side check before and after send
2 parents 425acf5 + 149701c commit 73af98f
Copy full SHA for 73af98f

File tree

Expand file treeCollapse file tree

4 files changed

+77
-5
lines changed
Filter options
Expand file treeCollapse file tree

4 files changed

+77
-5
lines changed

‎fluent/sender.py

Copy file name to clipboardExpand all lines: fluent/sender.py
+17Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,17 +166,34 @@ def _send_internal(self, bytes_):
166166

167167
return False
168168

169+
def _check_recv_side(self):
170+
try:
171+
self.socket.settimeout(0.0)
172+
try:
173+
recvd = self.socket.recv(4096, socket.MSG_DONTWAIT)
174+
except socket.error as recv_e:
175+
if recv_e.errno != errno.EWOULDBLOCK:
176+
raise
177+
return
178+
179+
if recvd == b'':
180+
raise socket.error(errno.EPIPE, "Broken pipe")
181+
finally:
182+
self.socket.settimeout(self.timeout)
183+
169184
def _send_data(self, bytes_):
170185
# reconnect if possible
171186
self._reconnect()
172187
# send message
173188
bytes_to_send = len(bytes_)
174189
bytes_sent = 0
190+
self._check_recv_side()
175191
while bytes_sent < bytes_to_send:
176192
sent = self.socket.send(bytes_[bytes_sent:])
177193
if sent == 0:
178194
raise socket.error(errno.EPIPE, "Broken pipe")
179195
bytes_sent += sent
196+
self._check_recv_side()
180197

181198
def _reconnect(self):
182199
if not self.socket:

‎setup.py

Copy file name to clipboardExpand all lines: setup.py
+1-1Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212

1313
setup(
1414
name='fluent-logger',
15-
version='0.9.0',
15+
version='0.9.9',
1616
description=desc,
1717
long_description=open(README).read(),
1818
package_dir={'fluent': 'fluent'},

‎tests/test_asynchandler.py

Copy file name to clipboardExpand all lines: tests/test_asynchandler.py
-1Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import logging
44
import sys
5-
import time
65
import unittest
76

87
import fluent.asynchandler

‎tests/test_sender.py

Copy file name to clipboardExpand all lines: tests/test_sender.py
+59-3Lines changed: 59 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -218,20 +218,76 @@ def test_broken_conn(self):
218218
self.assertTrue(sender.socket)
219219

220220
class FakeSocket:
221+
def __init__(self):
222+
self.to = 123
223+
self.send_side_effects = [3, 0, 9]
224+
self.send_idx = 0
225+
self.recv_side_effects = [socket.error(errno.EWOULDBLOCK, "Blah"),
226+
b"this data is going to be ignored",
227+
b"",
228+
socket.error(errno.EWOULDBLOCK, "Blah"),
229+
socket.error(errno.EWOULDBLOCK, "Blah"),
230+
socket.error(errno.EACCES, "This error will never happen"),
231+
]
232+
self.recv_idx = 0
233+
221234
def send(self, bytes_):
222-
return 0
235+
try:
236+
v = self.send_side_effects[self.send_idx]
237+
if isinstance(v, Exception):
238+
raise v
239+
if isinstance(v, type) and issubclass(v, Exception):
240+
raise v()
241+
return v
242+
finally:
243+
self.send_idx += 1
223244

224245
def shutdown(self, mode):
225246
pass
226247

227248
def close(self):
228249
pass
229250

251+
def settimeout(self, to):
252+
self.to = to
253+
254+
def gettimeout(self):
255+
return self.to
256+
257+
def recv(self, bufsize, flags):
258+
try:
259+
v = self.recv_side_effects[self.recv_idx]
260+
if isinstance(v, Exception):
261+
raise v
262+
if isinstance(v, type) and issubclass(v, Exception):
263+
raise v()
264+
return v
265+
finally:
266+
self.recv_idx += 1
267+
230268
old_sock = self._sender.socket
231-
self._sender.socket = FakeSocket()
269+
sock = FakeSocket()
270+
232271
try:
272+
self._sender.socket = sock
273+
sender.last_error = None
274+
self.assertTrue(sender._send_internal(b"456"))
275+
self.assertFalse(sender.last_error)
276+
277+
self._sender.socket = sock
278+
sender.last_error = None
279+
self.assertFalse(sender._send_internal(b"456"))
280+
self.assertEqual(sender.last_error.errno, errno.EPIPE)
281+
282+
self._sender.socket = sock
283+
sender.last_error = None
284+
self.assertFalse(sender._send_internal(b"456"))
285+
self.assertEqual(sender.last_error.errno, errno.EPIPE)
286+
287+
self._sender.socket = sock
288+
sender.last_error = None
233289
self.assertFalse(sender._send_internal(b"456"))
234-
self.assertTrue(sender.last_error.errno, errno.EPIPE)
290+
self.assertEqual(sender.last_error.errno, errno.EACCES)
235291
finally:
236292
self._sender.socket = old_sock
237293

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.