matrix-org_synapse/synapse/media/media_repository.py

1255 lines
45 KiB
Python

# Copyright 2014-2016 OpenMarket Ltd
# Copyright 2018-2021 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import errno
import logging
import os
import shutil
from io import BytesIO
from typing import IO, TYPE_CHECKING, Dict, List, Optional, Set, Tuple
import attr
from matrix_common.types.mxc_uri import MXCUri
import twisted.internet.error
import twisted.web.http
from twisted.internet.defer import Deferred
from synapse.api.errors import (
Codes,
FederationDeniedError,
HttpResponseException,
NotFoundError,
RequestSendFailed,
SynapseError,
cs_error,
)
from synapse.config.repository import ThumbnailRequirement
from synapse.http.server import respond_with_json
from synapse.http.site import SynapseRequest
from synapse.logging.context import defer_to_thread
from synapse.logging.opentracing import trace
from synapse.media._base import (
FileInfo,
Responder,
ThumbnailInfo,
get_filename_from_headers,
respond_404,
respond_with_responder,
)
from synapse.media.filepath import MediaFilePaths
from synapse.media.media_storage import MediaStorage
from synapse.media.storage_provider import StorageProviderWrapper
from synapse.media.thumbnailer import Thumbnailer, ThumbnailError
from synapse.media.url_previewer import UrlPreviewer
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.databases.main.media_repository import LocalMedia, RemoteMedia
from synapse.types import UserID
from synapse.util.async_helpers import Linearizer
from synapse.util.retryutils import NotRetryingDestination
from synapse.util.stringutils import random_string
if TYPE_CHECKING:
from synapse.server import HomeServer
logger = logging.getLogger(__name__)
# How often to run the background job to update the "recently accessed"
# attribute of local and remote media.
UPDATE_RECENTLY_ACCESSED_TS = 60 * 1000 # 1 minute
# How often to run the background job to check for local and remote media
# that should be purged according to the configured media retention settings.
MEDIA_RETENTION_CHECK_PERIOD_MS = 60 * 60 * 1000 # 1 hour
class MediaRepository:
def __init__(self, hs: "HomeServer"):
self.hs = hs
self.auth = hs.get_auth()
self.client = hs.get_federation_client()
self.clock = hs.get_clock()
self.server_name = hs.hostname
self.store = hs.get_datastores().main
self.max_upload_size = hs.config.media.max_upload_size
self.max_image_pixels = hs.config.media.max_image_pixels
self.unused_expiration_time = hs.config.media.unused_expiration_time
self.max_pending_media_uploads = hs.config.media.max_pending_media_uploads
Thumbnailer.set_limits(self.max_image_pixels)
self.primary_base_path: str = hs.config.media.media_store_path
self.filepaths: MediaFilePaths = MediaFilePaths(self.primary_base_path)
self.dynamic_thumbnails = hs.config.media.dynamic_thumbnails
self.thumbnail_requirements = hs.config.media.thumbnail_requirements
self.remote_media_linearizer = Linearizer(name="media_remote")
self.recently_accessed_remotes: Set[Tuple[str, str]] = set()
self.recently_accessed_locals: Set[str] = set()
self.federation_domain_whitelist = (
hs.config.federation.federation_domain_whitelist
)
self.prevent_media_downloads_from = hs.config.media.prevent_media_downloads_from
# List of StorageProviders where we should search for media and
# potentially upload to.
storage_providers = []
for (
clz,
provider_config,
wrapper_config,
) in hs.config.media.media_storage_providers:
backend = clz(hs, provider_config)
provider = StorageProviderWrapper(
backend,
store_local=wrapper_config.store_local,
store_remote=wrapper_config.store_remote,
store_synchronous=wrapper_config.store_synchronous,
)
storage_providers.append(provider)
self.media_storage: MediaStorage = MediaStorage(
self.hs, self.primary_base_path, self.filepaths, storage_providers
)
self.clock.looping_call(
self._start_update_recently_accessed, UPDATE_RECENTLY_ACCESSED_TS
)
# Media retention configuration options
self._media_retention_local_media_lifetime_ms = (
hs.config.media.media_retention_local_media_lifetime_ms
)
self._media_retention_remote_media_lifetime_ms = (
hs.config.media.media_retention_remote_media_lifetime_ms
)
# Check whether local or remote media retention is configured
if (
hs.config.media.media_retention_local_media_lifetime_ms is not None
or hs.config.media.media_retention_remote_media_lifetime_ms is not None
):
# Run the background job to apply media retention rules routinely,
# with the duration between runs dictated by the homeserver config.
self.clock.looping_call(
self._start_apply_media_retention_rules,
MEDIA_RETENTION_CHECK_PERIOD_MS,
)
if hs.config.media.url_preview_enabled:
self.url_previewer: Optional[UrlPreviewer] = UrlPreviewer(
hs, self, self.media_storage
)
else:
self.url_previewer = None
def _start_update_recently_accessed(self) -> Deferred:
return run_as_background_process(
"update_recently_accessed_media", self._update_recently_accessed
)
def _start_apply_media_retention_rules(self) -> Deferred:
return run_as_background_process(
"apply_media_retention_rules", self._apply_media_retention_rules
)
async def _update_recently_accessed(self) -> None:
remote_media = self.recently_accessed_remotes
self.recently_accessed_remotes = set()
local_media = self.recently_accessed_locals
self.recently_accessed_locals = set()
await self.store.update_cached_last_access_time(
local_media, remote_media, self.clock.time_msec()
)
def mark_recently_accessed(self, server_name: Optional[str], media_id: str) -> None:
"""Mark the given media as recently accessed.
Args:
server_name: Origin server of media, or None if local
media_id: The media ID of the content
"""
if server_name:
self.recently_accessed_remotes.add((server_name, media_id))
else:
self.recently_accessed_locals.add(media_id)
@trace
async def create_media_id(self, auth_user: UserID) -> Tuple[str, int]:
"""Create and store a media ID for a local user and return the MXC URI and its
expiration.
Args:
auth_user: The user_id of the uploader
Returns:
A tuple containing the MXC URI of the stored content and the timestamp at
which the MXC URI expires.
"""
media_id = random_string(24)
now = self.clock.time_msec()
await self.store.store_local_media_id(
media_id=media_id,
time_now_ms=now,
user_id=auth_user,
)
return f"mxc://{self.server_name}/{media_id}", now + self.unused_expiration_time
@trace
async def reached_pending_media_limit(self, auth_user: UserID) -> Tuple[bool, int]:
"""Check if the user is over the limit for pending media uploads.
Args:
auth_user: The user_id of the uploader
Returns:
A tuple with a boolean and an integer indicating whether the user has too
many pending media uploads and the timestamp at which the first pending
media will expire, respectively.
"""
pending, first_expiration_ts = await self.store.count_pending_media(
user_id=auth_user
)
return pending >= self.max_pending_media_uploads, first_expiration_ts
@trace
async def verify_can_upload(self, media_id: str, auth_user: UserID) -> None:
"""Verify that the media ID can be uploaded to by the given user. This
function checks that:
* the media ID exists
* the media ID does not already have content
* the user uploading is the same as the one who created the media ID
* the media ID has not expired
Args:
media_id: The media ID to verify
auth_user: The user_id of the uploader
"""
media = await self.store.get_local_media(media_id)
if media is None:
raise SynapseError(404, "Unknow media ID", errcode=Codes.NOT_FOUND)
if media.user_id != auth_user.to_string():
raise SynapseError(
403,
"Only the creator of the media ID can upload to it",
errcode=Codes.FORBIDDEN,
)
if media.media_length is not None:
raise SynapseError(
409,
"Media ID already has content",
errcode=Codes.CANNOT_OVERWRITE_MEDIA,
)
expired_time_ms = self.clock.time_msec() - self.unused_expiration_time
if media.created_ts < expired_time_ms:
raise NotFoundError("Media ID has expired")
@trace
async def update_content(
self,
media_id: str,
media_type: str,
upload_name: Optional[str],
content: IO,
content_length: int,
auth_user: UserID,
) -> None:
"""Update the content of the given media ID.
Args:
media_id: The media ID to replace.
media_type: The content type of the file.
upload_name: The name of the file, if provided.
content: A file like object that is the content to store
content_length: The length of the content
auth_user: The user_id of the uploader
"""
file_info = FileInfo(server_name=None, file_id=media_id)
fname = await self.media_storage.store_file(content, file_info)
logger.info("Stored local media in file %r", fname)
await self.store.update_local_media(
media_id=media_id,
media_type=media_type,
upload_name=upload_name,
media_length=content_length,
user_id=auth_user,
)
try:
await self._generate_thumbnails(None, media_id, media_id, media_type)
except Exception as e:
logger.info("Failed to generate thumbnails: %s", e)
@trace
async def create_content(
self,
media_type: str,
upload_name: Optional[str],
content: IO,
content_length: int,
auth_user: UserID,
) -> MXCUri:
"""Store uploaded content for a local user and return the mxc URL
Args:
media_type: The content type of the file.
upload_name: The name of the file, if provided.
content: A file like object that is the content to store
content_length: The length of the content
auth_user: The user_id of the uploader
Returns:
The mxc url of the stored content
"""
media_id = random_string(24)
file_info = FileInfo(server_name=None, file_id=media_id)
fname = await self.media_storage.store_file(content, file_info)
logger.info("Stored local media in file %r", fname)
await self.store.store_local_media(
media_id=media_id,
media_type=media_type,
time_now_ms=self.clock.time_msec(),
upload_name=upload_name,
media_length=content_length,
user_id=auth_user,
)
try:
await self._generate_thumbnails(None, media_id, media_id, media_type)
except Exception as e:
logger.info("Failed to generate thumbnails: %s", e)
return MXCUri(self.server_name, media_id)
def respond_not_yet_uploaded(self, request: SynapseRequest) -> None:
respond_with_json(
request,
504,
cs_error("Media has not been uploaded yet", code=Codes.NOT_YET_UPLOADED),
send_cors=True,
)
async def get_local_media_info(
self, request: SynapseRequest, media_id: str, max_timeout_ms: int
) -> Optional[LocalMedia]:
"""Gets the info dictionary for given local media ID. If the media has
not been uploaded yet, this function will wait up to ``max_timeout_ms``
milliseconds for the media to be uploaded.
Args:
request: The incoming request.
media_id: The media ID of the content. (This is the same as
the file_id for local content.)
max_timeout_ms: the maximum number of milliseconds to wait for the
media to be uploaded.
Returns:
Either the info dictionary for the given local media ID or
``None``. If ``None``, then no further processing is necessary as
this function will send the necessary JSON response.
"""
wait_until = self.clock.time_msec() + max_timeout_ms
while True:
# Get the info for the media
media_info = await self.store.get_local_media(media_id)
if not media_info:
logger.info("Media %s is unknown", media_id)
respond_404(request)
return None
if media_info.quarantined_by:
logger.info("Media %s is quarantined", media_id)
respond_404(request)
return None
# The file has been uploaded, so stop looping
if media_info.media_length is not None:
return media_info
# Check if the media ID has expired and still hasn't been uploaded to.
now = self.clock.time_msec()
expired_time_ms = now - self.unused_expiration_time
if media_info.created_ts < expired_time_ms:
logger.info("Media %s has expired without being uploaded", media_id)
respond_404(request)
return None
if now >= wait_until:
break
await self.clock.sleep(0.5)
logger.info("Media %s has not yet been uploaded", media_id)
self.respond_not_yet_uploaded(request)
return None
async def get_local_media(
self,
request: SynapseRequest,
media_id: str,
name: Optional[str],
max_timeout_ms: int,
) -> None:
"""Responds to requests for local media, if exists, or returns 404.
Args:
request: The incoming request.
media_id: The media ID of the content. (This is the same as
the file_id for local content.)
name: Optional name that, if specified, will be used as
the filename in the Content-Disposition header of the response.
max_timeout_ms: the maximum number of milliseconds to wait for the
media to be uploaded.
Returns:
Resolves once a response has successfully been written to request
"""
media_info = await self.get_local_media_info(request, media_id, max_timeout_ms)
if not media_info:
return
self.mark_recently_accessed(None, media_id)
media_type = media_info.media_type
if not media_type:
media_type = "application/octet-stream"
media_length = media_info.media_length
upload_name = name if name else media_info.upload_name
url_cache = media_info.url_cache
file_info = FileInfo(None, media_id, url_cache=bool(url_cache))
responder = await self.media_storage.fetch_media(file_info)
await respond_with_responder(
request, responder, media_type, media_length, upload_name
)
async def get_remote_media(
self,
request: SynapseRequest,
server_name: str,
media_id: str,
name: Optional[str],
max_timeout_ms: int,
) -> None:
"""Respond to requests for remote media.
Args:
request: The incoming request.
server_name: Remote server_name where the media originated.
media_id: The media ID of the content (as defined by the remote server).
name: Optional name that, if specified, will be used as
the filename in the Content-Disposition header of the response.
max_timeout_ms: the maximum number of milliseconds to wait for the
media to be uploaded.
Returns:
Resolves once a response has successfully been written to request
"""
if (
self.federation_domain_whitelist is not None
and server_name not in self.federation_domain_whitelist
):
raise FederationDeniedError(server_name)
# Don't let users download media from domains listed in the config, even
# if we might have the media to serve. This is Trust & Safety tooling to
# block some servers' media from being accessible to local users.
# See `prevent_media_downloads_from` config docs for more info.
if server_name in self.prevent_media_downloads_from:
respond_404(request)
return
self.mark_recently_accessed(server_name, media_id)
# We linearize here to ensure that we don't try and download remote
# media multiple times concurrently
key = (server_name, media_id)
async with self.remote_media_linearizer.queue(key):
responder, media_info = await self._get_remote_media_impl(
server_name, media_id, max_timeout_ms
)
# We deliberately stream the file outside the lock
if responder and media_info:
upload_name = name if name else media_info.upload_name
await respond_with_responder(
request,
responder,
media_info.media_type,
media_info.media_length,
upload_name,
)
else:
respond_404(request)
async def get_remote_media_info(
self, server_name: str, media_id: str, max_timeout_ms: int
) -> RemoteMedia:
"""Gets the media info associated with the remote file, downloading
if necessary.
Args:
server_name: Remote server_name where the media originated.
media_id: The media ID of the content (as defined by the remote server).
max_timeout_ms: the maximum number of milliseconds to wait for the
media to be uploaded.
Returns:
The media info of the file
"""
if (
self.federation_domain_whitelist is not None
and server_name not in self.federation_domain_whitelist
):
raise FederationDeniedError(server_name)
# We linearize here to ensure that we don't try and download remote
# media multiple times concurrently
key = (server_name, media_id)
async with self.remote_media_linearizer.queue(key):
responder, media_info = await self._get_remote_media_impl(
server_name, media_id, max_timeout_ms
)
# Ensure we actually use the responder so that it releases resources
if responder:
with responder:
pass
return media_info
async def _get_remote_media_impl(
self, server_name: str, media_id: str, max_timeout_ms: int
) -> Tuple[Optional[Responder], RemoteMedia]:
"""Looks for media in local cache, if not there then attempt to
download from remote server.
Args:
server_name: Remote server_name where the media originated.
media_id: The media ID of the content (as defined by the
remote server).
max_timeout_ms: the maximum number of milliseconds to wait for the
media to be uploaded.
Returns:
A tuple of responder and the media info of the file.
"""
media_info = await self.store.get_cached_remote_media(server_name, media_id)
# file_id is the ID we use to track the file locally. If we've already
# seen the file then reuse the existing ID, otherwise generate a new
# one.
# If we have an entry in the DB, try and look for it
if media_info:
file_id = media_info.filesystem_id
file_info = FileInfo(server_name, file_id)
if media_info.quarantined_by:
logger.info("Media is quarantined")
raise NotFoundError()
if not media_info.media_type:
media_info = attr.evolve(
media_info, media_type="application/octet-stream"
)
responder = await self.media_storage.fetch_media(file_info)
if responder:
return responder, media_info
# Failed to find the file anywhere, lets download it.
try:
media_info = await self._download_remote_file(
server_name, media_id, max_timeout_ms
)
except SynapseError:
raise
except Exception as e:
# An exception may be because we downloaded media in another
# process, so let's check if we magically have the media.
media_info = await self.store.get_cached_remote_media(server_name, media_id)
if not media_info:
raise e
file_id = media_info.filesystem_id
if not media_info.media_type:
media_info = attr.evolve(media_info, media_type="application/octet-stream")
file_info = FileInfo(server_name, file_id)
# We generate thumbnails even if another process downloaded the media
# as a) it's conceivable that the other download request dies before it
# generates thumbnails, but mainly b) we want to be sure the thumbnails
# have finished being generated before responding to the client,
# otherwise they'll request thumbnails and get a 404 if they're not
# ready yet.
await self._generate_thumbnails(
server_name, media_id, file_id, media_info.media_type
)
responder = await self.media_storage.fetch_media(file_info)
return responder, media_info
async def _download_remote_file(
self,
server_name: str,
media_id: str,
max_timeout_ms: int,
) -> RemoteMedia:
"""Attempt to download the remote file from the given server name,
using the given file_id as the local id.
Args:
server_name: Originating server
media_id: The media ID of the content (as defined by the
remote server). This is different than the file_id, which is
locally generated.
max_timeout_ms: the maximum number of milliseconds to wait for the
media to be uploaded.
Returns:
The media info of the file.
"""
file_id = random_string(24)
file_info = FileInfo(server_name=server_name, file_id=file_id)
with self.media_storage.store_into_file(file_info) as (f, fname, finish):
try:
length, headers = await self.client.download_media(
server_name,
media_id,
output_stream=f,
max_size=self.max_upload_size,
max_timeout_ms=max_timeout_ms,
)
except RequestSendFailed as e:
logger.warning(
"Request failed fetching remote media %s/%s: %r",
server_name,
media_id,
e,
)
raise SynapseError(502, "Failed to fetch remote media")
except HttpResponseException as e:
logger.warning(
"HTTP error fetching remote media %s/%s: %s",
server_name,
media_id,
e.response,
)
if e.code == twisted.web.http.NOT_FOUND:
raise e.to_synapse_error()
raise SynapseError(502, "Failed to fetch remote media")
except SynapseError:
logger.warning(
"Failed to fetch remote media %s/%s", server_name, media_id
)
raise
except NotRetryingDestination:
logger.warning("Not retrying destination %r", server_name)
raise SynapseError(502, "Failed to fetch remote media")
except Exception:
logger.exception(
"Failed to fetch remote media %s/%s", server_name, media_id
)
raise SynapseError(502, "Failed to fetch remote media")
await finish()
if b"Content-Type" in headers:
media_type = headers[b"Content-Type"][0].decode("ascii")
else:
media_type = "application/octet-stream"
upload_name = get_filename_from_headers(headers)
time_now_ms = self.clock.time_msec()
# Multiple remote media download requests can race (when using
# multiple media repos), so this may throw a violation constraint
# exception. If it does we'll delete the newly downloaded file from
# disk (as we're in the ctx manager).
#
# However: we've already called `finish()` so we may have also
# written to the storage providers. This is preferable to the
# alternative where we call `finish()` *after* this, where we could
# end up having an entry in the DB but fail to write the files to
# the storage providers.
await self.store.store_cached_remote_media(
origin=server_name,
media_id=media_id,
media_type=media_type,
time_now_ms=time_now_ms,
upload_name=upload_name,
media_length=length,
filesystem_id=file_id,
)
logger.info("Stored remote media in file %r", fname)
return RemoteMedia(
media_origin=server_name,
media_id=media_id,
media_type=media_type,
media_length=length,
upload_name=upload_name,
created_ts=time_now_ms,
filesystem_id=file_id,
last_access_ts=time_now_ms,
quarantined_by=None,
)
def _get_thumbnail_requirements(
self, media_type: str
) -> Tuple[ThumbnailRequirement, ...]:
scpos = media_type.find(";")
if scpos > 0:
media_type = media_type[:scpos]
return self.thumbnail_requirements.get(media_type, ())
def _generate_thumbnail(
self,
thumbnailer: Thumbnailer,
t_width: int,
t_height: int,
t_method: str,
t_type: str,
) -> Optional[BytesIO]:
m_width = thumbnailer.width
m_height = thumbnailer.height
if m_width * m_height >= self.max_image_pixels:
logger.info(
"Image too large to thumbnail %r x %r > %r",
m_width,
m_height,
self.max_image_pixels,
)
return None
if thumbnailer.transpose_method is not None:
m_width, m_height = thumbnailer.transpose()
if t_method == "crop":
return thumbnailer.crop(t_width, t_height, t_type)
elif t_method == "scale":
t_width, t_height = thumbnailer.aspect(t_width, t_height)
t_width = min(m_width, t_width)
t_height = min(m_height, t_height)
return thumbnailer.scale(t_width, t_height, t_type)
return None
async def generate_local_exact_thumbnail(
self,
media_id: str,
t_width: int,
t_height: int,
t_method: str,
t_type: str,
url_cache: bool,
) -> Optional[str]:
input_path = await self.media_storage.ensure_media_is_in_local_cache(
FileInfo(None, media_id, url_cache=url_cache)
)
try:
thumbnailer = Thumbnailer(input_path)
except ThumbnailError as e:
logger.warning(
"Unable to generate a thumbnail for local media %s using a method of %s and type of %s: %s",
media_id,
t_method,
t_type,
e,
)
return None
with thumbnailer:
t_byte_source = await defer_to_thread(
self.hs.get_reactor(),
self._generate_thumbnail,
thumbnailer,
t_width,
t_height,
t_method,
t_type,
)
if t_byte_source:
try:
file_info = FileInfo(
server_name=None,
file_id=media_id,
url_cache=url_cache,
thumbnail=ThumbnailInfo(
width=t_width,
height=t_height,
method=t_method,
type=t_type,
length=t_byte_source.tell(),
),
)
output_path = await self.media_storage.store_file(
t_byte_source, file_info
)
finally:
t_byte_source.close()
logger.info("Stored thumbnail in file %r", output_path)
t_len = os.path.getsize(output_path)
await self.store.store_local_thumbnail(
media_id, t_width, t_height, t_type, t_method, t_len
)
return output_path
# Could not generate thumbnail.
return None
async def generate_remote_exact_thumbnail(
self,
server_name: str,
file_id: str,
media_id: str,
t_width: int,
t_height: int,
t_method: str,
t_type: str,
) -> Optional[str]:
input_path = await self.media_storage.ensure_media_is_in_local_cache(
FileInfo(server_name, file_id)
)
try:
thumbnailer = Thumbnailer(input_path)
except ThumbnailError as e:
logger.warning(
"Unable to generate a thumbnail for remote media %s from %s using a method of %s and type of %s: %s",
media_id,
server_name,
t_method,
t_type,
e,
)
return None
with thumbnailer:
t_byte_source = await defer_to_thread(
self.hs.get_reactor(),
self._generate_thumbnail,
thumbnailer,
t_width,
t_height,
t_method,
t_type,
)
if t_byte_source:
try:
file_info = FileInfo(
server_name=server_name,
file_id=file_id,
thumbnail=ThumbnailInfo(
width=t_width,
height=t_height,
method=t_method,
type=t_type,
length=t_byte_source.tell(),
),
)
output_path = await self.media_storage.store_file(
t_byte_source, file_info
)
finally:
t_byte_source.close()
logger.info("Stored thumbnail in file %r", output_path)
t_len = os.path.getsize(output_path)
await self.store.store_remote_media_thumbnail(
server_name,
media_id,
file_id,
t_width,
t_height,
t_type,
t_method,
t_len,
)
return output_path
# Could not generate thumbnail.
return None
@trace
async def _generate_thumbnails(
self,
server_name: Optional[str],
media_id: str,
file_id: str,
media_type: str,
url_cache: bool = False,
) -> Optional[dict]:
"""Generate and store thumbnails for an image.
Args:
server_name: The server name if remote media, else None if local
media_id: The media ID of the content. (This is the same as
the file_id for local content)
file_id: Local file ID
media_type: The content type of the file
url_cache: If we are thumbnailing images downloaded for the URL cache,
used exclusively by the url previewer
Returns:
Dict with "width" and "height" keys of original image or None if the
media cannot be thumbnailed.
"""
requirements = self._get_thumbnail_requirements(media_type)
if not requirements:
return None
input_path = await self.media_storage.ensure_media_is_in_local_cache(
FileInfo(server_name, file_id, url_cache=url_cache)
)
try:
thumbnailer = Thumbnailer(input_path)
except ThumbnailError as e:
logger.warning(
"Unable to generate thumbnails for remote media %s from %s of type %s: %s",
media_id,
server_name,
media_type,
e,
)
return None
with thumbnailer:
m_width = thumbnailer.width
m_height = thumbnailer.height
if m_width * m_height >= self.max_image_pixels:
logger.info(
"Image too large to thumbnail %r x %r > %r",
m_width,
m_height,
self.max_image_pixels,
)
return None
if thumbnailer.transpose_method is not None:
m_width, m_height = await defer_to_thread(
self.hs.get_reactor(), thumbnailer.transpose
)
# We deduplicate the thumbnail sizes by ignoring the cropped versions if
# they have the same dimensions of a scaled one.
thumbnails: Dict[Tuple[int, int, str], str] = {}
for requirement in requirements:
if requirement.method == "crop":
thumbnails.setdefault(
(requirement.width, requirement.height, requirement.media_type),
requirement.method,
)
elif requirement.method == "scale":
t_width, t_height = thumbnailer.aspect(
requirement.width, requirement.height
)
t_width = min(m_width, t_width)
t_height = min(m_height, t_height)
thumbnails[
(t_width, t_height, requirement.media_type)
] = requirement.method
# Now we generate the thumbnails for each dimension, store it
for (t_width, t_height, t_type), t_method in thumbnails.items():
# Generate the thumbnail
if t_method == "crop":
t_byte_source = await defer_to_thread(
self.hs.get_reactor(),
thumbnailer.crop,
t_width,
t_height,
t_type,
)
elif t_method == "scale":
t_byte_source = await defer_to_thread(
self.hs.get_reactor(),
thumbnailer.scale,
t_width,
t_height,
t_type,
)
else:
logger.error("Unrecognized method: %r", t_method)
continue
if not t_byte_source:
continue
file_info = FileInfo(
server_name=server_name,
file_id=file_id,
url_cache=url_cache,
thumbnail=ThumbnailInfo(
width=t_width,
height=t_height,
method=t_method,
type=t_type,
length=t_byte_source.tell(),
),
)
with self.media_storage.store_into_file(file_info) as (
f,
fname,
finish,
):
try:
await self.media_storage.write_to_file(t_byte_source, f)
await finish()
finally:
t_byte_source.close()
t_len = os.path.getsize(fname)
# Write to database
if server_name:
# Multiple remote media download requests can race (when
# using multiple media repos), so this may throw a violation
# constraint exception. If it does we'll delete the newly
# generated thumbnail from disk (as we're in the ctx
# manager).
#
# However: we've already called `finish()` so we may have
# also written to the storage providers. This is preferable
# to the alternative where we call `finish()` *after* this,
# where we could end up having an entry in the DB but fail
# to write the files to the storage providers.
try:
await self.store.store_remote_media_thumbnail(
server_name,
media_id,
file_id,
t_width,
t_height,
t_type,
t_method,
t_len,
)
except Exception as e:
thumbnail_exists = (
await self.store.get_remote_media_thumbnail(
server_name,
media_id,
t_width,
t_height,
t_type,
)
)
if not thumbnail_exists:
raise e
else:
await self.store.store_local_thumbnail(
media_id, t_width, t_height, t_type, t_method, t_len
)
return {"width": m_width, "height": m_height}
async def _apply_media_retention_rules(self) -> None:
"""
Purge old local and remote media according to the media retention rules
defined in the homeserver config.
"""
# Purge remote media
if self._media_retention_remote_media_lifetime_ms is not None:
# Calculate a threshold timestamp derived from the configured lifetime. Any
# media that has not been accessed since this timestamp will be removed.
remote_media_threshold_timestamp_ms = (
self.clock.time_msec() - self._media_retention_remote_media_lifetime_ms
)
logger.info(
"Purging remote media last accessed before"
f" {remote_media_threshold_timestamp_ms}"
)
await self.delete_old_remote_media(
before_ts=remote_media_threshold_timestamp_ms
)
# And now do the same for local media
if self._media_retention_local_media_lifetime_ms is not None:
# This works the same as the remote media threshold
local_media_threshold_timestamp_ms = (
self.clock.time_msec() - self._media_retention_local_media_lifetime_ms
)
logger.info(
"Purging local media last accessed before"
f" {local_media_threshold_timestamp_ms}"
)
await self.delete_old_local_media(
before_ts=local_media_threshold_timestamp_ms,
keep_profiles=True,
delete_quarantined_media=False,
delete_protected_media=False,
)
async def delete_old_remote_media(self, before_ts: int) -> Dict[str, int]:
old_media = await self.store.get_remote_media_ids(
before_ts, include_quarantined_media=False
)
deleted = 0
for origin, media_id, file_id in old_media:
key = (origin, media_id)
logger.info("Deleting: %r", key)
# TODO: Should we delete from the backup store
async with self.remote_media_linearizer.queue(key):
full_path = self.filepaths.remote_media_filepath(origin, file_id)
try:
os.remove(full_path)
except OSError as e:
logger.warning("Failed to remove file: %r", full_path)
if e.errno == errno.ENOENT:
pass
else:
continue
thumbnail_dir = self.filepaths.remote_media_thumbnail_dir(
origin, file_id
)
shutil.rmtree(thumbnail_dir, ignore_errors=True)
await self.store.delete_remote_media(origin, media_id)
deleted += 1
return {"deleted": deleted}
async def delete_local_media_ids(
self, media_ids: List[str]
) -> Tuple[List[str], int]:
"""
Delete the given local or remote media ID from this server
Args:
media_id: The media ID to delete.
Returns:
A tuple of (list of deleted media IDs, total deleted media IDs).
"""
return await self._remove_local_media_from_disk(media_ids)
async def delete_old_local_media(
self,
before_ts: int,
size_gt: int = 0,
keep_profiles: bool = True,
delete_quarantined_media: bool = False,
delete_protected_media: bool = False,
) -> Tuple[List[str], int]:
"""
Delete local or remote media from this server by size and timestamp. Removes
media files, any thumbnails and cached URLs.
Args:
before_ts: Unix timestamp in ms.
Files that were last used before this timestamp will be deleted.
size_gt: Size of the media in bytes. Files that are larger will be deleted.
keep_profiles: Switch to delete also files that are still used in image data
(e.g user profile, room avatar). If false these files will be deleted.
delete_quarantined_media: If True, media marked as quarantined will be deleted.
delete_protected_media: If True, media marked as protected will be deleted.
Returns:
A tuple of (list of deleted media IDs, total deleted media IDs).
"""
old_media = await self.store.get_local_media_ids(
before_ts,
size_gt,
keep_profiles,
include_quarantined_media=delete_quarantined_media,
include_protected_media=delete_protected_media,
)
return await self._remove_local_media_from_disk(old_media)
async def _remove_local_media_from_disk(
self, media_ids: List[str]
) -> Tuple[List[str], int]:
"""
Delete local or remote media from this server. Removes media files,
any thumbnails and cached URLs.
Args:
media_ids: List of media_id to delete
Returns:
A tuple of (list of deleted media IDs, total deleted media IDs).
"""
removed_media = []
for media_id in media_ids:
logger.info("Deleting media with ID '%s'", media_id)
full_path = self.filepaths.local_media_filepath(media_id)
try:
os.remove(full_path)
except OSError as e:
logger.warning("Failed to remove file: %r: %s", full_path, e)
if e.errno == errno.ENOENT:
pass
else:
continue
thumbnail_dir = self.filepaths.local_media_thumbnail_dir(media_id)
shutil.rmtree(thumbnail_dir, ignore_errors=True)
await self.store.delete_remote_media(self.server_name, media_id)
await self.store.delete_url_cache((media_id,))
await self.store.delete_url_cache_media((media_id,))
removed_media.append(media_id)
return removed_media, len(removed_media)