Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save tim-schilling/b04846542927f6e6be777612e4b89d57 to your computer and use it in GitHub Desktop.
Save tim-schilling/b04846542927f6e6be777612e4b89d57 to your computer and use it in GitHub Desktop.
Django update_or_create method that will only update when there are changes
import warnings
from django.db import models, transaction
from django.db.models.fields import Field
from django.db.models.utils import resolve_callables
class QuerySet(models.QuerySet):
"""A QuerySet with a conditional update or create method."""
def _update_fields(self, obj, values):
"""
Detect which fields have been updated in values compared to the
existing object.
"""
update_fields = set()
for k, v in resolve_callables(values):
model_field = self.model._meta.get_field(k)
# If the key is a related field, we need to compare the primary
# key of the related model rather than the related model itself to
# avoid another database query.
if model_field.is_relation and k == model_field.name:
# Translate the key to the attname rather than the field name
# and compare the primary key.
existing_value = getattr(obj, model_field.attname, None)
has_changed = (
# if v is none, we only need to know if the current _id is None.
existing_value != v
if v is None
else getattr(v, model_field.target_field.attname) != existing_value
)
else:
existing_value = getattr(obj, k, None)
has_changed = existing_value != v
# Check if the types of the existing value and the new value are the same.
# If they don't match, but are castable on save, django-simple-history
# will create a new historical record.
if (
has_changed
and existing_value is not None
and v is not None
and type(existing_value) is not type(v)
):
# This mismatch doesn't break anything, but we'll want to know about it quickly.
warnings.warn(
"Mismatching types used for field %s in update_or_create" % k,
RuntimeWarning,
stacklevel=2,
)
if has_changed:
setattr(obj, k, v)
update_fields.add(k)
return update_fields
def update_or_create(self, defaults=None, create_defaults=None, **kwargs):
"""
Look up an object with the given kwargs, updating one with defaults
if it exists, otherwise create a new one. Optionally, an object can
be created with different values than defaults by using
create_defaults.
Return a tuple (object, created), where created is a boolean
specifying whether an object was created.
"""
update_defaults = defaults or {}
if create_defaults is None:
create_defaults = update_defaults
self._for_write = True
with transaction.atomic(using=self.db):
# Lock the row so that a concurrent update is blocked until
# update_or_create() has performed its save.
obj, created = self.select_for_update().get_or_create(
create_defaults, **kwargs
)
if created:
return obj, created
if update_fields := self._update_fields(obj, update_defaults):
concrete_field_names = self.model._meta._non_pk_concrete_field_names
# update_fields does not support non-concrete fields.
if concrete_field_names.issuperset(update_fields):
# Add fields which are set on pre_save(), e.g. auto_now fields.
# This is to maintain backward compatibility as these fields
# are not updated unless explicitly specified in the
# update_fields list.
for field in self.model._meta.local_concrete_fields:
if not (
field.primary_key
or field.__class__.pre_save is Field.pre_save
):
update_fields.add(field.name)
if field.name != field.attname:
update_fields.add(field.attname)
obj.save(using=self.db, update_fields=update_fields)
else:
obj.save(using=self.db)
return obj, False
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment