Source code for intranet.utils.locking

from typing import Dict, Iterable, List, Union

from django.db.models import Manager, Model, QuerySet


[docs]def lock_on(items: Iterable[Union[Model, Manager, QuerySet]]) -> None: """Given an iterable of ``Model`` instances, ``Manager``s, and/or ``QuerySet``s, locks the corresponding database rows. More specifically, this uses the Django ORM's ``select_for_update()`` method, which translates to a ``SELECT FOR UPDATE`` SQL query. For more information on what this actually does in PostgreSQL (used as the database backend in all environments) see PostgreSQL's documentation on locking at https://www.postgresql.org/docs/current/explicit-locking.html#LOCKING-ROWS. As described in Django's documentation at https://docs.djangoproject.com/en/stable/ref/models/queryset/#django.db.models.query.QuerySet.select_for_update, the ``select_for_update`` locks prevent other transactions from acquiring locks until this transaction is complete. This MUST by run in a transaction. A straightforward way to do this is to use the ``django.db.transaction.atomic`` wrapper. Args: items: An iterable of ``Model`` instances, ``Manager``s, and/or ``QuerySet``s representing the database rows to lock. """ querysets_by_model: Dict[str, List[Union[Manager, QuerySet]]] = {} objects_by_model: Dict[str, List[Model]] = {} # First, we go through and categorize everything. Put instances in objects_by_model and # Managers/QuerySets in querysets_by_model. # Both are categorized by the dotted path to their class. for item in items: model_class = item.model if isinstance(item, (Manager, QuerySet)) else item.__class__ model_fullname = model_class.__module__ + "." + model_class.__qualname__ if isinstance(item, (Manager, QuerySet)): querysets_by_model.setdefault(model_fullname, []) querysets_by_model[model_fullname].append(item.all().nocache()) else: objects_by_model.setdefault(model_fullname, []) objects_by_model[model_fullname].append(item) # Now we need to convert all the lists of instances to QuerySets. This is fairly # straightforward -- we get the PK (primary key) of each, then construct a QuerySet that # filters for those PKs, then add it to querysets_by_model. for model_fullname, objects in objects_by_model.items(): # We only create the lists if we're about to put something in them, so this should never fail # unless the above code is modified in a way that breaks it assert objects # Get the model class and the PKs model_class = objects[0].__class__ object_pks = [obj.pk for obj in objects] # Now construct a QuerySet and add it to the list querysets_by_model.setdefault(model_fullname, []) querysets_by_model[model_fullname].append(model_class.objects.filter(pk__in=object_pks).nocache()) # Now, with all lists of QuerySets to lock in querysets_by_model, we actually do the locking. # We sort by the dotted name to the model class in an attempt to prevent deadlocks. If all workers # attempt to lock rows in the same order, they should never deadlock. for model_fullname in sorted(querysets_by_model.keys()): qs_list = querysets_by_model[model_fullname] # If this assertion fails, the above code has been modified in a way that breaks it assert qs_list # First, if there are multiple QuerySets for a given model, we need to combine them into one. combined_qs = qs_list.pop(0) if qs_list: # all=False explicitly only selects distinct values combined_qs = combined_qs.union(*qs_list, all=False) # Now we actually do the locking. We order by PK first in an attempt to prevent deadlocks. _ = list(combined_qs.order_by("pk").select_for_update().values_list("pk"))