1
0
Fork 0
mirror of https://gitlab.com/bramw/baserow.git synced 2025-03-20 07:03:07 +00:00
bramw_baserow/changelog/src/handler.py
2025-02-26 15:35:05 +00:00

258 lines
8.8 KiB
Python

import json
import os
from datetime import datetime, timezone
from json import JSONDecodeError
from pathlib import Path
from typing import Dict, List, Optional, Union
from domains import domain_types
from changelog_entry import changelog_entry_types
from pygit2 import Repository
LINE_BREAK_CHARACTER = "\n"
INDENT_CHARACTER = " "
MAXIMUM_FILE_NAME_MESSAGE_LENGTH = 60
class ChangelogHandler:
UNRELEASED_FOLDER_NAME = "unreleased"
@property
def release_meta_data_file_path(self):
return f"{self.working_dir}/releases.json"
@property
def entries_file_path(self):
return f"{self.working_dir}/entries"
@property
def changelog_path(self):
return f"{self.working_dir}/changelog.md"
def __init__(self, working_dir: str = os.path.dirname(__file__)):
self.working_dir = working_dir
def add_entry(
self,
domain_type_name: str,
changelog_entry_type_name: str,
message: str,
issue_number: Optional[int] = None,
release: str = UNRELEASED_FOLDER_NAME,
bullet_points: List[str] = None,
) -> str:
changelog_entry_type = changelog_entry_types[changelog_entry_type_name]
path = Path(f"{self.entries_file_path}/{release}/{changelog_entry_type.type}")
path.mkdir(parents=True, exist_ok=True)
file_name = ChangelogHandler.generate_entry_file_name(message, issue_number)
full_path = f"{path}/{file_name}"
if os.path.isfile(full_path):
print(f'Existing change log entry "{file_name}" is being overwritten')
with open(full_path, "w+") as entry_file:
entry = changelog_entry_type().generate_entry_dict(
domain_type_name, message, issue_number, bullet_points=bullet_points
)
json.dump(entry, entry_file, indent=4)
return full_path
def get_changelog_entries(
self,
release_name: str = UNRELEASED_FOLDER_NAME,
) -> Dict[str, List[Dict]]:
base_path = f"{self.entries_file_path}/{release_name}"
entries = {entry_type: [] for entry_type in changelog_entry_types.keys()}
for category_dir_name in os.listdir(base_path):
if category_dir_name == ".gitkeep":
continue
category_dir = f"{base_path}/{category_dir_name}"
entry_file_names = os.listdir(category_dir)
entry_file_names.sort()
for entry_file_name in entry_file_names:
if entry_file_name == ".gitkeep":
continue
entry_file_path = f"{category_dir}/{entry_file_name}"
print(entry_file_path)
with open(entry_file_path, "r") as entry_file:
entry = json.load(entry_file)
entries[entry["type"]].append(entry)
return entries
def get_releases_meta_data(self) -> Optional[Dict]:
try:
with open(self.release_meta_data_file_path, "r") as releases_file:
return json.load(releases_file)
except FileNotFoundError:
print("Tried to read release meta data, but no file was found")
return None
def order_release_folders(self, release_folders: List[str]) -> List[str]:
release_folders_ordered = []
releases_meta_data = self.get_releases_meta_data()
for release in releases_meta_data["releases"]:
found_release = False
for release_folder_name in release_folders:
if release["name"] == release_folder_name:
release_folders_ordered.append(release_folder_name)
found_release = True
if not found_release:
print(
f"The release {release['name']} was not found and has been omitted "
f"from the changelog. Please check if the release folder exists."
)
return release_folders_ordered
def generate_changelog_markdown_file(self):
release_folders = os.listdir(self.entries_file_path)
if ChangelogHandler.UNRELEASED_FOLDER_NAME in release_folders:
release_folders.remove(ChangelogHandler.UNRELEASED_FOLDER_NAME)
release_folders = self.order_release_folders(release_folders)
changelog_file = open(self.changelog_path, "w+")
changelog_file.write(f"# Changelog{LINE_BREAK_CHARACTER}{LINE_BREAK_CHARACTER}")
domain_prefixes = {
domain_type: domain().message_prefix
for domain_type, domain in domain_types.items()
}
for release_folder in release_folders:
entries = self.get_changelog_entries(release_folder)
release_heading = f"## Released {release_folder}"
changelog_file.write(
f"{release_heading}{LINE_BREAK_CHARACTER}{LINE_BREAK_CHARACTER}"
)
for entry_type in changelog_entry_types.values():
entries_of_type = entries.get(entry_type.type, [])
if len(entries_of_type) == 0:
continue
heading = entry_type().markdown_heading
changelog_file.write(f"{heading}{LINE_BREAK_CHARACTER}")
for entry in entries_of_type:
# Prefix the entry's message with the domain prefix.
# Prefix with nothing if the `domain` doesn't exist,
# for compatibility with older entries.
domain_prefix = (
domain_prefixes[entry["domain"]] if "domain" in entry else ""
)
entry_message = f"{domain_prefix}{entry['message']}"
entry_markdown_string = entry_type.get_markdown_string(
entry_message, entry["issue_number"]
)
changelog_file.write(
f"{entry_markdown_string}{LINE_BREAK_CHARACTER}"
)
for bullet_point in entry.get("bullet_points", []):
changelog_file.write(
f"{INDENT_CHARACTER}* {bullet_point}{LINE_BREAK_CHARACTER}"
)
changelog_file.write(LINE_BREAK_CHARACTER)
changelog_file.write(LINE_BREAK_CHARACTER)
changelog_file.close()
def move_entries_to_release_folder(
self, name: Union[str, None] = None
) -> Optional[str]:
release_name = name or datetime.now(tz=timezone.utc).strftime("%Y_%m_%d")
try:
os.rename(
f"{self.entries_file_path}/unreleased",
f"{self.entries_file_path}/{release_name}",
)
os.mkdir(f"{self.entries_file_path}/unreleased")
return release_name
except OSError:
print(f'Release with name "{release_name}" already exists.')
return None
@staticmethod
def generate_entry_file_name(
message: str, issue_number: Optional[int] = None
) -> str:
file_name = ""
if issue_number is not None:
file_name += f"{issue_number}_"
# Sanitise message
message = message.strip().replace(".", "").replace(" ", "_")
message = "".join(
e for e in message if e.isalnum() or e == "_"
) # Remove special chars
message = message.lower()
file_name += message[:MAXIMUM_FILE_NAME_MESSAGE_LENGTH]
file_name += ".json"
return file_name
@staticmethod
def get_issue_number() -> Union[int, None]:
potential_issue_number = Repository(".").head.shorthand.split("-")[0]
try:
return int(potential_issue_number)
except ValueError:
return None
def write_release_meta_data(self, name: str):
# Make sure the parent dirs exist
path = Path(self.release_meta_data_file_path)
path.parent.mkdir(parents=True, exist_ok=True)
# Make sure the file exists
if not os.path.isfile(self.release_meta_data_file_path):
open(self.release_meta_data_file_path, "a").close()
with open(self.release_meta_data_file_path, "r+") as release_file:
try:
release_data = json.load(release_file)
except JSONDecodeError:
release_data = {}
if "releases" not in release_data:
release_data["releases"] = []
release_data["releases"].insert(
0,
{
"name": name,
"created_at": datetime.now(tz=timezone.utc).strftime("%Y-%m-%d"),
},
)
release_file.seek(0)
json.dump(release_data, release_file, indent=4)
release_file.truncate()
def is_release_name_unique(self, name: str) -> bool:
return name not in os.listdir(self.entries_file_path)