399 lines
13 KiB
Python
399 lines
13 KiB
Python
import logging
|
|
import pkgutil
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
from queue import Queue
|
|
from threading import Event
|
|
|
|
from pyvirtualdisplay import Display
|
|
from selenium.common.exceptions import (ElementClickInterceptedException,
|
|
NoSuchElementException,
|
|
TimeoutException)
|
|
from selenium.webdriver import Firefox
|
|
from selenium.webdriver.common.by import By
|
|
from selenium.webdriver.common.keys import Keys
|
|
from selenium.webdriver.firefox.options import Options
|
|
from selenium.webdriver.firefox.webdriver import WebDriver
|
|
from selenium.webdriver.support import expected_conditions as EC
|
|
from selenium.webdriver.support.ui import WebDriverWait
|
|
|
|
from talked import config
|
|
from talked.ffmpeg import assemble_command
|
|
|
|
|
|
def start(
|
|
token: str,
|
|
queue: Queue,
|
|
recording: Event,
|
|
nextcloud_version: int,
|
|
audio_only: bool,
|
|
grid_view: bool,
|
|
) -> None:
|
|
msg_queue = queue
|
|
|
|
# Assemble link for call to record
|
|
call_link = assemble_call_link(config["base_url"], token) # type: ignore
|
|
|
|
# Make sure an instance of Pulseaudio is running.
|
|
logging.info("Starting pulseaudio")
|
|
subprocess.run(["pulseaudio", "--start"], check=True) # nosec
|
|
|
|
logging.info("Starting virtual x server")
|
|
with Display(
|
|
backend="xvfb",
|
|
size=(config["video_width"], config["video_height"]),
|
|
color_depth=config["color_depth"],
|
|
):
|
|
logging.info("Starting browser")
|
|
logging.info(call_link)
|
|
browser = launch_browser(call_link, msg_queue, nextcloud_version, grid_view)
|
|
logging.info("Starting ffmpeg process")
|
|
|
|
try:
|
|
ffmpeg_command, filename = assemble_command(audio_only)
|
|
except RuntimeError:
|
|
msg_queue.put(
|
|
{
|
|
"status": "error",
|
|
"message": (
|
|
"There was an issue with the recording configuration, "
|
|
"please contact an administrator."
|
|
),
|
|
}
|
|
)
|
|
graceful_shutdown(browser)
|
|
|
|
ffmpeg = subprocess.Popen(ffmpeg_command) # nosec
|
|
logging.info("Recording has started")
|
|
|
|
msg_queue.put(
|
|
{
|
|
"status": "ok",
|
|
"message": "Recording has started.",
|
|
}
|
|
)
|
|
|
|
recording.wait()
|
|
|
|
logging.info("Stop ffmpeg process")
|
|
ffmpeg.terminate()
|
|
logging.info("Stop browser")
|
|
browser.close()
|
|
logging.info("Recording has stopped")
|
|
msg_queue.put(
|
|
{
|
|
"status": "ok",
|
|
"message": "Recording has stopped.",
|
|
}
|
|
)
|
|
|
|
if config.get("finalise_recording_script", ""):
|
|
logging.info("Running finalise recording script")
|
|
try:
|
|
subprocess.run( # nosec
|
|
[
|
|
config["finalise_recording_script"],
|
|
f"{config['recording_dir']}/{filename}",
|
|
],
|
|
check=True,
|
|
)
|
|
except subprocess.CalledProcessError:
|
|
logging.error(
|
|
"The finalise recording script failed to run successfully!"
|
|
)
|
|
|
|
logging.info("Done!")
|
|
|
|
|
|
def launch_browser(
|
|
call_link: str, msg_queue: Queue, nextcloud_version: int, grid_view: bool
|
|
) -> WebDriver:
|
|
logging.info("Configuring browser options")
|
|
options = Options()
|
|
options.set_preference("media.navigator.permission.disabled", True)
|
|
options.set_preference("privacy.webrtc.legacyGlobalIndicator", False)
|
|
options.set_preference("full-screen-api.warning.timeout", 0)
|
|
options.add_argument("--kiosk")
|
|
options.add_argument(f"--width={config['video_width']}")
|
|
options.add_argument(f"--height={config['video_height']}")
|
|
|
|
logging.info("Creating browser")
|
|
driver = Firefox(options=options)
|
|
logging.info("Navigate to call link")
|
|
driver.get(call_link)
|
|
|
|
# Check if loaded page is a valid Talk room
|
|
is_valid_talk_room(driver, msg_queue)
|
|
|
|
# Change the name of the recording user
|
|
change_name_of_user(driver, nextcloud_version)
|
|
|
|
join_call(driver, msg_queue, nextcloud_version)
|
|
|
|
# Get page body to send keyboard shortcuts
|
|
page = driver.find_element(By.TAG_NAME, "body")
|
|
|
|
# Press escape to remove focus from chat.
|
|
page.send_keys(Keys.ESCAPE)
|
|
# Mute the talked user
|
|
mute_user(driver, nextcloud_version)
|
|
|
|
# If grid view is set to False, switch to speaker view.
|
|
if not grid_view:
|
|
switch_to_speaker_view(driver, nextcloud_version)
|
|
|
|
close_sidebar(driver, nextcloud_version)
|
|
|
|
# Go fullscreen
|
|
page.send_keys("f")
|
|
|
|
# Load custom CSS used to improve the recording
|
|
load_custom_css(driver)
|
|
|
|
# Give it some time to properly connect to participants.
|
|
time.sleep(5)
|
|
return driver
|
|
|
|
|
|
def assemble_call_link(base_url: str, token: str) -> str:
|
|
return base_url + "/index.php/call/" + token
|
|
|
|
|
|
def is_valid_talk_room(driver: WebDriver, msg_queue: Queue) -> None:
|
|
"""Checks if the loaded page is a valid Talk room.
|
|
It looks for the start call / join call button, if it isn't there
|
|
it throws a TimeoutException notifies the HTTP api
|
|
and shuts down the browser.
|
|
|
|
Accepts the current browser driver.
|
|
"""
|
|
|
|
try:
|
|
WebDriverWait(driver, 10).until(
|
|
EC.presence_of_element_located(
|
|
(By.CSS_SELECTOR, ".app-talk .top-bar .top-bar__button")
|
|
)
|
|
)
|
|
except TimeoutException:
|
|
msg_queue.put(
|
|
{
|
|
"status": "error",
|
|
"message": "Failed to load the Talk room.",
|
|
}
|
|
)
|
|
logging.warning("Failed to load the Talk room.")
|
|
graceful_shutdown(driver)
|
|
|
|
|
|
def change_name_of_user(driver: WebDriver, nextcloud_version: int) -> None:
|
|
logging.info("Changing name of recording user")
|
|
|
|
if nextcloud_version >= 24:
|
|
edit_name = WebDriverWait(driver, 10).until(
|
|
EC.visibility_of_element_located(
|
|
(By.CSS_SELECTOR, ".username-form button .pencil-icon")
|
|
)
|
|
)
|
|
else:
|
|
edit_name = WebDriverWait(driver, 10).until(
|
|
EC.visibility_of_element_located(
|
|
(By.CSS_SELECTOR, ".username-form button.icon-rename")
|
|
)
|
|
)
|
|
|
|
edit_name.click()
|
|
driver.find_element(By.CSS_SELECTOR, "input.username-form__input").send_keys(
|
|
"Talked" + Keys.ENTER
|
|
)
|
|
|
|
|
|
def join_call(driver: WebDriver, msg_queue: Queue, nextcloud_version: int) -> None:
|
|
# Wait for the green Join Call button to appear then click it
|
|
logging.info("Waiting for join call button to appear")
|
|
|
|
if nextcloud_version >= 24:
|
|
join_call_button_css_selector = (
|
|
"#call_button.button-vue--vue-success:not(:disabled)"
|
|
)
|
|
else:
|
|
join_call_button_css_selector = "button.top-bar__button.success:not(:disabled)"
|
|
|
|
try:
|
|
join_call_button = WebDriverWait(driver, 10).until(
|
|
EC.presence_of_element_located(
|
|
(By.CSS_SELECTOR, join_call_button_css_selector)
|
|
)
|
|
)
|
|
except TimeoutException:
|
|
msg_queue.put(
|
|
{
|
|
"status": "error",
|
|
"message": "There doesn't seem to be an active call in the room.",
|
|
}
|
|
)
|
|
logging.warning("There doesn't seem to be an active call in the room.")
|
|
graceful_shutdown(driver)
|
|
|
|
logging.info("Joining call")
|
|
join_call_button.click()
|
|
|
|
if nextcloud_version >= 23:
|
|
logging.info("Handling device checker screen")
|
|
|
|
if nextcloud_version >= 24:
|
|
device_checker_join_call_button_css_selector = (
|
|
".device-checker #call_button.button-vue--vue-success:not(:disabled)"
|
|
)
|
|
else:
|
|
device_checker_join_call_button_css_selector = (
|
|
".device-checker #call_button.success"
|
|
)
|
|
|
|
try:
|
|
device_checker_join_call_button = WebDriverWait(driver, 10).until(
|
|
EC.presence_of_element_located(
|
|
(By.CSS_SELECTOR, device_checker_join_call_button_css_selector)
|
|
)
|
|
)
|
|
except TimeoutException:
|
|
msg_queue.put(
|
|
{
|
|
"status": "error",
|
|
"message": "There doesn't seem to be an active call in the room.",
|
|
}
|
|
)
|
|
logging.warning("There doesn't seem to be an active call in the room.")
|
|
graceful_shutdown(driver)
|
|
|
|
device_checker_join_call_button.click()
|
|
|
|
# Wait for the call to initiate
|
|
try:
|
|
WebDriverWait(driver, 10).until(
|
|
EC.presence_of_element_located((By.CSS_SELECTOR, ".top-bar.in-call"))
|
|
)
|
|
except TimeoutException:
|
|
msg_queue.put(
|
|
{
|
|
"status": "error",
|
|
"message": "Failed to initiate call.",
|
|
}
|
|
)
|
|
logging.warning("Failed to initiate call.")
|
|
graceful_shutdown(driver)
|
|
|
|
|
|
def mute_user(driver: WebDriver, nextcloud_version: int) -> None:
|
|
logging.info("Muting Talked user")
|
|
|
|
if nextcloud_version >= 25:
|
|
mute_button_css_selector = "#muteWrapper .button-vue:not(.no-audio-available)"
|
|
else:
|
|
mute_button_css_selector = "#mute:not(.audio-disabled)"
|
|
|
|
try:
|
|
driver.find_element(By.CSS_SELECTOR, mute_button_css_selector).click()
|
|
except NoSuchElementException:
|
|
logging.info(("Mute button wasn't found. Assuming we are already muted."))
|
|
|
|
|
|
def switch_to_speaker_view(driver: WebDriver, nextcloud_version: int) -> None:
|
|
# Switch to speaker view
|
|
logging.info("Switching to speaker view")
|
|
|
|
if nextcloud_version >= 23:
|
|
driver.find_element(
|
|
By.CSS_SELECTOR, ".local-media-controls button.action-item__menutoggle"
|
|
).click()
|
|
|
|
if nextcloud_version >= 25:
|
|
speaker_view_button_css_selector = "button.action-button .promoted-view-icon"
|
|
else:
|
|
speaker_view_button_css_selector = "button.action-button .icon-promoted-view"
|
|
|
|
try:
|
|
WebDriverWait(driver, 2).until(
|
|
EC.presence_of_element_located(
|
|
(
|
|
By.CSS_SELECTOR,
|
|
speaker_view_button_css_selector,
|
|
)
|
|
)
|
|
).click()
|
|
except TimeoutException:
|
|
logging.info(
|
|
(
|
|
"Speaker view button wasn't found. "
|
|
"Assuming we are already in speaker view."
|
|
)
|
|
)
|
|
else:
|
|
try:
|
|
driver.find_element(
|
|
By.CSS_SELECTOR, ".top-bar.in-call button.icon-promoted-view"
|
|
).click()
|
|
except NoSuchElementException:
|
|
logging.info(
|
|
(
|
|
"Speaker view button wasn't found. "
|
|
"Assuming we are already in speaker view."
|
|
)
|
|
)
|
|
|
|
|
|
def close_sidebar(driver: WebDriver, nextcloud_version: int) -> None:
|
|
# Close the sidebar
|
|
logging.info("Closing sidebar")
|
|
|
|
if nextcloud_version >= 25:
|
|
close_sidebar_button_css_selector = "button.app-sidebar__close"
|
|
leave_call_button_css_selector = ".top-bar.in-call .top-bar__button .video-off-icon"
|
|
else:
|
|
close_sidebar_button_css_selector = "a.app-sidebar__close"
|
|
leave_call_button_css_selector = ".top-bar.in-call .top-bar__button .icon-leave-call"
|
|
|
|
try:
|
|
driver.find_element(By.CSS_SELECTOR, close_sidebar_button_css_selector).click()
|
|
except ElementClickInterceptedException:
|
|
logging.info("Assuming toast is covering close button")
|
|
close_toasts(driver)
|
|
driver.find_element(By.CSS_SELECTOR, close_sidebar_button_css_selector).click()
|
|
|
|
# Wait for sidebar to close
|
|
WebDriverWait(driver, 10).until(
|
|
EC.visibility_of_element_located(
|
|
(
|
|
By.CSS_SELECTOR,
|
|
leave_call_button_css_selector,
|
|
)
|
|
)
|
|
)
|
|
|
|
|
|
def close_toasts(driver: WebDriver) -> None:
|
|
while True:
|
|
logging.info("Closing toast")
|
|
try:
|
|
driver.find_element(By.CSS_SELECTOR, "span.toast-close").click()
|
|
except NoSuchElementException:
|
|
logging.info("No more open toasts")
|
|
break
|
|
|
|
|
|
def load_custom_css(driver: WebDriver) -> None:
|
|
logging.info("Loading custom CSS")
|
|
|
|
javascript = pkgutil.get_data("talked", "static/custom_css.js")
|
|
|
|
if javascript:
|
|
driver.execute_script(javascript.decode("UTF-8"))
|
|
else:
|
|
logging.info("The custom CSS couldn't be loaded")
|
|
|
|
|
|
def graceful_shutdown(driver: WebDriver) -> None:
|
|
logging.info("Shutting down...")
|
|
driver.close()
|
|
sys.exit()
|