diff --git a/async_interval.py b/async_interval.py
new file mode 100644
index 0000000..8f9cfcf
--- /dev/null
+++ b/async_interval.py
@@ -0,0 +1,66 @@
+import asyncio
+import types
+
+# Calls fn in intervals given by the interval parameter (in seconds)
+# fn is called with an argument giving the elapsed time since start
+# Returns a Task object
+
+def async_interval(fn, interval, end = 0):
+ async def run(fn, interval):
+ try:
+ start = loop.time()
+ while True:
+ await asyncio.sleep(interval)
+ fn( loop.time() - start )
+ except asyncio.CancelledError:
+ pass
+
+ def stop():
+ try: task.cancel()
+ except asyncio.CancelledError: pass
+
+ loop = asyncio.get_running_loop()
+ task = asyncio.create_task( run(fn, interval) )
+ if (end > 0): loop.call_later(end, stop)
+ return task
+
+if __name__ == '__main__':
+ import unittest
+
+ class Test(unittest.IsolatedAsyncioTestCase):
+ async def test_cancel(self):
+ def empty(): pass
+ i = async_interval(empty, 1)
+ await asyncio.sleep(0) # need to wait, otherwise CancelledError is raised anyway
+ i.cancel()
+ await i # wait for interval completion
+ self.assertEqual(i.done(), True)
+ # cancelled() is False since the wrapped coroutine doesn't propagate the CancelledError
+ self.assertEqual(i.cancelled(), False)
+
+ async def test_interval(self):
+ times = []
+ def fn(time): times.append(time)
+
+ i = async_interval(fn, 1/4)
+ await asyncio.sleep(1)
+ i.cancel()
+ await i
+ # print(times)
+ self.assertEqual(len(times), 3)
+ self.assertAlmostEqual(times[0], 0.25, places=2)
+ self.assertAlmostEqual(times[1], 0.50, places=2)
+ self.assertAlmostEqual(times[2], 0.75, places=2)
+
+ async def test_end(self):
+ times = []
+ def fn(time): times.append(time)
+
+ i = async_interval(fn, 1/4, 1)
+ await i
+ self.assertEqual(len(times), 3)
+ self.assertAlmostEqual(times[0], 0.25, places=2)
+ self.assertAlmostEqual(times[1], 0.50, places=2)
+ self.assertAlmostEqual(times[2], 0.75, places=2)
+
+ unittest.main()
\ No newline at end of file
diff --git a/async_prompt.py b/async_prompt.py
deleted file mode 100644
index 78f2a68..0000000
--- a/async_prompt.py
+++ /dev/null
@@ -1,71 +0,0 @@
-import asyncio
-import sys
-import termios
-import tty
-import builtins
-import io
-
-# async prompt for a single keystroke
-class AsyncPrompt:
- def __init__(self):
- self.echo = True
- self.echo_end = '\n'
- self.waiting_for_input = False
- self.queue = asyncio.Queue()
- asyncio.get_running_loop().add_reader(sys.stdin, self.on_input)
-
- def __del__(self):
- # restore tty if object goes away
- if self.waiting_for_input:
- self.tty_restore()
-
- def tty_input(self):
- fd = sys.stdin.fileno()
- self.original_ttyattrs = termios.tcgetattr(fd) # save ttyattrs
- tty.setraw(fd) # set raw input mode on tty
-
- def tty_restore(self):
- fd = sys.stdin.fileno()
- termios.tcsetattr(fd, termios.TCSADRAIN, self.original_ttyattrs)
-
- def on_input(self):
- if self.waiting_for_input:
- key = sys.stdin.read(1)
- sys.stdin.seek(0, io.SEEK_END) # disard rest of input (by seeking to end of stream)
- self.tty_restore()
- if ord(key) == 3: # catch Control-C
- raise KeyboardInterrupt()
- if self.echo:
- if ord(key) == 27:
- print('^[', end=self.echo_end) # Don't echo ESC as it is, this would start an escape sequence in the terminal
- else:
- print(key, end=self.echo_end) # echo the input character (with newline)
- self.queue.put_nowait(key)
- self.waiting_for_input = False
- else:
- sys.stdin.readline() # discard input
-
- async def prompt(self, message = '? ', echo = True, echo_end = '\n'):
- self.echo = echo
- # print prompt
- print(message, end = '', flush=True)
- self.tty_input()
- # set flag to capture input
- self.waiting_for_input = True
- # wait until input is received
- return await self.queue.get()
-
- async def wait_for(self, chars, message = '? ', echo = True, echo_end = '\n'):
- res = None
- while res not in chars:
- res = await self.prompt(message, echo)
- return res
-
- def print(self, *objects, sep=' ', end='\n', file=None, flush=False):
- if self.waiting_for_input:
- self.tty_restore()
- builtins.print() # newline
- builtins.print(*objects, sep=sep, end=end, file=file, flush=flush)
- self.tty_input()
- else:
- builtins.print(*objects, sep=sep, end=end, file=file, flush=flush)
\ No newline at end of file
diff --git a/async_queue.py b/async_queue.py
new file mode 100644
index 0000000..baa9526
--- /dev/null
+++ b/async_queue.py
@@ -0,0 +1,319 @@
+import asyncio
+
+# Like asyncio.Queue with support for reordering and removing elements
+class Queue(asyncio.Queue):
+ def __init__(self, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+ self.order = []
+
+ def _rebuild(self):
+ # remove all items
+ try:
+ while True: super().get_nowait()
+ except asyncio.QueueEmpty:
+ pass
+ # add items in new order
+ for item in self.order:
+ try:
+ super().put_nowait(item)
+ except asyncio.QueueFull:
+ pass
+
+ # get the current queue as list
+ def list(self):
+ return self.order.copy()
+
+ # swap two items by index; supports negative indices
+ def swap(self, idx1, idx2):
+ if idx1 < -len(self.order) or idx1 > len(self.order)-1:
+ raise IndexError('index 1 out of bounds')
+ if idx2 < -len(self.order) or idx2 > len(self.order)-1:
+ raise IndexError('index 2 out of bounds')
+ if (idx1 == idx2): return
+ self.order[idx1], self.order[idx2] = self.order[idx2], self.order[idx1]
+ self._rebuild()
+
+ # move an item to new position in queue
+ def move(self, idx, new_idx):
+ if idx < -len(self.order) or idx > len(self.order)-1:
+ raise IndexError('index out of bounds')
+ if new_idx < -len(self.order) or new_idx > len(self.order)-1:
+ raise IndexError('target index out of bounds')
+
+ # normalize negative indices
+ if idx < 0: idx = len(self.order) + idx
+ if new_idx < 0: new_idx = len(self.order) + new_idx
+
+ if (idx == new_idx): return
+ item = self.order.pop(idx)
+ self.order.insert(new_idx, item)
+ self._rebuild()
+
+ def index(self, *args):
+ return self.order.index(*args)
+
+ def __iter__(self):
+ return self.order.copy().__iter__()
+
+ # remove an item from the queue; supports negative indices
+ def pop(self, idx = -1):
+ if idx < -len(self.order) or idx > len(self.order)-1:
+ raise IndexError('index out of bounds')
+ item = self.order.pop(idx)
+ # print('remove item', item)
+ self._rebuild()
+ return item
+
+ # insert an item at an arbitrary position into the queue
+ def insert(self, idx, item):
+ if idx < -len(self.order) or idx > len(self.order):
+ raise IndexError('index out of bounds')
+ self.order.insert(idx, item)
+ self._rebuild()
+
+ def put_nowait(self, item):
+ # print('put_nowait')
+ super().put_nowait(item)
+ self.order.append(item)
+
+ def get_nowait(self):
+ # print('get_nowait')
+ item = super().get_nowait()
+ self.order.pop(0)
+ return item
+
+ # no need to implement get() and put()
+ # as these are implemented in terms of get_nowait() and put_nowait()
+
+
+if __name__ == '__main__':
+ import unittest
+
+ class Test(unittest.IsolatedAsyncioTestCase):
+ def get_all(self, q):
+ out = []
+ while not q.empty():
+ out.append( q.get_nowait() )
+ return out
+
+ async def test_put(self):
+ q = Queue()
+ await q.put('one')
+ await q.put('two')
+ await q.put('three')
+ self.assertEqual(q.list(), ['one', 'two', 'three'])
+ self.assertEqual( self.get_all(q), ['one', 'two', 'three'])
+
+ async def test_put_nowait(self):
+ q = Queue()
+ q.put_nowait('one')
+ q.put_nowait('two')
+ q.put_nowait('three')
+ self.assertEqual(q.list(), ['one', 'two', 'three'])
+ self.assertEqual( self.get_all(q), ['one', 'two', 'three'])
+
+ async def test_get(self):
+ q = Queue()
+ get_task = asyncio.gather(
+ asyncio.create_task(q.get()),
+ asyncio.create_task(q.get()),
+ asyncio.create_task(q.get())
+ )
+ await q.put('one')
+ await q.put('two')
+ await q.put('three')
+ self.assertEqual( await get_task, ['one', 'two', 'three'] )
+
+ async def test_get_nowait(self):
+ q = Queue()
+ with self.assertRaises(asyncio.QueueEmpty):
+ q.get_nowait()
+ q.put_nowait('one')
+ q.put_nowait('two')
+ q.put_nowait('three')
+ self.assertEqual( q.get_nowait(), 'one' )
+ self.assertEqual( q.get_nowait(), 'two' )
+ self.assertEqual( q.get_nowait(), 'three' )
+ with self.assertRaises(asyncio.QueueEmpty):
+ q.get_nowait()
+
+ async def test_pop(self):
+ q = Queue()
+ with self.assertRaises(IndexError):
+ q.pop(0)
+
+ q.put_nowait('one')
+ with self.assertRaises(IndexError): q.pop(1)
+ with self.assertRaises(IndexError): q.pop(-2)
+ self.assertEqual(q.pop(0), 'one')
+ self.assertEqual(q.list(), [])
+
+ q.put_nowait('one')
+ q.put_nowait('two')
+ q.put_nowait('three')
+ self.assertEqual(q.pop(1), 'two')
+ self.assertEqual(q.list(), ['one', 'three'])
+ self.assertEqual(self.get_all(q), ['one', 'three'])
+ self.assertEqual(q.empty(), True)
+
+ q.put_nowait('one')
+ q.put_nowait('two')
+ q.put_nowait('three')
+ self.assertEqual(q.pop(0), 'one')
+ self.assertEqual(q.pop(-1), 'three')
+ self.assertEqual(q.list(), ['two'])
+ self.assertEqual(self.get_all(q), ['two'])
+
+ async def test_insert(self):
+ q = Queue()
+ q.put_nowait('one')
+ q.put_nowait('three')
+ q.insert(1, 'two')
+ self.assertEqual(q.list(), ['one', 'two', 'three'])
+ self.assertEqual(self.get_all(q), ['one', 'two', 'three'])
+
+ q.insert(0, 'one')
+ q.insert(1, 'two')
+ q.insert(2, 'three')
+ q.insert(0, 'zero')
+ self.assertEqual(q.list(), ['zero', 'one', 'two', 'three'])
+ self.assertEqual(self.get_all(q), ['zero', 'one', 'two', 'three'])
+
+ with self.assertRaises(IndexError): q.insert(-1, 'one')
+ with self.assertRaises(IndexError): q.insert(1, 'one')
+ q.insert(0, 'one')
+ self.assertEqual(q.list(), ['one'])
+ self.assertEqual(self.get_all(q), ['one'])
+
+ async def test_swap(self):
+ q = Queue()
+ q.put_nowait('zero')
+ q.put_nowait('one')
+ q.put_nowait('two')
+ q.put_nowait('three')
+ q.swap(1, 2)
+ self.assertEqual(q.list(), ['zero', 'two', 'one', 'three'])
+ self.assertEqual(self.get_all(q), ['zero', 'two', 'one', 'three'])
+
+ q.put_nowait('zero')
+ q.put_nowait('one')
+ q.put_nowait('two')
+ q.put_nowait('three')
+ q.swap(-1, 1)
+ self.assertEqual(q.list(), ['zero', 'three', 'two', 'one'])
+ self.assertEqual(self.get_all(q), ['zero', 'three', 'two', 'one'])
+
+ q.put_nowait('zero')
+ q.put_nowait('one')
+ q.put_nowait('two')
+ q.put_nowait('three')
+ with self.assertRaises(IndexError): q.swap(0, 4)
+ with self.assertRaises(IndexError): q.swap(4, 0)
+ with self.assertRaises(IndexError): q.swap(0, -5)
+ with self.assertRaises(IndexError): q.swap(-5, 0)
+
+ q.swap(0, 0)
+ q.swap(1, 1)
+ q.swap(2, 2)
+ q.swap(3, 3)
+ self.assertEqual(q.list(), ['zero', 'one', 'two', 'three'])
+ self.assertEqual(self.get_all(q), ['zero', 'one', 'two', 'three'])
+
+ q.put_nowait('zero')
+ q.put_nowait('one')
+ q.put_nowait('two')
+ q.put_nowait('three')
+ q.swap(1, 0)
+ self.assertEqual(q.list(), ['one', 'zero', 'two', 'three'])
+ self.assertEqual(self.get_all(q), ['one', 'zero', 'two', 'three'])
+
+ q.put_nowait('zero')
+ q.put_nowait('one')
+ q.put_nowait('two')
+ q.put_nowait('three')
+ q.swap(1, -1)
+ self.assertEqual(q.list(), ['zero', 'three', 'two', 'one'])
+ self.assertEqual(self.get_all(q), ['zero', 'three', 'two', 'one'])
+
+ async def test_move(self):
+ q = Queue()
+ q.put_nowait('zero')
+ q.put_nowait('one')
+ q.put_nowait('two')
+ q.put_nowait('three')
+ with self.assertRaises(IndexError): q.move(0, 4)
+ with self.assertRaises(IndexError): q.move(0, -5)
+ with self.assertRaises(IndexError): q.move(4, 0)
+ with self.assertRaises(IndexError): q.move(-5, 0)
+ q.move(1, 1)
+ self.assertEqual(q.list(), ['zero', 'one', 'two', 'three'])
+ self.assertEqual(self.get_all(q), ['zero', 'one', 'two', 'three'])
+
+ # move top to bottom
+ q.put_nowait('zero')
+ q.put_nowait('one')
+ q.put_nowait('two')
+ q.put_nowait('three')
+ q.move(0, -1)
+ self.assertEqual(q.list(), ['one', 'two', 'three', 'zero'])
+ self.assertEqual(self.get_all(q), ['one', 'two', 'three', 'zero'])
+
+ # move bottom to top
+ q.put_nowait('zero')
+ q.put_nowait('one')
+ q.put_nowait('two')
+ q.put_nowait('three')
+ q.move(-1, 0)
+ self.assertEqual(q.list(), ['three', 'zero', 'one', 'two'])
+ self.assertEqual(self.get_all(q), ['three', 'zero', 'one', 'two'])
+
+ # swap items next to each other
+ q.put_nowait('zero')
+ q.put_nowait('one')
+ q.put_nowait('two')
+ q.put_nowait('three')
+ q.move(1, 2)
+ self.assertEqual(q.list(), ['zero', 'two', 'one', 'three'])
+ self.assertEqual(self.get_all(q), ['zero', 'two', 'one', 'three'])
+
+ # move to bottom
+ q.put_nowait('zero')
+ q.put_nowait('one')
+ q.put_nowait('two')
+ q.put_nowait('three')
+ q.move(1, -1)
+ self.assertEqual(q.list(), ['zero', 'two', 'three', 'one'])
+ self.assertEqual(self.get_all(q), ['zero', 'two', 'three', 'one'])
+
+ # move to top
+ q.put_nowait('zero')
+ q.put_nowait('one')
+ q.put_nowait('two')
+ q.put_nowait('three')
+ q.move(2, 0)
+ self.assertEqual(q.list(), ['two', 'zero', 'one', 'three'])
+ self.assertEqual(self.get_all(q), ['two', 'zero', 'one', 'three'])
+
+ async def test_index(self):
+ q = Queue()
+ q.put_nowait('zero')
+ q.put_nowait('one')
+ q.put_nowait('two')
+ q.put_nowait('three')
+ self.assertEqual(q.index('zero'), 0)
+ self.assertEqual(q.index('one'), 1)
+ self.assertEqual(q.index('two'), 2)
+ self.assertEqual(q.index('three'), 3)
+ with self.assertRaises(ValueError): q.index('none')
+
+ async def test_iter(self):
+ q = Queue()
+ q.put_nowait('zero')
+ q.put_nowait('one')
+ q.put_nowait('two')
+ q.put_nowait('three')
+ items = [x for x in q]
+ self.assertEqual(items, ['zero', 'one', 'two', 'three'])
+
+ unittest.main()
+
\ No newline at end of file
diff --git a/frpc.sh b/frpc.sh
index 25542f7..e9a0627 100755
--- a/frpc.sh
+++ b/frpc.sh
@@ -10,16 +10,22 @@ trap quit EXIT
# find frpc
frpc=$(which "frpc")
if [[ -z $frpc ]]; then
- frpc=$(which "../frp-mac/latest/frpcx")
+ frpc=$(which "../frp-mac/latest/frpc")
if [[ -z $frpc ]]; then
- >&2 echo "frpc not found; try installing with 'brew install frpc'"
+ >&2 echo "Error: frpc not found; Try installing with 'brew install frpc'"
exit 1
fi
fi
->&2 echo "Using frpc: $frpc"
->&2 echo "Plotter URL (via frp): wss://plotter.process.tools"
+if [[ ! -f ./frp.auth ]]; then
+ >&2 echo "Error: Missing frp auth file: frp.auth"
+ exit 1
+fi
+
source frp.auth # read auth token from file
export FRP_AUTH_TOKEN
+>&2 echo "Using frpc: $frpc"
+>&2 echo "Plotter URL (via frp): wss://plotter.process.tools"
+
$frpc -c frpc.toml
diff --git a/header_timer.py b/header_timer.py
new file mode 100644
index 0000000..a501cec
--- /dev/null
+++ b/header_timer.py
@@ -0,0 +1,50 @@
+"""Provides a Textual application header widget."""
+from datetime import datetime
+
+from rich.text import Text
+from textual.widgets import Header
+from textual.widgets._header import HeaderIcon, HeaderTitle, HeaderClockSpace
+from textual.reactive import Reactive
+
+class HeaderClock(HeaderClockSpace):
+ """Display a clock on the right of the header."""
+
+ DEFAULT_CSS = """
+ HeaderClock {
+ background: $foreground-darken-1 5%;
+ color: $text;
+ text-opacity: 85%;
+ content-align: center middle;
+ }
+ """
+
+ time_seconds: Reactive[str] = Reactive(0)
+
+ def render(self):
+ """Render the header clock.
+
+ Returns:
+ The rendered clock.
+ """
+ if self.time_seconds == 0: return Text('--:--')
+ hours = int(self.time_seconds / 3600)
+ minutes = int((self.time_seconds - 3600 * hours) / 60)
+ seconds = int((self.time_seconds - 3600 * hours) % 60)
+ if hours == 0:
+ return Text(f'{minutes:02}:{seconds:02}')
+ else:
+ return Text(f'{hours:02}:{minutes:02}:{seconds:02}')
+
+class HeaderTimer(Header):
+
+ time_seconds: Reactive[str] = Reactive(0)
+ """Time of the Clock in seconds."""
+
+ def compose(self):
+ yield HeaderIcon().data_bind(Header.icon)
+ yield HeaderTitle()
+ yield (
+ HeaderClock().data_bind(HeaderTimer.time_seconds)
+ if self._show_clock
+ else HeaderClockSpace()
+ )
diff --git a/hotkey_button.py b/hotkey_button.py
new file mode 100644
index 0000000..50241c7
--- /dev/null
+++ b/hotkey_button.py
@@ -0,0 +1,46 @@
+from textual.widgets import Button
+
+class HotkeyButton(Button, can_focus=True):
+ idx = 0
+
+ def __init__(self, hotkey=None, hotkey_description = None, **kwargs):
+ if 'label' not in kwargs and hotkey_description != None:
+ kwargs['label'] = hotkey_description
+ super().__init__(**kwargs)
+
+ self.hotkey = hotkey
+ self.hotkey_description = hotkey_description
+ HotkeyButton.idx += 1
+ self.idx = HotkeyButton.idx
+ self.app_action = 'press_hotkeybutton_' + str(self.idx)
+
+ def update_hotkey(self, hotkey = None, hotkey_description = None, label = None):
+ if label == None and hotkey_description != None: self.label = hotkey_description
+ self.unbind_hotkey()
+ self.hotkey = hotkey
+ self.hotkey_description = hotkey_description
+ self.bind_hotkey()
+
+ def bind_hotkey(self):
+ if self.hotkey == None: return
+ self.app.bind(self.hotkey, self.app_action, description=self.hotkey_description)
+ print("hotkey BOUND for " + str(self.idx))
+
+ def unbind_hotkey(self):
+ if self.hotkey == None: return
+ self.app.unbind(self.hotkey)
+ print("hotkey UNBOUND for " + str(self.idx))
+
+ def on_mount(self):
+ def press_me(): self.press()
+ setattr(self.app, 'action_' + self.app_action, press_me)
+ self.bind_hotkey()
+
+ def on_button_pressed(self):
+ print('pressed button ' + str(self.idx))
+
+ def watch_disabled(self, disabled_state):
+ super().watch_disabled(disabled_state)
+ print('disabled', disabled_state)
+ if disabled_state: self.unbind_hotkey()
+ else: self.bind_hotkey()
\ No newline at end of file
diff --git a/main.py b/main.py
old mode 100755
new mode 100644
index fb38a6f..f0a9e76
--- a/main.py
+++ b/main.py
@@ -1,30 +1,12 @@
-#!/usr/bin/env python
-
-import asyncio
-import websockets
-import ssl
-import os.path
-import time
-import json
-import traceback
-import sys
-import signal
-import spooler
-import async_prompt
-from tty_colors import COL
-import zc
-import porkbun
-
-
-USE_ZEROCONF = 0
-ZEROCONF_HOSTNAME = 'plotter'
-
USE_PORKBUN = 1
PORKBUN_ROOT_DOMAIN = 'process.tools'
PORKBUN_SUBDOMAIN = 'plotter-local'
PORKBUN_TTL = 600
PORKBUN_SSL_OUTFILE = 'cert/process.tools.pem'
+USE_ZEROCONF = 0
+ZEROCONF_HOSTNAME = 'plotter'
+
BIND_IP = '0.0.0.0'
PORT = 0 # Use 0 for default ports (80 for http, 443 for ssl/tls)
USE_SSL = 1
@@ -38,44 +20,80 @@
SHOW_CONNECTION_EVENTS = 0 # Print when clients connect/disconnect
MAX_MESSAGE_SIZE_MB = 5 # in MB (Default in websockets lib is 2)
-prompt = None
+QUEUE_HEADERS = ['#', 'Client', 'Hash', 'Lines', 'Layers', 'Travel', 'Ink', 'Format', 'Speed', 'Duration', 'Status']
+
+import textual
+from textual import on
+from textual.events import Key
+from textual.app import App as TextualApp
+from textual.widgets import Button, DataTable, RichLog, Footer, Header, Static, ProgressBar, Rule
+from textual.widgets.data_table import RowDoesNotExist
+from textual.containers import Horizontal, Vertical
+from hotkey_button import HotkeyButton
+from header_timer import HeaderTimer
+
+import asyncio
+import websockets
+import spooler
+import json
+import math
+import subprocess
+import porkbun
+import functools
+
+
+app = None
+ssl_context = None
num_clients = 0
clients = []
-ssl_context = None
-def status_str(status):
- match status['status']:
- case 'setup':
- return(f'{COL.BOLD}{COL.BLUE}Setup{COL.OFF}')
- case 'waiting':
- return(f'{COL.BOLD}Waiting for jobs{COL.OFF}')
- case 'confirm_plot':
- return(f'{COL.BOLD}{COL.YELLOW}Confirm to plot {status["job_str"]}{COL.OFF}')
- case 'plotting':
- return(f'{COL.BOLD}{COL.GREEN}Plotting [{status["job"]}]{COL.OFF}')
-
-def col_num(n):
- if n > 0:
- return f'{COL.BOLD}{COL.GREEN}{n}{COL.OFF}'
- else:
- return f'{COL.BOLD}{n}{COL.OFF}'
+# Status simply shows up in the header
def print_status():
- s = spooler.status()
- print(f' Jobs: {col_num(s["queue_size"])} | Clients: {col_num(len(clients))} | Status: {status_str(s)}\n')
+ app.update_header()
-def setup_prompt():
- global prompt
- global print
- prompt = async_prompt.AsyncPrompt()
- print = prompt.print # replace global print function
-
-def remove_prompt():
- global prompt
- del prompt # force destructor, causes terminal to restore
+def setup_ssl():
+ import ssl
+ import os.path
+
+ if USE_SSL:
+ global ssl_context
+ try:
+ cert_file = os.path.join( os.path.dirname(__file__), SSL_CERT )
+ key_file = None if SSL_KEY == None else os.path.join( os.path.dirname(__file__), SSL_KEY )
+ ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
+ ssl_context.load_cert_chain(cert_file, key_file)
+ print(f'TLS enabled with certificate: {SSL_CERT}{"" if SSL_KEY == None else " + " + SSL_KEY}')
+ except FileNotFoundError:
+ print(f'Certificate not found, TLS disabled')
+ ssl_context = None
+ except:
+ print(f'Error establishing TLS context, TLS disabled')
+ ssl_context = None
+ global PORT
+ if PORT == 0: PORT = 80 if ssl_context == None else 443
-def disable_sigint():
- signal.signal(signal.SIGINT, lambda *args: None)
+async def handle_connection(ws):
+ global num_clients
+ num_clients += 1
+ clients.append(ws)
+ remote_address = ws.remote_address # store remote address (might not be available on disconnect)
+ if SHOW_CONNECTION_EVENTS:
+ print(f'({num_clients}) Connected: {remote_address[0]}:{remote_address[1]}')
+ print_status()
+ # await send_current_queue_size(ws)
+ try:
+ # The iterator exits normally when the connection is closed with close code 1000 (OK) or 1001 (going away). It raises a ConnectionClosedError when the connection is closed with any other code.
+ async for message in ws:
+ # print(f'Message ({ws.remote_address[0]}:{ws.remote_address[1]}):', message)
+ await handle_message(message, ws)
+ except websockets.exceptions.ConnectionClosedError:
+ pass
+ num_clients -= 1
+ clients.remove(ws)
+ if SHOW_CONNECTION_EVENTS:
+ print(f'({num_clients}) Disconnected: {remote_address[0]}:{remote_address[1]} ({ws.close_code}{(" " + ws.close_reason).rstrip()})')
+ print_status()
async def send_msg(msg, ws):
if type(msg) is dict: msg = json.dumps(msg)
@@ -83,103 +101,488 @@ async def send_msg(msg, ws):
await ws.send(msg)
except (websockets.exceptions.ConnectionClosedError, websockets.exceptions.ConnectionClosedOK):
pass
-
+
async def on_queue_size(size):
+ app.update_job_queue()
+ app.update_header()
cbs = []
for ws in clients:
cbs.append( send_msg({'type': 'queue_length', 'length': size}, ws) )
await asyncio.gather(*cbs)
async def send_current_queue_size(ws):
- await send_msg( {'type': 'queue_length', 'length': spooler.queue_size()}, ws )
+ await send_msg( {'type': 'queue_length', 'length': spooler.num_jobs()}, ws )
async def handle_message(message, ws):
async def on_queue_position(pos, job):
- await send_msg( {'type': 'queue_position', 'position': pos, 'id': job['id']}, ws )
+ await send_msg( {'type': 'queue_position', 'position': pos}, ws )
async def on_done(job):
- await send_msg( {'type': 'job_done', 'id': job['id']}, ws )
+ await send_msg( {'type': 'job_done'}, ws )
async def on_cancel(job):
- await send_msg( {'type': 'job_canceled', 'id': job['id']}, ws )
+ await send_msg( {'type': 'job_canceled'}, ws )
async def on_error(msg, job):
- await send_msg( {'type': 'error', 'msg': msg, 'id': job['id']}, ws )
- msg = json.loads(message)
+ await send_msg( {'type': 'error', 'msg': msg}, ws )
+
+ try:
+ msg = json.loads(message)
+ except JSONDecodeError:
+ return
+
if msg['type'] == 'echo':
await ws.send(message)
elif msg['type'] == 'plot':
- qsize = spooler.queue_size()
+ qsize = spooler.num_jobs()
result = await spooler.enqueue(msg, on_queue_position, on_done, on_cancel, on_error)
- if result and qsize > 0: print_status() # Don't print status if queue is empty -> Status will be printed by spooler
+ if result: print_status()
elif msg['type'] == 'cancel':
result = await spooler.cancel(msg['client'])
if result: print_status()
-async def handle_connection(ws):
- global num_clients
- num_clients += 1
- clients.append(ws)
- remote_address = ws.remote_address # store remote address (might not be available on disconnect)
- if SHOW_CONNECTION_EVENTS:
- print(f'({num_clients}) Connected: {remote_address[0]}:{remote_address[1]}')
- print_status()
- await send_current_queue_size(ws)
- try:
- # The iterator exits normally when the connection is closed with close code 1000 (OK) or 1001 (going away). It raises a ConnectionClosedError when the connection is closed with any other code.
- async for message in ws:
- # print(f'Message ({ws.remote_address[0]}:{ws.remote_address[1]}):', message)
- await handle_message(message, ws)
- except websockets.exceptions.ConnectionClosedError:
- pass
- num_clients -= 1
- clients.remove(ws)
- if SHOW_CONNECTION_EVENTS:
- print(f'({num_clients}) Disconnected: {remote_address[0]}:{remote_address[1]} ({ws.close_code}{(" " + ws.close_reason).rstrip()})')
- print_status()
-
-def setup_ssl():
- if USE_SSL:
- global ssl_context
- try:
- cert_file = os.path.join( os.path.dirname(__file__), SSL_CERT )
- key_file = None if SSL_KEY == None else os.path.join( os.path.dirname(__file__), SSL_KEY )
- ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
- ssl_context.load_cert_chain(cert_file, key_file)
- print(f'TLS enabled with certificate: {SSL_CERT}{"" if SSL_KEY == None else " + " + SSL_KEY}')
- except FileNotFoundError:
- print(f'Certificate not found, TLS disabled')
- ssl_context = None
- except:
- print(f'Error establishing TLS context, TLS disabled')
- ssl_context = None
- global PORT
- if PORT == 0: PORT = 80 if ssl_context == None else 443
-
-async def main():
- setup_prompt() # needs to be called within event loop
+async def run_server(app):
async with websockets.serve(handle_connection, BIND_IP, PORT, ping_interval=PING_INTERVAL, ping_timeout=PING_TIMEOUT, ssl=ssl_context, max_size=MAX_MESSAGE_SIZE_MB*(2**20)):
print(f'Server running on {"ws" if ssl_context == None else "wss"}://{BIND_IP}:{PORT}')
print()
spooler.set_queue_size_cb(on_queue_size)
# await asyncio.Future() # run forever
- await spooler.start(prompt, print_status) # run forever
+ await spooler.start(app) # run forever
-def quit():
- print('Quitting...')
- remove_prompt()
- if USE_ZEROCONF: zc.remove_zeroconf_service()
+class MyDataTable(DataTable):
+ def on_click(self, event):
+ self.app.on_queue_click(event)
-if __name__ == '__main__':
- try:
- if USE_PORKBUN:
- porkbun.ddns_update(PORKBUN_ROOT_DOMAIN, PORKBUN_SUBDOMAIN, PORKBUN_TTL)
- porkbun.cert_update(PORKBUN_ROOT_DOMAIN, PORKBUN_SSL_OUTFILE)
- print()
- setup_ssl() # Updates global PORT
- if USE_ZEROCONF: zc.add_zeroconf_service(ZEROCONF_HOSTNAME, PORT)
- asyncio.run(main())
- except KeyboardInterrupt:
+class App(TextualApp):
+ prompt_future = None
+
+ def compose(self):
+ global header, queue, log, footer
+ header = HeaderTimer(icon = '🖨️', show_clock = True, time_format = '%H:%M')
+
+ queue = MyDataTable(id = 'queue')
+
+ log = RichLog(markup=True)
+ footer = Footer(id="footer", show_command_palette=True)
+
+ global job_current, job_status,job_progress
+ job_current = DataTable()
+ job_status = Static(spooler.status()['status_desc'])
+ job_progress = ProgressBar()
+
+ global col_left, col_right, job, commands, commands_1, commands_2, commands_3, commands_4, commands_5
+ global b_pos, b_neg, b_align, b_cycle, b_home, b_plus, b_minus, b_preview
+
+ yield header
+ # yield HotkeyButton('p', 'Press')
+ # yield HotkeyButton('x', 'Something')
+ with Horizontal():
+ with Vertical() as col_left:
+ with Vertical() as job:
+ yield job_current
+ yield job_status
+ yield job_progress
+ # yield Rule()
+ with Horizontal(id='commands') as commands:
+ with Vertical() as commands_1:
+ yield (b_pos := HotkeyButton(label='Plot', id="pos"))
+ with Vertical() as commands_2:
+ yield (b_align := HotkeyButton('a', 'Align', label='Align', id='align'))
+ yield (b_cycle := HotkeyButton('c', 'Cycle', label='Cycle', id='cycle'))
+ yield (b_home := HotkeyButton('h', 'Home', label='Home', id='home'))
+ with Vertical() as commands_3:
+ yield (b_plus := HotkeyButton(label='+10', id='plus'))
+ yield (b_minus := HotkeyButton(label='-10', id='minus'))
+ with Vertical() as commands_4:
+ yield (b_preview := HotkeyButton('v', 'Preview', label='Preview', id='preview'))
+ with Vertical() as commands_5:
+ yield (b_neg := HotkeyButton(label='Cancel', id='neg'))
+ yield queue
+ with Vertical() as col_right:
+ yield log
+ yield footer
+
+ def on_mount(self):
+ self.title = "Plotter"
+ # self.theme = "textual-dark"
+ header.tall = True
+ col_left.styles.width = '3fr'
+ col_right.styles.width = '2fr'
+
+ # self.query_one('#footer').show_command_palette=False
+ log.border_title = 'Log'
+ log.styles.border = ('solid', 'white')
+
+ job.border_title = 'Job'
+ job.styles.border = ('solid', 'white')
+ job.styles.height = 22
+
+ job_current.styles.height = 3
+ job_current.add_columns(*QUEUE_HEADERS)
+ job_current.cursor_type = 'none'
+ job_status.styles.margin = 1
+ job_progress.styles.margin = 1
+ job_progress.styles.width = '100%'
+ job_progress.query_one('#bar').styles.width = '1fr'
+ job_progress.styles.display = 'none'
+
+ commands.styles.margin = (3, 0, 0, 0)
+
+ for button in commands.query('Button'):
+ button.styles.width = '100%'
+ button.styles.margin = (0, 1);
+
+ for col in commands.query('Vertical'):
+ col.styles.align_horizontal = 'center'
+ # col.styles.border = ('vkey', 'white')
+
+ commands_2.styles.width = '0.5625fr'
+ for button in commands_2.query('Button'):
+ button.styles.min_width = 9
+
+ commands_3.styles.width = '0.3125fr'
+ for button in commands_3.query('Button'):
+ button.styles.min_width = 5
+
+ commands_4.styles.width = '0.6875fr'
+ for button in commands_4.query('Button'):
+ button.styles.min_width = 11
+
+ queue.border_title = 'Queue'
+ queue.styles.border = ('solid', 'white')
+ queue.styles.height = '1fr'
+ queue.add_columns(*QUEUE_HEADERS)
+ queue.cursor_type = 'row'
+ queue.zebra_stripes = True
+ queue.show_cursor = False
+
+ self.update_header()
+
+ b_pos.disabled = True
+ b_neg.disabled = True
+ b_align.disabled = True
+ b_cycle.disabled = True
+ b_home.disabled = True
+ b_plus.disabled = True
+ b_minus.disabled = True
+ b_preview.disabled = True
+
+ self.bind('t', 'enqueue_test_job', description = 'Test job')
+ self.bind('o', 'open_svg_folder', description = 'Open SVG folder')
+
+ setup_ssl()
+ # log.write(log.styles.height)
+
+ global server_task
+ server_task = asyncio.create_task(run_server(self))
+
+ def on_server_task_exit(task):
+ print('[red]Server task exit')
+ if not task.cancelled(): # not a intentional exit
+ ex = task.exception()
+ if ex != None:
+ import traceback
+ print('Server task exited with exception:')
+ print(''.join(traceback.format_exception(ex)))
+ global server_task_exception
+ server_task_exception = ex
+ self.exit() # This line can be removed, exception will then be show inside app log area
+
+ server_task.add_done_callback(on_server_task_exit)
+
+ # global spooler_task
+ # spooler_task = asyncio.create_task(spooler.start(self))
+
+ async def action_enqueue_test_job(self):
+ from test_job import test_job
+ job = test_job()
+ await spooler.enqueue(test_job())
+
+ def action_open_svg_folder(self):
+ sub_coro = asyncio.create_subprocess_exec('open', spooler.STATUS_FOLDERS['waiting'], stdin=subprocess.DEVNULL, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
+ asyncio.create_task(sub_coro)
+
+ def on_resize(self, event):
+ pass
+
+ def on_key(self):
pass
- except:
- traceback.print_exception( sys.exception() )
- finally:
- disable_sigint() # prevent another Control-C
- quit()
\ No newline at end of file
+
+ def print(self, *args, sep=' ', end='\n'):
+ if len(args) == 1: log.write(args[0])
+ else: log.write( sep.join(map(str, args)) + end)
+
+ def update_header(self):
+ status = spooler.status()
+ self.title = status['status_desc']
+ self.sub_title = f'{num_clients} Clients – {spooler.num_jobs()} Jobs'
+ total_secs = functools.reduce(lambda acc, x: acc + x['time_estimate'], spooler.jobs(), 0)
+ header.time_seconds = total_secs
+
+ def bind(self, *args, **kwargs):
+ super().bind(*args, **kwargs)
+ self.refresh_bindings()
+
+ def unbind(self, key):
+ # self._bindings.key_to_bindings is a dict of keys to lists of Binding objects
+ self._bindings.key_to_bindings.pop(key, None)
+ self.refresh_bindings()
+
+ # # bindings: [ (key, desc), ... ]
+ # # This not a coroutine (no async). It returns a future, which can be awaited from coroutines
+ # def prompt(self, bindings, message):
+ # # setup bindings
+ # self.print(message)
+ # self.print(bindings)
+ # self.update_bindings([ ('y', 'prompt_response("y")', 'Yes'), ('n', 'prompt_response("n")', 'No') ])
+ #
+ # # return a future that eventually resolves to the result
+ # loop = asyncio.get_running_loop()
+ # self.prompt_future = loop.create_future()
+ # return self.prompt_future
+
+ def preview_job(self, job):
+ if job != None and 'save_path' in job:
+ print(f'Preview job \\[{job["client"]}]: {job["save_path"]}')
+ sub_coro = asyncio.create_subprocess_exec('qlmanage', '-p', job['save_path'], stdin=subprocess.DEVNULL, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
+ asyncio.create_task(sub_coro)
+
+ def adjust_job_speed(self, job, delta):
+ if job != None:
+ speed = job['speed'] if 'speed' in job else 100
+ speed += delta
+ speed = int(speed / 10) * 10
+ speed = max( min(speed, 100), 10 )
+ print(f'Adjust job speed \\[{job["client"]}]: {speed}')
+ job['speed'] = speed
+ if (job == spooler.current_job()): self.update_current_job()
+
+ @on(Button.Pressed, '#commands Button')
+ def on_button(self, event):
+ id = event.button.id
+ if id == 'preview':
+ self.preview_job( spooler.current_job() )
+ return
+ if id == 'plus':
+ self.adjust_job_speed( spooler.current_job(), 10 )
+ return
+ if id == 'minus':
+ self.adjust_job_speed( spooler.current_job(), -10 )
+ return
+ if id == 'neg' and spooler.status()['status'] == 'plotting':
+ print('[yellow]Interrupting...')
+ spooler.request_plot_pause()
+ return
+
+ if self.prompt_future != None and not self.prompt_future.done():
+ if id == None and event.button.hotkey_description:
+ id = str(event.button.hotkey_description).lower()
+ if id == None and event.button.label:
+ id = str(event.button.label).lower()
+
+ self.prompt_future.set_result({
+ 'id': id, # use button id, hotkey description (lowercase), or button label (lowercase)
+ 'button': event.button
+ })
+
+ @on(Key)
+ async def on_queue_hotkey(self, event):
+ if event.key in ['up', 'down'] and queue.row_count > 0:
+ queue.show_cursor = True
+ if queue.show_cursor == False:
+ return
+
+ if event.key in ['backspace', 'i', 'k', '1', '0', 'space']:
+ if queue.row_count == 0: return # nothing in list
+ client = queue.ordered_rows[queue.cursor_row].key.value
+
+ if (event.key == 'backspace'):
+ # if this is the current job, and we haven't started, cancel the prompt to start
+ if spooler.current_client() == client and spooler.status()['status'] == 'confirm_plot':
+ self.cancel_prompt_ui()
+ # handle all other cases (even plots that are running)
+ else:
+ await spooler.cancel(client)
+ elif (event.key == 'i'):
+ await spooler.move(client, max(queue.cursor_row - 1, 0))
+ queue.move_cursor(row=queue.get_row_index(client))
+ elif (event.key == 'k'):
+ new_row = queue.cursor_row + 1
+ await spooler.move(client, new_row)
+ queue.move_cursor(row=queue.get_row_index(client))
+ elif (event.key == '1'):
+ await spooler.move(client, 0)
+ queue.move_cursor(row=queue.get_row_index(client))
+ elif (event.key == '0'):
+ await spooler.move(client, -1)
+ queue.move_cursor(row=queue.get_row_index(client))
+ elif (event.key == 'space'):
+ self.preview_job( spooler.job_by_client(client) )
+
+ def on_queue_click(self, event):
+ if queue.row_count > 0:
+ queue.show_cursor = True
+
+ def job_to_row(self, job, idx):
+ return (idx, job['client'], job['hash'][:5], job['stats']['count'], job['stats']['layer_count'], int(job['stats']['travel'])/1000, int(job['stats']['travel_ink'])/1000, job['format'], job['speed'], f'{math.floor(job["time_estimate"]/60)}:{round(job["time_estimate"]%60):02}', job['status'])
+
+ def update_current_job(self):
+ job = spooler.current_job()
+ job_current.clear()
+ if job != None:
+ job_current.add_row( *self.job_to_row(job, 1), key=job['client'] )
+
+ def update_job_queue(self):
+ if queue.row_count == 0: queue.show_cursor = False
+ # remember selected client
+ client = None
+ if queue.row_count > 0 and queue.show_cursor:
+ client = queue.ordered_rows[queue.cursor_row].key.value
+ # print('selected client:', client)
+
+ queue.clear()
+ for idx, job in enumerate(spooler.jobs()):
+ queue.add_row( *self.job_to_row(job, idx+1), key=job['client'] )
+
+ # recall client (if possible)
+ if client:
+ try:
+ # print('select row:', queue.get_row_index(client))
+ queue.move_cursor(row=queue.get_row_index(client))
+ except RowDoesNotExist:
+ # print('row does not exist')
+ queue.show_cursor = False
+
+ def cancel_prompt_ui(self):
+ if self.prompt_future != None and not self.prompt_future.done():
+ self.prompt_future.set_result(False)
+
+ # This not a coroutine (no async). It returns a future, which can be awaited from coroutines
+ def prompt_ui(self, variant, message = ''):
+ # print('PROMPT', variant)
+
+ if len(message) > 0: message = ' – ' + message
+ job_status.update(spooler.status()['status_desc'] + message)
+ self.update_current_job()
+
+ match variant:
+ case 'setup':
+ b_pos.variant = 'default'
+ b_pos.disabled = True
+
+ b_neg.update_hotkey('d', 'Done')
+ b_neg.variant = 'success'
+ b_neg.disabled = False
+
+ b_align.disabled = False
+ b_cycle.disabled = False
+ b_home.disabled = True
+ b_plus.disabled = True
+ b_minus.disabled = True
+ b_preview.disabled = True
+ case 'waiting':
+ b_pos.disabled = True
+ b_neg.disabled = True
+
+ b_align.disabled = False
+ b_cycle.disabled = False
+ b_home.disabled = True
+ b_plus.disabled = True
+ b_minus.disabled = True
+ b_preview.disabled = True
+ case 'start_plot':
+ b_pos.update_hotkey('p', 'Plot')
+ b_pos.variant = 'success'
+ b_pos.disabled = False
+
+ b_neg.update_hotkey('escape', 'Cancel')
+ b_neg.variant = 'error'
+ b_neg.disabled = False
+
+ b_align.disabled = False
+ b_cycle.disabled = False
+ b_home.disabled = True
+ b_plus.disabled = False
+ b_minus.disabled = False
+ b_preview.disabled = False
+ case 'plotting':
+ b_pos.disabled = True
+
+ b_neg.update_hotkey('escape', 'Pause')
+ b_neg.variant = 'warning'
+ b_neg.disabled = False
+
+ b_align.disabled = True
+ b_cycle.disabled = True
+ b_home.disabled = True
+ b_plus.disabled = True
+ b_minus.disabled = True
+ b_preview.disabled = False
+ case 'repeat_plot':
+ b_pos.update_hotkey('r', 'Repeat')
+ b_pos.variant = 'primary'
+ b_pos.disabled = False
+
+ b_neg.update_hotkey('d', 'Done')
+ b_neg.variant = 'success'
+ b_neg.disabled = False
+
+ b_align.disabled = False
+ b_cycle.disabled = False
+ b_home.disabled = True
+ b_plus.disabled = False
+ b_minus.disabled = False
+ b_preview.disabled = False
+ case 'resume_plot':
+ b_pos.update_hotkey('p', 'Continue')
+ b_pos.variant = 'primary'
+ b_pos.disabled = False
+
+ b_neg.update_hotkey('d', 'Done')
+ b_neg.variant = 'warning'
+ b_neg.disabled = False
+
+ b_align.disabled = False
+ b_cycle.disabled = False
+ b_home.disabled = False
+ b_plus.disabled = True
+ b_minus.disabled = True
+ b_preview.disabled = False
+ case _:
+ raise ValueError('Invalid prompt variant')
+
+ # return a future that eventually resolves to the result
+ # reuse the future if it isn't done. allows for updating the prompt
+ if self.prompt_future == None or self.prompt_future.done():
+ loop = asyncio.get_running_loop()
+ self.prompt_future = loop.create_future()
+
+ if variant == 'plotting': self.prompt_future.set_result(True)
+
+ return self.prompt_future
+
+
+
+if __name__ == "__main__":
+ global print
+ global tprint
+ global server_task_exception
+
+ tprint = print
+ server_task_exception = None
+
+ if USE_PORKBUN:
+ porkbun.ddns_update(PORKBUN_ROOT_DOMAIN, PORKBUN_SUBDOMAIN, PORKBUN_TTL)
+ porkbun.cert_update(PORKBUN_ROOT_DOMAIN, PORKBUN_SSL_OUTFILE)
+ print()
+
+ if USE_ZEROCONF: zc.add_zeroconf_service(ZEROCONF_HOSTNAME, PORT)
+
+ app = App()
+ print = app.print
+ app.tprint = tprint
+
+ app.run()
+
+ print = tprint # restore print function
+ if server_task_exception != None:
+ print()
+ print("Server task exited with exception:")
+ raise server_task_exception
\ No newline at end of file
diff --git a/main_old.py b/main_old.py
new file mode 100755
index 0000000..5206ed7
--- /dev/null
+++ b/main_old.py
@@ -0,0 +1,186 @@
+#!/usr/bin/env python
+
+import asyncio
+import websockets
+import ssl
+import os.path
+import time
+import json
+import traceback
+import sys
+import signal
+import spooler
+import async_prompt
+from tty_colors import COL
+import zc
+import porkbun
+
+
+USE_ZEROCONF = 0
+ZEROCONF_HOSTNAME = 'plotter'
+
+USE_PORKBUN = 1
+PORKBUN_ROOT_DOMAIN = 'process.tools'
+PORKBUN_SUBDOMAIN = 'plotter-local'
+PORKBUN_TTL = 600
+PORKBUN_SSL_OUTFILE = 'cert/process.tools.pem'
+
+BIND_IP = '0.0.0.0'
+PORT = 0 # Use 0 for default ports (80 for http, 443 for ssl/tls)
+USE_SSL = 1
+# SSL_CERT = 'cert/localhost.pem' # Certificate file in pem format (can contain private key as well)
+# SSL_KEY = None # Private key file in pem format (If None, the key needs to be contained in SSL_CERT)
+SSL_CERT = 'cert/process.tools.pem'
+SSL_KEY = None
+
+PING_INTERVAL = 10
+PING_TIMEOUT = 5
+SHOW_CONNECTION_EVENTS = 0 # Print when clients connect/disconnect
+MAX_MESSAGE_SIZE_MB = 5 # in MB (Default in websockets lib is 2)
+
+prompt = None
+num_clients = 0
+clients = []
+ssl_context = None
+
+def status_str(status):
+ match status['status']:
+ case 'setup':
+ return(f'{COL.BOLD}{COL.BLUE}Setup{COL.OFF}')
+ case 'waiting':
+ return(f'{COL.BOLD}Waiting for jobs{COL.OFF}')
+ case 'confirm_plot':
+ return(f'{COL.BOLD}{COL.YELLOW}Confirm to plot {status["job_str"]}{COL.OFF}')
+ case 'plotting':
+ return(f'{COL.BOLD}{COL.GREEN}Plotting [{status["job"]}]{COL.OFF}')
+
+def col_num(n):
+ if n > 0:
+ return f'{COL.BOLD}{COL.GREEN}{n}{COL.OFF}'
+ else:
+ return f'{COL.BOLD}{n}{COL.OFF}'
+
+def print_status():
+ s = spooler.status()
+ print(f' Jobs: {col_num(s["queue_size"])} | Clients: {col_num(len(clients))} | Status: {status_str(s)}\n')
+
+def setup_prompt():
+ global prompt
+ global print
+ prompt = async_prompt.AsyncPrompt()
+ print = prompt.print # replace global print function
+
+def remove_prompt():
+ global prompt
+ del prompt # force destructor, causes terminal to restore
+
+def disable_sigint():
+ signal.signal(signal.SIGINT, lambda *args: None)
+
+async def send_msg(msg, ws):
+ if type(msg) is dict: msg = json.dumps(msg)
+ try:
+ await ws.send(msg)
+ except (websockets.exceptions.ConnectionClosedError, websockets.exceptions.ConnectionClosedOK):
+ pass
+
+async def on_queue_size(size):
+ cbs = []
+ for ws in clients:
+ cbs.append( send_msg({'type': 'queue_length', 'length': size}, ws) )
+ await asyncio.gather(*cbs)
+
+async def send_current_queue_size(ws):
+ await send_msg( {'type': 'queue_length', 'length': spooler.queue_size()}, ws )
+
+async def handle_message(message, ws):
+ async def on_queue_position(pos, job):
+ await send_msg( {'type': 'queue_position', 'position': pos}, ws )
+ async def on_done(job):
+ await send_msg( {'type': 'job_done'}, ws )
+ async def on_cancel(job):
+ await send_msg( {'type': 'job_canceled'}, ws )
+ async def on_error(msg, job):
+ await send_msg( {'type': 'error', 'msg': msg}, ws )
+ msg = json.loads(message)
+ if msg['type'] == 'echo':
+ await ws.send(message)
+ elif msg['type'] == 'plot':
+ qsize = spooler.queue_size()
+ result = await spooler.enqueue(msg, on_queue_position, on_done, on_cancel, on_error)
+ if result and qsize > 0: print_status() # Don't print status if queue is empty -> Status will be printed by spooler
+ elif msg['type'] == 'cancel':
+ result = await spooler.cancel(msg['client'])
+ if result: print_status()
+
+async def handle_connection(ws):
+ global num_clients
+ num_clients += 1
+ clients.append(ws)
+ remote_address = ws.remote_address # store remote address (might not be available on disconnect)
+ if SHOW_CONNECTION_EVENTS:
+ print(f'({num_clients}) Connected: {remote_address[0]}:{remote_address[1]}')
+ print_status()
+ await send_current_queue_size(ws)
+ try:
+ # The iterator exits normally when the connection is closed with close code 1000 (OK) or 1001 (going away). It raises a ConnectionClosedError when the connection is closed with any other code.
+ async for message in ws:
+ # print(f'Message ({ws.remote_address[0]}:{ws.remote_address[1]}):', message)
+ await handle_message(message, ws)
+ except websockets.exceptions.ConnectionClosedError:
+ pass
+ num_clients -= 1
+ clients.remove(ws)
+ if SHOW_CONNECTION_EVENTS:
+ print(f'({num_clients}) Disconnected: {remote_address[0]}:{remote_address[1]} ({ws.close_code}{(" " + ws.close_reason).rstrip()})')
+ print_status()
+
+def setup_ssl():
+ if USE_SSL:
+ global ssl_context
+ try:
+ cert_file = os.path.join( os.path.dirname(__file__), SSL_CERT )
+ key_file = None if SSL_KEY == None else os.path.join( os.path.dirname(__file__), SSL_KEY )
+ ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
+ ssl_context.load_cert_chain(cert_file, key_file)
+ print(f'TLS enabled with certificate: {SSL_CERT}{"" if SSL_KEY == None else " + " + SSL_KEY}')
+ except FileNotFoundError:
+ print(f'Certificate not found, TLS disabled')
+ ssl_context = None
+ except:
+ print(f'Error establishing TLS context, TLS disabled')
+ ssl_context = None
+ global PORT
+ if PORT == 0: PORT = 80 if ssl_context == None else 443
+
+async def main():
+ setup_prompt() # needs to be called within event loop
+ async with websockets.serve(handle_connection, BIND_IP, PORT, ping_interval=PING_INTERVAL, ping_timeout=PING_TIMEOUT, ssl=ssl_context, max_size=MAX_MESSAGE_SIZE_MB*(2**20)):
+ print(f'Server running on {"ws" if ssl_context == None else "wss"}://{BIND_IP}:{PORT}')
+ print()
+ spooler.set_queue_size_cb(on_queue_size)
+ # await asyncio.Future() # run forever
+ await spooler.start(prompt, print_status) # run forever
+
+def quit():
+ print('Quitting...')
+ remove_prompt()
+ if USE_ZEROCONF: zc.remove_zeroconf_service()
+
+if __name__ == '__main__':
+ try:
+ if USE_PORKBUN:
+ porkbun.ddns_update(PORKBUN_ROOT_DOMAIN, PORKBUN_SUBDOMAIN, PORKBUN_TTL)
+ porkbun.cert_update(PORKBUN_ROOT_DOMAIN, PORKBUN_SSL_OUTFILE)
+ print()
+ setup_ssl() # Updates global PORT
+ if USE_ZEROCONF: zc.add_zeroconf_service(ZEROCONF_HOSTNAME, PORT)
+ asyncio.run(main())
+ except KeyboardInterrupt:
+ print('*** Ctrl-C pressed ***')
+ pass
+ except:
+ traceback.print_exception( sys.exception() )
+ finally:
+ disable_sigint() # prevent another Control-C
+ quit()
\ No newline at end of file
diff --git a/porkbun.py b/porkbun.py
index fa979b2..7f839dd 100644
--- a/porkbun.py
+++ b/porkbun.py
@@ -15,6 +15,9 @@ def get_lanip():
return ipaddrlist[-1]
def get_config():
+ if not os.path.exists(CONFIG_FILE):
+ print(f"Error: Porkbun (DNS Service) config file is missing: {CONFIG_FILE}")
+ exit()
with open(CONFIG_FILE) as f:
api_config = json.load(f)
return api_config
diff --git a/requirements.txt b/requirements.txt
index 2a1fbff..7eff3a3 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,5 +1,7 @@
websockets==10.3
zeroconf==0.39.1
+textual==0.86.1
+textual-dev==1.6.1
# pyaxidraw module
#
diff --git a/spooler.py b/spooler.py
index 9bb4a9f..66ab4ef 100644
--- a/spooler.py
+++ b/spooler.py
@@ -1,45 +1,79 @@
+TESTING = False # Don't actually connect to AxiDraw, just simulate plotting
+REPEAT_JOBS = True # Ask to repeat a plot after a sucessful print
+RESUME_QUEUE = True # Resume plotting queue after quitting/restarting
+ALIGN_AFTER = True # Align plotter after success or error
+ALIGN_AFTER_PAUSE = False # Align plotter after pause (programmatic, stop button, keyboard interrupt)
+
+PEN_POS_UP = 60 # Default: 60
+PEN_POS_DOWN = 40 # Default: 40
+MIN_SPEED = 10 # percent
+SIMULATION_TIMEOUT = 8 # seconds
+
+STATUS_FOLDERS = {
+ 'waiting' : 'svgs/0_waiting',
+ 'plotting' : 'svgs/0_waiting',
+ 'canceled' : 'svgs/1_canceled',
+ 'finished' : 'svgs/2_finished',
+ 'error' : 'svgs/3_error'
+}
+
+# KEY_DONE = ( 'd', '(D)one' )
+# KEY_REPEAT = ( 'r', '(R)epeat' )
+# KEY_START_PLOT = ( 'p', '(P)lot' )
+# KEY_RESTART_PLOT = ( 'p', '(P)lot from start' )
+# KEY_ALIGN = ( 'a', '(A)lign' )
+# KEY_CYCLE = ( 'c', '(C)ycle' )
+# KEY_CANCEL = ( chr(27), '(Esc) Cancel Job' )
+# KEY_RESUME = ( 'r', '(R)esume' )
+# KEY_HOME = ( 'h', '(H)ome' )
+
+STATUS_DESC = {
+ 'setup': 'Setting up',
+ 'waiting': 'Waiting for jobs',
+ 'paused': 'Plot paused',
+ 'confirm_plot': 'Confirm job',
+ 'plotting': 'Plotting'
+}
+
import asyncio
from pyaxidraw import axidraw
-from tty_colors import COL
from datetime import datetime, timezone
import math
import os
from capture_output import capture_output
+import re
+import hashlib
+import async_queue
+import xml.etree.ElementTree as ElementTree
-FOLDER_WAITING ='svgs/0_waiting'
-FOLDER_CANCELED ='svgs/1_canceled'
-FOLDER_FINISHED ='svgs/2_finished'
-PEN_POS_UP = 60 # Default: 60
-PEN_POS_DOWN = 40 # Default: 40
-MIN_SPEED = 10 # percent
-
-KEY_DONE = [ 'd', '(D)one' ]
-KEY_REPEAT = [ 'r', '(R)epeat' ]
-KEY_START_PLOT = [ 'p', '(P)lot' ]
-KEY_RESTART_PLOT = [ 'p', '(P)lot from start' ]
-KEY_ALIGN = [ 'a', '(A)lign' ]
-KEY_CYCLE = [ 'c', '(C)ycle' ]
-KEY_CANCEL = [ chr(27), '(Esc) Cancel Job' ]
-KEY_RESUME = [ 'r', '(R)esume' ]
-KEY_HOME = [ 'h', '(H)ome' ]
-
-REPEAT_JOBS = True # Ask to repeat a plot after a sucessful print
-TESTING = True # Don't actually connect to AxiDraw, just simulate plotting
queue_size_cb = None
-queue = asyncio.Queue() # an async FIFO queue
-jobs = {} # an index to all unfinished jobs by client id (in queue or current_job) (insertion order is preserved in dict since python 3.7)
-current_job = None
-_status = 'setup' # setup | waiting | confirm_plot | plotting
+# queue = asyncio.Queue() # an async FIFO queue
+queue = async_queue.Queue() # an async FIFO queue that can be reordered
+_jobs = {} # an index to all unfinished jobs by client id (in queue or _current_job)
+_current_job = None
+_status = 'waiting' # waiting | confirm_plot | plotting
+
-async def callback(fn, *args):
+# Helper function calls async function fn with args
+# Returns a coroutine (because of async def)
+async def callback(fn, *args, **kwargs):
if callable(fn):
- await fn(*args)
+ await fn(*args, **kwargs)
+
+# async def _notify_queue_positions():
+# cbs = []
+# for i, client in enumerate(_jobs):
+# job = _jobs[client]
+# if i == 0 and _status == 'plotting': i = -1
+# if 'position_notified' not in job or job['position_notified'] != i:
+# job['position_notified'] = i
+# cbs.append( callback(job['queue_position_cb'], i, job) )
+# await asyncio.gather(*cbs) # run callbacks concurrently
async def _notify_queue_positions():
cbs = []
- for i, client in enumerate(jobs):
- job = jobs[client]
+ for i, job in enumerate(jobs()):
if i == 0 and _status == 'plotting': i = -1
if 'position_notified' not in job or job['position_notified'] != i:
job['position_notified'] = i
@@ -47,118 +81,284 @@ async def _notify_queue_positions():
await asyncio.gather(*cbs) # run callbacks concurrently
async def _notify_queue_size():
- await callback(queue_size_cb, len(jobs))
+ await callback(queue_size_cb, num_jobs())
def set_queue_size_cb(cb):
global queue_size_cb
queue_size_cb = cb
-def queue_size():
- return len(jobs)
+def num_jobs():
+ return len(_jobs)
+
+def current_job():
+ return _current_job
+
+def current_client():
+ if _current_job == None: return None
+ return _current_job['client']
+
+def job_by_client(client):
+ if client not in _jobs: return None
+ return _jobs[client]
+
+def jobs():
+ lst = queue.list()
+ if (_current_job != None and not _current_job['cancel']):
+ lst.insert(0, _current_job)
+ return lst
def status():
return {
'status': _status,
- 'job': current_job['client'] if current_job != None else None,
- 'job_str': job_str(current_job) if current_job != None else None,
- 'queue_size': queue_size(),
+ 'status_desc': STATUS_DESC[_status],
+ 'job': _current_job['client'] if _current_job != None else None,
+ 'job_str': job_str(_current_job) if _current_job != None else None,
+ 'queue_size': num_jobs(),
}
-def timestamp(date = None):
+def timestamp_str_full(date = None):
if date == None:
# make timezone aware timestamp: https://stackoverflow.com/a/39079819
date = datetime.now(timezone.utc)
date = date.replace(tzinfo=date.astimezone().tzinfo)
return date.strftime("%Y%m%d_%H%M%S.%f_UTC%z")
-# status: 'waiting' | 'canceled' | 'finished'
-def save_svg(job, status):
- if status not in ['waiting', 'canceled', 'finished']:
- return False
- filename = f'{job["received"]}_{job["client"][0:10]}_{job["hash"][0:5]}.svg'
- files = {
- 'waiting': os.path.join(FOLDER_WAITING, filename),
- 'canceled': os.path.join(FOLDER_CANCELED, filename),
- 'finished': os.path.join(FOLDER_FINISHED, filename),
- }
- for key, file in files.items():
- if key == status:
- os.makedirs( os.path.dirname(file), exist_ok=True)
- with open(file, 'w', encoding='utf-8') as f: f.write(job['svg'])
- else:
- try:
- os.remove(file)
- except:
- pass
+def timestamp_str(date = None):
+ if date == None:
+ # make timezone aware timestamp: https://stackoverflow.com/a/39079819
+ date = datetime.now(None) # None ... use current timezone
+ date = date.replace(tzinfo=date.astimezone().tzinfo)
+ return date.strftime("%Y%m%d_%H%M%S.%f")[:-3] # trim last three digits of microsecond
+
+
+def save_svg(job, overwrite_existing = False):
+ if job['status'] not in STATUS_FOLDERS.keys(): return False
+
+ min = int(job["time_estimate"] / 60) if "time_estimate" in job else 0
+ sec = math.ceil(job["time_estimate"] % 60) if "time_estimate" in job else 0
+ position = f'{(job["position"] + 1):03}_' if 'position' in job and job['status'] in ['waiting', 'plotting'] else ''
+
+ ink = f'{(job["stats"]["travel_ink"] / 1000):.1f}' if "stats" in job else 0
+ travel = f'{(job["stats"]["travel"] / 1000):.1f}' if "stats" in job else 0
+ filename = f'{position}{job["received"]}_[{job["client"][0:10]}]_{job["hash"][0:5]}_{travel}m_{min}m{sec}s.svg'
+ filename = os.path.join(STATUS_FOLDERS[job['status']], filename)
+
+ # remove previous save
+ if 'save_path' in job and job['save_path'] != filename:
+ try:
+ # print('removing', job['save_path'])
+ os.remove(job['save_path'])
+ except:
+ pass
+
+ # save file
+ if not os.path.isfile(filename) or overwrite_existing:
+ # print('writing', filename)
+ os.makedirs( os.path.dirname(filename), exist_ok=True)
+ with open(filename, 'w', encoding='utf-8') as f: f.write(job['svg'])
+ job['save_path'] = filename
return True
+
+# def save_svg_async(*args, **kwargs):
+# return asyncio.to_thread(save_svg, *args, **kwargs)
+
+
+# Updated pre version 4 SVGs, so they are compatible with resume queue
+def update_svg(job):
+ match = re.search('tg:version="(\\d+)"', job['svg'])
+ if match != None and int(match.group(1)) >= 4: return
-def save_svg_async(*args):
- return asyncio.to_thread(save_svg, *args)
+ MARKER = 'xmlns:tg="https://sketch.process.studio/turtle-graphics"'
+ idx = job['svg'].find(MARKER)
+ if idx == -1: return
+ idx += len(MARKER)
+ insert = f'\n tg:version="4" tg:layer_count="1" tg:oob_count="{job['stats']['oob_count']}" tg:short_count="{job['stats']['short_count']}" tg:format="{job['format']}" tg:width_mm="{job['size'][0]}" tg:height_mm="{job['size'][1]}" tg:speed="{job['speed']}" tg:author="{job['client']}" tg:timestamp="{job['timestamp']}"'
+
+ job['svg'] = job['svg'][:idx] + insert + job['svg'][idx:]
+ job['hash'] = hashlib.sha1(job['svg'].encode('utf-8')).hexdigest()
-# job: 'client', 'lines'
+# job {'type': 'plot, 'client', 'id', 'svg', stats, timestamp, hash, speed, format, size, received?}
+# adds to job: { 'cancel', time_estimate', 'layers', received }
# todo: don't wait on callbacks
async def enqueue(job, queue_position_cb = None, done_cb = None, cancel_cb = None, error_cb = None):
# the client might be in queue (or currently plotting)
- if job['client'] in jobs:
+ if job['client'] in _jobs:
await callback( error_cb, 'Cannot add job, you already have a job queued!', job )
return False
+ job['status'] = 'waiting'
job['cancel'] = False
# save callbacks
job['queue_position_cb'] = queue_position_cb
job['done_cb'] = done_cb
job['cancel_cb'] = cancel_cb
job['error_cb'] = error_cb
- job['received'] = timestamp()
+ if 'received' not in job or job['received'] == None:
+ job['received'] = timestamp_str()
# speed
if 'speed' in job: job['speed'] = max( min(job['speed'], 100), MIN_SPEED ) # limit speed (MIN_SPEED, 100)
else: job['speed'] = 100
# format
- if 'format' not in job: job['format'] = 'A3_LANDSCAPE'
+ if 'format' not in job: job['format'] = 'A4_LANDSCAPE'
# add to jobs index
- jobs[ job['client'] ] = job
- await _notify_queue_size() # notify new queue size
- await _notify_queue_positions()
- print(f'New job [{job["client"]}]')
- sim = await simulate_async(job) # run simulation
+ _jobs[ job['client'] ] = job
+ print(f'New job \\[{job["client"]}] {job["hash"][0:5]}')
+ try:
+ async with asyncio.timeout(SIMULATION_TIMEOUT):
+ sim = await simulate_async(job) # run simulation
+ except TimeoutError:
+ print(f'⚠️ [red]Timeout on simulating job \\[{job["client"]}] {job["hash"][0:5]}')
+ job['status'] = 'error'
+ save_svg(job)
+ await callback( error_cb, 'Cannot add job, it took to long to simulate!', job )
+ return False
+
job['time_estimate'] = sim['time_estimate']
job['layers'] = sim['layers']
+
+ update_svg(job)
await queue.put(job)
- await save_svg_async(job, 'waiting')
+ job['position'] = job_pos(job)
+ save_svg(job)
+
+ await _notify_queue_size() # notify new queue size
+ await _notify_queue_positions()
return True
async def cancel(client, force = False):
+ global _current_job
if not force:
- if current_job != None and current_job['client'] == client:
- await callback( current_job['error_cb'], 'Cannot cancel, already plotting!', current_job )
+ if _current_job != None and _current_job['client'] == client:
+ await callback( _current_job['error_cb'], 'Cannot cancel, already plotting!', _current_job )
return False
- if client not in jobs: return False
- job = jobs[client]
+ # remove from job index
+ if client not in _jobs: return False
+ job = _jobs[client]
+ if job == _current_job and _status == 'plotting': return # can't cancel if plotting
+ job['status'] = 'canceled'
job['cancel'] = True # set cancel flag
- del jobs[client] # remove from index
+ job['position'] = None
+ del _jobs[client]
+
+ # remove from queue
+ # if job is the current job, it has already been taken from the top of the queue
+ if job == _current_job:
+ _current_job = None
+ else:
+ queue.pop( queue.index(job) )
await callback( job['cancel_cb'], job ) # notify canceled job
await _notify_queue_size() # notify new queue size
await _notify_queue_positions() # notify queue positions (might have changed for some)
- print(f'❌ {COL.RED}Canceled job [{job["client"]}]{COL.OFF}')
- await save_svg_async(job, 'canceled')
+ print(f'❌ [red]Canceled job \\[{job["client"]}]')
+ save_svg(job)
+ update_positions_and_save()
return True
+async def cancel_current_job(force = True):
+ return await cancel(_current_job['client'], force = force)
+
async def finish_current_job():
- await callback( current_job['done_cb'], current_job ) # notify job done
- del jobs[ current_job['client'] ] # remove from jobs index
+ global _current_job
+ _current_job['status'] = 'finished'
+ _current_job['position'] = None
+ await callback( _current_job['done_cb'], _current_job ) # notify job done
+ del _jobs[ _current_job['client'] ] # remove from jobs index
+ finished_job = _current_job
+ _current_job = None
await _notify_queue_positions() # notify queue positions. current job is 0
await _notify_queue_size() # notify queue size
- print(f'✅ {COL.GREEN}Finished job [{current_job["client"]}]{COL.OFF}')
+ print(f'✅ [green]Finished job \\[{finished_job["client"]}]')
_status = 'waiting'
- await save_svg_async(current_job, 'finished')
+ save_svg(finished_job)
+ update_positions_and_save()
return True
+# current job: 0 (if plotting or waiting, check job['status'])
+def job_pos(job):
+ if (job == _current_job): return 0
+ try:
+ return queue.index(job) + 1
+ except ValueError:
+ return None
+
+def update_position(job):
+ if job == None: return
+ job['position'] = job_pos(job)
+
+def update_positions_and_save():
+ # print('updating positions')
+ for job in _jobs.values():
+ update_position(job)
+ # print(job['client'], job['position'])
+ save_svg(job, overwrite_existing=False)
+
+# positions
+# 0 .. current job
+# 1 .. first in queue (idx 0)
+# last .. num_jobs()-1
+
+# plot['status']: 'waiting'|'plotting'|'paused'|'ok'|'error'|'finished'|'canceled'
+async def move(client, new_pos):
+ global _current_job
+ # print('move', client, new_pos)
+ job = _jobs[client]
+ current_pos = job_pos(job)
+ # print('move: current pos', current_pos)
+ # cannot move if job is already plotting
+ if job['status'] in ['plotting', 'paused']:
+ # print('move: already plotting, can\'t move')
+ return
+
+ # normalize new_pos
+ if new_pos < 0: new_pos = num_jobs() + new_pos
+
+ # clamp to lower bound
+ if new_pos < 0: new_pos = 0
+
+ # can't take place of the plotting (or paused job)
+ if new_pos == 0 and _status in ['plotting', 'paused']: new_pos = 1
+
+ # print(f'move from {current_pos} to {new_pos}')
+
+ # clamp to upper bound
+ if new_pos > num_jobs()-1: new_pos = num_jobs()-1
+
+ # nothing to do
+ if new_pos == current_pos:
+ # print('move: nothing to do')
+ return
+
+ # move job from queue to current job
+ if new_pos == 0:
+ # print('move to top')
+ queue.pop(current_pos-1) # remove from current position
+ queue.insert(0, _current_job) # move current job (pos 0) to first waiting position
+ _current_job = job # set to current job
+ prompt_ui('start_plot', f'Ready to plot job \\[{_current_job["client"]}] ?')
+ # move current job to queue
+ elif current_pos == 0:
+ # print('move from top')
+ old_current_job = _current_job
+ _current_job = queue.pop(0) # new current job is next in line
+ queue.insert(new_pos-1, old_current_job)
+ prompt_ui('start_plot', f'Ready to plot job \\[{_current_job["client"]}] ?')
+ # move within queue
+ else:
+ # print('move within queue')
+ queue.move(current_pos-1, new_pos-1)
+
+ # update 'position' attribute of all jobs and save svgs
+ update_positions_and_save()
+
+ await _notify_queue_size()
+ await _notify_queue_positions() # notify queue positions (might have changed for some)
+
def job_str(job):
- info = '[' + str(job["client"])[0:10] + ']'
+ info = '[' + str(job["client"])[0:10] + '] ' + job['hash'][0:5]
speed_and_format = f'{job["speed"]}%, {job["format"]}, {math.floor(job["time_estimate"]/60)}:{round(job["time_estimate"]%60):02} min'
if 'stats' in job:
stats = job['stats']
@@ -179,6 +379,9 @@ def job_str(job):
103: 'Stopped by keyboard interrupt',
104: 'Lost USB connectivity'
}
+PLOTTER_PAUSED = [ 1, 102, 103 ];
+PLOTTER_OK = [ 0 ];
+PLOTTER_ERROR = [ 101, 104 ];
def get_error_msg(code):
if code in PLOTTER_ERRORS:
@@ -190,7 +393,7 @@ def print_axidraw(*args):
out = ' '.join(args)
lines = out.split('\n')
for line in lines:
- print(f"{COL.GREY}[AxiDraw] " + line + COL.OFF)
+ print(f"[gray50]\\[AxiDraw] " + line)
# Raise pen and disable XY stepper motors
def align():
@@ -216,8 +419,14 @@ def cycle():
ad.plot_run()
return ad.errors.code
-def plot(job, align_after = True, options_cb = None, return_ad = False):
+def request_plot_pause():
+ global _current_ad
+ if _current_ad != None:
+ _current_ad.transmit_pause_request()
+
+def plot(job, align_after = ALIGN_AFTER, align_after_pause = ALIGN_AFTER_PAUSE, options_cb = None, return_ad = False):
if 'svg' not in job: return 0
+ job['status'] = 'plotting'
speed = job['speed'] / 100
with capture_output(print_axidraw, print_axidraw):
ad = axidraw.AxiDraw()
@@ -234,12 +443,22 @@ def plot(job, align_after = True, options_cb = None, return_ad = False):
ad.options.pen_pos_down = PEN_POS_DOWN
if callable(options_cb): options_cb(ad.options)
if TESTING: ad.options.preview = True
+ global _current_ad
+ _current_ad = ad # for request_plot_pause()
job['output_svg'] = ad.plot_run(output=True)
- if align_after: align()
+ _current_ad = None
+ if (ad.errors.code in PLOTTER_PAUSED and align_after_pause) or \
+ (ad.errors.code not in PLOTTER_PAUSED and align_after):
+ align()
+
+ if ad.errors.code in PLOTTER_PAUSED: job['status'] = 'paused'
+ elif ad.errors.code in PLOTTER_OK: job['status'] = 'ok'
+ elif ad.errors.code in PLOTTER_ERROR: job['status'] = 'error'
+
if return_ad: return ad
else: return ad.errors.code
-def resume_home(job, align_after = True, options_cb = None, return_ad = False):
+def resume_home(job, align_after = ALIGN_AFTER, align_after_pause = ALIGN_AFTER_PAUSE, options_cb = None, return_ad = False):
if 'output_svg' not in job: return 0
orig_svg = job['svg'] # save original svg
job['svg'] = job['output_svg'] # set last output svg as input
@@ -248,11 +467,11 @@ def _options_cb(options):
if callable(options_cb): options_cb(options)
options.mode = 'res_home'
- res = plot(job, align_after, _options_cb, return_ad)
+ res = plot(job, align_after, align_after_pause, _options_cb, return_ad)
job['svg'] = orig_svg # restore original svg
return res
-def resume_plot(job, align_after = True, options_cb = None, return_ad = False):
+def resume_plot(job, align_after = ALIGN_AFTER, align_after_pause = ALIGN_AFTER_PAUSE, options_cb = None, return_ad = False):
if 'output_svg' not in job: return 0
orig_svg = job['svg'] # save original svg
job['svg'] = job['output_svg'] # set last output svg as input
@@ -260,8 +479,8 @@ def resume_plot(job, align_after = True, options_cb = None, return_ad = False):
def _options_cb(options):
if callable(options_cb): options_cb(options)
options.mode = 'res_plot'
-
- res = plot(job, align_after, _options_cb, return_ad)
+
+ res = plot(job, align_after, align_after_pause, _options_cb, return_ad)
job['svg'] = orig_svg # restore original svg
return res
@@ -289,21 +508,23 @@ def update_stats(ad):
def _options_cb(options):
options.preview = True
- ad = plot(job, align_after=False, options_cb=_options_cb, return_ad=True)
+ ad = plot(job, align_after=False, align_after_pause=False, options_cb=_options_cb, return_ad=True)
update_stats(ad)
while ad.errors.code == 1: # Paused programmatically
- ad = resume_plot(job, align_after=False, options_cb=_options_cb, return_ad=True)
+ ad = resume_plot(job, align_after=False, align_after_pause=False, options_cb=_options_cb, return_ad=True)
update_stats(ad)
+ job['status'] = 'waiting' # reset status (has been set to 'plotting' by plot())
+
return stats
-async def plot_async(*args):
- return await asyncio.to_thread(plot, *args)
+async def plot_async(*args, **kwargs):
+ return await asyncio.to_thread(plot, *args, **kwargs)
-async def simulate_async(*args):
- return await asyncio.to_thread(simulate, *args)
+async def simulate_async(*args, **kwargs):
+ return await asyncio.to_thread(simulate, *args, **kwargs)
async def align_async():
return await asyncio.to_thread(align)
@@ -311,146 +532,259 @@ async def align_async():
async def cycle_async():
return await asyncio.to_thread(cycle)
-async def resume_plot_async(*args):
- return await asyncio.to_thread(resume_plot, *args)
+async def resume_plot_async(*args, **kwargs):
+ return await asyncio.to_thread(resume_plot, *args, **kwargs)
-async def resume_home_async(*args):
- return await asyncio.to_thread(resume_home, *args)
+async def resume_home_async(*args, **kwargs):
+ return await asyncio.to_thread(resume_home, *args, **kwargs)
+
+async def prompt_setup(message = 'Press \'Done\' when ready'):
+ while True:
+ res = await prompt_ui('setup', message)
+ res = res['id']
+ if res == 'align': # Align
+ print('Aligning...')
+ await align_async() # -> prompt again
+ elif res == 'cycle': # Cycle
+ print('Cycling...')
+ await cycle_async() # -> prompt again
+ elif res == 'neg' : # Finish
+ return True
+
+async def prompt_waiting(message = 'Setup as needed'):
+ while True:
+ res = await prompt_ui('waiting', message)
+ if not res:
+ # print('prompt cancelled')
+ return False # the prompt was intentionally cancelled
+
+ res = res['id']
+ if res == 'align': # Align
+ print('Aligning...')
+ await align_async() # -> prompt again
+ elif res == 'cycle': # Cycle
+ print('Cycling...')
+ await cycle_async() # -> prompt again
+
+def cancel_prompt_waiting():
+ cancel_prompt_ui()
async def prompt_start_plot(message):
- message += f' {KEY_START_PLOT[1]}, {KEY_ALIGN[1]}, {KEY_CYCLE[1]}, {KEY_CANCEL[1]} ?'
while True:
- res = await prompt.wait_for( [KEY_START_PLOT[0],KEY_ALIGN[0],KEY_CYCLE[0],KEY_CANCEL[0]], message, echo=True )
- if res == KEY_START_PLOT[0]: # Start Plot
+ res = await prompt_ui('start_plot', message)
+ if not res: return False # the prompt was intentionally cancelled -> Cancel plotting
+
+ res = res['id']
+ if res == 'pos': # Start Plot
return True
- elif res == KEY_ALIGN[0]: # Align
+ elif res == 'align': # Align
print('Aligning...')
await align_async() # -> prompt again
- elif res == KEY_CYCLE[0]: # Cycle
+ elif res == 'cycle': # Cycle
print('Cycling...')
await cycle_async() # -> prompt again
- elif res == KEY_CANCEL[0]: # Cancel
+ elif res == 'neg': # Cancel
return False
+async def prompt_plotting(message = ''):
+ return await prompt_ui('plotting', message);
+
async def prompt_repeat_plot(message):
- message += f' {KEY_REPEAT[1]}, {KEY_ALIGN[1]}, {KEY_CYCLE[1]}, {KEY_DONE[1]} ?'
while True:
- res = await prompt.wait_for( [KEY_REPEAT[0],KEY_ALIGN[0],KEY_CYCLE[0],KEY_DONE[0]], message, echo=True )
- if res == KEY_REPEAT[0]: # Start Plot
+ res = await prompt_ui('repeat_plot', message)
+ res = res['id']
+ if res == 'pos': # Start Plot
return True
- elif res == KEY_ALIGN[0]: # Align
+ elif res == 'align': # Align
print('Aligning...')
await align_async() # -> prompt again
- elif res == KEY_CYCLE[0]: # Cycle
+ elif res == 'cycle': # Cycle
print('Cycling...')
await cycle_async() # -> prompt again
- elif res == KEY_DONE[0]: # Finish
+ elif res == 'neg': # Done
return False
async def prompt_resume_plot(message, job):
- message += f' {KEY_RESUME[1]}, {KEY_HOME[1]}, {KEY_ALIGN[1]}, {KEY_CYCLE[1]}, {KEY_RESTART_PLOT[1]}, {KEY_DONE[1]} ?'
while True:
- res = await prompt.wait_for( [KEY_RESUME[0],KEY_HOME[0],KEY_ALIGN[0],KEY_CYCLE[0],KEY_RESTART_PLOT[0],KEY_DONE[0]], message, echo=True )
- if res == KEY_RESUME[0]: # Resume Plot
- return 'resume'
- elif res == KEY_RESTART_PLOT[0]: # Restart plot
- return 'restart'
- elif res == KEY_HOME[0]: # Home
+ res = await prompt_ui('resume_plot', message)
+ res = res['id']
+
+ if res == 'pos': # Resume Plot
+ return True
+ elif res == 'home': # Home
print('Returning home...')
await resume_home_async(job) # -> prompt again
- elif res == KEY_ALIGN[0]: # Align
+ elif res == 'align': # Align
print('Aligning...')
await align_async() # -> prompt again
- elif res == KEY_CYCLE[0]: # Cycle
+ elif res == 'cycle': # Cycle
print('Cycling...')
await cycle_async() # -> prompt again
- elif res == KEY_DONE[0]: # Finish
+ elif res == 'neg': # Done
return False
-async def prompt_setup(message = 'Setup Plotter:'):
- message += f' {KEY_ALIGN[1]}, {KEY_CYCLE[1]}, {KEY_DONE[1]} ?'
- while True:
- res = await prompt.wait_for( [KEY_ALIGN[0],KEY_CYCLE[0],KEY_DONE[0]], message, echo=True )
- if res == KEY_ALIGN[0]: # Align
- print('Aligning...')
- await align_async() # -> prompt again
- elif res == KEY_CYCLE[0]: # Cycle
- print('Cycling...')
- await cycle_async() # -> prompt again
- elif res == KEY_DONE[0] : # Finish
- return True
+def svg_to_job(svg, filename = None):
+ NS = 'https://sketch.process.studio/turtle-graphics'
+ root = ElementTree.fromstring(svg)
+
+ def attr(attr, ns = NS):
+ return root.get(attr if ns == None else "{" + ns + "}" + attr)
+
+ received_ts = None
+ if filename != None:
+ match = re.search('\\d{8}_\\d{6}', os.path.basename(filename))
+ if match != None: received_ts = match.group(0)
+
+ job = {
+ 'loaded_from_file': True,
+ 'client': attr('author'),
+ 'id': "XYZ",
+ 'svg': svg,
+ 'stats': {
+ 'count': int(attr('count')),
+ 'layer_count': int(attr('layer_count')),
+ 'oob_count': int(attr('oob_count')),
+ 'short_count': int(attr('short_count')),
+ 'travel': int(attr('travel')),
+ 'travel_ink': int(attr('travel_ink')),
+ 'travel_blank': int(attr('travel_blank'))
+ },
+ 'timestamp': attr('timestamp'),
+ 'speed': int(attr('speed')),
+ 'format': attr('format'),
+ 'size': [int(attr('width_mm')), int(attr('height_mm'))],
+ 'hash': hashlib.sha1(svg.encode('utf-8')).hexdigest(),
+ 'received': received_ts,
+ 'save_path': filename,
+ }
+
+ return job
+
+async def resume_queue_from_disk():
+ import xml.etree.ElementTree as ElementTree
+ try:
+ list = sorted(os.listdir(STATUS_FOLDERS['waiting']))
+ list = [ os.path.join(STATUS_FOLDERS['waiting'], x) for x in list if x.endswith('.svg') ]
+ except FileNotFoundError:
+ list = []
+
+ resumable_jobs = []
+ for filename in list:
+ # print('Loading ', filename)
+ try:
+ with open(filename, 'r') as file: svg = file.read()
+ job = svg_to_job(svg, filename)
+ resumable_jobs.append(job)
+ except:
+ print('Error resuming ', filename)
+
+ if len(resumable_jobs) > 0: print(f"Resuming {len(resumable_jobs)} jobs...")
+ else: print("No jobs to resume")
+ for job in resumable_jobs:
+ await enqueue(job)
-async def start(_prompt, print_status):
- global current_job
+def set_status(status):
+ global _status
+ _status = status
+ print_status()
+
+async def start(app):
+ global _current_job
global _status
+
global print
- global prompt
- prompt = _prompt # make this global
- print = prompt.print # replace global print function
+ print = app.print
+
+ global tprint
+ tprint = app.tprint
+
+ global prompt_ui, cancel_prompt_ui
+ prompt_ui = app.prompt_ui
+ cancel_prompt_ui = app.cancel_prompt_ui
- if TESTING: print(f'{COL.YELLOW}TESTING MODE enabled{COL.OFF}')
+ global print_status
+ print_status = app.update_header
+
+ if TESTING: print('[yellow]TESTING MODE enabled')
+ if RESUME_QUEUE: await resume_queue_from_disk()
await align_async()
- await prompt_setup()
- _status = 'waiting'
+ # await prompt_setup()
while True:
# get the next job from the queue, waits until a job becomes available
if queue.empty():
- print_status()
- current_job = await queue.get()
- if not current_job['cancel']: # skip if job is canceled
- _status = 'confirm_plot'
- print_status()
- ready = await prompt_start_plot(f'Ready to plot job {job_str(current_job)}?')
+ set_status('waiting')
+ asyncio.create_task( prompt_waiting() ) # this allows align/cycle
+ _current_job = await queue.get()
+ cancel_prompt_waiting()
+ update_positions_and_save()
+
+ if not _current_job['cancel']: # skip if job is canceled
+ set_status('confirm_plot')
+ ready = await prompt_start_plot(f'[green]Ready to plot[/green] job \\[{_current_job["client"]}] ?')
if not ready:
- await cancel(current_job['client'], force = True)
- _status = 'waiting'
+ await cancel_current_job()
+ set_status('waiting')
+ _current_job = None
continue # skip over rest of the loop
# plot (and retry on error or repeat)
loop = 0 # number or tries/repetitions
+ layer = 0 # number of programmatic pauses (error 1)
+ interrupt = 0 # number of stops by button press (error 102) or keyboard interrupt (103)
resume = False # flag indicating resume (vs. plotting from start)
while True:
- if (resume):
- print(f'🖨️ {COL.YELLOW}Resuming job [{current_job["client"]}] ...{COL.OFF}')
- _status = 'plotting'
- error = await resume_plot_async(current_job)
+ set_status('plotting')
+ await prompt_plotting(f'\\[{_current_job["client"]}]') # this returns immediately
+ if (resume == 'skip_to_repeat'):
+ error = 0
+ elif resume:
+ print(f'🖨️ [yellow]Resuming job \\[{_current_job["client"]}] ...')
+ error = await resume_plot_async(_current_job)
else:
loop += 1
- print(f'🖨️ {COL.YELLOW}Plotting job [{current_job["client"]}] ...{COL.OFF}')
- _status = 'plotting'
+ print(f'🖨️ [yellow]Plotting job \\[{_current_job["client"]}] ...')
await _notify_queue_positions() # notify plotting
- error = await plot_async(current_job)
+ error = await plot_async(_current_job)
resume = False
# No error
if error == 0:
if REPEAT_JOBS:
- print(f'{COL.BLUE}Done ({loop}x) job [{current_job["client"]}]{COL.OFF}')
- _status = 'confirm_plot'
- repeat = await prompt_repeat_plot(f'{COL.BLUE}Repeat{COL.OFF} job [{current_job["client"]}] ?')
+ print(f'[blue]Done ({loop}x) job \\[{_current_job["client"]}]')
+ set_status('confirm_plot')
+ layer = 0
+ interrupt = 0
+ repeat = await prompt_repeat_plot(f'[yellow]Repeat ({loop+1}) job[/yellow] \\[{_current_job["client"]}] ?')
if repeat: continue
await finish_current_job()
break
# Paused programmatically (1), Stopped by pause button press (102) or Stopped by keyboard interrupt (103)
- elif error in [1, 102, 103]:
- print(f'{COL.YELLOW}Plotter: {get_error_msg(error)}{COL.OFF}')
- _status = 'confirm_plot'
- ready = await prompt_resume_plot(f'{COL.YELLOW}Resume{COL.OFF} job [{current_job["client"]}] ?', current_job)
- if not ready:
- await finish_current_job()
- break
- if ready == 'resume': resume = True
+ elif error in PLOTTER_PAUSED:
+ print(f'[yellow]Plotter: {get_error_msg(error)}')
+ set_status('paused')
+ if error in [1]:
+ layer += 1
+ prompt = f"[blue]Continue layer ({layer+1}/{_current_job['layers']})[/blue]"
+ elif error in [102, 103]:
+ interrupt += 1
+ prompt = f"[blue]Continue ({interrupt+1}) interrupted job[/blue]"
+ if _current_job['layers'] > 1: prompt += f" layer ({layer+1}/{_current_job['layers']})"
+ ready = await prompt_resume_plot(f'{prompt} \\[{_current_job["client"]}] ?', _current_job)
+ if ready: resume = True
+ else:
+ resume = 'skip_to_repeat' # Skip to asking to repeat job
+ _current_job['status'] = 'ok' # set status to 'successfully printed'
# Errors
else:
- print(f'{COL.RED}Plotter: {get_error_msg(error)}{COL.OFF}')
- _status = 'confirm_plot'
- ready = await prompt_start_plot(f'{COL.RED}Retry{COL.OFF} job [{current_job["client"]}] ?')
+ print(f'[red]Plotter: {get_error_msg(error)}')
+ set_status('confirm_plot')
+ ready = await prompt_start_plot(f'[red]Retry job \\[{_current_job["client"]}] ?')
if not ready:
- await cancel(current_job['client'], force = True)
+ await cancel(_current_job['client'], force = True)
break
-
- _status = 'waiting'
- current_job = None
+
+ set_status('waiting')
+ _current_job = None
diff --git a/spooler_old.py b/spooler_old.py
new file mode 100644
index 0000000..4ff49e1
--- /dev/null
+++ b/spooler_old.py
@@ -0,0 +1,528 @@
+import asyncio
+from pyaxidraw import axidraw
+from tty_colors import COL
+from datetime import datetime, timezone
+import math
+import os
+from capture_output import capture_output
+import re
+import hashlib
+
+FOLDER_WAITING ='svgs/0_waiting'
+FOLDER_CANCELED ='svgs/1_canceled'
+FOLDER_FINISHED ='svgs/2_finished'
+PEN_POS_UP = 60 # Default: 60
+PEN_POS_DOWN = 40 # Default: 40
+MIN_SPEED = 10 # percent
+
+KEY_DONE = [ 'd', '(D)one' ]
+KEY_REPEAT = [ 'r', '(R)epeat' ]
+KEY_START_PLOT = [ 'p', '(P)lot' ]
+KEY_RESTART_PLOT = [ 'p', '(P)lot from start' ]
+KEY_ALIGN = [ 'a', '(A)lign' ]
+KEY_CYCLE = [ 'c', '(C)ycle' ]
+KEY_CANCEL = [ chr(27), '(Esc) Cancel Job' ]
+KEY_RESUME = [ 'r', '(R)esume' ]
+KEY_HOME = [ 'h', '(H)ome' ]
+
+TESTING = False # Don't actually connect to AxiDraw, just simulate plotting
+REPEAT_JOBS = True # Ask to repeat a plot after a sucessful print
+RESUME_QUEUE = True # Resume plotting queue after quitting/restarting
+ALIGN_AFTER = True # Align plotter after success or error
+ALIGN_AFTER_PAUSE = False # Align plotter after pause (programmatic, stop button, keyboard interrupt)
+
+queue_size_cb = None
+queue = asyncio.Queue() # an async FIFO queue
+jobs = {} # an index to all unfinished jobs by client id (in queue or current_job) (insertion order is preserved in dict since python 3.7)
+current_job = None
+_status = 'setup' # setup | waiting | confirm_plot | plotting
+
+async def callback(fn, *args):
+ if callable(fn):
+ await fn(*args)
+
+async def _notify_queue_positions():
+ cbs = []
+ for i, client in enumerate(jobs):
+ job = jobs[client]
+ if i == 0 and _status == 'plotting': i = -1
+ if 'position_notified' not in job or job['position_notified'] != i:
+ job['position_notified'] = i
+ cbs.append( callback(job['queue_position_cb'], i, job) )
+ await asyncio.gather(*cbs) # run callbacks concurrently
+
+async def _notify_queue_size():
+ await callback(queue_size_cb, len(jobs))
+
+def set_queue_size_cb(cb):
+ global queue_size_cb
+ queue_size_cb = cb
+
+def queue_size():
+ return len(jobs)
+
+def status():
+ return {
+ 'status': _status,
+ 'job': current_job['client'] if current_job != None else None,
+ 'job_str': job_str(current_job) if current_job != None else None,
+ 'queue_size': queue_size(),
+ }
+
+def timestamp(date = None):
+ if date == None:
+ # make timezone aware timestamp: https://stackoverflow.com/a/39079819
+ date = datetime.now(timezone.utc)
+ date = date.replace(tzinfo=date.astimezone().tzinfo)
+ return date.strftime("%Y%m%d_%H%M%S.%f_UTC%z")
+
+# status: 'waiting' | 'canceled' | 'finished'
+def save_svg(job, status):
+ if status not in ['waiting', 'canceled', 'finished']:
+ return False
+ filename = f'{job["received"]}_{job["client"][0:10]}_{job["hash"][0:5]}.svg'
+ files = {
+ 'waiting': os.path.join(FOLDER_WAITING, filename),
+ 'canceled': os.path.join(FOLDER_CANCELED, filename),
+ 'finished': os.path.join(FOLDER_FINISHED, filename),
+ }
+ for key, file in files.items():
+ if key == status:
+ os.makedirs( os.path.dirname(file), exist_ok=True)
+ with open(file, 'w', encoding='utf-8') as f: f.write(job['svg'])
+ else:
+ try:
+ os.remove(file)
+ except:
+ pass
+ return True
+
+def save_svg_async(*args, **kwargs):
+ return asyncio.to_thread(save_svg, *args, **kwargs)
+
+
+# Updated pre version 4 SVGs, so they are compatible with resume queue
+def update_svg(job):
+ match = re.search('tg:version="(\\d+)"', job['svg'])
+ if match != None and int(match.group(1)) >= 4: return
+
+ MARKER = 'xmlns:tg="https://sketch.process.studio/turtle-graphics"'
+ idx = job['svg'].find(MARKER)
+ if idx == -1: return
+ idx += len(MARKER)
+ insert = f'\n tg:version="4" tg:layer_count="1" tg:oob_count="{job['stats']['oob_count']}" tg:short_count="{job['stats']['short_count']}" tg:format="{job['format']}" tg:width_mm="{job['size'][0]}" tg:height_mm="{job['size'][1]}" tg:speed="{job['speed']}" tg:author="{job['client']}" tg:timestamp="{job['timestamp']}"'
+
+ job['svg'] = job['svg'][:idx] + insert + job['svg'][idx:]
+ job['hash'] = hashlib.sha1(job['svg'].encode('utf-8')).hexdigest()
+
+# job {'type': 'plot, 'client', 'id', 'svg', stats, timestamp, hash, speed, format, size, received?}
+# adds to job: { 'cancel', time_estimate', 'layers', received }
+# todo: don't wait on callbacks
+async def enqueue(job, queue_position_cb = None, done_cb = None, cancel_cb = None, error_cb = None):
+ # the client might be in queue (or currently plotting)
+ if job['client'] in jobs:
+ await callback( error_cb, 'Cannot add job, you already have a job queued!', job )
+ return False
+
+ job['cancel'] = False
+ # save callbacks
+ job['queue_position_cb'] = queue_position_cb
+ job['done_cb'] = done_cb
+ job['cancel_cb'] = cancel_cb
+ job['error_cb'] = error_cb
+ if 'received' not in job or job['received'] == None:
+ job['received'] = timestamp()
+
+ # speed
+ if 'speed' in job: job['speed'] = max( min(job['speed'], 100), MIN_SPEED ) # limit speed (MIN_SPEED, 100)
+ else: job['speed'] = 100
+ # format
+ if 'format' not in job: job['format'] = 'A3_LANDSCAPE'
+
+ # add to jobs index
+ jobs[ job['client'] ] = job
+ await _notify_queue_size() # notify new queue size
+ await _notify_queue_positions()
+ print(f'New job [{job["client"]}] {job["hash"][0:5]}')
+ sim = await simulate_async(job) # run simulation
+ job['time_estimate'] = sim['time_estimate']
+ job['layers'] = sim['layers']
+
+ update_svg(job)
+ await queue.put(job)
+ await save_svg_async(job, 'waiting')
+ return True
+
+async def cancel(client, force = False):
+ if not force:
+ if current_job != None and current_job['client'] == client:
+ await callback( current_job['error_cb'], 'Cannot cancel, already plotting!', current_job )
+ return False
+
+ if client not in jobs: return False
+ job = jobs[client]
+ job['cancel'] = True # set cancel flag
+ del jobs[client] # remove from index
+
+ await callback( job['cancel_cb'], job ) # notify canceled job
+ await _notify_queue_size() # notify new queue size
+ await _notify_queue_positions() # notify queue positions (might have changed for some)
+ print(f'❌ {COL.RED}Canceled job [{job["client"]}]{COL.OFF}')
+ await save_svg_async(job, 'canceled')
+ return True
+
+async def finish_current_job():
+ await callback( current_job['done_cb'], current_job ) # notify job done
+ del jobs[ current_job['client'] ] # remove from jobs index
+ await _notify_queue_positions() # notify queue positions. current job is 0
+ await _notify_queue_size() # notify queue size
+ print(f'✅ {COL.GREEN}Finished job [{current_job["client"]}]{COL.OFF}')
+ _status = 'waiting'
+ await save_svg_async(current_job, 'finished')
+ return True
+
+def job_str(job):
+ info = '[' + str(job["client"])[0:10] + '] ' + job['hash'][0:5]
+ speed_and_format = f'{job["speed"]}%, {job["format"]}, {math.floor(job["time_estimate"]/60)}:{round(job["time_estimate"]%60):02} min'
+ if 'stats' in job:
+ stats = job['stats']
+ layers = f'{job["layers"]} layers, ' if 'layers' in job and job['layers'] > 1 else ''
+ if 'count' in stats and 'travel' in stats and 'travel_ink' in stats:
+ info += f' ({stats["count"]} lines, {layers}{int(stats["travel_ink"])}/{int(stats["travel"])} mm, {speed_and_format})'
+ else:
+ info += f' ({speed_and_format})'
+ return info
+
+
+# Return codes
+PLOTTER_ERRORS = {
+ 0: 'No error; operation nominal',
+ 1: 'Paused programmatically',
+ 101: 'Failed to connect',
+ 102: 'Stopped by pause button press',
+ 103: 'Stopped by keyboard interrupt',
+ 104: 'Lost USB connectivity'
+}
+PLOTTER_PAUSED = [ 1, 102, 103 ];
+
+def get_error_msg(code):
+ if code in PLOTTER_ERRORS:
+ return PLOTTER_ERRORS[code]
+ else:
+ return f'Unkown error (Code {code})'
+
+def print_axidraw(*args):
+ out = ' '.join(args)
+ lines = out.split('\n')
+ for line in lines:
+ print(f"{COL.GREY}[AxiDraw] " + line + COL.OFF)
+
+# Raise pen and disable XY stepper motors
+def align():
+ with capture_output(print_axidraw, print_axidraw):
+ ad = axidraw.AxiDraw()
+ ad.plot_setup()
+ ad.options.mode = 'align' # A setup mode: Raise pen, disable XY stepper motors
+ ad.options.pen_pos_up = PEN_POS_UP
+ ad.options.pen_pos_down = PEN_POS_DOWN
+ if TESTING: ad.options.preview = True
+ ad.plot_run()
+ return ad.errors.code
+
+# Cycle the pen down and back up
+def cycle():
+ with capture_output(print_axidraw, print_axidraw):
+ ad = axidraw.AxiDraw()
+ ad.plot_setup()
+ ad.options.mode = 'cycle' # A setup mode: Lower and then raise the pen
+ ad.options.pen_pos_up = PEN_POS_UP
+ ad.options.pen_pos_down = PEN_POS_DOWN
+ if TESTING: ad.options.preview = True
+ ad.plot_run()
+ return ad.errors.code
+
+def plot(job, align_after = ALIGN_AFTER, align_after_pause = ALIGN_AFTER_PAUSE, options_cb = None, return_ad = False):
+ if 'svg' not in job: return 0
+ speed = job['speed'] / 100
+ with capture_output(print_axidraw, print_axidraw):
+ ad = axidraw.AxiDraw()
+ ad.plot_setup(job['svg'])
+ ad.options.model = 2 # A3
+ ad.options.reordering = 4 # No reordering
+ ad.options.auto_rotate = True # (This is the default) Drawings that are taller than wide will be rotated 90 deg to the left
+ ad.options.speed_pendown = int(110 * speed)
+ ad.options.speed_penup = int(110 * speed)
+ ad.options.accel = int(100 * speed)
+ ad.options.pen_rate_lower = int(100 * speed)
+ ad.options.pen_rate_raise = int(100 * speed)
+ ad.options.pen_pos_up = PEN_POS_UP
+ ad.options.pen_pos_down = PEN_POS_DOWN
+ if callable(options_cb): options_cb(ad.options)
+ if TESTING: ad.options.preview = True
+ job['output_svg'] = ad.plot_run(output=True)
+ if (ad.errors.code in PLOTTER_PAUSED and align_after_pause) or \
+ (ad.errors.code not in PLOTTER_PAUSED and align_after):
+ align()
+ if return_ad: return ad
+ else: return ad.errors.code
+
+def resume_home(job, align_after = ALIGN_AFTER, align_after_pause = ALIGN_AFTER_PAUSE, options_cb = None, return_ad = False):
+ if 'output_svg' not in job: return 0
+ orig_svg = job['svg'] # save original svg
+ job['svg'] = job['output_svg'] # set last output svg as input
+
+ def _options_cb(options):
+ if callable(options_cb): options_cb(options)
+ options.mode = 'res_home'
+
+ res = plot(job, align_after, align_after_pause, _options_cb, return_ad)
+ job['svg'] = orig_svg # restore original svg
+ return res
+
+def resume_plot(job, align_after = ALIGN_AFTER, align_after_pause = ALIGN_AFTER_PAUSE, options_cb = None, return_ad = False):
+ if 'output_svg' not in job: return 0
+ orig_svg = job['svg'] # save original svg
+ job['svg'] = job['output_svg'] # set last output svg as input
+
+ def _options_cb(options):
+ if callable(options_cb): options_cb(options)
+ options.mode = 'res_plot'
+
+ res = plot(job, align_after, align_after_pause, _options_cb, return_ad)
+ job['svg'] = orig_svg # restore original svg
+ return res
+
+def simulate(job):
+ if 'svg' not in job: return 0
+ speed = job['speed'] / 100
+
+ stats = {
+ 'error_code': None,
+ 'time_estimate': 0,
+ 'distance_total': 0,
+ 'distance_pendown': 0,
+ 'pen_lifts': 0,
+ 'layers': 0
+ }
+
+ def update_stats(ad):
+ stats['error_code'] = ad.errors.code
+ stats['time_estimate'] += ad.time_estimate
+ stats['distance_total'] += ad.distance_total
+ stats['distance_pendown'] += ad.distance_pendown
+ stats['pen_lifts'] += ad.pen_lifts
+ stats['layers'] += 1
+
+ def _options_cb(options):
+ options.preview = True
+
+ ad = plot(job, align_after=False, align_after_pause=False, options_cb=_options_cb, return_ad=True)
+ update_stats(ad)
+
+ while ad.errors.code == 1: # Paused programmatically
+ ad = resume_plot(job, align_after=False, align_after_pause=False, options_cb=_options_cb, return_ad=True)
+ update_stats(ad)
+
+ return stats
+
+
+async def plot_async(*args, **kwargs):
+ return await asyncio.to_thread(plot, *args, **kwargs)
+
+async def simulate_async(*args, **kwargs):
+ return await asyncio.to_thread(simulate, *args, **kwargs)
+
+async def align_async():
+ return await asyncio.to_thread(align)
+
+async def cycle_async():
+ return await asyncio.to_thread(cycle)
+
+async def resume_plot_async(*args, **kwargs):
+ return await asyncio.to_thread(resume_plot, *args, **kwargs)
+
+async def resume_home_async(*args, **kwargs):
+ return await asyncio.to_thread(resume_home, *args, **kwargs)
+
+async def prompt_start_plot(message):
+ message += f' {KEY_START_PLOT[1]}, {KEY_ALIGN[1]}, {KEY_CYCLE[1]}, {KEY_CANCEL[1]} ?'
+ while True:
+ res = await prompt.wait_for( [KEY_START_PLOT[0],KEY_ALIGN[0],KEY_CYCLE[0],KEY_CANCEL[0]], message, echo=True )
+ if res == KEY_START_PLOT[0]: # Start Plot
+ return True
+ elif res == KEY_ALIGN[0]: # Align
+ print('Aligning...')
+ await align_async() # -> prompt again
+ elif res == KEY_CYCLE[0]: # Cycle
+ print('Cycling...')
+ await cycle_async() # -> prompt again
+ elif res == KEY_CANCEL[0]: # Cancel
+ return False
+
+async def prompt_repeat_plot(message):
+ message += f' {KEY_REPEAT[1]}, {KEY_ALIGN[1]}, {KEY_CYCLE[1]}, {KEY_DONE[1]} ?'
+ while True:
+ res = await prompt.wait_for( [KEY_REPEAT[0],KEY_ALIGN[0],KEY_CYCLE[0],KEY_DONE[0]], message, echo=True )
+ if res == KEY_REPEAT[0]: # Start Plot
+ return True
+ elif res == KEY_ALIGN[0]: # Align
+ print('Aligning...')
+ await align_async() # -> prompt again
+ elif res == KEY_CYCLE[0]: # Cycle
+ print('Cycling...')
+ await cycle_async() # -> prompt again
+ elif res == KEY_DONE[0]: # Finish
+ return False
+
+async def prompt_resume_plot(message, job):
+ message += f' {KEY_RESUME[1]}, {KEY_HOME[1]}, {KEY_ALIGN[1]}, {KEY_CYCLE[1]}, {KEY_RESTART_PLOT[1]}, {KEY_DONE[1]} ?'
+ while True:
+ res = await prompt.wait_for( [KEY_RESUME[0],KEY_HOME[0],KEY_ALIGN[0],KEY_CYCLE[0],KEY_RESTART_PLOT[0],KEY_DONE[0]], message, echo=True )
+ if res == KEY_RESUME[0]: # Resume Plot
+ return 'resume'
+ elif res == KEY_RESTART_PLOT[0]: # Restart plot
+ return 'restart'
+ elif res == KEY_HOME[0]: # Home
+ print('Returning home...')
+ await resume_home_async(job) # -> prompt again
+ elif res == KEY_ALIGN[0]: # Align
+ print('Aligning...')
+ await align_async() # -> prompt again
+ elif res == KEY_CYCLE[0]: # Cycle
+ print('Cycling...')
+ await cycle_async() # -> prompt again
+ elif res == KEY_DONE[0]: # Finish
+ return False
+
+async def prompt_setup(message = 'Setup Plotter:'):
+ message += f' {KEY_ALIGN[1]}, {KEY_CYCLE[1]}, {KEY_DONE[1]} ?'
+ while True:
+ res = await prompt.wait_for( [KEY_ALIGN[0],KEY_CYCLE[0],KEY_DONE[0]], message, echo=True )
+ if res == KEY_ALIGN[0]: # Align
+ print('Aligning...')
+ await align_async() # -> prompt again
+ elif res == KEY_CYCLE[0]: # Cycle
+ print('Cycling...')
+ await cycle_async() # -> prompt again
+ elif res == KEY_DONE[0] : # Finish
+ return True
+
+async def resume_queue():
+ import xml.etree.ElementTree as ElementTree
+ list = sorted(os.listdir(FOLDER_WAITING))
+ list = [ os.path.join(FOLDER_WAITING, x) for x in list if x.endswith('.svg') ]
+ resumable_jobs = []
+ for filename in list:
+ # print('Loading ', filename)
+ try:
+ with open(filename, 'r') as file:
+ svg = file.read()
+ root = ElementTree.fromstring(svg)
+ def attr(attr, ns = 'https://sketch.process.studio/turtle-graphics'):
+ return root.get(attr if ns == None else "{" + ns + "}" + attr)
+ match = re.search('\\d{8}_\\d{6}.\\d{6}_UTC[+-]\\d{4}', os.path.basename(filename))
+ received_ts = None if match == None else match.group(0)
+ job = {
+ 'loaded_from_file': True,
+ 'client': attr('author'),
+ 'id': "XYZ",
+ 'svg': svg,
+ 'stats': {
+ 'count': int(attr('count')),
+ 'layer_count': int(attr('layer_count')),
+ 'oob_count': int(attr('oob_count')),
+ 'short_count': int(attr('short_count')),
+ 'travel': int(attr('travel')),
+ 'travel_ink': int(attr('travel_ink')),
+ 'travel_blank': int(attr('travel_blank'))
+ },
+ 'timestamp': attr('timestamp'),
+ 'speed': int(attr('speed')),
+ 'format': attr('format'),
+ 'size': [int(attr('width_mm')), int(attr('height_mm'))],
+ 'hash': hashlib.sha1(svg.encode('utf-8')).hexdigest(),
+ 'received': received_ts
+ }
+ resumable_jobs.append(job)
+ except:
+ print('Error resuming ', filename)
+
+ if len(resumable_jobs) > 0: print(f"Resuming {len(resumable_jobs)} jobs...")
+ else: print("No jobs to resume")
+ for job in resumable_jobs:
+ await enqueue(job)
+
+
+async def start(_prompt, print_status):
+ global current_job
+ global _status
+ global print
+ global prompt
+ prompt = _prompt # make this global
+ print = prompt.print # replace global print function
+
+ if TESTING: print(f'{COL.YELLOW}TESTING MODE enabled{COL.OFF}')
+ if RESUME_QUEUE: await resume_queue()
+
+ await align_async()
+ await prompt_setup()
+ _status = 'waiting'
+
+ while True:
+ # get the next job from the queue, waits until a job becomes available
+ if queue.empty():
+ print_status()
+ current_job = await queue.get()
+ if not current_job['cancel']: # skip if job is canceled
+ _status = 'confirm_plot'
+ print_status()
+ ready = await prompt_start_plot(f'Ready to plot job {job_str(current_job)}?')
+ if not ready:
+ await cancel(current_job['client'], force = True)
+ _status = 'waiting'
+ continue # skip over rest of the loop
+
+ # plot (and retry on error or repeat)
+ loop = 0 # number or tries/repetitions
+ resume = False # flag indicating resume (vs. plotting from start)
+ while True:
+ if (resume):
+ print(f'🖨️ {COL.YELLOW}Resuming job [{current_job["client"]}] ...{COL.OFF}')
+ _status = 'plotting'
+ error = await resume_plot_async(current_job)
+ else:
+ loop += 1
+ print(f'🖨️ {COL.YELLOW}Plotting job [{current_job["client"]}] ...{COL.OFF}')
+ _status = 'plotting'
+ await _notify_queue_positions() # notify plotting
+ error = await plot_async(current_job)
+ resume = False
+ # No error
+ if error == 0:
+ if REPEAT_JOBS:
+ print(f'{COL.BLUE}Done ({loop}x) job [{current_job["client"]}]{COL.OFF}')
+ _status = 'confirm_plot'
+ repeat = await prompt_repeat_plot(f'{COL.BLUE}Repeat{COL.OFF} job [{current_job["client"]}] ?')
+ if repeat: continue
+ await finish_current_job()
+ break
+ # Paused programmatically (1), Stopped by pause button press (102) or Stopped by keyboard interrupt (103)
+ elif error in PLOTTER_PAUSED:
+ print(f'{COL.YELLOW}Plotter: {get_error_msg(error)}{COL.OFF}')
+ _status = 'confirm_plot'
+ ready = await prompt_resume_plot(f'{COL.YELLOW}Resume{COL.OFF} job [{current_job["client"]}] ?', current_job)
+ if not ready:
+ await finish_current_job()
+ break
+ if ready == 'resume': resume = True
+ # Errors
+ else:
+ print(f'{COL.RED}Plotter: {get_error_msg(error)}{COL.OFF}')
+ _status = 'confirm_plot'
+ ready = await prompt_start_plot(f'{COL.RED}Retry{COL.OFF} job [{current_job["client"]}] ?')
+ if not ready:
+ await cancel(current_job['client'], force = True)
+ break
+
+ _status = 'waiting'
+ current_job = None
diff --git a/start b/start
index 3c8311e..871caeb 100755
--- a/start
+++ b/start
@@ -11,17 +11,35 @@ quit() {
trap quit EXIT
usage() {
- echo "Usage: $(basename $0) [-f <0|1>] [-n <0|1>]"
- echo " -f ... Start frpc (Default 1)"
- echo " -n ... Start ngrok (Default 0)"
+ echo "Usage: $(basename $0) [-d] [-f <0|1>] [-n <0|1>]"
+ echo " -h ... Show this help"
+ echo " -d ... Dev Mode"
+ echo " -c ... Console (Run in second terminal to see output from Dev Mode)"
+ echo " -f ... Start frpc (Default 1 except when using -d)"
+ echo " -n ... Start ngrok (Default 0)"
exit 1
}
+dev=false
start_frpc=true
start_ngrok=false
-while getopts ":f:n:h" opts; do
+# A letter followed by a colon means the switch takes an argument
+while getopts "hcdf:n:" opts; do
case "${opts}" in
+ h)
+ usage
+ exit 1
+ ;;
+ c)
+ textual console -x event -x debug
+ exit 0
+ ;;
+ d)
+ dev=true
+ start_frpc=true
+ start_ngrok=false
+ ;;
f)
f=${OPTARG}
[[ $f -eq 0 || $f -eq 1 ]] || usage
@@ -32,14 +50,14 @@ while getopts ":f:n:h" opts; do
[[ $n -eq 0 || $n -eq 1 ]] || usage
[[ $n -eq 1 || -z $n ]] && start_ngrok=true || start_ngrok=false
;;
- h)
- usage
- ;;
*)
usage
+ exit 1
;;
esac
done
+
+# remove switches from $@
shift $((OPTIND-1))
if $start_frpc; then
@@ -56,5 +74,15 @@ if $start_ngrok; then
ngrok_pid=$!
fi
+# Try to activate venv
+if [[ -f ./venv/bin/activate ]]; then
+ echo "Activating virtual env..."
+ source ./venv/bin/activate
+fi
+
# run plotter-server
-python main.py
+if $dev; then
+ textual run --dev main.py
+else
+ python main.py
+fi
diff --git a/test_job.py b/test_job.py
new file mode 100644
index 0000000..1f27904
--- /dev/null
+++ b/test_job.py
@@ -0,0 +1,83 @@
+test_square = '''
+
+
+'''
+
+test_wide = '''
+
+
+'''
+
+test_high = '''
+
+
+'''
+
+test_layers = '''
+
+
+'''
+
+from spooler import svg_to_job
+import uuid
+
+def test_job(name = 'layers'):
+ svg = globals()['test_' + name]
+ client = 'Test-' + uuid.uuid4().hex[:3]
+ svg = svg.replace('tg:author=""', f'tg:author="{client}"')
+ job = svg_to_job(svg)
+ return job