OctoPrint-Qrcodespoolswitcher/octoprint_QRCodeSpoolSwitcher/__init__.py
2024-05-16 20:57:33 +02:00

137 lines
4.4 KiB
Python

# coding=utf-8
from __future__ import absolute_import
import threading
import cv2
import numpy as np
import requests
from time import sleep
import octoprint.plugin
class QrcodespoolswitcherPlugin(
octoprint.plugin.StartupPlugin,
octoprint.plugin.SettingsPlugin,
octoprint.plugin.AssetPlugin,
octoprint.plugin.TemplatePlugin,
octoprint.plugin.EventHandlerPlugin,
):
def __init__(self):
self.snapshot = ""
self.toolId = 0
self.selectedSpoolId = None
self.commitCurrentSpoolValues = True
self.frequency = 5
self.watching = False
def get_settings_defaults(self):
return {
"snapshot": "http://127.0.0.1:8080/?action=snapshot",
"toolId": 0,
"commitCurrentSpoolValues": True,
"frequency": 5,
}
def get_assets(self):
return {
"js": ["js/QRCodeSpoolSwitcher.js"],
"css": ["css/QRCodeSpoolSwitcher.css"],
"less": ["less/QRCodeSpoolSwitcher.less"],
}
def get_update_information(self):
return {
"QRCodeSpoolSwitcher": {
"displayName": "Qrcodespoolswitcher Plugin",
"displayVersion": self._plugin_version,
"type": "github_release",
"user": "mwalbeck",
"repo": "OctoPrint-Qrcodespoolswitcher",
"current": self._plugin_version,
"pip": "https://git.walbeck.it/mwalbeck/OctoPrint-Qrcodespoolswitcher/archive/{target_version}.zip",
}
}
def on_after_startup(self):
self.snapshot = self._settings.get(["snapshot"])
self.toolId = self._settings.get_int(["toolId"])
self.frequency = self._settings.get_int(["frequency"])
self.commitCurrentSpoolValues = self._settings.get_boolean(
["commitCurrentSpoolValues"]
)
self._logger.info("Starting QR code watcher")
watcher = threading.Thread(target=self.watch_for_qr)
watcher.start()
def on_event(self, event, payload):
if event == "plugin_spoolmanager_spool_selected":
if payload["toolId"] == self.toolId:
if payload["databaseId"] != self.selectedSpoolId:
self.selectedSpoolId = payload["databaseId"]
def watch_for_qr(self):
self.watching = True
qrcode_detector = cv2.QRCodeDetector()
while self.watching:
sleep(self.frequency)
try:
response = requests.get(self.snapshot, timeout=2)
response.raise_for_status()
except requests.RequestException as e:
self._logger.error(f"Failed to fetch image for QR code scaning: {e}")
continue
image_array = np.asarray(bytearray(response.content), dtype=np.uint8)
image = cv2.imdecode(image_array, cv2.IMREAD_UNCHANGED)
retval, decoded_info, points, straight_qrcode = (
qrcode_detector.detectAndDecodeMulti(image)
)
if retval:
self._logger.info(decoded_info)
for string in decoded_info:
if "SpoolManager/selectSpoolByQRCode" in string:
spoolId = int(string.split("/")[-1])
self._logger.info(f"Found spool {spoolId}")
if spoolId != self.selectedSpoolId:
self.select_spool(spoolId)
break
def select_spool(self, spoolId):
self.selectedSpoolId = spoolId
self._logger.info(f"Changed active spool to {spoolId}")
# payload = {
# "databaseId": spoolId,
# "toolIndex": self.toolId,
# "commitCurrentSpoolValues": self.commitCurrentSpoolValues,
# }
# try:
# requests.put(
# "http://localhost:5000/plugin/SpoolManager/selectSpool", json=payload
# )
# except requests.RequestException as e:
# self._logger.error(f"Error occured during request to change spool: {e}")
__plugin_name__ = "QRCodeSpoolSwitcher"
__plugin_pythoncompat__ = ">=3,<4"
def __plugin_load__():
global __plugin_implementation__
__plugin_implementation__ = QrcodespoolswitcherPlugin()
global __plugin_hooks__
__plugin_hooks__ = {
"octoprint.plugin.softwareupdate.check_config": __plugin_implementation__.get_update_information
}