Format with black and add flake8 config

This commit is contained in:
Magnus Walbeck 2020-08-12 23:45:50 +02:00
parent c379059ff4
commit 3d7a1ce738
Signed by: mwalbeck
GPG key ID: CCB78CFF3F950769
3 changed files with 159 additions and 139 deletions

3
.flake8 Normal file
View file

@ -0,0 +1,3 @@
[flake8]
select = C,E,F,W,B,B950
ignore = E203, E501, W503

View file

@ -1,3 +1,8 @@
{
"python.pythonPath": "./.venv/bin/python"
"python.pythonPath": ".venv/bin/python",
"editor.formatOnSave": true,
"[python]": {
"editor.tabSize": 4,
"editor.rulers": [88]
}
}

View file

@ -44,16 +44,14 @@ from docopt import docopt
CONFIGURATION = {}
MIMETYPES = [
'audio/ogg',
'audio/mpeg',
'video/mp4'
]
MIMETYPES = ["audio/ogg", "audio/mpeg", "video/mp4"]
def print_err(err):
print(Fore.RED + Style.BRIGHT + err +
Fore.RESET + Back.RESET + Style.RESET_ALL, file=sys.stderr)
print(
Fore.RED + Style.BRIGHT + err + Fore.RESET + Back.RESET + Style.RESET_ALL,
file=sys.stderr,
)
def print_green(s):
@ -61,26 +59,27 @@ def print_green(s):
def get_folder(shortname):
base = CONFIGURATION['podcast-directory']
base = CONFIGURATION["podcast-directory"]
return os.path.join(base, shortname)
def get_feed_file(shortname):
return os.path.join(get_folder(shortname), 'feed.json')
return os.path.join(get_folder(shortname), "feed.json")
def sort_feed(feed):
feed['episodes'] = sorted(feed['episodes'], key=lambda k: k['published'],
reverse=True)
feed["episodes"] = sorted(
feed["episodes"], key=lambda k: k["published"], reverse=True
)
return feed
def import_feed(url, shortname=''):
'''
def import_feed(url, shortname=""):
"""
creates a folder for the new feed, and then inserts a new feed.json
that will contain all the necessary information about this feed, and
all the episodes contained.
'''
"""
# configuration for this feed, will be written to file.
feed = {}
# get the feed.
@ -89,8 +88,7 @@ def import_feed(url, shortname=''):
if shortname:
folder = get_folder(shortname)
if os.path.exists(folder):
print_err(
'{} already exists'.format(folder))
print_err("{} already exists".format(folder))
exit(-1)
else:
os.makedirs(folder)
@ -98,63 +96,70 @@ def import_feed(url, shortname=''):
# we have to create one from the title
if not shortname:
# the rss advertises a title, lets use that.
if hasattr(d['feed'], 'title'):
title = d['feed']['title']
if hasattr(d["feed"], "title"):
title = d["feed"]["title"]
# still no succes, lets use the last part of the url
else:
title = url.rsplit('/', 1)[-1]
title = url.rsplit("/", 1)[-1]
# we wanna avoid any filename crazyness,
# so foldernames will be restricted to lowercase ascii letters,
# numbers, and dashes:
title = ''.join(ch for ch in title
if ch.isalnum() or ch == ' ')
shortname = title.replace(' ', '-').lower()
title = "".join(ch for ch in title if ch.isalnum() or ch == " ")
shortname = title.replace(" ", "-").lower()
if not shortname:
print_err('could not auto-deduce shortname.')
print_err('please provide one explicitly.')
print_err("could not auto-deduce shortname.")
print_err("please provide one explicitly.")
exit(-1)
folder = get_folder(shortname)
if os.path.exists(folder):
print_err(
'{} already exists'.format(folder))
print_err("{} already exists".format(folder))
exit(-1)
else:
os.makedirs(folder)
# we have succesfully generated a folder that we can store the files in
# trawl all the entries, and find links to audio files.
feed['episodes'] = episodes_from_feed(d)
feed['shortname'] = shortname
feed['title'] = d['feed']['title']
feed['url'] = url
feed['description'] = d['feed']['description']
feed["episodes"] = episodes_from_feed(d)
feed["shortname"] = shortname
feed["title"] = d["feed"]["title"]
feed["url"] = url
feed["description"] = d["feed"]["description"]
# write the configuration to a feed.json within the folder
feed_file = get_feed_file(shortname)
feed = sort_feed(feed)
with open(feed_file, 'x') as f:
with open(feed_file, "x") as f:
json.dump(feed, f, indent=4)
print('imported ' +
Fore.GREEN + feed['title'] + Fore.RESET + ' with shortname ' +
Fore.BLUE + feed['shortname'] + Fore.RESET)
print(
"imported "
+ Fore.GREEN
+ feed["title"]
+ Fore.RESET
+ " with shortname "
+ Fore.BLUE
+ feed["shortname"]
+ Fore.RESET
)
if "cover_image" in CONFIGURATION and CONFIGURATION["cover_image"]:
get_cover_image(shortname, d["feed"]["image"]["url"])
def update_feed(feed):
'''
"""
download the current feed, and insert previously unknown
episodes into our local config.
'''
d = feedparser.parse(feed['url'])
"""
d = feedparser.parse(feed["url"])
# only append new episodes!
for episode in episodes_from_feed(d):
found = False
for old_episode in feed['episodes']:
if episode['published'] == old_episode['published'] \
and episode['title'] == old_episode['title']:
for old_episode in feed["episodes"]:
if (
episode["published"] == old_episode["published"]
and episode["title"] == old_episode["title"]
):
found = True
if not found:
feed['episodes'].append(episode)
print('new episode.')
feed["episodes"].append(episode)
print("new episode.")
feed = sort_feed(feed)
overwrite_config(feed)
if "cover_image" in CONFIGURATION and CONFIGURATION["cover_image"]:
@ -162,9 +167,9 @@ def update_feed(feed):
def get_cover_image(shortname, url):
'''
"""
download the cover image of podcast
'''
"""
# Check if an image name is set in the config file
if "cover_image_name" in CONFIGURATION:
filename = CONFIGURATION["cover_image_name"]
@ -179,12 +184,12 @@ def get_cover_image(shortname, url):
def overwrite_config(feed):
'''
"""
after updating the feed, or downloading new items,
we want to update our local config to reflect that fact.
'''
filename = get_feed_file(feed['shortname'])
with open(filename, 'w') as f:
"""
filename = get_feed_file(feed["shortname"])
with open(filename, "w") as f:
json.dump(feed, f, indent=4)
@ -194,35 +199,37 @@ def episodes_from_feed(d):
# convert publishing time to unix time, so that we can sort
# this should be unix time, barring any timezone shenanigans
date = mktime(parsedate(entry.published))
if hasattr(entry, 'links'):
if hasattr(entry, "links"):
for link in entry.links:
if not hasattr(link, 'type'):
if not hasattr(link, "type"):
continue
if hasattr(link, 'type') and (link.type in MIMETYPES):
if hasattr(entry, 'title'):
if hasattr(link, "type") and (link.type in MIMETYPES):
if hasattr(entry, "title"):
episode_title = entry.title
else:
episode_title = link.href
if hasattr(entry, 'description'):
if hasattr(entry, "description"):
episode_description = entry.description
else:
episode_description = ""
episodes.append({
'title': episode_title,
'url': link.href,
'downloaded': False,
'listened': False,
'published': date,
'description': episode_description
})
episodes.append(
{
"title": episode_title,
"url": link.href,
"downloaded": False,
"listened": False,
"published": date,
"description": episode_description,
}
)
return episodes
def rename_episode(folder, published, title, url):
if 'date_format' in CONFIGURATION:
date_format = CONFIGURATION['date_format']
if "date_format" in CONFIGURATION:
date_format = CONFIGURATION["date_format"]
else:
date_format = "%Y-%m-%d"
@ -281,18 +288,18 @@ def escape_string(title):
def get_extenstion(url):
url = url.split("?")[0]
pattern = r'[.][\w]+$'
pattern = r"[.][\w]+$"
return re.search(pattern, url).group(0)
def get_original_filename(url):
url = url.split("?")[0]
pattern = r'[^\/]+$'
pattern = r"[^\/]+$"
return re.search(pattern, url).group(0)
def file_exists(shortname, filename):
base = CONFIGURATION['podcast-directory']
base = CONFIGURATION["podcast-directory"]
if os.path.exists(os.path.join(base, shortname, filename)):
return True
@ -313,34 +320,38 @@ def generic_episode_name(folder, url):
def download_multiple(feed, maxnum):
for episode in feed['episodes']:
for episode in feed["episodes"]:
if maxnum == 0:
break
if episode['downloaded']:
if episode["downloaded"]:
maxnum -= 1
if not episode['downloaded']:
if 'rename_episodes' in CONFIGURATION and CONFIGURATION['rename_episodes']:
filename = rename_episode(feed['shortname'], episode['published'],
episode["title"], episode["url"])
if not episode["downloaded"]:
if "rename_episodes" in CONFIGURATION and CONFIGURATION["rename_episodes"]:
filename = rename_episode(
feed["shortname"],
episode["published"],
episode["title"],
episode["url"],
)
else:
filename = generic_episode_name(feed['shortname'], episode['url'])
if download_single(feed['shortname'], episode['url'], filename) is True:
episode['downloaded'] = True
filename = generic_episode_name(feed["shortname"], episode["url"])
if download_single(feed["shortname"], episode["url"], filename) is True:
episode["downloaded"] = True
maxnum -= 1
overwrite_config(feed)
def download_single(folder, url, filename):
print(url)
base = CONFIGURATION['podcast-directory']
base = CONFIGURATION["podcast-directory"]
if 'connection_timeout' in CONFIGURATION:
connection_timeout = CONFIGURATION['connection_timeout']
if "connection_timeout" in CONFIGURATION:
connection_timeout = CONFIGURATION["connection_timeout"]
else:
connection_timeout = 10
if 'connection_retries' in CONFIGURATION:
connection_retries = CONFIGURATION['connection_retries']
if "connection_retries" in CONFIGURATION:
connection_retries = CONFIGURATION["connection_retries"]
else:
connection_retries = 3
@ -351,9 +362,9 @@ def download_single(folder, url, filename):
for i in range(connection_retries):
try:
r = requests.get(url.strip(), stream=True, timeout=connection_timeout)
size = int(r.headers.get('content-length'))
size = int(r.headers.get("content-length"))
progress = tqdm(total=size, unit="B", unit_scale=True)
with open(os.path.join(base, folder, filename), 'wb') as f:
with open(os.path.join(base, folder, filename), "wb") as f:
for chunk in r.iter_content(1024):
f.write(chunk)
progress.update(len(chunk))
@ -362,7 +373,7 @@ def download_single(folder, url, filename):
if progress:
progress.close()
if i == connection_retries-1:
if i == connection_retries - 1:
print("Connection to server timed out")
else:
print("Connection timed out, retrying...")
@ -373,7 +384,7 @@ def download_single(folder, url, filename):
if progress:
progress.close()
if i == connection_retries-1:
if i == connection_retries - 1:
print("Failed to establish connection with server")
else:
print("Connection failed, retrying...")
@ -392,34 +403,36 @@ def download_single(folder, url, filename):
def available_feeds():
'''
"""
podweasel will save each feed to its own folder. Each folder should
contain a json configuration file describing which elements
have been downloaded already, and how many will be kept.
'''
base = CONFIGURATION['podcast-directory']
paths = [p for p in os.listdir(base)
if os.path.isdir(get_folder(p)) and
os.path.isfile(get_feed_file(p))]
"""
base = CONFIGURATION["podcast-directory"]
paths = [
p
for p in os.listdir(base)
if os.path.isdir(get_folder(p)) and os.path.isfile(get_feed_file(p))
]
# for every folder, check wether a configuration file exists.
results = []
for shortname in paths:
with open(get_feed_file(shortname), 'r') as f:
with open(get_feed_file(shortname), "r") as f:
feed = json.load(f)
results.append(feed)
return sorted(results, key=lambda k: k['title'])
return sorted(results, key=lambda k: k["title"])
def find_feed(shortname):
'''
"""
all feeds are identified by their shortname, which is also the name of
the folder they will be stored in.
this function will find the correct folder, and parse the json file
within that folder to generate the feed data
'''
"""
feeds = available_feeds()
for feed in feeds:
if feed['shortname'] == shortname:
if feed["shortname"] == shortname:
return feed
return None
@ -428,40 +441,40 @@ def rename(shortname, newname):
folder = get_folder(shortname)
new_folder = get_folder(newname)
if not os.path.isdir(folder):
print_err('folder {0} not found'.format(folder))
print_err("folder {0} not found".format(folder))
exit(-1)
os.rename(folder, new_folder)
feed = find_feed(shortname)
feed['shortname'] = newname
feed["shortname"] = newname
overwrite_config(feed)
def pretty_print_feeds(feeds):
format_str = Fore.GREEN + '{0:45.45} |'
format_str += Fore.BLUE + ' {1:40}' + Fore.RESET + Back.RESET
print(format_str.format('title', 'shortname'))
print('='*80)
format_str = Fore.GREEN + "{0:45.45} |"
format_str += Fore.BLUE + " {1:40}" + Fore.RESET + Back.RESET
print(format_str.format("title", "shortname"))
print("=" * 80)
for feed in feeds:
format_str = Fore.GREEN + '{0:40.40} {1:3d}{2:1.1} |'
format_str += Fore.BLUE + ' {3:40}' + Fore.RESET + Back.RESET
format_str = Fore.GREEN + "{0:40.40} {1:3d}{2:1.1} |"
format_str += Fore.BLUE + " {3:40}" + Fore.RESET + Back.RESET
feed = sort_feed(feed)
amount = len([ep for ep in feed['episodes'] if ep['downloaded']])
dl = '' if feed['episodes'][0]['downloaded'] else '*'
print(format_str.format(feed['title'], amount, dl, feed['shortname']))
amount = len([ep for ep in feed["episodes"] if ep["downloaded"]])
dl = "" if feed["episodes"][0]["downloaded"] else "*"
print(format_str.format(feed["title"], amount, dl, feed["shortname"]))
def pretty_print_episodes(feed):
format_str = Fore.GREEN + '{0:40} |'
format_str += Fore.BLUE + ' {1:20}' + Fore.RESET + Back.RESET
for e in feed['episodes'][:20]:
status = 'Downloaded' if e['downloaded'] else 'Not Downloaded'
print(format_str.format(e['title'][:40], status))
format_str = Fore.GREEN + "{0:40} |"
format_str += Fore.BLUE + " {1:20}" + Fore.RESET + Back.RESET
for e in feed["episodes"][:20]:
status = "Downloaded" if e["downloaded"] else "Not Downloaded"
print(format_str.format(e["title"][:40], status))
def main():
global CONFIGURATION
colorama.init()
arguments = docopt(__doc__, version='p0d 0.01')
arguments = docopt(__doc__, version="p0d 0.01")
# before we do anything with the commands,
# find the configuration file
@ -474,62 +487,61 @@ def main():
print("invalid json in configuration file.")
exit(-1)
# handle the commands
if arguments['import']:
if arguments['<shortname>'] is None:
import_feed(arguments['<feed-url>'])
if arguments["import"]:
if arguments["<shortname>"] is None:
import_feed(arguments["<feed-url>"])
else:
import_feed(arguments['<feed-url>'],
shortname=arguments['<shortname>'])
import_feed(arguments["<feed-url>"], shortname=arguments["<shortname>"])
exit(0)
if arguments['feeds']:
if arguments["feeds"]:
pretty_print_feeds(available_feeds())
exit(0)
if arguments['episodes']:
feed = find_feed(arguments['<shortname>'])
if arguments["episodes"]:
feed = find_feed(arguments["<shortname>"])
if feed:
pretty_print_episodes(feed)
exit(0)
else:
print_err("feed {} not found".format(arguments['<shortname>']))
print_err("feed {} not found".format(arguments["<shortname>"]))
exit(-1)
if arguments['update']:
if arguments['<shortname>']:
feed = find_feed(arguments['<shortname>'])
if arguments["update"]:
if arguments["<shortname>"]:
feed = find_feed(arguments["<shortname>"])
if feed:
print_green('updating {}'.format(feed['title']))
print_green("updating {}".format(feed["title"]))
update_feed(feed)
exit(0)
else:
print_err("feed {} not found".format(arguments['<shortname>']))
print_err("feed {} not found".format(arguments["<shortname>"]))
exit(-1)
else:
for feed in available_feeds():
print_green('updating {}'.format(feed['title']))
print_green("updating {}".format(feed["title"]))
update_feed(feed)
exit(0)
if arguments['download']:
if arguments['--how-many']:
maxnum = int(arguments['--how-many'])
elif 'maxnum' in CONFIGURATION:
maxnum = CONFIGURATION['maxnum']
if arguments["download"]:
if arguments["--how-many"]:
maxnum = int(arguments["--how-many"])
elif "maxnum" in CONFIGURATION:
maxnum = CONFIGURATION["maxnum"]
else:
maxnum = -1
# download episodes for a specific feed
if arguments['<shortname>']:
feed = find_feed(arguments['<shortname>'])
if arguments["<shortname>"]:
feed = find_feed(arguments["<shortname>"])
if feed:
download_multiple(feed, maxnum)
exit(0)
else:
print_err("feed {} not found".format(arguments['<shortname>']))
print_err("feed {} not found".format(arguments["<shortname>"]))
exit(-1)
# download episodes for all feeds.
else:
for feed in available_feeds():
download_multiple(feed, maxnum)
exit(0)
if arguments['rename']:
rename(arguments['<shortname>'], arguments['<newname>'])
if arguments["rename"]:
rename(arguments["<shortname>"], arguments["<newname>"])
if __name__ == "__main__":