podweasel/podweasel/__init__.py

564 lines
17 KiB
Python
Executable File

#!/usr/bin/env python3
"""podweasel - podcatcher for the terminal
Usage:
podweasel import <feed-url> [<shortname>] [-c=<path>]
podweasel update [<shortname>] [-c=<path>]
podweasel feeds [-c=<path>]
podweasel episodes <shortname> [-c=<path>]
podweasel download [<shortname> --how-many=<n>] [-c=<path>]
podweasel rename <shortname> <newname> [-c=<path>]
Options:
-c --config=<path> Specify an alternate config file [default: ~/.podweasel.json]
-h --help Show this help
"""
# (C) 2015 Bastian Reitemeier
# mail(at)brtmr.de
import sys
from sys import exit
import os
import os.path
from os.path import expanduser
import re
import json
from time import time, mktime, gmtime, strftime, sleep
from email.utils import parsedate
import mimetypes
import colorama
from colorama import Fore, Back, Style
import feedparser
import requests
from tqdm import tqdm
from docopt import docopt
# RSS datetimes follow RFC 2822, same as email headers.
# this is the chain of stackoverflow posts that led me to believe this is true.
# http://stackoverflow.com/questions/11993258/
# what-is-the-correct-format-for-rss-feed-pubdate
# http://stackoverflow.com/questions/885015/
# how-to-parse-a-rfc-2822-date-time-into-a-python-datetime
CONFIGURATION = {}
MIMETYPES = [
"audio/ogg",
"audio/mpeg",
"audio/mp3",
"audio/opus",
"audio/aac",
"video/mp4",
]
def print_err(err):
print(
Fore.RED + Style.BRIGHT + err + Fore.RESET + Back.RESET + Style.RESET_ALL,
file=sys.stderr,
)
def print_green(s):
print(Fore.GREEN + s + Fore.RESET)
def get_folder(shortname):
base = CONFIGURATION["podcast_directory"]
return os.path.join(base, shortname)
def get_feed_file(shortname):
return os.path.join(get_folder(shortname), "feed.json")
def sort_feed(feed):
feed["episodes"] = sorted(
feed["episodes"], key=lambda k: k["published"], reverse=True
)
return feed
def import_feed(url, shortname=""):
"""
creates a folder for the new feed, and then inserts a new feed.json
that will contain all the necessary information about this feed, and
all the episodes contained.
"""
# configuration for this feed, will be written to file.
feed = {}
# get the feed.
d = feedparser.parse(url)
if shortname:
folder = get_folder(shortname)
if os.path.exists(folder):
print_err("{} already exists".format(folder))
exit(-1)
else:
os.makedirs(folder)
# if the user did not specify a folder name,
# we have to create one from the title
if not shortname:
# the rss advertises a title, lets use that.
if hasattr(d["feed"], "title"):
title = d["feed"]["title"]
# still no succes, lets use the last part of the url
else:
title = url.rsplit("/", 1)[-1]
# we wanna avoid any filename crazyness,
# so foldernames will be restricted to lowercase ascii letters,
# numbers, and dashes:
title = "".join(ch for ch in title if ch.isalnum() or ch == " ")
shortname = title.replace(" ", "-").lower()
if not shortname:
print_err("could not auto-deduce shortname.")
print_err("please provide one explicitly.")
exit(-1)
folder = get_folder(shortname)
if os.path.exists(folder):
print_err("{} already exists".format(folder))
exit(-1)
else:
os.makedirs(folder)
# we have succesfully generated a folder that we can store the files in
# trawl all the entries, and find links to audio files.
feed["episodes"] = episodes_from_feed(d)
feed["shortname"] = shortname
feed["title"] = d["feed"]["title"]
feed["url"] = url
feed["description"] = d["feed"]["description"]
# write the configuration to a feed.json within the folder
feed_file = get_feed_file(shortname)
feed = sort_feed(feed)
with open(feed_file, "x") as f:
json.dump(feed, f, indent=4)
print(
"imported "
+ Fore.GREEN
+ feed["title"]
+ Fore.RESET
+ " with shortname "
+ Fore.BLUE
+ feed["shortname"]
+ Fore.RESET
)
if "cover_image" in CONFIGURATION and CONFIGURATION["cover_image"]:
get_cover_image(shortname, d["feed"]["image"]["url"])
def update_feed(feed):
"""
download the current feed, and insert previously unknown
episodes into our local config.
"""
d = feedparser.parse(feed["url"])
# only append new episodes!
for episode in episodes_from_feed(d):
found = False
for old_episode in feed["episodes"]:
if (
episode["published"] == old_episode["published"]
and episode["title"] == old_episode["title"]
):
found = True
if not found:
feed["episodes"].append(episode)
print("new episode.")
feed = sort_feed(feed)
overwrite_config(feed)
if "cover_image" in CONFIGURATION and CONFIGURATION["cover_image"]:
get_cover_image(feed["shortname"], d["feed"]["image"]["url"])
def get_cover_image(shortname, url):
"""
download the cover image of podcast
"""
# Check if an image name is set in the config file
if "cover_image_name" in CONFIGURATION:
filename = CONFIGURATION["cover_image_name"]
else:
filename = "folder"
filename += get_extenstion(url)
# Only download if the image doesn't exists in the podcast folder
if not file_exists(shortname, filename):
download_single(shortname, url, filename)
def overwrite_config(feed):
"""
after updating the feed, or downloading new items,
we want to update our local config to reflect that fact.
"""
filename = get_feed_file(feed["shortname"])
with open(filename, "w") as f:
json.dump(feed, f, indent=4)
def episodes_from_feed(d):
episodes = []
for entry in d.entries:
# convert publishing time to unix time, so that we can sort
# this should be unix time, barring any timezone shenanigans
date = mktime(parsedate(entry.published))
if hasattr(entry, "links"):
for link in entry.links:
if not hasattr(link, "type"):
continue
if hasattr(link, "type") and (link.type in MIMETYPES):
if hasattr(entry, "title"):
episode_title = entry.title
else:
episode_title = link.href
if hasattr(entry, "description"):
episode_description = entry.description
else:
episode_description = ""
episodes.append(
{
"title": episode_title,
"url": link.href,
"downloaded": False,
"listened": False,
"published": date,
"description": episode_description,
}
)
return episodes
def rename_episode(folder, published, title, url):
if "date_format" in CONFIGURATION:
date_format = CONFIGURATION["date_format"]
else:
date_format = "%Y-%m-%d"
# Use published date and escaped title as filename
safe_title = escape_string(title) + get_extenstion(url)
if date_format:
published_date = strftime(date_format, gmtime(published))
else:
published_date = None
filename = construct_filename(safe_title, published_date)
if not file_exists(folder, filename):
return filename
# If filename exists change title to original filename
original_title = get_original_filename(url)
filename = construct_filename(original_title, published_date)
if not file_exists(folder, filename):
return filename
# If filename exists change date to current and title to escaped title
if date_format:
current_date = strftime(date_format, gmtime())
else:
current_date = None
filename = construct_filename(safe_title, current_date)
if not file_exists(folder, filename):
return filename
# If filename exists change date to current and title to original filename
filename = construct_filename(original_title, current_date)
if not file_exists(folder, filename):
return filename
# If filename exists change date to current epoch and original filename
return construct_filename(original_title, int(time()))
def construct_filename(title, date=None):
if date is None:
return title
return "{} - {}".format(date, title)
def escape_string(title):
pattern = r'[\|#:%&{}\\/<>*?$!\'"@]'
return re.sub(pattern, "_", title)
def get_extenstion(url):
response = requests.head(url, allow_redirects=True, timeout=10)
mimetype = response.headers["Content-Type"]
extension = mimetypes.guess_extension(mimetype)
if not extension:
url = url.split("?")[0]
pattern = r"[.][\w]+$"
extension = re.search(pattern, url).group(0)
if extension == ".jpe":
extension = ".jpg"
return extension
def get_original_filename(url):
url = url.split("?")[0]
pattern = r"[^\/]+$"
return re.search(pattern, url).group(0)
def file_exists(shortname, filename):
base = CONFIGURATION["podcast_directory"]
if os.path.exists(os.path.join(base, shortname, filename)):
return True
return False
def remove_file(path):
os.remove(path)
def generic_episode_name(folder, url):
filename = get_original_filename(url)
if not file_exists(folder, filename):
return filename
return construct_filename(filename, int(time()))
def download_multiple(feed, maxnum):
for episode in feed["episodes"]:
if maxnum == 0:
break
if episode["downloaded"]:
maxnum -= 1
if not episode["downloaded"]:
if "rename_episodes" in CONFIGURATION and CONFIGURATION["rename_episodes"]:
filename = rename_episode(
feed["shortname"],
episode["published"],
episode["title"],
episode["url"],
)
else:
filename = generic_episode_name(feed["shortname"], episode["url"])
if download_single(feed["shortname"], episode["url"], filename) is True:
episode["downloaded"] = True
maxnum -= 1
overwrite_config(feed)
def download_single(folder, url, filename):
print(url)
base = CONFIGURATION["podcast_directory"]
if "connection_timeout" in CONFIGURATION:
connection_timeout = CONFIGURATION["connection_timeout"]
else:
connection_timeout = 10
if "connection_retries" in CONFIGURATION:
connection_retries = CONFIGURATION["connection_retries"]
else:
connection_retries = 3
print_green("{:s} downloading".format(filename))
for i in range(connection_retries):
try:
r = requests.get(url.strip(), stream=True, timeout=connection_timeout)
size = int(r.headers.get("content-length"))
progress = tqdm(total=size, unit="B", unit_scale=True)
with open(os.path.join(base, folder, filename), "wb") as f:
for chunk in r.iter_content(1024):
f.write(chunk)
progress.update(len(chunk))
progress.close()
except requests.Timeout:
if progress:
progress.close()
if i == connection_retries - 1:
print("Connection to server timed out")
else:
print("Connection timed out, retrying...")
sleep(1)
continue
except requests.ConnectionError:
if progress:
progress.close()
if i == connection_retries - 1:
print("Failed to establish connection with server")
else:
print("Connection failed, retrying...")
sleep(1)
continue
else:
print("done.")
break
else:
if file_exists(folder, filename):
remove_file(os.path.join(base, folder, filename))
return False
return True
def available_feeds():
"""
podweasel will save each feed to its own folder. Each folder should
contain a json configuration file describing which elements
have been downloaded already, and how many will be kept.
"""
base = CONFIGURATION["podcast_directory"]
paths = [
p
for p in os.listdir(base)
if os.path.isdir(get_folder(p)) and os.path.isfile(get_feed_file(p))
]
# for every folder, check wether a configuration file exists.
results = []
for shortname in paths:
with open(get_feed_file(shortname), "r") as f:
feed = json.load(f)
results.append(feed)
return sorted(results, key=lambda k: k["title"])
def find_feed(shortname):
"""
all feeds are identified by their shortname, which is also the name of
the folder they will be stored in.
this function will find the correct folder, and parse the json file
within that folder to generate the feed data
"""
feeds = available_feeds()
for feed in feeds:
if feed["shortname"] == shortname:
return feed
return None
def rename(shortname, newname):
folder = get_folder(shortname)
new_folder = get_folder(newname)
if not os.path.isdir(folder):
print_err("folder {0} not found".format(folder))
exit(-1)
os.rename(folder, new_folder)
feed = find_feed(shortname)
feed["shortname"] = newname
overwrite_config(feed)
def pretty_print_feeds(feeds):
format_str = Fore.GREEN + "{0:45.45} |"
format_str += Fore.BLUE + " {1:40}" + Fore.RESET + Back.RESET
print(format_str.format("title", "shortname"))
print("=" * 80)
for feed in feeds:
format_str = Fore.GREEN + "{0:40.40} {1:3d}{2:1.1} |"
format_str += Fore.BLUE + " {3:40}" + Fore.RESET + Back.RESET
feed = sort_feed(feed)
amount = len([ep for ep in feed["episodes"] if ep["downloaded"]])
dl = "" if feed["episodes"][0]["downloaded"] else "*"
print(format_str.format(feed["title"], amount, dl, feed["shortname"]))
def pretty_print_episodes(feed):
format_str = Fore.GREEN + "{0:40} |"
format_str += Fore.BLUE + " {1:20}" + Fore.RESET + Back.RESET
for e in feed["episodes"][:20]:
status = "Downloaded" if e["downloaded"] else "Not Downloaded"
print(format_str.format(e["title"][:40], status))
def main():
global CONFIGURATION
colorama.init()
arguments = docopt(__doc__, version="p0d 0.01")
# before we do anything with the commands,
# find the configuration file
configfile = expanduser(arguments["--config"])
with open(configfile) as conf_file:
try:
CONFIGURATION = json.load(conf_file)
except ValueError:
print("invalid json in configuration file.")
exit(-1)
# handle the commands
if arguments["import"]:
if arguments["<shortname>"] is None:
import_feed(arguments["<feed-url>"])
else:
import_feed(arguments["<feed-url>"], shortname=arguments["<shortname>"])
exit(0)
if arguments["feeds"]:
pretty_print_feeds(available_feeds())
exit(0)
if arguments["episodes"]:
feed = find_feed(arguments["<shortname>"])
if feed:
pretty_print_episodes(feed)
exit(0)
else:
print_err("feed {} not found".format(arguments["<shortname>"]))
exit(-1)
if arguments["update"]:
if arguments["<shortname>"]:
feed = find_feed(arguments["<shortname>"])
if feed:
print_green("updating {}".format(feed["title"]))
update_feed(feed)
exit(0)
else:
print_err("feed {} not found".format(arguments["<shortname>"]))
exit(-1)
else:
for feed in available_feeds():
print_green("updating {}".format(feed["title"]))
update_feed(feed)
exit(0)
if arguments["download"]:
if arguments["--how-many"]:
maxnum = int(arguments["--how-many"])
elif "maxnum" in CONFIGURATION:
maxnum = CONFIGURATION["maxnum"]
else:
maxnum = -1
# download episodes for a specific feed
if arguments["<shortname>"]:
feed = find_feed(arguments["<shortname>"])
if feed:
download_multiple(feed, maxnum)
exit(0)
else:
print_err("feed {} not found".format(arguments["<shortname>"]))
exit(-1)
# download episodes for all feeds.
else:
for feed in available_feeds():
download_multiple(feed, maxnum)
exit(0)
if arguments["rename"]:
rename(arguments["<shortname>"], arguments["<newname>"])
if __name__ == "__main__":
main()