138 lines
3.8 KiB
Python
138 lines
3.8 KiB
Python
import argparse
|
|
import logging
|
|
import sys
|
|
from typing import Any
|
|
|
|
import tomli
|
|
import paho.mqtt.client as mqtt
|
|
import pigpio
|
|
|
|
from printer_enclosure import __version__
|
|
|
|
pi = pigpio.pi()
|
|
config: dict[str, Any] = {}
|
|
|
|
|
|
def setup_arguments():
|
|
parser = argparse.ArgumentParser()
|
|
|
|
parser.add_argument(
|
|
"-c",
|
|
"--config",
|
|
required=True,
|
|
help="Path to config file",
|
|
)
|
|
|
|
return parser
|
|
|
|
|
|
def on_connect(client, userdata, flags, rc):
|
|
logging.info("Connected with result code - %s", rc)
|
|
|
|
client.publish(f"{config['mqtt_topic_prefix']}/version", __version__)
|
|
client.subscribe(f"{config['mqtt_topic_prefix']}/fan/state/set")
|
|
client.subscribe(f"{config['mqtt_topic_prefix']}/fan/speed/set")
|
|
client.subscribe(f"{config['mqtt_topic_prefix']}/light/state/set")
|
|
|
|
|
|
def main():
|
|
parser = setup_arguments()
|
|
args = parser.parse_args()
|
|
|
|
global config
|
|
|
|
with open(args.config, "rb") as config_file:
|
|
try:
|
|
config = tomli.load(config_file)
|
|
except ValueError:
|
|
logging.critical("Invalid toml in config file.")
|
|
sys.exit()
|
|
|
|
logging.basicConfig(
|
|
format="%(asctime)s - %(message)s", level=config["loglevel"].upper()
|
|
)
|
|
|
|
logging.info("printer-enclosure - %s", __version__)
|
|
|
|
# Set GPIO pin modes
|
|
pi.set_mode(config["pwm_gpio_pin"], pigpio.OUTPUT)
|
|
pi.set_mode(config["fan_gpio_pin"], pigpio.OUTPUT)
|
|
pi.set_mode(config["light_gpio_pin"], pigpio.OUTPUT)
|
|
|
|
client = mqtt.Client(client_id=config["mqtt_client_id"], clean_session=False)
|
|
client.on_connect = on_connect
|
|
client.on_message = on_message
|
|
|
|
client.connect(config["host"], config["port"], 60)
|
|
|
|
set_fan_state(client, "OFF")
|
|
set_fan_speed(client, 40)
|
|
set_light_state(client, "OFF")
|
|
|
|
client.loop_forever()
|
|
|
|
|
|
def on_message(client, userdata, message):
|
|
if message.topic == f"{config['mqtt_topic_prefix']}/fan/state/set":
|
|
process_fan_command(client, message)
|
|
elif message.topic == f"{config['mqtt_topic_prefix']}/fan/speed/set":
|
|
process_fan_speed_command(client, message)
|
|
elif message.topic == f"{config['mqtt_topic_prefix']}/light/state/set":
|
|
process_light_command(client, message)
|
|
|
|
|
|
def process_fan_command(client, message):
|
|
if message.payload == b"ON":
|
|
set_fan_state(client, "ON")
|
|
elif message.payload == b"OFF":
|
|
set_fan_state(client, "OFF")
|
|
|
|
|
|
def process_fan_speed_command(client, message):
|
|
fan_speed_percentage = int(message.payload.decode())
|
|
|
|
if fan_speed_percentage < 0 or fan_speed_percentage > 100:
|
|
logging.error("Fan speed percentage can only be between 0-100")
|
|
return
|
|
|
|
set_fan_speed(client, fan_speed_percentage)
|
|
|
|
|
|
def process_light_command(client, message):
|
|
if message.payload == b"ON":
|
|
set_light_state(client, "ON")
|
|
elif message.payload == b"OFF":
|
|
set_light_state(client, "OFF")
|
|
|
|
|
|
def set_fan_state(client, fan_state):
|
|
if fan_state == "ON":
|
|
pi.write(config["fan_gpio_pin"], 1)
|
|
elif fan_state == "OFF":
|
|
pi.write(config["fan_gpio_pin"], 0)
|
|
|
|
client.publish(f"{config['mqtt_topic_prefix']}/fan/state", fan_state, retain=True)
|
|
logging.info("Turned fan %s", fan_state)
|
|
|
|
|
|
def set_fan_speed(client, fan_speed_percentage):
|
|
pi.hardware_PWM(
|
|
config["pwm_gpio_pin"],
|
|
config["pwm_frequency"],
|
|
fan_speed_percentage * 10000,
|
|
)
|
|
client.publish(
|
|
f"{config['mqtt_topic_prefix']}/fan/speed/state", fan_speed_percentage, retain=True
|
|
)
|
|
logging.info("Set fan speed to %s procent", fan_speed_percentage)
|
|
|
|
|
|
def set_light_state(client, light_state):
|
|
if light_state == "ON":
|
|
pi.write(config["light_gpio_pin"], 1)
|
|
elif light_state == "OFF":
|
|
pi.write(config["light_gpio_pin"], 0)
|
|
|
|
client.publish(f"{config['mqtt_topic_prefix']}/light/state", light_state, retain=True)
|
|
logging.info("Turned light %s", light_state)
|