|
| 1 | +import sys |
| 2 | +import socket |
| 3 | +import threading |
| 4 | +import time |
| 5 | +from typing import Optional, Tuple, Dict |
| 6 | + |
| 7 | +class TcpProxy: |
| 8 | + def __init__(self): |
| 9 | + self._local_addr: str = "" |
| 10 | + self._local_port: int = 0 |
| 11 | + self._remote_addr: str = "" |
| 12 | + self._remote_port: int = 0 |
| 13 | + self._preload: bool = False |
| 14 | + self._backlog: int = 5 |
| 15 | + self._chunk_size: int = 16 |
| 16 | + self._timeout: int = 5 |
| 17 | + self._buffer_size: int = 4096 |
| 18 | + self._termination_flags: Dict[bytes, bool] = { |
| 19 | + b'220 ': True, |
| 20 | + b'331 ': True, |
| 21 | + b'230 ': True, |
| 22 | + b'530 ': True |
| 23 | + } |
| 24 | + |
| 25 | + def _process_data(self, stream: bytes) -> None: |
| 26 | + #Transform data stream for analysis |
| 27 | + for offset in range(0, len(stream), self._chunk_size): |
| 28 | + block = stream[offset:offset + self._chunk_size] |
| 29 | + |
| 30 | + # Format block representation |
| 31 | + bytes_view = ' '.join(f'{byte:02X}' for byte in block) |
| 32 | + text_view = ''.join(chr(byte) if 32 <= byte <= 126 else '.' for byte in block) |
| 33 | + |
| 34 | + # Display formatted line |
| 35 | + print(f"{offset:04X} {bytes_view:<{self._chunk_size * 3}} {text_view}") |
| 36 | + |
| 37 | + def _extract_stream(self, conn: socket.socket) -> bytes: |
| 38 | + #Extract data stream from connection |
| 39 | + accumulator = b'' |
| 40 | + conn.settimeout(self._timeout) |
| 41 | + |
| 42 | + try: |
| 43 | + while True: |
| 44 | + fragment = conn.recv(self._buffer_size) |
| 45 | + if not fragment: |
| 46 | + break |
| 47 | + |
| 48 | + accumulator += fragment |
| 49 | + |
| 50 | + # Check for protocol markers |
| 51 | + if accumulator.endswith(b'\r\n'): |
| 52 | + for flag in self._termination_flags: |
| 53 | + if flag in accumulator: |
| 54 | + return accumulator |
| 55 | + |
| 56 | + except socket.timeout: |
| 57 | + pass |
| 58 | + |
| 59 | + return accumulator |
| 60 | + |
| 61 | + def _monitor_stream(self, direction: str, stream: bytes) -> bytes: |
| 62 | + # Monitor and decode stream content |
| 63 | + try: |
| 64 | + content = stream.decode('utf-8').strip() |
| 65 | + marker = ">>>" if direction == "in" else "<<<" |
| 66 | + print(f"{marker} {content}") |
| 67 | + except UnicodeDecodeError: |
| 68 | + print(f"{direction}: [binary content]") |
| 69 | + |
| 70 | + return stream |
| 71 | + |
| 72 | + def _bridge_connections(self, entry_point: socket.socket) -> None: |
| 73 | + #Establish and maintain connection bridge |
| 74 | + # Initialize exit point |
| 75 | + exit_point = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| 76 | + try: |
| 77 | + exit_point.connect((self._remote_addr, self._remote_port)) |
| 78 | + # Handle initial remote response |
| 79 | + if self._preload: |
| 80 | + remote_data = self._extract_stream(exit_point) |
| 81 | + if remote_data: |
| 82 | + self._process_data(remote_data) |
| 83 | + processed = self._monitor_stream("out", remote_data) |
| 84 | + entry_point.send(processed) |
| 85 | + # Main interaction loop |
| 86 | + while True: |
| 87 | + # Process incoming traffic |
| 88 | + entry_data = self._extract_stream(entry_point) |
| 89 | + if entry_data: |
| 90 | + print(f"\n[>] Captured {len(entry_data)} bytes incoming") |
| 91 | + self._process_data(entry_data) |
| 92 | + processed = self._monitor_stream("in", entry_data) |
| 93 | + exit_point.send(processed) |
| 94 | + # Process outgoing traffic |
| 95 | + exit_data = self._extract_stream(exit_point) |
| 96 | + if exit_data: |
| 97 | + print(f"\n[<] Captured {len(exit_data)} bytes outgoing") |
| 98 | + self._process_data(exit_data) |
| 99 | + processed = self._monitor_stream("out", exit_data) |
| 100 | + entry_point.send(processed) |
| 101 | + # Prevent CPU saturation |
| 102 | + if not (entry_data or exit_data): |
| 103 | + time.sleep(0.1) |
| 104 | + except Exception as e: |
| 105 | + print(f"[!] Bridge error: {str(e)}") |
| 106 | + finally: |
| 107 | + print("[*] Closing bridge") |
| 108 | + entry_point.close() |
| 109 | + exit_point.close() |
| 110 | + |
| 111 | + def orchestrate(self) -> None: |
| 112 | + # Orchestrate the proxy operation |
| 113 | + # Validate input |
| 114 | + if len(sys.argv[1:]) != 5: |
| 115 | + print("Usage: script.py [local_addr] [local_port] [remote_addr] [remote_port] [preload]") |
| 116 | + print("Example: script.py 127.0.0.1 8080 target.com 80 True") |
| 117 | + sys.exit(1) |
| 118 | + # Configure proxy parameters |
| 119 | + self._local_addr = sys.argv[1] |
| 120 | + self._local_port = int(sys.argv[2]) |
| 121 | + self._remote_addr = sys.argv[3] |
| 122 | + self._remote_port = int(sys.argv[4]) |
| 123 | + self._preload = "true" in sys.argv[5].lower() |
| 124 | + # Initialize listener |
| 125 | + listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| 126 | + listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) |
| 127 | + try: |
| 128 | + listener.bind((self._local_addr, self._local_port)) |
| 129 | + except socket.error as e: |
| 130 | + print(f"[!] Binding failed: {e}") |
| 131 | + sys.exit(1) |
| 132 | + listener.listen(self._backlog) |
| 133 | + print(f"[*] Service active on {self._local_addr}:{self._local_port}") |
| 134 | + # Main service loop |
| 135 | + while True: |
| 136 | + client, address = listener.accept() |
| 137 | + print(f"[+] Connection from {address[0]}:{address[1]}") |
| 138 | + bridge = threading.Thread( |
| 139 | + target=self._bridge_connections, |
| 140 | + args=(client,) |
| 141 | + ) |
| 142 | + bridge.daemon = True |
| 143 | + bridge.start() |
| 144 | + |
| 145 | +if __name__ == "__main__": |
| 146 | + bridge = TcpProxy() |
| 147 | + bridge.orchestrate() |
0 commit comments