matrix-org_synapse/synapse/storage/util/partial_state_events_tracke...

188 lines
7.1 KiB
Python

# Copyright 2022 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from collections import defaultdict
from typing import Collection, Dict, Set
from twisted.internet import defer
from twisted.internet.defer import Deferred
from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable
from synapse.logging.opentracing import trace_with_opname
from synapse.storage.databases.main.events_worker import EventsWorkerStore
from synapse.storage.databases.main.room import RoomWorkerStore
from synapse.util import unwrapFirstError
from synapse.util.cancellation import cancellable
logger = logging.getLogger(__name__)
class PartialStateEventsTracker:
"""Keeps track of which events have partial state, after a partial-state join"""
def __init__(self, store: EventsWorkerStore):
self._store = store
# a map from event id to a set of Deferreds which are waiting for that event to be
# un-partial-stated.
self._observers: Dict[str, Set[Deferred[None]]] = defaultdict(set)
def notify_un_partial_stated(self, event_id: str) -> None:
"""Notify that we now have full state for a given event
Called by the state-resynchronization loop whenever we resynchronize the state
for a particular event. Unblocks any callers to await_full_state() for that
event.
Args:
event_id: the event that now has full state.
"""
observers = self._observers.pop(event_id, None)
if not observers:
return
logger.info(
"Notifying %i things waiting for un-partial-stating of event %s",
len(observers),
event_id,
)
with PreserveLoggingContext():
for o in observers:
o.callback(None)
@trace_with_opname("PartialStateEventsTracker.await_full_state")
@cancellable
async def await_full_state(self, event_ids: Collection[str]) -> None:
"""Wait for all the given events to have full state.
Args:
event_ids: the list of event ids that we want full state for
"""
# first try the happy path: if there are no partial-state events, we can return
# quickly
partial_state_event_ids = [
ev
for ev, p in (await self._store.get_partial_state_events(event_ids)).items()
if p
]
if not partial_state_event_ids:
return
logger.info(
"Awaiting un-partial-stating of events %s",
partial_state_event_ids,
stack_info=True,
)
# create an observer for each lazy-joined event
observers: Dict[str, Deferred[None]] = {
event_id: Deferred() for event_id in partial_state_event_ids
}
for event_id, observer in observers.items():
self._observers[event_id].add(observer)
try:
# some of them may have been un-lazy-joined between us checking the db and
# registering the observer, in which case we'd wait forever for the
# notification. Call back the observers now.
for event_id, partial in (
await self._store.get_partial_state_events(observers.keys())
).items():
# there may have been a call to notify_un_partial_stated during the
# db query, so the observers may already have been called.
if not partial and not observers[event_id].called:
observers[event_id].callback(None)
await make_deferred_yieldable(
defer.gatherResults(
observers.values(),
consumeErrors=True,
)
).addErrback(unwrapFirstError)
logger.info("Events %s all un-partial-stated", observers.keys())
finally:
# remove any observers we created. This should happen when the notification
# is received, but that might not happen for two reasons:
# (a) we're bailing out early on an exception (including us being
# cancelled during the await)
# (b) the event got de-lazy-joined before we set up the observer.
for event_id, observer in observers.items():
observer_set = self._observers.get(event_id)
if observer_set:
observer_set.discard(observer)
if not observer_set:
del self._observers[event_id]
class PartialCurrentStateTracker:
"""Keeps track of which rooms have partial state, after partial-state joins"""
def __init__(self, store: RoomWorkerStore):
self._store = store
# a map from room id to a set of Deferreds which are waiting for that room to be
# un-partial-stated.
self._observers: Dict[str, Set[Deferred[None]]] = defaultdict(set)
def notify_un_partial_stated(self, room_id: str) -> None:
"""Notify that we now have full current state for a given room
Unblocks any callers to await_full_state() for that room.
Args:
room_id: the room that now has full current state.
"""
observers = self._observers.pop(room_id, None)
if not observers:
return
logger.info(
"Notifying %i things waiting for un-partial-stating of room %s",
len(observers),
room_id,
)
with PreserveLoggingContext():
for o in observers:
o.callback(None)
@trace_with_opname("PartialCurrentStateTracker.await_full_state")
@cancellable
async def await_full_state(self, room_id: str) -> None:
# We add the deferred immediately so that the DB call to check for
# partial state doesn't race when we unpartial the room.
d: Deferred[None] = Deferred()
self._observers.setdefault(room_id, set()).add(d)
try:
# Check if the room has partial current state or not.
has_partial_state = await self._store.is_partial_state_room(room_id)
if not has_partial_state:
return
logger.info(
"Awaiting un-partial-stating of room %s",
room_id,
stack_info=True,
)
await make_deferred_yieldable(d)
logger.info("Room has un-partial-stated")
finally:
# Remove the added observer, and remove the room entry if its empty.
ds = self._observers.get(room_id)
if ds is not None:
ds.discard(d)
if not ds:
self._observers.pop(room_id, None)