pi-soulra/pi_soulra/main.py

111 lines
2.7 KiB
Python

import argparse
import logging
from threading import Event
import os
import subprocess
import paho.mqtt.client as mqtt
from pi_soulra import __version__
from pi_soulra import __file__
power_state = Event()
def setup_arguments():
parser = argparse.ArgumentParser()
parser.add_argument(
"-H",
"--host",
required=True,
help="Hostname or IP address of MQTT broker",
)
parser.add_argument(
"-p",
"--port",
type=int,
default=1883,
help="Port for MQTT broker",
)
parser.add_argument(
"-l",
"--loglevel",
choices=["critical", "error", "warning", "info", "debug"],
default="warning",
help="Port for MQTT broker",
)
return parser
def on_connect(client, userdata, flags, rc):
logging.info("Connected with result code - %s", rc)
client.publish("speakers/pi-soulra/version", __version__)
client.subscribe("speakers/pi-soulra/command")
def on_message(client, userdata, message):
if message.payload == b"power_on" and not power_state.is_set():
power_state.set()
send(["power", "aux"])
client.publish("speakers/pi-soulra/state", "on")
logging.info("Sent power_on signal")
elif message.payload == b"power_off" and power_state.is_set():
power_state.clear()
send(["power"])
client.publish("speakers/pi-soulra/state", "off")
logging.info("Sent power_off signal")
elif message.payload == b"volume_up":
send(["volumeup"])
logging.info("Sent volume-up signal")
elif message.payload == b"volume_down":
send(["volumedown"])
logging.info("Sent volume-down signal")
elif message.payload == b"aux":
send(["aux"])
logging.info("Sent aux signal")
elif message.payload == b"bass":
send(["bass"])
logging.info("Sent bass signal")
elif message.payload == b"shutdown":
logging.info("Shutting down...")
subprocess.run(["systemctl", "poweroff"], check=False)
def send(commands):
for command in commands:
subprocess.run(
["ir-ctl", "-s", f"{os.path.dirname(__file__)}/ir_codes/{command}"],
check=False,
)
def main():
parser = setup_arguments()
args = parser.parse_args()
logging.basicConfig(format="%(asctime)s - %(message)s", level=args.loglevel.upper())
logging.debug(args)
logging.info("Pi-Soulra - %s", __version__)
client = mqtt.Client(client_id="pi-soulra", clean_session=False)
client.connect(args.host, args.port, 60)
client.on_connect = on_connect
client.on_message = on_message
client.loop_forever()
main()