printer-enclosure/printer_enclosure/main.py

138 lines
3.8 KiB
Python

import argparse
import logging
import sys
from typing import Any
import tomli
import paho.mqtt.client as mqtt
import pigpio
from printer_enclosure import __version__
pi = pigpio.pi()
config: dict[str, Any] = {}
def setup_arguments():
parser = argparse.ArgumentParser()
parser.add_argument(
"-c",
"--config",
required=True,
help="Path to config file",
)
return parser
def on_connect(client, userdata, flags, rc):
logging.info("Connected with result code - %s", rc)
client.publish(f"{config['mqtt_topic_prefix']}/version", __version__)
client.subscribe(f"{config['mqtt_topic_prefix']}/fan/state/set")
client.subscribe(f"{config['mqtt_topic_prefix']}/fan/speed/set")
client.subscribe(f"{config['mqtt_topic_prefix']}/light/state/set")
def main():
parser = setup_arguments()
args = parser.parse_args()
global config
with open(args.config, "rb") as config_file:
try:
config = tomli.load(config_file)
except ValueError:
logging.critical("Invalid toml in config file.")
sys.exit()
logging.basicConfig(
format="%(asctime)s - %(message)s", level=config["loglevel"].upper()
)
logging.info("printer-enclosure - %s", __version__)
# Set GPIO pin modes
pi.set_mode(config["pwm_gpio_pin"], pigpio.OUTPUT)
pi.set_mode(config["fan_gpio_pin"], pigpio.OUTPUT)
pi.set_mode(config["light_gpio_pin"], pigpio.OUTPUT)
client = mqtt.Client(client_id=config["mqtt_client_id"], clean_session=False)
client.on_connect = on_connect
client.on_message = on_message
client.connect(config["host"], config["port"], 60)
set_fan_state(client, "OFF")
set_fan_speed(client, 40)
set_light_state(client, "OFF")
client.loop_forever()
def on_message(client, userdata, message):
if message.topic == f"{config['mqtt_topic_prefix']}/fan/state/set":
process_fan_command(client, message)
elif message.topic == f"{config['mqtt_topic_prefix']}/fan/speed/set":
process_fan_speed_command(client, message)
elif message.topic == f"{config['mqtt_topic_prefix']}/light/state/set":
process_light_command(client, message)
def process_fan_command(client, message):
if message.payload == b"ON":
set_fan_state(client, "ON")
elif message.payload == b"OFF":
set_fan_state(client, "OFF")
def process_fan_speed_command(client, message):
fan_speed_percentage = int(message.payload.decode())
if fan_speed_percentage < 0 or fan_speed_percentage > 100:
logging.error("Fan speed percentage can only be between 0-100")
return
set_fan_speed(client, fan_speed_percentage)
def process_light_command(client, message):
if message.payload == b"ON":
set_light_state(client, "ON")
elif message.payload == b"OFF":
set_light_state(client, "OFF")
def set_fan_state(client, fan_state):
if fan_state == "ON":
pi.write(config["fan_gpio_pin"], 1)
elif fan_state == "OFF":
pi.write(config["fan_gpio_pin"], 0)
client.publish(f"{config['mqtt_topic_prefix']}/fan/state", fan_state, retain=True)
logging.info("Turned fan %s", fan_state)
def set_fan_speed(client, fan_speed_percentage):
pi.hardware_PWM(
config["pwm_gpio_pin"],
config["pwm_frequency"],
fan_speed_percentage * 10000,
)
client.publish(
f"{config['mqtt_topic_prefix']}/fan/speed/state", fan_speed_percentage, retain=True
)
logging.info("Set fan speed to %s procent", fan_speed_percentage)
def set_light_state(client, light_state):
if light_state == "ON":
pi.write(config["light_gpio_pin"], 1)
elif light_state == "OFF":
pi.write(config["light_gpio_pin"], 0)
client.publish(f"{config['mqtt_topic_prefix']}/light/state", light_state, retain=True)
logging.info("Turned light %s", light_state)