Skip to content

Navigation Menu

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings

Commit a8945a8

Browse filesBrowse files
committed
feat: add sender
1 parent 8df8b39 commit a8945a8
Copy full SHA for a8945a8

File tree

4 files changed

+172
-71
lines changed
Filter options

4 files changed

+172
-71
lines changed

‎build_ext.py

Copy file name to clipboardExpand all lines: build_ext.py
+1Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ def build(setup_kwargs: Any) -> None:
4141
"src/zeroconf/_services/browser.py",
4242
"src/zeroconf/_services/info.py",
4343
"src/zeroconf/_services/registry.py",
44+
"src/zeroconf/_sender.py",
4445
"src/zeroconf/_transport.py",
4546
"src/zeroconf/_updates.py",
4647
"src/zeroconf/_utils/ipaddress.py",

‎src/zeroconf/_core.py

Copy file name to clipboardExpand all lines: src/zeroconf/_core.py
+7-71Lines changed: 7 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
"""
2222

2323
import asyncio
24-
import logging
2524
import sys
2625
import threading
2726
from types import TracebackType
@@ -37,6 +36,7 @@
3736
from ._history import QuestionHistory
3837
from ._logger import QuietLogger, log
3938
from ._protocol.outgoing import DNSOutgoing
39+
from ._sender import _ZeroconfSender
4040
from ._services import ServiceListener
4141
from ._services.browser import ServiceBrowser
4242
from ._services.info import (
@@ -62,7 +62,6 @@
6262
InterfacesType,
6363
IPVersion,
6464
autodetect_ip_version,
65-
can_send_to,
6665
create_sockets,
6766
)
6867
from ._utils.time import current_time_millis, millis_to_seconds
@@ -73,9 +72,6 @@
7372
_FLAGS_AA,
7473
_FLAGS_QR_QUERY,
7574
_FLAGS_QR_RESPONSE,
76-
_MAX_MSG_ABSOLUTE,
77-
_MDNS_ADDR,
78-
_MDNS_ADDR6,
7975
_MDNS_PORT,
8076
_ONE_SECOND,
8177
_REGISTER_TIME,
@@ -102,43 +98,6 @@
10298
_REGISTER_BROADCASTS = 3
10399

104100

105-
def async_send_with_transport(
106-
log_debug: bool,
107-
transport: _WrappedTransport,
108-
packet: bytes,
109-
packet_num: int,
110-
out: DNSOutgoing,
111-
addr: Optional[str],
112-
port: int,
113-
v6_flow_scope: Union[Tuple[()], Tuple[int, int]] = (),
114-
) -> None:
115-
ipv6_socket = transport.is_ipv6
116-
if addr is None:
117-
real_addr = _MDNS_ADDR6 if ipv6_socket else _MDNS_ADDR
118-
else:
119-
real_addr = addr
120-
if not can_send_to(ipv6_socket, real_addr):
121-
return
122-
if log_debug:
123-
log.debug(
124-
"Sending to (%s, %d) via [socket %s (%s)] (%d bytes #%d) %r as %r...",
125-
real_addr,
126-
port or _MDNS_PORT,
127-
transport.fileno,
128-
transport.sock_name,
129-
len(packet),
130-
packet_num + 1,
131-
out,
132-
packet,
133-
)
134-
# Get flowinfo and scopeid for the IPV6 socket to create a complete IPv6
135-
# address tuple: https://docs.python.org/3.6/library/socket.html#socket-families
136-
if ipv6_socket and not v6_flow_scope:
137-
_, _, sock_flowinfo, sock_scopeid = transport.sock_name
138-
v6_flow_scope = (sock_flowinfo, sock_scopeid)
139-
transport.transport.sendto(packet, (real_addr, port or _MDNS_PORT, *v6_flow_scope))
140-
141-
142101
class Zeroconf(QuietLogger):
143102
"""Implementation of Zeroconf Multicast DNS Service Discovery
144103
@@ -195,6 +154,7 @@ def __init__(
195154
self._notify_futures: Set[asyncio.Future] = set()
196155
self.loop: Optional[asyncio.AbstractEventLoop] = None
197156
self._loop_thread: Optional[threading.Thread] = None
157+
self.sender = _ZeroconfSender(self)
198158

199159
self.start()
200160

@@ -208,6 +168,7 @@ def start(self) -> None:
208168
self.loop = get_running_loop()
209169
if self.loop:
210170
self.engine.setup(self.loop, None)
171+
self.sender.loop = self.loop
211172
return
212173
self._start_thread()
213174

@@ -217,6 +178,7 @@ def _start_thread(self) -> None:
217178

218179
def _run_loop() -> None:
219180
self.loop = asyncio.new_event_loop()
181+
self.sender.loop = self.loop
220182
asyncio.set_event_loop(self.loop)
221183
self.engine.setup(self.loop, loop_thread_ready)
222184
self.loop.run_forever()
@@ -611,7 +573,7 @@ def send(
611573
) -> None:
612574
"""Sends an outgoing packet threadsafe."""
613575
assert self.loop is not None
614-
self.loop.call_soon_threadsafe(self.async_send, out, addr, port, v6_flow_scope, transport)
576+
self.loop.call_soon_threadsafe(self.sender.async_send, out, addr, port, v6_flow_scope, transport)
615577

616578
def async_send(
617579
self,
@@ -622,41 +584,15 @@ def async_send(
622584
transport: Optional[_WrappedTransport] = None,
623585
) -> None:
624586
"""Sends an outgoing packet."""
625-
if self.done:
626-
return
627-
628-
# If no transport is specified, we send to all the ones
629-
# with the same address family
630-
transports = [transport] if transport else self.engine.senders
631-
log_debug = log.isEnabledFor(logging.DEBUG)
632-
633-
for packet_num, packet in enumerate(out.packets()):
634-
if len(packet) > _MAX_MSG_ABSOLUTE:
635-
self.log_warning_once(
636-
"Dropping %r over-sized packet (%d bytes) %r",
637-
out,
638-
len(packet),
639-
packet,
640-
)
641-
return
642-
for send_transport in transports:
643-
async_send_with_transport(
644-
log_debug,
645-
send_transport,
646-
packet,
647-
packet_num,
648-
out,
649-
addr,
650-
port,
651-
v6_flow_scope,
652-
)
587+
self.sender.async_send(out, addr, port, v6_flow_scope, transport)
653588

654589
def _close(self) -> None:
655590
"""Set global done and remove all service listeners."""
656591
if self.done:
657592
return
658593
self.remove_all_service_listeners()
659594
self.done = True
595+
self.sender.done = True
660596

661597
def _shutdown_threads(self) -> None:
662598
"""Shutdown any threads."""

‎src/zeroconf/_sender.pxd

Copy file name to clipboard
+24Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
2+
import cython
3+
4+
from ._engine cimport AsyncEngine
5+
from ._transport cimport _WrappedTransport
6+
from ._protocol.outgoing import DNSOutgoing
7+
8+
cdef void async_send_with_transport(
9+
bint log_debug
10+
_WrappedTransport transport,
11+
bytes packet,
12+
int packet_num,
13+
DNSOutgoing out,
14+
str addr,
15+
int port,
16+
tuple v6_flow_scope
17+
)
18+
19+
cdef class _ZeroconfSender:
20+
21+
cdef public object zc
22+
cdef public object loop
23+
cdef public bint done
24+
cdef public AsyncEngine

‎src/zeroconf/_sender.py

Copy file name to clipboard
+140Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
"""Multicast DNS Service Discovery for Python, v0.14-wmcbrine
2+
Copyright 2003 Paul Scott-Murphy, 2014 William McBrine
3+
4+
This module provides a framework for the use of DNS Service Discovery
5+
using IP multicast.
6+
7+
This library is free software; you can redistribute it and/or
8+
modify it under the terms of the GNU Lesser General Public
9+
License as published by the Free Software Foundation; either
10+
version 2.1 of the License, or (at your option) any later version.
11+
12+
This library is distributed in the hope that it will be useful,
13+
but WITHOUT ANY WARRANTY; without even the implied warranty of
14+
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15+
Lesser General Public License for more details.
16+
17+
You should have received a copy of the GNU Lesser General Public
18+
License along with this library; if not, write to the Free Software
19+
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301
20+
USA
21+
"""
22+
23+
import logging
24+
from typing import TYPE_CHECKING, Optional, Tuple, Union
25+
26+
from ._logger import QuietLogger, log
27+
from ._protocol.outgoing import DNSOutgoing
28+
from ._transport import _WrappedTransport
29+
from ._utils.net import (
30+
can_send_to,
31+
)
32+
from .const import (
33+
_MAX_MSG_ABSOLUTE,
34+
_MDNS_ADDR,
35+
_MDNS_ADDR6,
36+
_MDNS_PORT,
37+
)
38+
39+
if TYPE_CHECKING:
40+
from ._core import Zeroconf
41+
42+
43+
def async_send_with_transport(
44+
log_debug: bool,
45+
transport: _WrappedTransport,
46+
packet: bytes,
47+
packet_num: int,
48+
out: DNSOutgoing,
49+
addr: Optional[str],
50+
port: int,
51+
v6_flow_scope: Union[Tuple[()], Tuple[int, int]] = (),
52+
) -> None:
53+
ipv6_socket = transport.is_ipv6
54+
if addr is None:
55+
real_addr = _MDNS_ADDR6 if ipv6_socket else _MDNS_ADDR
56+
else:
57+
real_addr = addr
58+
if not can_send_to(ipv6_socket, real_addr):
59+
return
60+
if log_debug:
61+
log.debug(
62+
"Sending to (%s, %d) via [socket %s (%s)] (%d bytes #%d) %r as %r...",
63+
real_addr,
64+
port or _MDNS_PORT,
65+
transport.fileno,
66+
transport.sock_name,
67+
len(packet),
68+
packet_num + 1,
69+
out,
70+
packet,
71+
)
72+
# Get flowinfo and scopeid for the IPV6 socket to create a complete IPv6
73+
# address tuple: https://docs.python.org/3.6/library/socket.html#socket-families
74+
if ipv6_socket and not v6_flow_scope:
75+
_, _, sock_flowinfo, sock_scopeid = transport.sock_name
76+
v6_flow_scope = (sock_flowinfo, sock_scopeid)
77+
transport.transport.sendto(packet, (real_addr, port or _MDNS_PORT, *v6_flow_scope))
78+
79+
80+
class _ZeroconfSender:
81+
"""Send implementation for Zeroconf."""
82+
83+
__slots__ = ("zc", "loop", "done", "engine")
84+
85+
def __init__(self, zc: "Zeroconf") -> None:
86+
"""Initialize the ZeroconfSender."""
87+
self.zc = zc
88+
self.loop = zc.loop
89+
self.done = zc.done
90+
self.engine = zc.engine
91+
92+
def send(
93+
self,
94+
out: DNSOutgoing,
95+
addr: Optional[str] = None,
96+
port: int = _MDNS_PORT,
97+
v6_flow_scope: Union[Tuple[()], Tuple[int, int]] = (),
98+
transport: Optional[_WrappedTransport] = None,
99+
) -> None:
100+
"""Sends an outgoing packet threadsafe."""
101+
assert self.loop is not None
102+
self.loop.call_soon_threadsafe(self.async_send, out, addr, port, v6_flow_scope, transport)
103+
104+
def async_send(
105+
self,
106+
out: DNSOutgoing,
107+
addr: Optional[str] = None,
108+
port: int = _MDNS_PORT,
109+
v6_flow_scope: Union[Tuple[()], Tuple[int, int]] = (),
110+
transport: Optional[_WrappedTransport] = None,
111+
) -> None:
112+
"""Sends an outgoing packet."""
113+
if self.done:
114+
return
115+
116+
# If no transport is specified, we send to all the ones
117+
# with the same address family
118+
transports = [transport] if transport else self.engine.senders
119+
log_debug = log.isEnabledFor(logging.DEBUG)
120+
121+
for packet_num, packet in enumerate(out.packets()):
122+
if len(packet) > _MAX_MSG_ABSOLUTE:
123+
QuietLogger.log_warning_once(
124+
"Dropping %r over-sized packet (%d bytes) %r",
125+
out,
126+
len(packet),
127+
packet,
128+
)
129+
return
130+
for send_transport in transports:
131+
async_send_with_transport(
132+
log_debug,
133+
send_transport,
134+
packet,
135+
packet_num,
136+
out,
137+
addr,
138+
port,
139+
v6_flow_scope,
140+
)

0 commit comments

Comments
0 (0)
Morty Proxy This is a proxified and sanitized view of the page, visit original site.