diff --git a/async_interval.py b/async_interval.py new file mode 100644 index 0000000..8f9cfcf --- /dev/null +++ b/async_interval.py @@ -0,0 +1,66 @@ +import asyncio +import types + +# Calls fn in intervals given by the interval parameter (in seconds) +# fn is called with an argument giving the elapsed time since start +# Returns a Task object + +def async_interval(fn, interval, end = 0): + async def run(fn, interval): + try: + start = loop.time() + while True: + await asyncio.sleep(interval) + fn( loop.time() - start ) + except asyncio.CancelledError: + pass + + def stop(): + try: task.cancel() + except asyncio.CancelledError: pass + + loop = asyncio.get_running_loop() + task = asyncio.create_task( run(fn, interval) ) + if (end > 0): loop.call_later(end, stop) + return task + +if __name__ == '__main__': + import unittest + + class Test(unittest.IsolatedAsyncioTestCase): + async def test_cancel(self): + def empty(): pass + i = async_interval(empty, 1) + await asyncio.sleep(0) # need to wait, otherwise CancelledError is raised anyway + i.cancel() + await i # wait for interval completion + self.assertEqual(i.done(), True) + # cancelled() is False since the wrapped coroutine doesn't propagate the CancelledError + self.assertEqual(i.cancelled(), False) + + async def test_interval(self): + times = [] + def fn(time): times.append(time) + + i = async_interval(fn, 1/4) + await asyncio.sleep(1) + i.cancel() + await i + # print(times) + self.assertEqual(len(times), 3) + self.assertAlmostEqual(times[0], 0.25, places=2) + self.assertAlmostEqual(times[1], 0.50, places=2) + self.assertAlmostEqual(times[2], 0.75, places=2) + + async def test_end(self): + times = [] + def fn(time): times.append(time) + + i = async_interval(fn, 1/4, 1) + await i + self.assertEqual(len(times), 3) + self.assertAlmostEqual(times[0], 0.25, places=2) + self.assertAlmostEqual(times[1], 0.50, places=2) + self.assertAlmostEqual(times[2], 0.75, places=2) + + unittest.main() \ No newline at end of file diff --git a/async_prompt.py b/async_prompt.py deleted file mode 100644 index 78f2a68..0000000 --- a/async_prompt.py +++ /dev/null @@ -1,71 +0,0 @@ -import asyncio -import sys -import termios -import tty -import builtins -import io - -# async prompt for a single keystroke -class AsyncPrompt: - def __init__(self): - self.echo = True - self.echo_end = '\n' - self.waiting_for_input = False - self.queue = asyncio.Queue() - asyncio.get_running_loop().add_reader(sys.stdin, self.on_input) - - def __del__(self): - # restore tty if object goes away - if self.waiting_for_input: - self.tty_restore() - - def tty_input(self): - fd = sys.stdin.fileno() - self.original_ttyattrs = termios.tcgetattr(fd) # save ttyattrs - tty.setraw(fd) # set raw input mode on tty - - def tty_restore(self): - fd = sys.stdin.fileno() - termios.tcsetattr(fd, termios.TCSADRAIN, self.original_ttyattrs) - - def on_input(self): - if self.waiting_for_input: - key = sys.stdin.read(1) - sys.stdin.seek(0, io.SEEK_END) # disard rest of input (by seeking to end of stream) - self.tty_restore() - if ord(key) == 3: # catch Control-C - raise KeyboardInterrupt() - if self.echo: - if ord(key) == 27: - print('^[', end=self.echo_end) # Don't echo ESC as it is, this would start an escape sequence in the terminal - else: - print(key, end=self.echo_end) # echo the input character (with newline) - self.queue.put_nowait(key) - self.waiting_for_input = False - else: - sys.stdin.readline() # discard input - - async def prompt(self, message = '? ', echo = True, echo_end = '\n'): - self.echo = echo - # print prompt - print(message, end = '', flush=True) - self.tty_input() - # set flag to capture input - self.waiting_for_input = True - # wait until input is received - return await self.queue.get() - - async def wait_for(self, chars, message = '? ', echo = True, echo_end = '\n'): - res = None - while res not in chars: - res = await self.prompt(message, echo) - return res - - def print(self, *objects, sep=' ', end='\n', file=None, flush=False): - if self.waiting_for_input: - self.tty_restore() - builtins.print() # newline - builtins.print(*objects, sep=sep, end=end, file=file, flush=flush) - self.tty_input() - else: - builtins.print(*objects, sep=sep, end=end, file=file, flush=flush) \ No newline at end of file diff --git a/async_queue.py b/async_queue.py new file mode 100644 index 0000000..baa9526 --- /dev/null +++ b/async_queue.py @@ -0,0 +1,319 @@ +import asyncio + +# Like asyncio.Queue with support for reordering and removing elements +class Queue(asyncio.Queue): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.order = [] + + def _rebuild(self): + # remove all items + try: + while True: super().get_nowait() + except asyncio.QueueEmpty: + pass + # add items in new order + for item in self.order: + try: + super().put_nowait(item) + except asyncio.QueueFull: + pass + + # get the current queue as list + def list(self): + return self.order.copy() + + # swap two items by index; supports negative indices + def swap(self, idx1, idx2): + if idx1 < -len(self.order) or idx1 > len(self.order)-1: + raise IndexError('index 1 out of bounds') + if idx2 < -len(self.order) or idx2 > len(self.order)-1: + raise IndexError('index 2 out of bounds') + if (idx1 == idx2): return + self.order[idx1], self.order[idx2] = self.order[idx2], self.order[idx1] + self._rebuild() + + # move an item to new position in queue + def move(self, idx, new_idx): + if idx < -len(self.order) or idx > len(self.order)-1: + raise IndexError('index out of bounds') + if new_idx < -len(self.order) or new_idx > len(self.order)-1: + raise IndexError('target index out of bounds') + + # normalize negative indices + if idx < 0: idx = len(self.order) + idx + if new_idx < 0: new_idx = len(self.order) + new_idx + + if (idx == new_idx): return + item = self.order.pop(idx) + self.order.insert(new_idx, item) + self._rebuild() + + def index(self, *args): + return self.order.index(*args) + + def __iter__(self): + return self.order.copy().__iter__() + + # remove an item from the queue; supports negative indices + def pop(self, idx = -1): + if idx < -len(self.order) or idx > len(self.order)-1: + raise IndexError('index out of bounds') + item = self.order.pop(idx) + # print('remove item', item) + self._rebuild() + return item + + # insert an item at an arbitrary position into the queue + def insert(self, idx, item): + if idx < -len(self.order) or idx > len(self.order): + raise IndexError('index out of bounds') + self.order.insert(idx, item) + self._rebuild() + + def put_nowait(self, item): + # print('put_nowait') + super().put_nowait(item) + self.order.append(item) + + def get_nowait(self): + # print('get_nowait') + item = super().get_nowait() + self.order.pop(0) + return item + + # no need to implement get() and put() + # as these are implemented in terms of get_nowait() and put_nowait() + + +if __name__ == '__main__': + import unittest + + class Test(unittest.IsolatedAsyncioTestCase): + def get_all(self, q): + out = [] + while not q.empty(): + out.append( q.get_nowait() ) + return out + + async def test_put(self): + q = Queue() + await q.put('one') + await q.put('two') + await q.put('three') + self.assertEqual(q.list(), ['one', 'two', 'three']) + self.assertEqual( self.get_all(q), ['one', 'two', 'three']) + + async def test_put_nowait(self): + q = Queue() + q.put_nowait('one') + q.put_nowait('two') + q.put_nowait('three') + self.assertEqual(q.list(), ['one', 'two', 'three']) + self.assertEqual( self.get_all(q), ['one', 'two', 'three']) + + async def test_get(self): + q = Queue() + get_task = asyncio.gather( + asyncio.create_task(q.get()), + asyncio.create_task(q.get()), + asyncio.create_task(q.get()) + ) + await q.put('one') + await q.put('two') + await q.put('three') + self.assertEqual( await get_task, ['one', 'two', 'three'] ) + + async def test_get_nowait(self): + q = Queue() + with self.assertRaises(asyncio.QueueEmpty): + q.get_nowait() + q.put_nowait('one') + q.put_nowait('two') + q.put_nowait('three') + self.assertEqual( q.get_nowait(), 'one' ) + self.assertEqual( q.get_nowait(), 'two' ) + self.assertEqual( q.get_nowait(), 'three' ) + with self.assertRaises(asyncio.QueueEmpty): + q.get_nowait() + + async def test_pop(self): + q = Queue() + with self.assertRaises(IndexError): + q.pop(0) + + q.put_nowait('one') + with self.assertRaises(IndexError): q.pop(1) + with self.assertRaises(IndexError): q.pop(-2) + self.assertEqual(q.pop(0), 'one') + self.assertEqual(q.list(), []) + + q.put_nowait('one') + q.put_nowait('two') + q.put_nowait('three') + self.assertEqual(q.pop(1), 'two') + self.assertEqual(q.list(), ['one', 'three']) + self.assertEqual(self.get_all(q), ['one', 'three']) + self.assertEqual(q.empty(), True) + + q.put_nowait('one') + q.put_nowait('two') + q.put_nowait('three') + self.assertEqual(q.pop(0), 'one') + self.assertEqual(q.pop(-1), 'three') + self.assertEqual(q.list(), ['two']) + self.assertEqual(self.get_all(q), ['two']) + + async def test_insert(self): + q = Queue() + q.put_nowait('one') + q.put_nowait('three') + q.insert(1, 'two') + self.assertEqual(q.list(), ['one', 'two', 'three']) + self.assertEqual(self.get_all(q), ['one', 'two', 'three']) + + q.insert(0, 'one') + q.insert(1, 'two') + q.insert(2, 'three') + q.insert(0, 'zero') + self.assertEqual(q.list(), ['zero', 'one', 'two', 'three']) + self.assertEqual(self.get_all(q), ['zero', 'one', 'two', 'three']) + + with self.assertRaises(IndexError): q.insert(-1, 'one') + with self.assertRaises(IndexError): q.insert(1, 'one') + q.insert(0, 'one') + self.assertEqual(q.list(), ['one']) + self.assertEqual(self.get_all(q), ['one']) + + async def test_swap(self): + q = Queue() + q.put_nowait('zero') + q.put_nowait('one') + q.put_nowait('two') + q.put_nowait('three') + q.swap(1, 2) + self.assertEqual(q.list(), ['zero', 'two', 'one', 'three']) + self.assertEqual(self.get_all(q), ['zero', 'two', 'one', 'three']) + + q.put_nowait('zero') + q.put_nowait('one') + q.put_nowait('two') + q.put_nowait('three') + q.swap(-1, 1) + self.assertEqual(q.list(), ['zero', 'three', 'two', 'one']) + self.assertEqual(self.get_all(q), ['zero', 'three', 'two', 'one']) + + q.put_nowait('zero') + q.put_nowait('one') + q.put_nowait('two') + q.put_nowait('three') + with self.assertRaises(IndexError): q.swap(0, 4) + with self.assertRaises(IndexError): q.swap(4, 0) + with self.assertRaises(IndexError): q.swap(0, -5) + with self.assertRaises(IndexError): q.swap(-5, 0) + + q.swap(0, 0) + q.swap(1, 1) + q.swap(2, 2) + q.swap(3, 3) + self.assertEqual(q.list(), ['zero', 'one', 'two', 'three']) + self.assertEqual(self.get_all(q), ['zero', 'one', 'two', 'three']) + + q.put_nowait('zero') + q.put_nowait('one') + q.put_nowait('two') + q.put_nowait('three') + q.swap(1, 0) + self.assertEqual(q.list(), ['one', 'zero', 'two', 'three']) + self.assertEqual(self.get_all(q), ['one', 'zero', 'two', 'three']) + + q.put_nowait('zero') + q.put_nowait('one') + q.put_nowait('two') + q.put_nowait('three') + q.swap(1, -1) + self.assertEqual(q.list(), ['zero', 'three', 'two', 'one']) + self.assertEqual(self.get_all(q), ['zero', 'three', 'two', 'one']) + + async def test_move(self): + q = Queue() + q.put_nowait('zero') + q.put_nowait('one') + q.put_nowait('two') + q.put_nowait('three') + with self.assertRaises(IndexError): q.move(0, 4) + with self.assertRaises(IndexError): q.move(0, -5) + with self.assertRaises(IndexError): q.move(4, 0) + with self.assertRaises(IndexError): q.move(-5, 0) + q.move(1, 1) + self.assertEqual(q.list(), ['zero', 'one', 'two', 'three']) + self.assertEqual(self.get_all(q), ['zero', 'one', 'two', 'three']) + + # move top to bottom + q.put_nowait('zero') + q.put_nowait('one') + q.put_nowait('two') + q.put_nowait('three') + q.move(0, -1) + self.assertEqual(q.list(), ['one', 'two', 'three', 'zero']) + self.assertEqual(self.get_all(q), ['one', 'two', 'three', 'zero']) + + # move bottom to top + q.put_nowait('zero') + q.put_nowait('one') + q.put_nowait('two') + q.put_nowait('three') + q.move(-1, 0) + self.assertEqual(q.list(), ['three', 'zero', 'one', 'two']) + self.assertEqual(self.get_all(q), ['three', 'zero', 'one', 'two']) + + # swap items next to each other + q.put_nowait('zero') + q.put_nowait('one') + q.put_nowait('two') + q.put_nowait('three') + q.move(1, 2) + self.assertEqual(q.list(), ['zero', 'two', 'one', 'three']) + self.assertEqual(self.get_all(q), ['zero', 'two', 'one', 'three']) + + # move to bottom + q.put_nowait('zero') + q.put_nowait('one') + q.put_nowait('two') + q.put_nowait('three') + q.move(1, -1) + self.assertEqual(q.list(), ['zero', 'two', 'three', 'one']) + self.assertEqual(self.get_all(q), ['zero', 'two', 'three', 'one']) + + # move to top + q.put_nowait('zero') + q.put_nowait('one') + q.put_nowait('two') + q.put_nowait('three') + q.move(2, 0) + self.assertEqual(q.list(), ['two', 'zero', 'one', 'three']) + self.assertEqual(self.get_all(q), ['two', 'zero', 'one', 'three']) + + async def test_index(self): + q = Queue() + q.put_nowait('zero') + q.put_nowait('one') + q.put_nowait('two') + q.put_nowait('three') + self.assertEqual(q.index('zero'), 0) + self.assertEqual(q.index('one'), 1) + self.assertEqual(q.index('two'), 2) + self.assertEqual(q.index('three'), 3) + with self.assertRaises(ValueError): q.index('none') + + async def test_iter(self): + q = Queue() + q.put_nowait('zero') + q.put_nowait('one') + q.put_nowait('two') + q.put_nowait('three') + items = [x for x in q] + self.assertEqual(items, ['zero', 'one', 'two', 'three']) + + unittest.main() + \ No newline at end of file diff --git a/frpc.sh b/frpc.sh index 25542f7..e9a0627 100755 --- a/frpc.sh +++ b/frpc.sh @@ -10,16 +10,22 @@ trap quit EXIT # find frpc frpc=$(which "frpc") if [[ -z $frpc ]]; then - frpc=$(which "../frp-mac/latest/frpcx") + frpc=$(which "../frp-mac/latest/frpc") if [[ -z $frpc ]]; then - >&2 echo "frpc not found; try installing with 'brew install frpc'" + >&2 echo "Error: frpc not found; Try installing with 'brew install frpc'" exit 1 fi fi ->&2 echo "Using frpc: $frpc" ->&2 echo "Plotter URL (via frp): wss://plotter.process.tools" +if [[ ! -f ./frp.auth ]]; then + >&2 echo "Error: Missing frp auth file: frp.auth" + exit 1 +fi + source frp.auth # read auth token from file export FRP_AUTH_TOKEN +>&2 echo "Using frpc: $frpc" +>&2 echo "Plotter URL (via frp): wss://plotter.process.tools" + $frpc -c frpc.toml diff --git a/header_timer.py b/header_timer.py new file mode 100644 index 0000000..a501cec --- /dev/null +++ b/header_timer.py @@ -0,0 +1,50 @@ +"""Provides a Textual application header widget.""" +from datetime import datetime + +from rich.text import Text +from textual.widgets import Header +from textual.widgets._header import HeaderIcon, HeaderTitle, HeaderClockSpace +from textual.reactive import Reactive + +class HeaderClock(HeaderClockSpace): + """Display a clock on the right of the header.""" + + DEFAULT_CSS = """ + HeaderClock { + background: $foreground-darken-1 5%; + color: $text; + text-opacity: 85%; + content-align: center middle; + } + """ + + time_seconds: Reactive[str] = Reactive(0) + + def render(self): + """Render the header clock. + + Returns: + The rendered clock. + """ + if self.time_seconds == 0: return Text('--:--') + hours = int(self.time_seconds / 3600) + minutes = int((self.time_seconds - 3600 * hours) / 60) + seconds = int((self.time_seconds - 3600 * hours) % 60) + if hours == 0: + return Text(f'{minutes:02}:{seconds:02}') + else: + return Text(f'{hours:02}:{minutes:02}:{seconds:02}') + +class HeaderTimer(Header): + + time_seconds: Reactive[str] = Reactive(0) + """Time of the Clock in seconds.""" + + def compose(self): + yield HeaderIcon().data_bind(Header.icon) + yield HeaderTitle() + yield ( + HeaderClock().data_bind(HeaderTimer.time_seconds) + if self._show_clock + else HeaderClockSpace() + ) diff --git a/hotkey_button.py b/hotkey_button.py new file mode 100644 index 0000000..50241c7 --- /dev/null +++ b/hotkey_button.py @@ -0,0 +1,46 @@ +from textual.widgets import Button + +class HotkeyButton(Button, can_focus=True): + idx = 0 + + def __init__(self, hotkey=None, hotkey_description = None, **kwargs): + if 'label' not in kwargs and hotkey_description != None: + kwargs['label'] = hotkey_description + super().__init__(**kwargs) + + self.hotkey = hotkey + self.hotkey_description = hotkey_description + HotkeyButton.idx += 1 + self.idx = HotkeyButton.idx + self.app_action = 'press_hotkeybutton_' + str(self.idx) + + def update_hotkey(self, hotkey = None, hotkey_description = None, label = None): + if label == None and hotkey_description != None: self.label = hotkey_description + self.unbind_hotkey() + self.hotkey = hotkey + self.hotkey_description = hotkey_description + self.bind_hotkey() + + def bind_hotkey(self): + if self.hotkey == None: return + self.app.bind(self.hotkey, self.app_action, description=self.hotkey_description) + print("hotkey BOUND for " + str(self.idx)) + + def unbind_hotkey(self): + if self.hotkey == None: return + self.app.unbind(self.hotkey) + print("hotkey UNBOUND for " + str(self.idx)) + + def on_mount(self): + def press_me(): self.press() + setattr(self.app, 'action_' + self.app_action, press_me) + self.bind_hotkey() + + def on_button_pressed(self): + print('pressed button ' + str(self.idx)) + + def watch_disabled(self, disabled_state): + super().watch_disabled(disabled_state) + print('disabled', disabled_state) + if disabled_state: self.unbind_hotkey() + else: self.bind_hotkey() \ No newline at end of file diff --git a/main.py b/main.py old mode 100755 new mode 100644 index fb38a6f..f0a9e76 --- a/main.py +++ b/main.py @@ -1,30 +1,12 @@ -#!/usr/bin/env python - -import asyncio -import websockets -import ssl -import os.path -import time -import json -import traceback -import sys -import signal -import spooler -import async_prompt -from tty_colors import COL -import zc -import porkbun - - -USE_ZEROCONF = 0 -ZEROCONF_HOSTNAME = 'plotter' - USE_PORKBUN = 1 PORKBUN_ROOT_DOMAIN = 'process.tools' PORKBUN_SUBDOMAIN = 'plotter-local' PORKBUN_TTL = 600 PORKBUN_SSL_OUTFILE = 'cert/process.tools.pem' +USE_ZEROCONF = 0 +ZEROCONF_HOSTNAME = 'plotter' + BIND_IP = '0.0.0.0' PORT = 0 # Use 0 for default ports (80 for http, 443 for ssl/tls) USE_SSL = 1 @@ -38,44 +20,80 @@ SHOW_CONNECTION_EVENTS = 0 # Print when clients connect/disconnect MAX_MESSAGE_SIZE_MB = 5 # in MB (Default in websockets lib is 2) -prompt = None +QUEUE_HEADERS = ['#', 'Client', 'Hash', 'Lines', 'Layers', 'Travel', 'Ink', 'Format', 'Speed', 'Duration', 'Status'] + +import textual +from textual import on +from textual.events import Key +from textual.app import App as TextualApp +from textual.widgets import Button, DataTable, RichLog, Footer, Header, Static, ProgressBar, Rule +from textual.widgets.data_table import RowDoesNotExist +from textual.containers import Horizontal, Vertical +from hotkey_button import HotkeyButton +from header_timer import HeaderTimer + +import asyncio +import websockets +import spooler +import json +import math +import subprocess +import porkbun +import functools + + +app = None +ssl_context = None num_clients = 0 clients = [] -ssl_context = None -def status_str(status): - match status['status']: - case 'setup': - return(f'{COL.BOLD}{COL.BLUE}Setup{COL.OFF}') - case 'waiting': - return(f'{COL.BOLD}Waiting for jobs{COL.OFF}') - case 'confirm_plot': - return(f'{COL.BOLD}{COL.YELLOW}Confirm to plot {status["job_str"]}{COL.OFF}') - case 'plotting': - return(f'{COL.BOLD}{COL.GREEN}Plotting [{status["job"]}]{COL.OFF}') - -def col_num(n): - if n > 0: - return f'{COL.BOLD}{COL.GREEN}{n}{COL.OFF}' - else: - return f'{COL.BOLD}{n}{COL.OFF}' +# Status simply shows up in the header def print_status(): - s = spooler.status() - print(f' Jobs: {col_num(s["queue_size"])} | Clients: {col_num(len(clients))} | Status: {status_str(s)}\n') + app.update_header() -def setup_prompt(): - global prompt - global print - prompt = async_prompt.AsyncPrompt() - print = prompt.print # replace global print function - -def remove_prompt(): - global prompt - del prompt # force destructor, causes terminal to restore +def setup_ssl(): + import ssl + import os.path + + if USE_SSL: + global ssl_context + try: + cert_file = os.path.join( os.path.dirname(__file__), SSL_CERT ) + key_file = None if SSL_KEY == None else os.path.join( os.path.dirname(__file__), SSL_KEY ) + ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) + ssl_context.load_cert_chain(cert_file, key_file) + print(f'TLS enabled with certificate: {SSL_CERT}{"" if SSL_KEY == None else " + " + SSL_KEY}') + except FileNotFoundError: + print(f'Certificate not found, TLS disabled') + ssl_context = None + except: + print(f'Error establishing TLS context, TLS disabled') + ssl_context = None + global PORT + if PORT == 0: PORT = 80 if ssl_context == None else 443 -def disable_sigint(): - signal.signal(signal.SIGINT, lambda *args: None) +async def handle_connection(ws): + global num_clients + num_clients += 1 + clients.append(ws) + remote_address = ws.remote_address # store remote address (might not be available on disconnect) + if SHOW_CONNECTION_EVENTS: + print(f'({num_clients}) Connected: {remote_address[0]}:{remote_address[1]}') + print_status() + # await send_current_queue_size(ws) + try: + # The iterator exits normally when the connection is closed with close code 1000 (OK) or 1001 (going away). It raises a ConnectionClosedError when the connection is closed with any other code. + async for message in ws: + # print(f'Message ({ws.remote_address[0]}:{ws.remote_address[1]}):', message) + await handle_message(message, ws) + except websockets.exceptions.ConnectionClosedError: + pass + num_clients -= 1 + clients.remove(ws) + if SHOW_CONNECTION_EVENTS: + print(f'({num_clients}) Disconnected: {remote_address[0]}:{remote_address[1]} ({ws.close_code}{(" " + ws.close_reason).rstrip()})') + print_status() async def send_msg(msg, ws): if type(msg) is dict: msg = json.dumps(msg) @@ -83,103 +101,488 @@ async def send_msg(msg, ws): await ws.send(msg) except (websockets.exceptions.ConnectionClosedError, websockets.exceptions.ConnectionClosedOK): pass - + async def on_queue_size(size): + app.update_job_queue() + app.update_header() cbs = [] for ws in clients: cbs.append( send_msg({'type': 'queue_length', 'length': size}, ws) ) await asyncio.gather(*cbs) async def send_current_queue_size(ws): - await send_msg( {'type': 'queue_length', 'length': spooler.queue_size()}, ws ) + await send_msg( {'type': 'queue_length', 'length': spooler.num_jobs()}, ws ) async def handle_message(message, ws): async def on_queue_position(pos, job): - await send_msg( {'type': 'queue_position', 'position': pos, 'id': job['id']}, ws ) + await send_msg( {'type': 'queue_position', 'position': pos}, ws ) async def on_done(job): - await send_msg( {'type': 'job_done', 'id': job['id']}, ws ) + await send_msg( {'type': 'job_done'}, ws ) async def on_cancel(job): - await send_msg( {'type': 'job_canceled', 'id': job['id']}, ws ) + await send_msg( {'type': 'job_canceled'}, ws ) async def on_error(msg, job): - await send_msg( {'type': 'error', 'msg': msg, 'id': job['id']}, ws ) - msg = json.loads(message) + await send_msg( {'type': 'error', 'msg': msg}, ws ) + + try: + msg = json.loads(message) + except JSONDecodeError: + return + if msg['type'] == 'echo': await ws.send(message) elif msg['type'] == 'plot': - qsize = spooler.queue_size() + qsize = spooler.num_jobs() result = await spooler.enqueue(msg, on_queue_position, on_done, on_cancel, on_error) - if result and qsize > 0: print_status() # Don't print status if queue is empty -> Status will be printed by spooler + if result: print_status() elif msg['type'] == 'cancel': result = await spooler.cancel(msg['client']) if result: print_status() -async def handle_connection(ws): - global num_clients - num_clients += 1 - clients.append(ws) - remote_address = ws.remote_address # store remote address (might not be available on disconnect) - if SHOW_CONNECTION_EVENTS: - print(f'({num_clients}) Connected: {remote_address[0]}:{remote_address[1]}') - print_status() - await send_current_queue_size(ws) - try: - # The iterator exits normally when the connection is closed with close code 1000 (OK) or 1001 (going away). It raises a ConnectionClosedError when the connection is closed with any other code. - async for message in ws: - # print(f'Message ({ws.remote_address[0]}:{ws.remote_address[1]}):', message) - await handle_message(message, ws) - except websockets.exceptions.ConnectionClosedError: - pass - num_clients -= 1 - clients.remove(ws) - if SHOW_CONNECTION_EVENTS: - print(f'({num_clients}) Disconnected: {remote_address[0]}:{remote_address[1]} ({ws.close_code}{(" " + ws.close_reason).rstrip()})') - print_status() - -def setup_ssl(): - if USE_SSL: - global ssl_context - try: - cert_file = os.path.join( os.path.dirname(__file__), SSL_CERT ) - key_file = None if SSL_KEY == None else os.path.join( os.path.dirname(__file__), SSL_KEY ) - ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) - ssl_context.load_cert_chain(cert_file, key_file) - print(f'TLS enabled with certificate: {SSL_CERT}{"" if SSL_KEY == None else " + " + SSL_KEY}') - except FileNotFoundError: - print(f'Certificate not found, TLS disabled') - ssl_context = None - except: - print(f'Error establishing TLS context, TLS disabled') - ssl_context = None - global PORT - if PORT == 0: PORT = 80 if ssl_context == None else 443 - -async def main(): - setup_prompt() # needs to be called within event loop +async def run_server(app): async with websockets.serve(handle_connection, BIND_IP, PORT, ping_interval=PING_INTERVAL, ping_timeout=PING_TIMEOUT, ssl=ssl_context, max_size=MAX_MESSAGE_SIZE_MB*(2**20)): print(f'Server running on {"ws" if ssl_context == None else "wss"}://{BIND_IP}:{PORT}') print() spooler.set_queue_size_cb(on_queue_size) # await asyncio.Future() # run forever - await spooler.start(prompt, print_status) # run forever + await spooler.start(app) # run forever -def quit(): - print('Quitting...') - remove_prompt() - if USE_ZEROCONF: zc.remove_zeroconf_service() +class MyDataTable(DataTable): + def on_click(self, event): + self.app.on_queue_click(event) -if __name__ == '__main__': - try: - if USE_PORKBUN: - porkbun.ddns_update(PORKBUN_ROOT_DOMAIN, PORKBUN_SUBDOMAIN, PORKBUN_TTL) - porkbun.cert_update(PORKBUN_ROOT_DOMAIN, PORKBUN_SSL_OUTFILE) - print() - setup_ssl() # Updates global PORT - if USE_ZEROCONF: zc.add_zeroconf_service(ZEROCONF_HOSTNAME, PORT) - asyncio.run(main()) - except KeyboardInterrupt: +class App(TextualApp): + prompt_future = None + + def compose(self): + global header, queue, log, footer + header = HeaderTimer(icon = '🖨️', show_clock = True, time_format = '%H:%M') + + queue = MyDataTable(id = 'queue') + + log = RichLog(markup=True) + footer = Footer(id="footer", show_command_palette=True) + + global job_current, job_status,job_progress + job_current = DataTable() + job_status = Static(spooler.status()['status_desc']) + job_progress = ProgressBar() + + global col_left, col_right, job, commands, commands_1, commands_2, commands_3, commands_4, commands_5 + global b_pos, b_neg, b_align, b_cycle, b_home, b_plus, b_minus, b_preview + + yield header + # yield HotkeyButton('p', 'Press') + # yield HotkeyButton('x', 'Something') + with Horizontal(): + with Vertical() as col_left: + with Vertical() as job: + yield job_current + yield job_status + yield job_progress + # yield Rule() + with Horizontal(id='commands') as commands: + with Vertical() as commands_1: + yield (b_pos := HotkeyButton(label='Plot', id="pos")) + with Vertical() as commands_2: + yield (b_align := HotkeyButton('a', 'Align', label='Align', id='align')) + yield (b_cycle := HotkeyButton('c', 'Cycle', label='Cycle', id='cycle')) + yield (b_home := HotkeyButton('h', 'Home', label='Home', id='home')) + with Vertical() as commands_3: + yield (b_plus := HotkeyButton(label='+10', id='plus')) + yield (b_minus := HotkeyButton(label='-10', id='minus')) + with Vertical() as commands_4: + yield (b_preview := HotkeyButton('v', 'Preview', label='Preview', id='preview')) + with Vertical() as commands_5: + yield (b_neg := HotkeyButton(label='Cancel', id='neg')) + yield queue + with Vertical() as col_right: + yield log + yield footer + + def on_mount(self): + self.title = "Plotter" + # self.theme = "textual-dark" + header.tall = True + col_left.styles.width = '3fr' + col_right.styles.width = '2fr' + + # self.query_one('#footer').show_command_palette=False + log.border_title = 'Log' + log.styles.border = ('solid', 'white') + + job.border_title = 'Job' + job.styles.border = ('solid', 'white') + job.styles.height = 22 + + job_current.styles.height = 3 + job_current.add_columns(*QUEUE_HEADERS) + job_current.cursor_type = 'none' + job_status.styles.margin = 1 + job_progress.styles.margin = 1 + job_progress.styles.width = '100%' + job_progress.query_one('#bar').styles.width = '1fr' + job_progress.styles.display = 'none' + + commands.styles.margin = (3, 0, 0, 0) + + for button in commands.query('Button'): + button.styles.width = '100%' + button.styles.margin = (0, 1); + + for col in commands.query('Vertical'): + col.styles.align_horizontal = 'center' + # col.styles.border = ('vkey', 'white') + + commands_2.styles.width = '0.5625fr' + for button in commands_2.query('Button'): + button.styles.min_width = 9 + + commands_3.styles.width = '0.3125fr' + for button in commands_3.query('Button'): + button.styles.min_width = 5 + + commands_4.styles.width = '0.6875fr' + for button in commands_4.query('Button'): + button.styles.min_width = 11 + + queue.border_title = 'Queue' + queue.styles.border = ('solid', 'white') + queue.styles.height = '1fr' + queue.add_columns(*QUEUE_HEADERS) + queue.cursor_type = 'row' + queue.zebra_stripes = True + queue.show_cursor = False + + self.update_header() + + b_pos.disabled = True + b_neg.disabled = True + b_align.disabled = True + b_cycle.disabled = True + b_home.disabled = True + b_plus.disabled = True + b_minus.disabled = True + b_preview.disabled = True + + self.bind('t', 'enqueue_test_job', description = 'Test job') + self.bind('o', 'open_svg_folder', description = 'Open SVG folder') + + setup_ssl() + # log.write(log.styles.height) + + global server_task + server_task = asyncio.create_task(run_server(self)) + + def on_server_task_exit(task): + print('[red]Server task exit') + if not task.cancelled(): # not a intentional exit + ex = task.exception() + if ex != None: + import traceback + print('Server task exited with exception:') + print(''.join(traceback.format_exception(ex))) + global server_task_exception + server_task_exception = ex + self.exit() # This line can be removed, exception will then be show inside app log area + + server_task.add_done_callback(on_server_task_exit) + + # global spooler_task + # spooler_task = asyncio.create_task(spooler.start(self)) + + async def action_enqueue_test_job(self): + from test_job import test_job + job = test_job() + await spooler.enqueue(test_job()) + + def action_open_svg_folder(self): + sub_coro = asyncio.create_subprocess_exec('open', spooler.STATUS_FOLDERS['waiting'], stdin=subprocess.DEVNULL, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + asyncio.create_task(sub_coro) + + def on_resize(self, event): + pass + + def on_key(self): pass - except: - traceback.print_exception( sys.exception() ) - finally: - disable_sigint() # prevent another Control-C - quit() \ No newline at end of file + + def print(self, *args, sep=' ', end='\n'): + if len(args) == 1: log.write(args[0]) + else: log.write( sep.join(map(str, args)) + end) + + def update_header(self): + status = spooler.status() + self.title = status['status_desc'] + self.sub_title = f'{num_clients} Clients – {spooler.num_jobs()} Jobs' + total_secs = functools.reduce(lambda acc, x: acc + x['time_estimate'], spooler.jobs(), 0) + header.time_seconds = total_secs + + def bind(self, *args, **kwargs): + super().bind(*args, **kwargs) + self.refresh_bindings() + + def unbind(self, key): + # self._bindings.key_to_bindings is a dict of keys to lists of Binding objects + self._bindings.key_to_bindings.pop(key, None) + self.refresh_bindings() + + # # bindings: [ (key, desc), ... ] + # # This not a coroutine (no async). It returns a future, which can be awaited from coroutines + # def prompt(self, bindings, message): + # # setup bindings + # self.print(message) + # self.print(bindings) + # self.update_bindings([ ('y', 'prompt_response("y")', 'Yes'), ('n', 'prompt_response("n")', 'No') ]) + # + # # return a future that eventually resolves to the result + # loop = asyncio.get_running_loop() + # self.prompt_future = loop.create_future() + # return self.prompt_future + + def preview_job(self, job): + if job != None and 'save_path' in job: + print(f'Preview job \\[{job["client"]}]: {job["save_path"]}') + sub_coro = asyncio.create_subprocess_exec('qlmanage', '-p', job['save_path'], stdin=subprocess.DEVNULL, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + asyncio.create_task(sub_coro) + + def adjust_job_speed(self, job, delta): + if job != None: + speed = job['speed'] if 'speed' in job else 100 + speed += delta + speed = int(speed / 10) * 10 + speed = max( min(speed, 100), 10 ) + print(f'Adjust job speed \\[{job["client"]}]: {speed}') + job['speed'] = speed + if (job == spooler.current_job()): self.update_current_job() + + @on(Button.Pressed, '#commands Button') + def on_button(self, event): + id = event.button.id + if id == 'preview': + self.preview_job( spooler.current_job() ) + return + if id == 'plus': + self.adjust_job_speed( spooler.current_job(), 10 ) + return + if id == 'minus': + self.adjust_job_speed( spooler.current_job(), -10 ) + return + if id == 'neg' and spooler.status()['status'] == 'plotting': + print('[yellow]Interrupting...') + spooler.request_plot_pause() + return + + if self.prompt_future != None and not self.prompt_future.done(): + if id == None and event.button.hotkey_description: + id = str(event.button.hotkey_description).lower() + if id == None and event.button.label: + id = str(event.button.label).lower() + + self.prompt_future.set_result({ + 'id': id, # use button id, hotkey description (lowercase), or button label (lowercase) + 'button': event.button + }) + + @on(Key) + async def on_queue_hotkey(self, event): + if event.key in ['up', 'down'] and queue.row_count > 0: + queue.show_cursor = True + if queue.show_cursor == False: + return + + if event.key in ['backspace', 'i', 'k', '1', '0', 'space']: + if queue.row_count == 0: return # nothing in list + client = queue.ordered_rows[queue.cursor_row].key.value + + if (event.key == 'backspace'): + # if this is the current job, and we haven't started, cancel the prompt to start + if spooler.current_client() == client and spooler.status()['status'] == 'confirm_plot': + self.cancel_prompt_ui() + # handle all other cases (even plots that are running) + else: + await spooler.cancel(client) + elif (event.key == 'i'): + await spooler.move(client, max(queue.cursor_row - 1, 0)) + queue.move_cursor(row=queue.get_row_index(client)) + elif (event.key == 'k'): + new_row = queue.cursor_row + 1 + await spooler.move(client, new_row) + queue.move_cursor(row=queue.get_row_index(client)) + elif (event.key == '1'): + await spooler.move(client, 0) + queue.move_cursor(row=queue.get_row_index(client)) + elif (event.key == '0'): + await spooler.move(client, -1) + queue.move_cursor(row=queue.get_row_index(client)) + elif (event.key == 'space'): + self.preview_job( spooler.job_by_client(client) ) + + def on_queue_click(self, event): + if queue.row_count > 0: + queue.show_cursor = True + + def job_to_row(self, job, idx): + return (idx, job['client'], job['hash'][:5], job['stats']['count'], job['stats']['layer_count'], int(job['stats']['travel'])/1000, int(job['stats']['travel_ink'])/1000, job['format'], job['speed'], f'{math.floor(job["time_estimate"]/60)}:{round(job["time_estimate"]%60):02}', job['status']) + + def update_current_job(self): + job = spooler.current_job() + job_current.clear() + if job != None: + job_current.add_row( *self.job_to_row(job, 1), key=job['client'] ) + + def update_job_queue(self): + if queue.row_count == 0: queue.show_cursor = False + # remember selected client + client = None + if queue.row_count > 0 and queue.show_cursor: + client = queue.ordered_rows[queue.cursor_row].key.value + # print('selected client:', client) + + queue.clear() + for idx, job in enumerate(spooler.jobs()): + queue.add_row( *self.job_to_row(job, idx+1), key=job['client'] ) + + # recall client (if possible) + if client: + try: + # print('select row:', queue.get_row_index(client)) + queue.move_cursor(row=queue.get_row_index(client)) + except RowDoesNotExist: + # print('row does not exist') + queue.show_cursor = False + + def cancel_prompt_ui(self): + if self.prompt_future != None and not self.prompt_future.done(): + self.prompt_future.set_result(False) + + # This not a coroutine (no async). It returns a future, which can be awaited from coroutines + def prompt_ui(self, variant, message = ''): + # print('PROMPT', variant) + + if len(message) > 0: message = ' – ' + message + job_status.update(spooler.status()['status_desc'] + message) + self.update_current_job() + + match variant: + case 'setup': + b_pos.variant = 'default' + b_pos.disabled = True + + b_neg.update_hotkey('d', 'Done') + b_neg.variant = 'success' + b_neg.disabled = False + + b_align.disabled = False + b_cycle.disabled = False + b_home.disabled = True + b_plus.disabled = True + b_minus.disabled = True + b_preview.disabled = True + case 'waiting': + b_pos.disabled = True + b_neg.disabled = True + + b_align.disabled = False + b_cycle.disabled = False + b_home.disabled = True + b_plus.disabled = True + b_minus.disabled = True + b_preview.disabled = True + case 'start_plot': + b_pos.update_hotkey('p', 'Plot') + b_pos.variant = 'success' + b_pos.disabled = False + + b_neg.update_hotkey('escape', 'Cancel') + b_neg.variant = 'error' + b_neg.disabled = False + + b_align.disabled = False + b_cycle.disabled = False + b_home.disabled = True + b_plus.disabled = False + b_minus.disabled = False + b_preview.disabled = False + case 'plotting': + b_pos.disabled = True + + b_neg.update_hotkey('escape', 'Pause') + b_neg.variant = 'warning' + b_neg.disabled = False + + b_align.disabled = True + b_cycle.disabled = True + b_home.disabled = True + b_plus.disabled = True + b_minus.disabled = True + b_preview.disabled = False + case 'repeat_plot': + b_pos.update_hotkey('r', 'Repeat') + b_pos.variant = 'primary' + b_pos.disabled = False + + b_neg.update_hotkey('d', 'Done') + b_neg.variant = 'success' + b_neg.disabled = False + + b_align.disabled = False + b_cycle.disabled = False + b_home.disabled = True + b_plus.disabled = False + b_minus.disabled = False + b_preview.disabled = False + case 'resume_plot': + b_pos.update_hotkey('p', 'Continue') + b_pos.variant = 'primary' + b_pos.disabled = False + + b_neg.update_hotkey('d', 'Done') + b_neg.variant = 'warning' + b_neg.disabled = False + + b_align.disabled = False + b_cycle.disabled = False + b_home.disabled = False + b_plus.disabled = True + b_minus.disabled = True + b_preview.disabled = False + case _: + raise ValueError('Invalid prompt variant') + + # return a future that eventually resolves to the result + # reuse the future if it isn't done. allows for updating the prompt + if self.prompt_future == None or self.prompt_future.done(): + loop = asyncio.get_running_loop() + self.prompt_future = loop.create_future() + + if variant == 'plotting': self.prompt_future.set_result(True) + + return self.prompt_future + + + +if __name__ == "__main__": + global print + global tprint + global server_task_exception + + tprint = print + server_task_exception = None + + if USE_PORKBUN: + porkbun.ddns_update(PORKBUN_ROOT_DOMAIN, PORKBUN_SUBDOMAIN, PORKBUN_TTL) + porkbun.cert_update(PORKBUN_ROOT_DOMAIN, PORKBUN_SSL_OUTFILE) + print() + + if USE_ZEROCONF: zc.add_zeroconf_service(ZEROCONF_HOSTNAME, PORT) + + app = App() + print = app.print + app.tprint = tprint + + app.run() + + print = tprint # restore print function + if server_task_exception != None: + print() + print("Server task exited with exception:") + raise server_task_exception \ No newline at end of file diff --git a/main_old.py b/main_old.py new file mode 100755 index 0000000..5206ed7 --- /dev/null +++ b/main_old.py @@ -0,0 +1,186 @@ +#!/usr/bin/env python + +import asyncio +import websockets +import ssl +import os.path +import time +import json +import traceback +import sys +import signal +import spooler +import async_prompt +from tty_colors import COL +import zc +import porkbun + + +USE_ZEROCONF = 0 +ZEROCONF_HOSTNAME = 'plotter' + +USE_PORKBUN = 1 +PORKBUN_ROOT_DOMAIN = 'process.tools' +PORKBUN_SUBDOMAIN = 'plotter-local' +PORKBUN_TTL = 600 +PORKBUN_SSL_OUTFILE = 'cert/process.tools.pem' + +BIND_IP = '0.0.0.0' +PORT = 0 # Use 0 for default ports (80 for http, 443 for ssl/tls) +USE_SSL = 1 +# SSL_CERT = 'cert/localhost.pem' # Certificate file in pem format (can contain private key as well) +# SSL_KEY = None # Private key file in pem format (If None, the key needs to be contained in SSL_CERT) +SSL_CERT = 'cert/process.tools.pem' +SSL_KEY = None + +PING_INTERVAL = 10 +PING_TIMEOUT = 5 +SHOW_CONNECTION_EVENTS = 0 # Print when clients connect/disconnect +MAX_MESSAGE_SIZE_MB = 5 # in MB (Default in websockets lib is 2) + +prompt = None +num_clients = 0 +clients = [] +ssl_context = None + +def status_str(status): + match status['status']: + case 'setup': + return(f'{COL.BOLD}{COL.BLUE}Setup{COL.OFF}') + case 'waiting': + return(f'{COL.BOLD}Waiting for jobs{COL.OFF}') + case 'confirm_plot': + return(f'{COL.BOLD}{COL.YELLOW}Confirm to plot {status["job_str"]}{COL.OFF}') + case 'plotting': + return(f'{COL.BOLD}{COL.GREEN}Plotting [{status["job"]}]{COL.OFF}') + +def col_num(n): + if n > 0: + return f'{COL.BOLD}{COL.GREEN}{n}{COL.OFF}' + else: + return f'{COL.BOLD}{n}{COL.OFF}' + +def print_status(): + s = spooler.status() + print(f' Jobs: {col_num(s["queue_size"])} | Clients: {col_num(len(clients))} | Status: {status_str(s)}\n') + +def setup_prompt(): + global prompt + global print + prompt = async_prompt.AsyncPrompt() + print = prompt.print # replace global print function + +def remove_prompt(): + global prompt + del prompt # force destructor, causes terminal to restore + +def disable_sigint(): + signal.signal(signal.SIGINT, lambda *args: None) + +async def send_msg(msg, ws): + if type(msg) is dict: msg = json.dumps(msg) + try: + await ws.send(msg) + except (websockets.exceptions.ConnectionClosedError, websockets.exceptions.ConnectionClosedOK): + pass + +async def on_queue_size(size): + cbs = [] + for ws in clients: + cbs.append( send_msg({'type': 'queue_length', 'length': size}, ws) ) + await asyncio.gather(*cbs) + +async def send_current_queue_size(ws): + await send_msg( {'type': 'queue_length', 'length': spooler.queue_size()}, ws ) + +async def handle_message(message, ws): + async def on_queue_position(pos, job): + await send_msg( {'type': 'queue_position', 'position': pos}, ws ) + async def on_done(job): + await send_msg( {'type': 'job_done'}, ws ) + async def on_cancel(job): + await send_msg( {'type': 'job_canceled'}, ws ) + async def on_error(msg, job): + await send_msg( {'type': 'error', 'msg': msg}, ws ) + msg = json.loads(message) + if msg['type'] == 'echo': + await ws.send(message) + elif msg['type'] == 'plot': + qsize = spooler.queue_size() + result = await spooler.enqueue(msg, on_queue_position, on_done, on_cancel, on_error) + if result and qsize > 0: print_status() # Don't print status if queue is empty -> Status will be printed by spooler + elif msg['type'] == 'cancel': + result = await spooler.cancel(msg['client']) + if result: print_status() + +async def handle_connection(ws): + global num_clients + num_clients += 1 + clients.append(ws) + remote_address = ws.remote_address # store remote address (might not be available on disconnect) + if SHOW_CONNECTION_EVENTS: + print(f'({num_clients}) Connected: {remote_address[0]}:{remote_address[1]}') + print_status() + await send_current_queue_size(ws) + try: + # The iterator exits normally when the connection is closed with close code 1000 (OK) or 1001 (going away). It raises a ConnectionClosedError when the connection is closed with any other code. + async for message in ws: + # print(f'Message ({ws.remote_address[0]}:{ws.remote_address[1]}):', message) + await handle_message(message, ws) + except websockets.exceptions.ConnectionClosedError: + pass + num_clients -= 1 + clients.remove(ws) + if SHOW_CONNECTION_EVENTS: + print(f'({num_clients}) Disconnected: {remote_address[0]}:{remote_address[1]} ({ws.close_code}{(" " + ws.close_reason).rstrip()})') + print_status() + +def setup_ssl(): + if USE_SSL: + global ssl_context + try: + cert_file = os.path.join( os.path.dirname(__file__), SSL_CERT ) + key_file = None if SSL_KEY == None else os.path.join( os.path.dirname(__file__), SSL_KEY ) + ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) + ssl_context.load_cert_chain(cert_file, key_file) + print(f'TLS enabled with certificate: {SSL_CERT}{"" if SSL_KEY == None else " + " + SSL_KEY}') + except FileNotFoundError: + print(f'Certificate not found, TLS disabled') + ssl_context = None + except: + print(f'Error establishing TLS context, TLS disabled') + ssl_context = None + global PORT + if PORT == 0: PORT = 80 if ssl_context == None else 443 + +async def main(): + setup_prompt() # needs to be called within event loop + async with websockets.serve(handle_connection, BIND_IP, PORT, ping_interval=PING_INTERVAL, ping_timeout=PING_TIMEOUT, ssl=ssl_context, max_size=MAX_MESSAGE_SIZE_MB*(2**20)): + print(f'Server running on {"ws" if ssl_context == None else "wss"}://{BIND_IP}:{PORT}') + print() + spooler.set_queue_size_cb(on_queue_size) + # await asyncio.Future() # run forever + await spooler.start(prompt, print_status) # run forever + +def quit(): + print('Quitting...') + remove_prompt() + if USE_ZEROCONF: zc.remove_zeroconf_service() + +if __name__ == '__main__': + try: + if USE_PORKBUN: + porkbun.ddns_update(PORKBUN_ROOT_DOMAIN, PORKBUN_SUBDOMAIN, PORKBUN_TTL) + porkbun.cert_update(PORKBUN_ROOT_DOMAIN, PORKBUN_SSL_OUTFILE) + print() + setup_ssl() # Updates global PORT + if USE_ZEROCONF: zc.add_zeroconf_service(ZEROCONF_HOSTNAME, PORT) + asyncio.run(main()) + except KeyboardInterrupt: + print('*** Ctrl-C pressed ***') + pass + except: + traceback.print_exception( sys.exception() ) + finally: + disable_sigint() # prevent another Control-C + quit() \ No newline at end of file diff --git a/porkbun.py b/porkbun.py index fa979b2..7f839dd 100644 --- a/porkbun.py +++ b/porkbun.py @@ -15,6 +15,9 @@ def get_lanip(): return ipaddrlist[-1] def get_config(): + if not os.path.exists(CONFIG_FILE): + print(f"Error: Porkbun (DNS Service) config file is missing: {CONFIG_FILE}") + exit() with open(CONFIG_FILE) as f: api_config = json.load(f) return api_config diff --git a/requirements.txt b/requirements.txt index 2a1fbff..7eff3a3 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,7 @@ websockets==10.3 zeroconf==0.39.1 +textual==0.86.1 +textual-dev==1.6.1 # pyaxidraw module # diff --git a/spooler.py b/spooler.py index 9bb4a9f..66ab4ef 100644 --- a/spooler.py +++ b/spooler.py @@ -1,45 +1,79 @@ +TESTING = False # Don't actually connect to AxiDraw, just simulate plotting +REPEAT_JOBS = True # Ask to repeat a plot after a sucessful print +RESUME_QUEUE = True # Resume plotting queue after quitting/restarting +ALIGN_AFTER = True # Align plotter after success or error +ALIGN_AFTER_PAUSE = False # Align plotter after pause (programmatic, stop button, keyboard interrupt) + +PEN_POS_UP = 60 # Default: 60 +PEN_POS_DOWN = 40 # Default: 40 +MIN_SPEED = 10 # percent +SIMULATION_TIMEOUT = 8 # seconds + +STATUS_FOLDERS = { + 'waiting' : 'svgs/0_waiting', + 'plotting' : 'svgs/0_waiting', + 'canceled' : 'svgs/1_canceled', + 'finished' : 'svgs/2_finished', + 'error' : 'svgs/3_error' +} + +# KEY_DONE = ( 'd', '(D)one' ) +# KEY_REPEAT = ( 'r', '(R)epeat' ) +# KEY_START_PLOT = ( 'p', '(P)lot' ) +# KEY_RESTART_PLOT = ( 'p', '(P)lot from start' ) +# KEY_ALIGN = ( 'a', '(A)lign' ) +# KEY_CYCLE = ( 'c', '(C)ycle' ) +# KEY_CANCEL = ( chr(27), '(Esc) Cancel Job' ) +# KEY_RESUME = ( 'r', '(R)esume' ) +# KEY_HOME = ( 'h', '(H)ome' ) + +STATUS_DESC = { + 'setup': 'Setting up', + 'waiting': 'Waiting for jobs', + 'paused': 'Plot paused', + 'confirm_plot': 'Confirm job', + 'plotting': 'Plotting' +} + import asyncio from pyaxidraw import axidraw -from tty_colors import COL from datetime import datetime, timezone import math import os from capture_output import capture_output +import re +import hashlib +import async_queue +import xml.etree.ElementTree as ElementTree -FOLDER_WAITING ='svgs/0_waiting' -FOLDER_CANCELED ='svgs/1_canceled' -FOLDER_FINISHED ='svgs/2_finished' -PEN_POS_UP = 60 # Default: 60 -PEN_POS_DOWN = 40 # Default: 40 -MIN_SPEED = 10 # percent - -KEY_DONE = [ 'd', '(D)one' ] -KEY_REPEAT = [ 'r', '(R)epeat' ] -KEY_START_PLOT = [ 'p', '(P)lot' ] -KEY_RESTART_PLOT = [ 'p', '(P)lot from start' ] -KEY_ALIGN = [ 'a', '(A)lign' ] -KEY_CYCLE = [ 'c', '(C)ycle' ] -KEY_CANCEL = [ chr(27), '(Esc) Cancel Job' ] -KEY_RESUME = [ 'r', '(R)esume' ] -KEY_HOME = [ 'h', '(H)ome' ] - -REPEAT_JOBS = True # Ask to repeat a plot after a sucessful print -TESTING = True # Don't actually connect to AxiDraw, just simulate plotting queue_size_cb = None -queue = asyncio.Queue() # an async FIFO queue -jobs = {} # an index to all unfinished jobs by client id (in queue or current_job) (insertion order is preserved in dict since python 3.7) -current_job = None -_status = 'setup' # setup | waiting | confirm_plot | plotting +# queue = asyncio.Queue() # an async FIFO queue +queue = async_queue.Queue() # an async FIFO queue that can be reordered +_jobs = {} # an index to all unfinished jobs by client id (in queue or _current_job) +_current_job = None +_status = 'waiting' # waiting | confirm_plot | plotting + -async def callback(fn, *args): +# Helper function calls async function fn with args +# Returns a coroutine (because of async def) +async def callback(fn, *args, **kwargs): if callable(fn): - await fn(*args) + await fn(*args, **kwargs) + +# async def _notify_queue_positions(): +# cbs = [] +# for i, client in enumerate(_jobs): +# job = _jobs[client] +# if i == 0 and _status == 'plotting': i = -1 +# if 'position_notified' not in job or job['position_notified'] != i: +# job['position_notified'] = i +# cbs.append( callback(job['queue_position_cb'], i, job) ) +# await asyncio.gather(*cbs) # run callbacks concurrently async def _notify_queue_positions(): cbs = [] - for i, client in enumerate(jobs): - job = jobs[client] + for i, job in enumerate(jobs()): if i == 0 and _status == 'plotting': i = -1 if 'position_notified' not in job or job['position_notified'] != i: job['position_notified'] = i @@ -47,118 +81,284 @@ async def _notify_queue_positions(): await asyncio.gather(*cbs) # run callbacks concurrently async def _notify_queue_size(): - await callback(queue_size_cb, len(jobs)) + await callback(queue_size_cb, num_jobs()) def set_queue_size_cb(cb): global queue_size_cb queue_size_cb = cb -def queue_size(): - return len(jobs) +def num_jobs(): + return len(_jobs) + +def current_job(): + return _current_job + +def current_client(): + if _current_job == None: return None + return _current_job['client'] + +def job_by_client(client): + if client not in _jobs: return None + return _jobs[client] + +def jobs(): + lst = queue.list() + if (_current_job != None and not _current_job['cancel']): + lst.insert(0, _current_job) + return lst def status(): return { 'status': _status, - 'job': current_job['client'] if current_job != None else None, - 'job_str': job_str(current_job) if current_job != None else None, - 'queue_size': queue_size(), + 'status_desc': STATUS_DESC[_status], + 'job': _current_job['client'] if _current_job != None else None, + 'job_str': job_str(_current_job) if _current_job != None else None, + 'queue_size': num_jobs(), } -def timestamp(date = None): +def timestamp_str_full(date = None): if date == None: # make timezone aware timestamp: https://stackoverflow.com/a/39079819 date = datetime.now(timezone.utc) date = date.replace(tzinfo=date.astimezone().tzinfo) return date.strftime("%Y%m%d_%H%M%S.%f_UTC%z") -# status: 'waiting' | 'canceled' | 'finished' -def save_svg(job, status): - if status not in ['waiting', 'canceled', 'finished']: - return False - filename = f'{job["received"]}_{job["client"][0:10]}_{job["hash"][0:5]}.svg' - files = { - 'waiting': os.path.join(FOLDER_WAITING, filename), - 'canceled': os.path.join(FOLDER_CANCELED, filename), - 'finished': os.path.join(FOLDER_FINISHED, filename), - } - for key, file in files.items(): - if key == status: - os.makedirs( os.path.dirname(file), exist_ok=True) - with open(file, 'w', encoding='utf-8') as f: f.write(job['svg']) - else: - try: - os.remove(file) - except: - pass +def timestamp_str(date = None): + if date == None: + # make timezone aware timestamp: https://stackoverflow.com/a/39079819 + date = datetime.now(None) # None ... use current timezone + date = date.replace(tzinfo=date.astimezone().tzinfo) + return date.strftime("%Y%m%d_%H%M%S.%f")[:-3] # trim last three digits of microsecond + + +def save_svg(job, overwrite_existing = False): + if job['status'] not in STATUS_FOLDERS.keys(): return False + + min = int(job["time_estimate"] / 60) if "time_estimate" in job else 0 + sec = math.ceil(job["time_estimate"] % 60) if "time_estimate" in job else 0 + position = f'{(job["position"] + 1):03}_' if 'position' in job and job['status'] in ['waiting', 'plotting'] else '' + + ink = f'{(job["stats"]["travel_ink"] / 1000):.1f}' if "stats" in job else 0 + travel = f'{(job["stats"]["travel"] / 1000):.1f}' if "stats" in job else 0 + filename = f'{position}{job["received"]}_[{job["client"][0:10]}]_{job["hash"][0:5]}_{travel}m_{min}m{sec}s.svg' + filename = os.path.join(STATUS_FOLDERS[job['status']], filename) + + # remove previous save + if 'save_path' in job and job['save_path'] != filename: + try: + # print('removing', job['save_path']) + os.remove(job['save_path']) + except: + pass + + # save file + if not os.path.isfile(filename) or overwrite_existing: + # print('writing', filename) + os.makedirs( os.path.dirname(filename), exist_ok=True) + with open(filename, 'w', encoding='utf-8') as f: f.write(job['svg']) + job['save_path'] = filename return True + +# def save_svg_async(*args, **kwargs): +# return asyncio.to_thread(save_svg, *args, **kwargs) + + +# Updated pre version 4 SVGs, so they are compatible with resume queue +def update_svg(job): + match = re.search('tg:version="(\\d+)"', job['svg']) + if match != None and int(match.group(1)) >= 4: return -def save_svg_async(*args): - return asyncio.to_thread(save_svg, *args) + MARKER = 'xmlns:tg="https://sketch.process.studio/turtle-graphics"' + idx = job['svg'].find(MARKER) + if idx == -1: return + idx += len(MARKER) + insert = f'\n tg:version="4" tg:layer_count="1" tg:oob_count="{job['stats']['oob_count']}" tg:short_count="{job['stats']['short_count']}" tg:format="{job['format']}" tg:width_mm="{job['size'][0]}" tg:height_mm="{job['size'][1]}" tg:speed="{job['speed']}" tg:author="{job['client']}" tg:timestamp="{job['timestamp']}"' + + job['svg'] = job['svg'][:idx] + insert + job['svg'][idx:] + job['hash'] = hashlib.sha1(job['svg'].encode('utf-8')).hexdigest() -# job: 'client', 'lines' +# job {'type': 'plot, 'client', 'id', 'svg', stats, timestamp, hash, speed, format, size, received?} +# adds to job: { 'cancel', time_estimate', 'layers', received } # todo: don't wait on callbacks async def enqueue(job, queue_position_cb = None, done_cb = None, cancel_cb = None, error_cb = None): # the client might be in queue (or currently plotting) - if job['client'] in jobs: + if job['client'] in _jobs: await callback( error_cb, 'Cannot add job, you already have a job queued!', job ) return False + job['status'] = 'waiting' job['cancel'] = False # save callbacks job['queue_position_cb'] = queue_position_cb job['done_cb'] = done_cb job['cancel_cb'] = cancel_cb job['error_cb'] = error_cb - job['received'] = timestamp() + if 'received' not in job or job['received'] == None: + job['received'] = timestamp_str() # speed if 'speed' in job: job['speed'] = max( min(job['speed'], 100), MIN_SPEED ) # limit speed (MIN_SPEED, 100) else: job['speed'] = 100 # format - if 'format' not in job: job['format'] = 'A3_LANDSCAPE' + if 'format' not in job: job['format'] = 'A4_LANDSCAPE' # add to jobs index - jobs[ job['client'] ] = job - await _notify_queue_size() # notify new queue size - await _notify_queue_positions() - print(f'New job [{job["client"]}]') - sim = await simulate_async(job) # run simulation + _jobs[ job['client'] ] = job + print(f'New job \\[{job["client"]}] {job["hash"][0:5]}') + try: + async with asyncio.timeout(SIMULATION_TIMEOUT): + sim = await simulate_async(job) # run simulation + except TimeoutError: + print(f'⚠️ [red]Timeout on simulating job \\[{job["client"]}] {job["hash"][0:5]}') + job['status'] = 'error' + save_svg(job) + await callback( error_cb, 'Cannot add job, it took to long to simulate!', job ) + return False + job['time_estimate'] = sim['time_estimate'] job['layers'] = sim['layers'] + + update_svg(job) await queue.put(job) - await save_svg_async(job, 'waiting') + job['position'] = job_pos(job) + save_svg(job) + + await _notify_queue_size() # notify new queue size + await _notify_queue_positions() return True async def cancel(client, force = False): + global _current_job if not force: - if current_job != None and current_job['client'] == client: - await callback( current_job['error_cb'], 'Cannot cancel, already plotting!', current_job ) + if _current_job != None and _current_job['client'] == client: + await callback( _current_job['error_cb'], 'Cannot cancel, already plotting!', _current_job ) return False - if client not in jobs: return False - job = jobs[client] + # remove from job index + if client not in _jobs: return False + job = _jobs[client] + if job == _current_job and _status == 'plotting': return # can't cancel if plotting + job['status'] = 'canceled' job['cancel'] = True # set cancel flag - del jobs[client] # remove from index + job['position'] = None + del _jobs[client] + + # remove from queue + # if job is the current job, it has already been taken from the top of the queue + if job == _current_job: + _current_job = None + else: + queue.pop( queue.index(job) ) await callback( job['cancel_cb'], job ) # notify canceled job await _notify_queue_size() # notify new queue size await _notify_queue_positions() # notify queue positions (might have changed for some) - print(f'❌ {COL.RED}Canceled job [{job["client"]}]{COL.OFF}') - await save_svg_async(job, 'canceled') + print(f'❌ [red]Canceled job \\[{job["client"]}]') + save_svg(job) + update_positions_and_save() return True +async def cancel_current_job(force = True): + return await cancel(_current_job['client'], force = force) + async def finish_current_job(): - await callback( current_job['done_cb'], current_job ) # notify job done - del jobs[ current_job['client'] ] # remove from jobs index + global _current_job + _current_job['status'] = 'finished' + _current_job['position'] = None + await callback( _current_job['done_cb'], _current_job ) # notify job done + del _jobs[ _current_job['client'] ] # remove from jobs index + finished_job = _current_job + _current_job = None await _notify_queue_positions() # notify queue positions. current job is 0 await _notify_queue_size() # notify queue size - print(f'✅ {COL.GREEN}Finished job [{current_job["client"]}]{COL.OFF}') + print(f'✅ [green]Finished job \\[{finished_job["client"]}]') _status = 'waiting' - await save_svg_async(current_job, 'finished') + save_svg(finished_job) + update_positions_and_save() return True +# current job: 0 (if plotting or waiting, check job['status']) +def job_pos(job): + if (job == _current_job): return 0 + try: + return queue.index(job) + 1 + except ValueError: + return None + +def update_position(job): + if job == None: return + job['position'] = job_pos(job) + +def update_positions_and_save(): + # print('updating positions') + for job in _jobs.values(): + update_position(job) + # print(job['client'], job['position']) + save_svg(job, overwrite_existing=False) + +# positions +# 0 .. current job +# 1 .. first in queue (idx 0) +# last .. num_jobs()-1 + +# plot['status']: 'waiting'|'plotting'|'paused'|'ok'|'error'|'finished'|'canceled' +async def move(client, new_pos): + global _current_job + # print('move', client, new_pos) + job = _jobs[client] + current_pos = job_pos(job) + # print('move: current pos', current_pos) + # cannot move if job is already plotting + if job['status'] in ['plotting', 'paused']: + # print('move: already plotting, can\'t move') + return + + # normalize new_pos + if new_pos < 0: new_pos = num_jobs() + new_pos + + # clamp to lower bound + if new_pos < 0: new_pos = 0 + + # can't take place of the plotting (or paused job) + if new_pos == 0 and _status in ['plotting', 'paused']: new_pos = 1 + + # print(f'move from {current_pos} to {new_pos}') + + # clamp to upper bound + if new_pos > num_jobs()-1: new_pos = num_jobs()-1 + + # nothing to do + if new_pos == current_pos: + # print('move: nothing to do') + return + + # move job from queue to current job + if new_pos == 0: + # print('move to top') + queue.pop(current_pos-1) # remove from current position + queue.insert(0, _current_job) # move current job (pos 0) to first waiting position + _current_job = job # set to current job + prompt_ui('start_plot', f'Ready to plot job \\[{_current_job["client"]}] ?') + # move current job to queue + elif current_pos == 0: + # print('move from top') + old_current_job = _current_job + _current_job = queue.pop(0) # new current job is next in line + queue.insert(new_pos-1, old_current_job) + prompt_ui('start_plot', f'Ready to plot job \\[{_current_job["client"]}] ?') + # move within queue + else: + # print('move within queue') + queue.move(current_pos-1, new_pos-1) + + # update 'position' attribute of all jobs and save svgs + update_positions_and_save() + + await _notify_queue_size() + await _notify_queue_positions() # notify queue positions (might have changed for some) + def job_str(job): - info = '[' + str(job["client"])[0:10] + ']' + info = '[' + str(job["client"])[0:10] + '] ' + job['hash'][0:5] speed_and_format = f'{job["speed"]}%, {job["format"]}, {math.floor(job["time_estimate"]/60)}:{round(job["time_estimate"]%60):02} min' if 'stats' in job: stats = job['stats'] @@ -179,6 +379,9 @@ def job_str(job): 103: 'Stopped by keyboard interrupt', 104: 'Lost USB connectivity' } +PLOTTER_PAUSED = [ 1, 102, 103 ]; +PLOTTER_OK = [ 0 ]; +PLOTTER_ERROR = [ 101, 104 ]; def get_error_msg(code): if code in PLOTTER_ERRORS: @@ -190,7 +393,7 @@ def print_axidraw(*args): out = ' '.join(args) lines = out.split('\n') for line in lines: - print(f"{COL.GREY}[AxiDraw] " + line + COL.OFF) + print(f"[gray50]\\[AxiDraw] " + line) # Raise pen and disable XY stepper motors def align(): @@ -216,8 +419,14 @@ def cycle(): ad.plot_run() return ad.errors.code -def plot(job, align_after = True, options_cb = None, return_ad = False): +def request_plot_pause(): + global _current_ad + if _current_ad != None: + _current_ad.transmit_pause_request() + +def plot(job, align_after = ALIGN_AFTER, align_after_pause = ALIGN_AFTER_PAUSE, options_cb = None, return_ad = False): if 'svg' not in job: return 0 + job['status'] = 'plotting' speed = job['speed'] / 100 with capture_output(print_axidraw, print_axidraw): ad = axidraw.AxiDraw() @@ -234,12 +443,22 @@ def plot(job, align_after = True, options_cb = None, return_ad = False): ad.options.pen_pos_down = PEN_POS_DOWN if callable(options_cb): options_cb(ad.options) if TESTING: ad.options.preview = True + global _current_ad + _current_ad = ad # for request_plot_pause() job['output_svg'] = ad.plot_run(output=True) - if align_after: align() + _current_ad = None + if (ad.errors.code in PLOTTER_PAUSED and align_after_pause) or \ + (ad.errors.code not in PLOTTER_PAUSED and align_after): + align() + + if ad.errors.code in PLOTTER_PAUSED: job['status'] = 'paused' + elif ad.errors.code in PLOTTER_OK: job['status'] = 'ok' + elif ad.errors.code in PLOTTER_ERROR: job['status'] = 'error' + if return_ad: return ad else: return ad.errors.code -def resume_home(job, align_after = True, options_cb = None, return_ad = False): +def resume_home(job, align_after = ALIGN_AFTER, align_after_pause = ALIGN_AFTER_PAUSE, options_cb = None, return_ad = False): if 'output_svg' not in job: return 0 orig_svg = job['svg'] # save original svg job['svg'] = job['output_svg'] # set last output svg as input @@ -248,11 +467,11 @@ def _options_cb(options): if callable(options_cb): options_cb(options) options.mode = 'res_home' - res = plot(job, align_after, _options_cb, return_ad) + res = plot(job, align_after, align_after_pause, _options_cb, return_ad) job['svg'] = orig_svg # restore original svg return res -def resume_plot(job, align_after = True, options_cb = None, return_ad = False): +def resume_plot(job, align_after = ALIGN_AFTER, align_after_pause = ALIGN_AFTER_PAUSE, options_cb = None, return_ad = False): if 'output_svg' not in job: return 0 orig_svg = job['svg'] # save original svg job['svg'] = job['output_svg'] # set last output svg as input @@ -260,8 +479,8 @@ def resume_plot(job, align_after = True, options_cb = None, return_ad = False): def _options_cb(options): if callable(options_cb): options_cb(options) options.mode = 'res_plot' - - res = plot(job, align_after, _options_cb, return_ad) + + res = plot(job, align_after, align_after_pause, _options_cb, return_ad) job['svg'] = orig_svg # restore original svg return res @@ -289,21 +508,23 @@ def update_stats(ad): def _options_cb(options): options.preview = True - ad = plot(job, align_after=False, options_cb=_options_cb, return_ad=True) + ad = plot(job, align_after=False, align_after_pause=False, options_cb=_options_cb, return_ad=True) update_stats(ad) while ad.errors.code == 1: # Paused programmatically - ad = resume_plot(job, align_after=False, options_cb=_options_cb, return_ad=True) + ad = resume_plot(job, align_after=False, align_after_pause=False, options_cb=_options_cb, return_ad=True) update_stats(ad) + job['status'] = 'waiting' # reset status (has been set to 'plotting' by plot()) + return stats -async def plot_async(*args): - return await asyncio.to_thread(plot, *args) +async def plot_async(*args, **kwargs): + return await asyncio.to_thread(plot, *args, **kwargs) -async def simulate_async(*args): - return await asyncio.to_thread(simulate, *args) +async def simulate_async(*args, **kwargs): + return await asyncio.to_thread(simulate, *args, **kwargs) async def align_async(): return await asyncio.to_thread(align) @@ -311,146 +532,259 @@ async def align_async(): async def cycle_async(): return await asyncio.to_thread(cycle) -async def resume_plot_async(*args): - return await asyncio.to_thread(resume_plot, *args) +async def resume_plot_async(*args, **kwargs): + return await asyncio.to_thread(resume_plot, *args, **kwargs) -async def resume_home_async(*args): - return await asyncio.to_thread(resume_home, *args) +async def resume_home_async(*args, **kwargs): + return await asyncio.to_thread(resume_home, *args, **kwargs) + +async def prompt_setup(message = 'Press \'Done\' when ready'): + while True: + res = await prompt_ui('setup', message) + res = res['id'] + if res == 'align': # Align + print('Aligning...') + await align_async() # -> prompt again + elif res == 'cycle': # Cycle + print('Cycling...') + await cycle_async() # -> prompt again + elif res == 'neg' : # Finish + return True + +async def prompt_waiting(message = 'Setup as needed'): + while True: + res = await prompt_ui('waiting', message) + if not res: + # print('prompt cancelled') + return False # the prompt was intentionally cancelled + + res = res['id'] + if res == 'align': # Align + print('Aligning...') + await align_async() # -> prompt again + elif res == 'cycle': # Cycle + print('Cycling...') + await cycle_async() # -> prompt again + +def cancel_prompt_waiting(): + cancel_prompt_ui() async def prompt_start_plot(message): - message += f' {KEY_START_PLOT[1]}, {KEY_ALIGN[1]}, {KEY_CYCLE[1]}, {KEY_CANCEL[1]} ?' while True: - res = await prompt.wait_for( [KEY_START_PLOT[0],KEY_ALIGN[0],KEY_CYCLE[0],KEY_CANCEL[0]], message, echo=True ) - if res == KEY_START_PLOT[0]: # Start Plot + res = await prompt_ui('start_plot', message) + if not res: return False # the prompt was intentionally cancelled -> Cancel plotting + + res = res['id'] + if res == 'pos': # Start Plot return True - elif res == KEY_ALIGN[0]: # Align + elif res == 'align': # Align print('Aligning...') await align_async() # -> prompt again - elif res == KEY_CYCLE[0]: # Cycle + elif res == 'cycle': # Cycle print('Cycling...') await cycle_async() # -> prompt again - elif res == KEY_CANCEL[0]: # Cancel + elif res == 'neg': # Cancel return False +async def prompt_plotting(message = ''): + return await prompt_ui('plotting', message); + async def prompt_repeat_plot(message): - message += f' {KEY_REPEAT[1]}, {KEY_ALIGN[1]}, {KEY_CYCLE[1]}, {KEY_DONE[1]} ?' while True: - res = await prompt.wait_for( [KEY_REPEAT[0],KEY_ALIGN[0],KEY_CYCLE[0],KEY_DONE[0]], message, echo=True ) - if res == KEY_REPEAT[0]: # Start Plot + res = await prompt_ui('repeat_plot', message) + res = res['id'] + if res == 'pos': # Start Plot return True - elif res == KEY_ALIGN[0]: # Align + elif res == 'align': # Align print('Aligning...') await align_async() # -> prompt again - elif res == KEY_CYCLE[0]: # Cycle + elif res == 'cycle': # Cycle print('Cycling...') await cycle_async() # -> prompt again - elif res == KEY_DONE[0]: # Finish + elif res == 'neg': # Done return False async def prompt_resume_plot(message, job): - message += f' {KEY_RESUME[1]}, {KEY_HOME[1]}, {KEY_ALIGN[1]}, {KEY_CYCLE[1]}, {KEY_RESTART_PLOT[1]}, {KEY_DONE[1]} ?' while True: - res = await prompt.wait_for( [KEY_RESUME[0],KEY_HOME[0],KEY_ALIGN[0],KEY_CYCLE[0],KEY_RESTART_PLOT[0],KEY_DONE[0]], message, echo=True ) - if res == KEY_RESUME[0]: # Resume Plot - return 'resume' - elif res == KEY_RESTART_PLOT[0]: # Restart plot - return 'restart' - elif res == KEY_HOME[0]: # Home + res = await prompt_ui('resume_plot', message) + res = res['id'] + + if res == 'pos': # Resume Plot + return True + elif res == 'home': # Home print('Returning home...') await resume_home_async(job) # -> prompt again - elif res == KEY_ALIGN[0]: # Align + elif res == 'align': # Align print('Aligning...') await align_async() # -> prompt again - elif res == KEY_CYCLE[0]: # Cycle + elif res == 'cycle': # Cycle print('Cycling...') await cycle_async() # -> prompt again - elif res == KEY_DONE[0]: # Finish + elif res == 'neg': # Done return False -async def prompt_setup(message = 'Setup Plotter:'): - message += f' {KEY_ALIGN[1]}, {KEY_CYCLE[1]}, {KEY_DONE[1]} ?' - while True: - res = await prompt.wait_for( [KEY_ALIGN[0],KEY_CYCLE[0],KEY_DONE[0]], message, echo=True ) - if res == KEY_ALIGN[0]: # Align - print('Aligning...') - await align_async() # -> prompt again - elif res == KEY_CYCLE[0]: # Cycle - print('Cycling...') - await cycle_async() # -> prompt again - elif res == KEY_DONE[0] : # Finish - return True +def svg_to_job(svg, filename = None): + NS = 'https://sketch.process.studio/turtle-graphics' + root = ElementTree.fromstring(svg) + + def attr(attr, ns = NS): + return root.get(attr if ns == None else "{" + ns + "}" + attr) + + received_ts = None + if filename != None: + match = re.search('\\d{8}_\\d{6}', os.path.basename(filename)) + if match != None: received_ts = match.group(0) + + job = { + 'loaded_from_file': True, + 'client': attr('author'), + 'id': "XYZ", + 'svg': svg, + 'stats': { + 'count': int(attr('count')), + 'layer_count': int(attr('layer_count')), + 'oob_count': int(attr('oob_count')), + 'short_count': int(attr('short_count')), + 'travel': int(attr('travel')), + 'travel_ink': int(attr('travel_ink')), + 'travel_blank': int(attr('travel_blank')) + }, + 'timestamp': attr('timestamp'), + 'speed': int(attr('speed')), + 'format': attr('format'), + 'size': [int(attr('width_mm')), int(attr('height_mm'))], + 'hash': hashlib.sha1(svg.encode('utf-8')).hexdigest(), + 'received': received_ts, + 'save_path': filename, + } + + return job + +async def resume_queue_from_disk(): + import xml.etree.ElementTree as ElementTree + try: + list = sorted(os.listdir(STATUS_FOLDERS['waiting'])) + list = [ os.path.join(STATUS_FOLDERS['waiting'], x) for x in list if x.endswith('.svg') ] + except FileNotFoundError: + list = [] + + resumable_jobs = [] + for filename in list: + # print('Loading ', filename) + try: + with open(filename, 'r') as file: svg = file.read() + job = svg_to_job(svg, filename) + resumable_jobs.append(job) + except: + print('Error resuming ', filename) + + if len(resumable_jobs) > 0: print(f"Resuming {len(resumable_jobs)} jobs...") + else: print("No jobs to resume") + for job in resumable_jobs: + await enqueue(job) -async def start(_prompt, print_status): - global current_job +def set_status(status): + global _status + _status = status + print_status() + +async def start(app): + global _current_job global _status + global print - global prompt - prompt = _prompt # make this global - print = prompt.print # replace global print function + print = app.print + + global tprint + tprint = app.tprint + + global prompt_ui, cancel_prompt_ui + prompt_ui = app.prompt_ui + cancel_prompt_ui = app.cancel_prompt_ui - if TESTING: print(f'{COL.YELLOW}TESTING MODE enabled{COL.OFF}') + global print_status + print_status = app.update_header + + if TESTING: print('[yellow]TESTING MODE enabled') + if RESUME_QUEUE: await resume_queue_from_disk() await align_async() - await prompt_setup() - _status = 'waiting' + # await prompt_setup() while True: # get the next job from the queue, waits until a job becomes available if queue.empty(): - print_status() - current_job = await queue.get() - if not current_job['cancel']: # skip if job is canceled - _status = 'confirm_plot' - print_status() - ready = await prompt_start_plot(f'Ready to plot job {job_str(current_job)}?') + set_status('waiting') + asyncio.create_task( prompt_waiting() ) # this allows align/cycle + _current_job = await queue.get() + cancel_prompt_waiting() + update_positions_and_save() + + if not _current_job['cancel']: # skip if job is canceled + set_status('confirm_plot') + ready = await prompt_start_plot(f'[green]Ready to plot[/green] job \\[{_current_job["client"]}] ?') if not ready: - await cancel(current_job['client'], force = True) - _status = 'waiting' + await cancel_current_job() + set_status('waiting') + _current_job = None continue # skip over rest of the loop # plot (and retry on error or repeat) loop = 0 # number or tries/repetitions + layer = 0 # number of programmatic pauses (error 1) + interrupt = 0 # number of stops by button press (error 102) or keyboard interrupt (103) resume = False # flag indicating resume (vs. plotting from start) while True: - if (resume): - print(f'🖨️ {COL.YELLOW}Resuming job [{current_job["client"]}] ...{COL.OFF}') - _status = 'plotting' - error = await resume_plot_async(current_job) + set_status('plotting') + await prompt_plotting(f'\\[{_current_job["client"]}]') # this returns immediately + if (resume == 'skip_to_repeat'): + error = 0 + elif resume: + print(f'🖨️ [yellow]Resuming job \\[{_current_job["client"]}] ...') + error = await resume_plot_async(_current_job) else: loop += 1 - print(f'🖨️ {COL.YELLOW}Plotting job [{current_job["client"]}] ...{COL.OFF}') - _status = 'plotting' + print(f'🖨️ [yellow]Plotting job \\[{_current_job["client"]}] ...') await _notify_queue_positions() # notify plotting - error = await plot_async(current_job) + error = await plot_async(_current_job) resume = False # No error if error == 0: if REPEAT_JOBS: - print(f'{COL.BLUE}Done ({loop}x) job [{current_job["client"]}]{COL.OFF}') - _status = 'confirm_plot' - repeat = await prompt_repeat_plot(f'{COL.BLUE}Repeat{COL.OFF} job [{current_job["client"]}] ?') + print(f'[blue]Done ({loop}x) job \\[{_current_job["client"]}]') + set_status('confirm_plot') + layer = 0 + interrupt = 0 + repeat = await prompt_repeat_plot(f'[yellow]Repeat ({loop+1}) job[/yellow] \\[{_current_job["client"]}] ?') if repeat: continue await finish_current_job() break # Paused programmatically (1), Stopped by pause button press (102) or Stopped by keyboard interrupt (103) - elif error in [1, 102, 103]: - print(f'{COL.YELLOW}Plotter: {get_error_msg(error)}{COL.OFF}') - _status = 'confirm_plot' - ready = await prompt_resume_plot(f'{COL.YELLOW}Resume{COL.OFF} job [{current_job["client"]}] ?', current_job) - if not ready: - await finish_current_job() - break - if ready == 'resume': resume = True + elif error in PLOTTER_PAUSED: + print(f'[yellow]Plotter: {get_error_msg(error)}') + set_status('paused') + if error in [1]: + layer += 1 + prompt = f"[blue]Continue layer ({layer+1}/{_current_job['layers']})[/blue]" + elif error in [102, 103]: + interrupt += 1 + prompt = f"[blue]Continue ({interrupt+1}) interrupted job[/blue]" + if _current_job['layers'] > 1: prompt += f" layer ({layer+1}/{_current_job['layers']})" + ready = await prompt_resume_plot(f'{prompt} \\[{_current_job["client"]}] ?', _current_job) + if ready: resume = True + else: + resume = 'skip_to_repeat' # Skip to asking to repeat job + _current_job['status'] = 'ok' # set status to 'successfully printed' # Errors else: - print(f'{COL.RED}Plotter: {get_error_msg(error)}{COL.OFF}') - _status = 'confirm_plot' - ready = await prompt_start_plot(f'{COL.RED}Retry{COL.OFF} job [{current_job["client"]}] ?') + print(f'[red]Plotter: {get_error_msg(error)}') + set_status('confirm_plot') + ready = await prompt_start_plot(f'[red]Retry job \\[{_current_job["client"]}] ?') if not ready: - await cancel(current_job['client'], force = True) + await cancel(_current_job['client'], force = True) break - - _status = 'waiting' - current_job = None + + set_status('waiting') + _current_job = None diff --git a/spooler_old.py b/spooler_old.py new file mode 100644 index 0000000..4ff49e1 --- /dev/null +++ b/spooler_old.py @@ -0,0 +1,528 @@ +import asyncio +from pyaxidraw import axidraw +from tty_colors import COL +from datetime import datetime, timezone +import math +import os +from capture_output import capture_output +import re +import hashlib + +FOLDER_WAITING ='svgs/0_waiting' +FOLDER_CANCELED ='svgs/1_canceled' +FOLDER_FINISHED ='svgs/2_finished' +PEN_POS_UP = 60 # Default: 60 +PEN_POS_DOWN = 40 # Default: 40 +MIN_SPEED = 10 # percent + +KEY_DONE = [ 'd', '(D)one' ] +KEY_REPEAT = [ 'r', '(R)epeat' ] +KEY_START_PLOT = [ 'p', '(P)lot' ] +KEY_RESTART_PLOT = [ 'p', '(P)lot from start' ] +KEY_ALIGN = [ 'a', '(A)lign' ] +KEY_CYCLE = [ 'c', '(C)ycle' ] +KEY_CANCEL = [ chr(27), '(Esc) Cancel Job' ] +KEY_RESUME = [ 'r', '(R)esume' ] +KEY_HOME = [ 'h', '(H)ome' ] + +TESTING = False # Don't actually connect to AxiDraw, just simulate plotting +REPEAT_JOBS = True # Ask to repeat a plot after a sucessful print +RESUME_QUEUE = True # Resume plotting queue after quitting/restarting +ALIGN_AFTER = True # Align plotter after success or error +ALIGN_AFTER_PAUSE = False # Align plotter after pause (programmatic, stop button, keyboard interrupt) + +queue_size_cb = None +queue = asyncio.Queue() # an async FIFO queue +jobs = {} # an index to all unfinished jobs by client id (in queue or current_job) (insertion order is preserved in dict since python 3.7) +current_job = None +_status = 'setup' # setup | waiting | confirm_plot | plotting + +async def callback(fn, *args): + if callable(fn): + await fn(*args) + +async def _notify_queue_positions(): + cbs = [] + for i, client in enumerate(jobs): + job = jobs[client] + if i == 0 and _status == 'plotting': i = -1 + if 'position_notified' not in job or job['position_notified'] != i: + job['position_notified'] = i + cbs.append( callback(job['queue_position_cb'], i, job) ) + await asyncio.gather(*cbs) # run callbacks concurrently + +async def _notify_queue_size(): + await callback(queue_size_cb, len(jobs)) + +def set_queue_size_cb(cb): + global queue_size_cb + queue_size_cb = cb + +def queue_size(): + return len(jobs) + +def status(): + return { + 'status': _status, + 'job': current_job['client'] if current_job != None else None, + 'job_str': job_str(current_job) if current_job != None else None, + 'queue_size': queue_size(), + } + +def timestamp(date = None): + if date == None: + # make timezone aware timestamp: https://stackoverflow.com/a/39079819 + date = datetime.now(timezone.utc) + date = date.replace(tzinfo=date.astimezone().tzinfo) + return date.strftime("%Y%m%d_%H%M%S.%f_UTC%z") + +# status: 'waiting' | 'canceled' | 'finished' +def save_svg(job, status): + if status not in ['waiting', 'canceled', 'finished']: + return False + filename = f'{job["received"]}_{job["client"][0:10]}_{job["hash"][0:5]}.svg' + files = { + 'waiting': os.path.join(FOLDER_WAITING, filename), + 'canceled': os.path.join(FOLDER_CANCELED, filename), + 'finished': os.path.join(FOLDER_FINISHED, filename), + } + for key, file in files.items(): + if key == status: + os.makedirs( os.path.dirname(file), exist_ok=True) + with open(file, 'w', encoding='utf-8') as f: f.write(job['svg']) + else: + try: + os.remove(file) + except: + pass + return True + +def save_svg_async(*args, **kwargs): + return asyncio.to_thread(save_svg, *args, **kwargs) + + +# Updated pre version 4 SVGs, so they are compatible with resume queue +def update_svg(job): + match = re.search('tg:version="(\\d+)"', job['svg']) + if match != None and int(match.group(1)) >= 4: return + + MARKER = 'xmlns:tg="https://sketch.process.studio/turtle-graphics"' + idx = job['svg'].find(MARKER) + if idx == -1: return + idx += len(MARKER) + insert = f'\n tg:version="4" tg:layer_count="1" tg:oob_count="{job['stats']['oob_count']}" tg:short_count="{job['stats']['short_count']}" tg:format="{job['format']}" tg:width_mm="{job['size'][0]}" tg:height_mm="{job['size'][1]}" tg:speed="{job['speed']}" tg:author="{job['client']}" tg:timestamp="{job['timestamp']}"' + + job['svg'] = job['svg'][:idx] + insert + job['svg'][idx:] + job['hash'] = hashlib.sha1(job['svg'].encode('utf-8')).hexdigest() + +# job {'type': 'plot, 'client', 'id', 'svg', stats, timestamp, hash, speed, format, size, received?} +# adds to job: { 'cancel', time_estimate', 'layers', received } +# todo: don't wait on callbacks +async def enqueue(job, queue_position_cb = None, done_cb = None, cancel_cb = None, error_cb = None): + # the client might be in queue (or currently plotting) + if job['client'] in jobs: + await callback( error_cb, 'Cannot add job, you already have a job queued!', job ) + return False + + job['cancel'] = False + # save callbacks + job['queue_position_cb'] = queue_position_cb + job['done_cb'] = done_cb + job['cancel_cb'] = cancel_cb + job['error_cb'] = error_cb + if 'received' not in job or job['received'] == None: + job['received'] = timestamp() + + # speed + if 'speed' in job: job['speed'] = max( min(job['speed'], 100), MIN_SPEED ) # limit speed (MIN_SPEED, 100) + else: job['speed'] = 100 + # format + if 'format' not in job: job['format'] = 'A3_LANDSCAPE' + + # add to jobs index + jobs[ job['client'] ] = job + await _notify_queue_size() # notify new queue size + await _notify_queue_positions() + print(f'New job [{job["client"]}] {job["hash"][0:5]}') + sim = await simulate_async(job) # run simulation + job['time_estimate'] = sim['time_estimate'] + job['layers'] = sim['layers'] + + update_svg(job) + await queue.put(job) + await save_svg_async(job, 'waiting') + return True + +async def cancel(client, force = False): + if not force: + if current_job != None and current_job['client'] == client: + await callback( current_job['error_cb'], 'Cannot cancel, already plotting!', current_job ) + return False + + if client not in jobs: return False + job = jobs[client] + job['cancel'] = True # set cancel flag + del jobs[client] # remove from index + + await callback( job['cancel_cb'], job ) # notify canceled job + await _notify_queue_size() # notify new queue size + await _notify_queue_positions() # notify queue positions (might have changed for some) + print(f'❌ {COL.RED}Canceled job [{job["client"]}]{COL.OFF}') + await save_svg_async(job, 'canceled') + return True + +async def finish_current_job(): + await callback( current_job['done_cb'], current_job ) # notify job done + del jobs[ current_job['client'] ] # remove from jobs index + await _notify_queue_positions() # notify queue positions. current job is 0 + await _notify_queue_size() # notify queue size + print(f'✅ {COL.GREEN}Finished job [{current_job["client"]}]{COL.OFF}') + _status = 'waiting' + await save_svg_async(current_job, 'finished') + return True + +def job_str(job): + info = '[' + str(job["client"])[0:10] + '] ' + job['hash'][0:5] + speed_and_format = f'{job["speed"]}%, {job["format"]}, {math.floor(job["time_estimate"]/60)}:{round(job["time_estimate"]%60):02} min' + if 'stats' in job: + stats = job['stats'] + layers = f'{job["layers"]} layers, ' if 'layers' in job and job['layers'] > 1 else '' + if 'count' in stats and 'travel' in stats and 'travel_ink' in stats: + info += f' ({stats["count"]} lines, {layers}{int(stats["travel_ink"])}/{int(stats["travel"])} mm, {speed_and_format})' + else: + info += f' ({speed_and_format})' + return info + + +# Return codes +PLOTTER_ERRORS = { + 0: 'No error; operation nominal', + 1: 'Paused programmatically', + 101: 'Failed to connect', + 102: 'Stopped by pause button press', + 103: 'Stopped by keyboard interrupt', + 104: 'Lost USB connectivity' +} +PLOTTER_PAUSED = [ 1, 102, 103 ]; + +def get_error_msg(code): + if code in PLOTTER_ERRORS: + return PLOTTER_ERRORS[code] + else: + return f'Unkown error (Code {code})' + +def print_axidraw(*args): + out = ' '.join(args) + lines = out.split('\n') + for line in lines: + print(f"{COL.GREY}[AxiDraw] " + line + COL.OFF) + +# Raise pen and disable XY stepper motors +def align(): + with capture_output(print_axidraw, print_axidraw): + ad = axidraw.AxiDraw() + ad.plot_setup() + ad.options.mode = 'align' # A setup mode: Raise pen, disable XY stepper motors + ad.options.pen_pos_up = PEN_POS_UP + ad.options.pen_pos_down = PEN_POS_DOWN + if TESTING: ad.options.preview = True + ad.plot_run() + return ad.errors.code + +# Cycle the pen down and back up +def cycle(): + with capture_output(print_axidraw, print_axidraw): + ad = axidraw.AxiDraw() + ad.plot_setup() + ad.options.mode = 'cycle' # A setup mode: Lower and then raise the pen + ad.options.pen_pos_up = PEN_POS_UP + ad.options.pen_pos_down = PEN_POS_DOWN + if TESTING: ad.options.preview = True + ad.plot_run() + return ad.errors.code + +def plot(job, align_after = ALIGN_AFTER, align_after_pause = ALIGN_AFTER_PAUSE, options_cb = None, return_ad = False): + if 'svg' not in job: return 0 + speed = job['speed'] / 100 + with capture_output(print_axidraw, print_axidraw): + ad = axidraw.AxiDraw() + ad.plot_setup(job['svg']) + ad.options.model = 2 # A3 + ad.options.reordering = 4 # No reordering + ad.options.auto_rotate = True # (This is the default) Drawings that are taller than wide will be rotated 90 deg to the left + ad.options.speed_pendown = int(110 * speed) + ad.options.speed_penup = int(110 * speed) + ad.options.accel = int(100 * speed) + ad.options.pen_rate_lower = int(100 * speed) + ad.options.pen_rate_raise = int(100 * speed) + ad.options.pen_pos_up = PEN_POS_UP + ad.options.pen_pos_down = PEN_POS_DOWN + if callable(options_cb): options_cb(ad.options) + if TESTING: ad.options.preview = True + job['output_svg'] = ad.plot_run(output=True) + if (ad.errors.code in PLOTTER_PAUSED and align_after_pause) or \ + (ad.errors.code not in PLOTTER_PAUSED and align_after): + align() + if return_ad: return ad + else: return ad.errors.code + +def resume_home(job, align_after = ALIGN_AFTER, align_after_pause = ALIGN_AFTER_PAUSE, options_cb = None, return_ad = False): + if 'output_svg' not in job: return 0 + orig_svg = job['svg'] # save original svg + job['svg'] = job['output_svg'] # set last output svg as input + + def _options_cb(options): + if callable(options_cb): options_cb(options) + options.mode = 'res_home' + + res = plot(job, align_after, align_after_pause, _options_cb, return_ad) + job['svg'] = orig_svg # restore original svg + return res + +def resume_plot(job, align_after = ALIGN_AFTER, align_after_pause = ALIGN_AFTER_PAUSE, options_cb = None, return_ad = False): + if 'output_svg' not in job: return 0 + orig_svg = job['svg'] # save original svg + job['svg'] = job['output_svg'] # set last output svg as input + + def _options_cb(options): + if callable(options_cb): options_cb(options) + options.mode = 'res_plot' + + res = plot(job, align_after, align_after_pause, _options_cb, return_ad) + job['svg'] = orig_svg # restore original svg + return res + +def simulate(job): + if 'svg' not in job: return 0 + speed = job['speed'] / 100 + + stats = { + 'error_code': None, + 'time_estimate': 0, + 'distance_total': 0, + 'distance_pendown': 0, + 'pen_lifts': 0, + 'layers': 0 + } + + def update_stats(ad): + stats['error_code'] = ad.errors.code + stats['time_estimate'] += ad.time_estimate + stats['distance_total'] += ad.distance_total + stats['distance_pendown'] += ad.distance_pendown + stats['pen_lifts'] += ad.pen_lifts + stats['layers'] += 1 + + def _options_cb(options): + options.preview = True + + ad = plot(job, align_after=False, align_after_pause=False, options_cb=_options_cb, return_ad=True) + update_stats(ad) + + while ad.errors.code == 1: # Paused programmatically + ad = resume_plot(job, align_after=False, align_after_pause=False, options_cb=_options_cb, return_ad=True) + update_stats(ad) + + return stats + + +async def plot_async(*args, **kwargs): + return await asyncio.to_thread(plot, *args, **kwargs) + +async def simulate_async(*args, **kwargs): + return await asyncio.to_thread(simulate, *args, **kwargs) + +async def align_async(): + return await asyncio.to_thread(align) + +async def cycle_async(): + return await asyncio.to_thread(cycle) + +async def resume_plot_async(*args, **kwargs): + return await asyncio.to_thread(resume_plot, *args, **kwargs) + +async def resume_home_async(*args, **kwargs): + return await asyncio.to_thread(resume_home, *args, **kwargs) + +async def prompt_start_plot(message): + message += f' {KEY_START_PLOT[1]}, {KEY_ALIGN[1]}, {KEY_CYCLE[1]}, {KEY_CANCEL[1]} ?' + while True: + res = await prompt.wait_for( [KEY_START_PLOT[0],KEY_ALIGN[0],KEY_CYCLE[0],KEY_CANCEL[0]], message, echo=True ) + if res == KEY_START_PLOT[0]: # Start Plot + return True + elif res == KEY_ALIGN[0]: # Align + print('Aligning...') + await align_async() # -> prompt again + elif res == KEY_CYCLE[0]: # Cycle + print('Cycling...') + await cycle_async() # -> prompt again + elif res == KEY_CANCEL[0]: # Cancel + return False + +async def prompt_repeat_plot(message): + message += f' {KEY_REPEAT[1]}, {KEY_ALIGN[1]}, {KEY_CYCLE[1]}, {KEY_DONE[1]} ?' + while True: + res = await prompt.wait_for( [KEY_REPEAT[0],KEY_ALIGN[0],KEY_CYCLE[0],KEY_DONE[0]], message, echo=True ) + if res == KEY_REPEAT[0]: # Start Plot + return True + elif res == KEY_ALIGN[0]: # Align + print('Aligning...') + await align_async() # -> prompt again + elif res == KEY_CYCLE[0]: # Cycle + print('Cycling...') + await cycle_async() # -> prompt again + elif res == KEY_DONE[0]: # Finish + return False + +async def prompt_resume_plot(message, job): + message += f' {KEY_RESUME[1]}, {KEY_HOME[1]}, {KEY_ALIGN[1]}, {KEY_CYCLE[1]}, {KEY_RESTART_PLOT[1]}, {KEY_DONE[1]} ?' + while True: + res = await prompt.wait_for( [KEY_RESUME[0],KEY_HOME[0],KEY_ALIGN[0],KEY_CYCLE[0],KEY_RESTART_PLOT[0],KEY_DONE[0]], message, echo=True ) + if res == KEY_RESUME[0]: # Resume Plot + return 'resume' + elif res == KEY_RESTART_PLOT[0]: # Restart plot + return 'restart' + elif res == KEY_HOME[0]: # Home + print('Returning home...') + await resume_home_async(job) # -> prompt again + elif res == KEY_ALIGN[0]: # Align + print('Aligning...') + await align_async() # -> prompt again + elif res == KEY_CYCLE[0]: # Cycle + print('Cycling...') + await cycle_async() # -> prompt again + elif res == KEY_DONE[0]: # Finish + return False + +async def prompt_setup(message = 'Setup Plotter:'): + message += f' {KEY_ALIGN[1]}, {KEY_CYCLE[1]}, {KEY_DONE[1]} ?' + while True: + res = await prompt.wait_for( [KEY_ALIGN[0],KEY_CYCLE[0],KEY_DONE[0]], message, echo=True ) + if res == KEY_ALIGN[0]: # Align + print('Aligning...') + await align_async() # -> prompt again + elif res == KEY_CYCLE[0]: # Cycle + print('Cycling...') + await cycle_async() # -> prompt again + elif res == KEY_DONE[0] : # Finish + return True + +async def resume_queue(): + import xml.etree.ElementTree as ElementTree + list = sorted(os.listdir(FOLDER_WAITING)) + list = [ os.path.join(FOLDER_WAITING, x) for x in list if x.endswith('.svg') ] + resumable_jobs = [] + for filename in list: + # print('Loading ', filename) + try: + with open(filename, 'r') as file: + svg = file.read() + root = ElementTree.fromstring(svg) + def attr(attr, ns = 'https://sketch.process.studio/turtle-graphics'): + return root.get(attr if ns == None else "{" + ns + "}" + attr) + match = re.search('\\d{8}_\\d{6}.\\d{6}_UTC[+-]\\d{4}', os.path.basename(filename)) + received_ts = None if match == None else match.group(0) + job = { + 'loaded_from_file': True, + 'client': attr('author'), + 'id': "XYZ", + 'svg': svg, + 'stats': { + 'count': int(attr('count')), + 'layer_count': int(attr('layer_count')), + 'oob_count': int(attr('oob_count')), + 'short_count': int(attr('short_count')), + 'travel': int(attr('travel')), + 'travel_ink': int(attr('travel_ink')), + 'travel_blank': int(attr('travel_blank')) + }, + 'timestamp': attr('timestamp'), + 'speed': int(attr('speed')), + 'format': attr('format'), + 'size': [int(attr('width_mm')), int(attr('height_mm'))], + 'hash': hashlib.sha1(svg.encode('utf-8')).hexdigest(), + 'received': received_ts + } + resumable_jobs.append(job) + except: + print('Error resuming ', filename) + + if len(resumable_jobs) > 0: print(f"Resuming {len(resumable_jobs)} jobs...") + else: print("No jobs to resume") + for job in resumable_jobs: + await enqueue(job) + + +async def start(_prompt, print_status): + global current_job + global _status + global print + global prompt + prompt = _prompt # make this global + print = prompt.print # replace global print function + + if TESTING: print(f'{COL.YELLOW}TESTING MODE enabled{COL.OFF}') + if RESUME_QUEUE: await resume_queue() + + await align_async() + await prompt_setup() + _status = 'waiting' + + while True: + # get the next job from the queue, waits until a job becomes available + if queue.empty(): + print_status() + current_job = await queue.get() + if not current_job['cancel']: # skip if job is canceled + _status = 'confirm_plot' + print_status() + ready = await prompt_start_plot(f'Ready to plot job {job_str(current_job)}?') + if not ready: + await cancel(current_job['client'], force = True) + _status = 'waiting' + continue # skip over rest of the loop + + # plot (and retry on error or repeat) + loop = 0 # number or tries/repetitions + resume = False # flag indicating resume (vs. plotting from start) + while True: + if (resume): + print(f'🖨️ {COL.YELLOW}Resuming job [{current_job["client"]}] ...{COL.OFF}') + _status = 'plotting' + error = await resume_plot_async(current_job) + else: + loop += 1 + print(f'🖨️ {COL.YELLOW}Plotting job [{current_job["client"]}] ...{COL.OFF}') + _status = 'plotting' + await _notify_queue_positions() # notify plotting + error = await plot_async(current_job) + resume = False + # No error + if error == 0: + if REPEAT_JOBS: + print(f'{COL.BLUE}Done ({loop}x) job [{current_job["client"]}]{COL.OFF}') + _status = 'confirm_plot' + repeat = await prompt_repeat_plot(f'{COL.BLUE}Repeat{COL.OFF} job [{current_job["client"]}] ?') + if repeat: continue + await finish_current_job() + break + # Paused programmatically (1), Stopped by pause button press (102) or Stopped by keyboard interrupt (103) + elif error in PLOTTER_PAUSED: + print(f'{COL.YELLOW}Plotter: {get_error_msg(error)}{COL.OFF}') + _status = 'confirm_plot' + ready = await prompt_resume_plot(f'{COL.YELLOW}Resume{COL.OFF} job [{current_job["client"]}] ?', current_job) + if not ready: + await finish_current_job() + break + if ready == 'resume': resume = True + # Errors + else: + print(f'{COL.RED}Plotter: {get_error_msg(error)}{COL.OFF}') + _status = 'confirm_plot' + ready = await prompt_start_plot(f'{COL.RED}Retry{COL.OFF} job [{current_job["client"]}] ?') + if not ready: + await cancel(current_job['client'], force = True) + break + + _status = 'waiting' + current_job = None diff --git a/start b/start index 3c8311e..871caeb 100755 --- a/start +++ b/start @@ -11,17 +11,35 @@ quit() { trap quit EXIT usage() { - echo "Usage: $(basename $0) [-f <0|1>] [-n <0|1>]" - echo " -f ... Start frpc (Default 1)" - echo " -n ... Start ngrok (Default 0)" + echo "Usage: $(basename $0) [-d] [-f <0|1>] [-n <0|1>]" + echo " -h ... Show this help" + echo " -d ... Dev Mode" + echo " -c ... Console (Run in second terminal to see output from Dev Mode)" + echo " -f ... Start frpc (Default 1 except when using -d)" + echo " -n ... Start ngrok (Default 0)" exit 1 } +dev=false start_frpc=true start_ngrok=false -while getopts ":f:n:h" opts; do +# A letter followed by a colon means the switch takes an argument +while getopts "hcdf:n:" opts; do case "${opts}" in + h) + usage + exit 1 + ;; + c) + textual console -x event -x debug + exit 0 + ;; + d) + dev=true + start_frpc=true + start_ngrok=false + ;; f) f=${OPTARG} [[ $f -eq 0 || $f -eq 1 ]] || usage @@ -32,14 +50,14 @@ while getopts ":f:n:h" opts; do [[ $n -eq 0 || $n -eq 1 ]] || usage [[ $n -eq 1 || -z $n ]] && start_ngrok=true || start_ngrok=false ;; - h) - usage - ;; *) usage + exit 1 ;; esac done + +# remove switches from $@ shift $((OPTIND-1)) if $start_frpc; then @@ -56,5 +74,15 @@ if $start_ngrok; then ngrok_pid=$! fi +# Try to activate venv +if [[ -f ./venv/bin/activate ]]; then + echo "Activating virtual env..." + source ./venv/bin/activate +fi + # run plotter-server -python main.py +if $dev; then + textual run --dev main.py +else + python main.py +fi diff --git a/test_job.py b/test_job.py new file mode 100644 index 0000000..1f27904 --- /dev/null +++ b/test_job.py @@ -0,0 +1,83 @@ +test_square = ''' + + + + + + +''' + +test_wide = ''' + + + + + + +''' + +test_high = ''' + + + + + + +''' + +test_layers = ''' + + + + + + + + + + + + +''' + +from spooler import svg_to_job +import uuid + +def test_job(name = 'layers'): + svg = globals()['test_' + name] + client = 'Test-' + uuid.uuid4().hex[:3] + svg = svg.replace('tg:author=""', f'tg:author="{client}"') + job = svg_to_job(svg) + return job