forked from alessandromaggio/pythonping
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathnetwork.py
More file actions
60 lines (51 loc) · 2.27 KB
/
Copy pathnetwork.py
File metadata and controls
60 lines (51 loc) · 2.27 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
import socket
import select
import time
class Socket:
BUFFER_SIZE = 1024
DONT_FRAGMENT = (socket.SOL_IP, 10, 1) # Option value for raw socket
def __init__(self, destination, protocol, source=None, options=()):
"""Creates a network socket to exchange messages
:param destination: Destination IP address
:type destination: str
:param protocol: Name of the protocol to use
:type protocol: str
:param options: Options to set on the socket
:type options: tuple
:param source: Source IP to use - implemented in future releases
:type source: Union[None, str]"""
self.destination = socket.gethostbyname(destination)
self.protocol = socket.getprotobyname(protocol)
if source is not None:
raise NotImplementedError('PythonPing currently does not support specification of source IP')
self.socket = socket.socket(socket.AF_INET, socket.SOCK_RAW, self.protocol)
if options:
self.socket.setsockopt(*options)
def send(self, packet):
"""Sends a raw packet on the stream
:param packet: The raw packet to send
:type packet: bytes"""
self.socket.sendto(packet, (self.destination, 0))
def receive(self, timeout=2):
"""Listen for incoming packets until timeout
:param timeout: Time after which stop listening
:type timeout: Union[int, float]
:return: The packet, the remote socket, and the time left before timeout
:rtype: (bytes, tuple, float)"""
time_left = timeout
while time_left > 0:
start_select = time.perf_counter()
data_ready = select.select([self.socket], [], [], time_left)
elapsed_in_select = time.perf_counter() - start_select
time_left -= elapsed_in_select
if not data_ready[0]:
# Timeout
return b'', '', time_left
packet, source = self.socket.recvfrom(self.BUFFER_SIZE)
return packet, source, time_left
def __del__(self):
try:
if self.socket:
self.socket.close()
except AttributeError:
raise AttributeError("Attribute error because of failed socket init. Make sure you have the root privilege.")