import logging
import operator
from datetime import datetime
from django.conf import settings
from django.db.backends.ddl_references import (
Columns,
Expressions,
ForeignKeyName,
IndexName,
Statement,
Table,
)
from django.db.backends.utils import names_digest, split_identifier, truncate_name
from django.db.models import NOT_PROVIDED, Deferrable, Index
from django.db.models.sql import Query
from django.db.transaction import TransactionManagementError, atomic
from django.utils import timezone
logger = logging.getLogger("django.db.backends.schema")
def _is_relevant_relation(relation, altered_field):
"""
When altering the given field, must constraints on its model from the given
relation be temporarily dropped?
"""
field = relation.field
if field.many_to_many:
# M2M reverse field
return False
if altered_field.primary_key and field.to_fields == [None]:
# Foreign key constraint on the primary key, which is being altered.
return True
# Is the constraint targeting the field being altered?
return altered_field.name in field.to_fields
def _all_related_fields(model):
# Related fields must be returned in a deterministic order.
return sorted(
model._meta._get_fields(
forward=False,
reverse=True,
include_hidden=True,
include_parents=False,
),
key=operator.attrgetter("name"),
)
def _related_non_m2m_objects(old_field, new_field):
# Filter out m2m objects from reverse relations.
# Return (old_relation, new_relation) tuples.
related_fields = zip(
(
obj
for obj in _all_related_fields(old_field.model)
if _is_relevant_relation(obj, old_field)
),
(
obj
for obj in _all_related_fields(new_field.model)
if _is_relevant_relation(obj, new_field)
),
)
for old_rel, new_rel in related_fields:
yield old_rel, new_rel
yield from _related_non_m2m_objects(
old_rel.remote_field,
new_rel.remote_field,
)
[docs]
class BaseDatabaseSchemaEditor:
"""
This class and its subclasses are responsible for emitting schema-changing
statements to the databases - model creation/removal/alteration, field
renaming, index fiddling, and so on.
"""
# Overrideable SQL templates
sql_create_table = "CREATE TABLE %(table)s (%(definition)s)"
sql_rename_table = "ALTER TABLE %(old_table)s RENAME TO %(new_table)s"
sql_retablespace_table = "ALTER TABLE %(table)s SET TABLESPACE %(new_tablespace)s"
sql_delete_table = "DROP TABLE %(table)s CASCADE"
sql_create_column = "ALTER TABLE %(table)s ADD COLUMN %(column)s %(definition)s"
sql_alter_column = "ALTER TABLE %(table)s %(changes)s"
sql_alter_column_type = "ALTER COLUMN %(column)s TYPE %(type)s%(collation)s"
sql_alter_column_null = "ALTER COLUMN %(column)s DROP NOT NULL"
sql_alter_column_not_null = "ALTER COLUMN %(column)s SET NOT NULL"
sql_alter_column_default = "ALTER COLUMN %(column)s SET DEFAULT %(default)s"
sql_alter_column_no_default = "ALTER COLUMN %(column)s DROP DEFAULT"
sql_alter_column_no_default_null = sql_alter_column_no_default
sql_delete_column = "ALTER TABLE %(table)s DROP COLUMN %(column)s CASCADE"
sql_rename_column = (
"ALTER TABLE %(table)s RENAME COLUMN %(old_column)s TO %(new_column)s"
)
sql_update_with_default = (
"UPDATE %(table)s SET %(column)s = %(default)s WHERE %(column)s IS NULL"
)
sql_unique_constraint = "UNIQUE (%(columns)s)%(deferrable)s"
sql_check_constraint = "CHECK (%(check)s)"
sql_delete_constraint = "ALTER TABLE %(table)s DROP CONSTRAINT %(name)s"
sql_constraint = "CONSTRAINT %(name)s %(constraint)s"
sql_create_check = "ALTER TABLE %(table)s ADD CONSTRAINT %(name)s CHECK (%(check)s)"
sql_delete_check = sql_delete_constraint
sql_create_unique = (
"ALTER TABLE %(table)s ADD CONSTRAINT %(name)s "
"UNIQUE%(nulls_distinct)s (%(columns)s)%(deferrable)s"
)
sql_delete_unique = sql_delete_constraint
sql_create_fk = (
"ALTER TABLE %(table)s ADD CONSTRAINT %(name)s FOREIGN KEY (%(column)s) "
"REFERENCES %(to_table)s (%(to_column)s)%(deferrable)s"
)
sql_create_inline_fk = None
sql_create_column_inline_fk = None
sql_delete_fk = sql_delete_constraint
sql_create_index = (
"CREATE INDEX %(name)s ON %(table)s "
"(%(columns)s)%(include)s%(extra)s%(condition)s"
)
sql_create_unique_index = (
"CREATE UNIQUE INDEX %(name)s ON %(table)s "
"(%(columns)s)%(include)s%(condition)s%(nulls_distinct)s"
)
sql_rename_index = "ALTER INDEX %(old_name)s RENAME TO %(new_name)s"
sql_delete_index = "DROP INDEX %(name)s"
sql_create_pk = (
"ALTER TABLE %(table)s ADD CONSTRAINT %(name)s PRIMARY KEY (%(columns)s)"
)
sql_delete_pk = sql_delete_constraint
sql_delete_procedure = "DROP PROCEDURE %(procedure)s"
sql_alter_table_comment = "COMMENT ON TABLE %(table)s IS %(comment)s"
sql_alter_column_comment = "COMMENT ON COLUMN %(table)s.%(column)s IS %(comment)s"
def __init__(self, connection, collect_sql=False, atomic=True):
self.connection = connection
self.collect_sql = collect_sql
if self.collect_sql:
self.collected_sql = []
self.atomic_migration = self.connection.features.can_rollback_ddl and atomic
# State-managing methods
def __enter__(self):
self.deferred_sql = []
if self.atomic_migration:
self.atomic = atomic(self.connection.alias)
self.atomic.__enter__()
return self
def __exit__(self, exc_type, exc_value, traceback):
if exc_type is None:
for sql in self.deferred_sql:
self.execute(sql)
if self.atomic_migration:
self.atomic.__exit__(exc_type, exc_value, traceback)
# Core utility functions
[docs]
def execute(self, sql, params=()):
"""Execute the given SQL statement, with optional parameters."""
# Don't perform the transactional DDL check if SQL is being collected
# as it's not going to be executed anyway.
if (
not self.collect_sql
and self.connection.in_atomic_block
and not self.connection.features.can_rollback_ddl
):
raise TransactionManagementError(
"Executing DDL statements while in a transaction on databases "
"that can't perform a rollback is prohibited."
)
# Account for non-string statement objects.
sql = str(sql)
# Log the command we're running, then run it
logger.debug(
"%s; (params %r)", sql, params, extra={"params": params, "sql": sql}
)
if self.collect_sql:
ending = "" if sql.rstrip().endswith(";") else ";"
if params is not None:
self.collected_sql.append(
(sql % tuple(map(self.quote_value, params))) + ending
)
else:
self.collected_sql.append(sql + ending)
else:
with self.connection.cursor() as cursor:
cursor.execute(sql, params)
def quote_name(self, name):
return self.connection.ops.quote_name(name)
def table_sql(self, model):
"""Take a model and return its table definition."""
# Add any unique_togethers (always deferred, as some fields might be
# created afterward, like geometry fields with some backends).
for field_names in model._meta.unique_together:
fields = [model._meta.get_field(field) for field in field_names]
self.deferred_sql.append(self._create_unique_sql(model, fields))
# Create column SQL, add FK deferreds if needed.
column_sqls = []
params = []
for field in model._meta.local_fields:
# SQL.
definition, extra_params = self.column_sql(model, field)
if definition is None:
continue
# Check constraints can go on the column SQL here.
db_params = field.db_parameters(connection=self.connection)
if db_params["check"]:
definition += " " + self.sql_check_constraint % db_params
# Autoincrement SQL (for backends with inline variant).
col_type_suffix = field.db_type_suffix(connection=self.connection)
if col_type_suffix:
definition += " %s" % col_type_suffix
params.extend(extra_params)
# FK.
if field.remote_field and field.db_constraint:
to_table = field.remote_field.model._meta.db_table
to_column = field.remote_field.model._meta.get_field(
field.remote_field.field_name
).column
if self.sql_create_inline_fk:
definition += " " + self.sql_create_inline_fk % {
"to_table": self.quote_name(to_table),
"to_column": self.quote_name(to_column),
}
elif self.connection.features.supports_foreign_keys:
self.deferred_sql.append(
self._create_fk_sql(
model, field, "_fk_%(to_table)s_%(to_column)s"
)
)
# Add the SQL to our big list.
column_sqls.append(
"%s %s"
% (
self.quote_name(field.column),
definition,
)
)
# Autoincrement SQL (for backends with post table definition
# variant).
if field.get_internal_type() in (
"AutoField",
"BigAutoField",
"SmallAutoField",
):
autoinc_sql = self.connection.ops.autoinc_sql(
model._meta.db_table, field.column
)
if autoinc_sql:
self.deferred_sql.extend(autoinc_sql)
constraints = [
constraint.constraint_sql(model, self)
for constraint in model._meta.constraints
]
sql = self.sql_create_table % {
"table": self.quote_name(model._meta.db_table),
"definition": ", ".join(
str(constraint)
for constraint in (*column_sqls, *constraints)
if constraint
),
}
if model._meta.db_tablespace:
tablespace_sql = self.connection.ops.tablespace_sql(
model._meta.db_tablespace
)
if tablespace_sql:
sql += " " + tablespace_sql
return sql, params
# Field <-> database mapping functions
def _iter_column_sql(
self, column_db_type, params, model, field, field_db_params, include_default
):
yield column_db_type
if collation := field_db_params.get("collation"):
yield self._collate_sql(collation)
if self.connection.features.supports_comments_inline and field.db_comment:
yield self._comment_sql(field.db_comment)
# Work out nullability.
null = field.null
# Add database default.
if field.db_default is not NOT_PROVIDED:
default_sql, default_params = self.db_default_sql(field)
yield f"DEFAULT {default_sql}"
params.extend(default_params)
include_default = False
# Include a default value, if requested.
include_default = (
include_default
and not self.skip_default(field)
and
# Don't include a default value if it's a nullable field and the
# default cannot be dropped in the ALTER COLUMN statement (e.g.
# MySQL longtext and longblob).
not (null and self.skip_default_on_alter(field))
)
if include_default:
default_value = self.effective_default(field)
if default_value is not None:
column_default = "DEFAULT " + self._column_default_sql(field)
if self.connection.features.requires_literal_defaults:
# Some databases can't take defaults as a parameter (Oracle).
# If this is the case, the individual schema backend should
# implement prepare_default().
yield column_default % self.prepare_default(default_value)
else:
yield column_default
params.append(default_value)
# Oracle treats the empty string ('') as null, so coerce the null
# option whenever '' is a possible value.
if (
field.empty_strings_allowed
and not field.primary_key
and self.connection.features.interprets_empty_strings_as_nulls
):
null = True
if field.generated:
yield self._column_generated_sql(field)
elif not null:
yield "NOT NULL"
elif not self.connection.features.implied_column_null:
yield "NULL"
if field.primary_key:
yield "PRIMARY KEY"
elif field.unique:
yield "UNIQUE"
# Optionally add the tablespace if it's an implicitly indexed column.
tablespace = field.db_tablespace or model._meta.db_tablespace
if (
tablespace
and self.connection.features.supports_tablespaces
and field.unique
):
yield self.connection.ops.tablespace_sql(tablespace, inline=True)
def column_sql(self, model, field, include_default=False):
"""
Return the column definition for a field. The field must already have
had set_attributes_from_name() called.
"""
# Get the column's type and use that as the basis of the SQL.
field_db_params = field.db_parameters(connection=self.connection)
column_db_type = field_db_params["type"]
# Check for fields that aren't actually columns (e.g. M2M).
if column_db_type is None:
return None, None
params = []
return (
" ".join(
# This appends to the params being returned.
self._iter_column_sql(
column_db_type,
params,
model,
field,
field_db_params,
include_default,
)
),
params,
)
def skip_default(self, field):
"""
Some backends don't accept default values for certain columns types
(i.e. MySQL longtext and longblob).
"""
return False
def skip_default_on_alter(self, field):
"""
Some backends don't accept default values for certain columns types
(i.e. MySQL longtext and longblob) in the ALTER COLUMN statement.
"""
return False
def prepare_default(self, value):
"""
Only used for backends which have requires_literal_defaults feature
"""
raise NotImplementedError(
"subclasses of BaseDatabaseSchemaEditor for backends which have "
"requires_literal_defaults must provide a prepare_default() method"
)
def _column_default_sql(self, field):
"""
Return the SQL to use in a DEFAULT clause. The resulting string should
contain a '%s' placeholder for a default value.
"""
return "%s"
def db_default_sql(self, field):
"""Return the sql and params for the field's database default."""
from django.db.models.expressions import Value
sql = "%s" if isinstance(field.db_default, Value) else "(%s)"
query = Query(model=field.model)
compiler = query.get_compiler(connection=self.connection)
default_sql, params = compiler.compile(field.db_default)
if self.connection.features.requires_literal_defaults:
# Some databases doesn't support parameterized defaults (Oracle,
# SQLite). If this is the case, the individual schema backend
# should implement prepare_default().
default_sql %= tuple(self.prepare_default(p) for p in params)
params = []
return sql % default_sql, params
def _column_generated_sql(self, field):
"""Return the SQL to use in a GENERATED ALWAYS clause."""
expression_sql, params = field.generated_sql(self.connection)
persistency_sql = "STORED" if field.db_persist else "VIRTUAL"
if params:
expression_sql = expression_sql % tuple(self.quote_value(p) for p in params)
return f"GENERATED ALWAYS AS ({expression_sql}) {persistency_sql}"
@staticmethod
def _effective_default(field):
# This method allows testing its logic without a connection.
if field.has_default():
default = field.get_default()
elif field.generated:
default = None
elif not field.null and field.blank and field.empty_strings_allowed:
if field.get_internal_type() == "BinaryField":
default = b""
else:
default = ""
elif getattr(field, "auto_now", False) or getattr(field, "auto_now_add", False):
internal_type = field.get_internal_type()
if internal_type == "DateTimeField":
default = timezone.now()
else:
default = datetime.now()
if internal_type == "DateField":
default = default.date()
elif internal_type == "TimeField":
default = default.time()
else:
default = None
return default
def effective_default(self, field):
"""Return a field's effective database default value."""
return field.get_db_prep_save(self._effective_default(field), self.connection)
def quote_value(self, value):
"""
Return a quoted version of the value so it's safe to use in an SQL
string. This is not safe against injection from user code; it is
intended only for use in making SQL scripts or preparing default values
for particularly tricky backends (defaults are not user-defined, though,
so this is safe).
"""
raise NotImplementedError()
# Actions
[docs]
def create_model(self, model):
"""
Create a table and any accompanying indexes or unique constraints for
the given `model`.
"""
sql, params = self.table_sql(model)
# Prevent using [] as params, in the case a literal '%' is used in the
# definition.
self.execute(sql, params or None)
if self.connection.features.supports_comments:
# Add table comment.
if model._meta.db_table_comment:
self.alter_db_table_comment(model, None, model._meta.db_table_comment)
# Add column comments.
if not self.connection.features.supports_comments_inline:
for field in model._meta.local_fields:
if field.db_comment:
field_db_params = field.db_parameters(
connection=self.connection
)
field_type = field_db_params["type"]
self.execute(
*self._alter_column_comment_sql(
model, field, field_type, field.db_comment
)
)
# Add any field index (deferred as SQLite _remake_table needs it).
self.deferred_sql.extend(self._model_indexes_sql(model))
# Make M2M tables
for field in model._meta.local_many_to_many:
if field.remote_field.through._meta.auto_created:
self.create_model(field.remote_field.through)
[docs]
def delete_model(self, model):
"""Delete a model from the database."""
# Handle auto-created intermediary models
for field in model._meta.local_many_to_many:
if field.remote_field.through._meta.auto_created:
self.delete_model(field.remote_field.through)
# Delete the table
self.execute(
self.sql_delete_table
% {
"table": self.quote_name(model._meta.db_table),
}
)
# Remove all deferred statements referencing the deleted table.
for sql in list(self.deferred_sql):
if isinstance(sql, Statement) and sql.references_table(
model._meta.db_table
):
self.deferred_sql.remove(sql)
[docs]
def add_index(self, model, index):
"""Add an index on a model."""
if (
index.contains_expressions
and not self.connection.features.supports_expression_indexes
):
return None
# Index.create_sql returns interpolated SQL which makes params=None a
# necessity to avoid escaping attempts on execution.
self.execute(index.create_sql(model, self), params=None)
[docs]
def remove_index(self, model, index):
"""Remove an index from a model."""
if (
index.contains_expressions
and not self.connection.features.supports_expression_indexes
):
return None
self.execute(index.remove_sql(model, self))
[docs]
def rename_index(self, model, old_index, new_index):
if self.connection.features.can_rename_index:
self.execute(
self._rename_index_sql(model, old_index.name, new_index.name),
params=None,
)
else:
self.remove_index(model, old_index)
self.add_index(model, new_index)
[docs]
def add_constraint(self, model, constraint):
"""Add a constraint to a model."""
sql = constraint.create_sql(model, self)
if sql:
# Constraint.create_sql returns interpolated SQL which makes
# params=None a necessity to avoid escaping attempts on execution.
self.execute(sql, params=None)
[docs]
def remove_constraint(self, model, constraint):
"""Remove a constraint from a model."""
sql = constraint.remove_sql(model, self)
if sql:
self.execute(sql)
[docs]
def alter_unique_together(self, model, old_unique_together, new_unique_together):
"""
Deal with a model changing its unique_together. The input
unique_togethers must be doubly-nested, not the single-nested
["foo", "bar"] format.
"""
olds = {tuple(fields) for fields in old_unique_together}
news = {tuple(fields) for fields in new_unique_together}
# Deleted uniques
for fields in olds.difference(news):
self._delete_composed_index(
model,
fields,
{"unique": True, "primary_key": False},
self.sql_delete_unique,
)
# Created uniques
for field_names in news.difference(olds):
fields = [model._meta.get_field(field) for field in field_names]
self.execute(self._create_unique_sql(model, fields))
[docs]
def alter_index_together(self, model, old_index_together, new_index_together):
"""
Deal with a model changing its index_together. The input
index_togethers must be doubly-nested, not the single-nested
["foo", "bar"] format.
"""
olds = {tuple(fields) for fields in old_index_together}
news = {tuple(fields) for fields in new_index_together}
# Deleted indexes
for fields in olds.difference(news):
self._delete_composed_index(
model,
fields,
{"index": True, "unique": False},
self.sql_delete_index,
)
# Created indexes
for field_names in news.difference(olds):
fields = [model._meta.get_field(field) for field in field_names]
self.execute(self._create_index_sql(model, fields=fields, suffix="_idx"))
def _delete_composed_index(self, model, fields, constraint_kwargs, sql):
meta_constraint_names = {
constraint.name for constraint in model._meta.constraints
}
meta_index_names = {constraint.name for constraint in model._meta.indexes}
columns = [model._meta.get_field(field).column for field in fields]
constraint_names = self._constraint_names(
model,
columns,
exclude=meta_constraint_names | meta_index_names,
**constraint_kwargs,
)
if (
constraint_kwargs.get("unique") is True
and constraint_names
and self.connection.features.allows_multiple_constraints_on_same_fields
):
# Constraint matching the unique_together name.
default_name = str(
self._unique_constraint_name(model._meta.db_table, columns, quote=False)
)
if default_name in constraint_names:
constraint_names = [default_name]
if len(constraint_names) != 1:
raise ValueError(
"Found wrong number (%s) of constraints for %s(%s)"
% (
len(constraint_names),
model._meta.db_table,
", ".join(columns),
)
)
self.execute(self._delete_constraint_sql(sql, model, constraint_names[0]))
[docs]
def alter_db_table(self, model, old_db_table, new_db_table):
"""Rename the table a model points to."""
if old_db_table == new_db_table or (
self.connection.features.ignores_table_name_case
and old_db_table.lower() == new_db_table.lower()
):
return
self.execute(
self.sql_rename_table
% {
"old_table": self.quote_name(old_db_table),
"new_table": self.quote_name(new_db_table),
}
)
# Rename all references to the old table name.
for sql in self.deferred_sql:
if isinstance(sql, Statement):
sql.rename_table_references(old_db_table, new_db_table)
[docs]
def alter_db_table_comment(self, model, old_db_table_comment, new_db_table_comment):
if self.sql_alter_table_comment and self.connection.features.supports_comments:
self.execute(
self.sql_alter_table_comment
% {
"table": self.quote_name(model._meta.db_table),
"comment": self.quote_value(new_db_table_comment or ""),
}
)
[docs]
def alter_db_tablespace(self, model, old_db_tablespace, new_db_tablespace):
"""Move a model's table between tablespaces."""
self.execute(
self.sql_retablespace_table
% {
"table": self.quote_name(model._meta.db_table),
"old_tablespace": self.quote_name(old_db_tablespace),
"new_tablespace": self.quote_name(new_db_tablespace),
}
)
[docs]
def add_field(self, model, field):
"""
Create a field on a model. Usually involves adding a column, but may
involve adding a table instead (for M2M fields).
"""
# Special-case implicit M2M tables
if field.many_to_many and field.remote_field.through._meta.auto_created:
return self.create_model(field.remote_field.through)
# Get the column's definition
definition, params = self.column_sql(model, field, include_default=True)
# It might not actually have a column behind it
if definition is None:
return
if col_type_suffix := field.db_type_suffix(connection=self.connection):
definition += f" {col_type_suffix}"
# Check constraints can go on the column SQL here
db_params = field.db_parameters(connection=self.connection)
if db_params["check"]:
definition += " " + self.sql_check_constraint % db_params
if (
field.remote_field
and self.connection.features.supports_foreign_keys
and field.db_constraint
):
constraint_suffix = "_fk_%(to_table)s_%(to_column)s"
# Add FK constraint inline, if supported.
if self.sql_create_column_inline_fk:
to_table = field.remote_field.model._meta.db_table
to_column = field.remote_field.model._meta.get_field(
field.remote_field.field_name
).column
namespace, _ = split_identifier(model._meta.db_table)
definition += " " + self.sql_create_column_inline_fk % {
"name": self._fk_constraint_name(model, field, constraint_suffix),
"namespace": "%s." % self.quote_name(namespace)
if namespace
else "",
"column": self.quote_name(field.column),
"to_table": self.quote_name(to_table),
"to_column": self.quote_name(to_column),
"deferrable": self.connection.ops.deferrable_sql(),
}
# Otherwise, add FK constraints later.
else:
self.deferred_sql.append(
self._create_fk_sql(model, field, constraint_suffix)
)
# Build the SQL and run it
sql = self.sql_create_column % {
"table": self.quote_name(model._meta.db_table),
"column": self.quote_name(field.column),
"definition": definition,
}
self.execute(sql, params)
# Drop the default if we need to
if (
field.db_default is NOT_PROVIDED
and not self.skip_default_on_alter(field)
and self.effective_default(field) is not None
):
changes_sql, params = self._alter_column_default_sql(
model, None, field, drop=True
)
sql = self.sql_alter_column % {
"table": self.quote_name(model._meta.db_table),
"changes": changes_sql,
}
self.execute(sql, params)
# Add field comment, if required.
if (
field.db_comment
and self.connection.features.supports_comments
and not self.connection.features.supports_comments_inline
):
field_type = db_params["type"]
self.execute(
*self._alter_column_comment_sql(
model, field, field_type, field.db_comment
)
)
# Add an index, if required
self.deferred_sql.extend(self._field_indexes_sql(model, field))
# Reset connection if required
if self.connection.features.connection_persists_old_columns:
self.connection.close()
[docs]
def remove_field(self, model, field):
"""
Remove a field from a model. Usually involves deleting a column,
but for M2Ms may involve deleting a table.
"""
# Special-case implicit M2M tables
if field.many_to_many and field.remote_field.through._meta.auto_created:
return self.delete_model(field.remote_field.through)
# It might not actually have a column behind it
if field.db_parameters(connection=self.connection)["type"] is None:
return
# Drop any FK constraints, MySQL requires explicit deletion
if field.remote_field:
fk_names = self._constraint_names(model, [field.column], foreign_key=True)
for fk_name in fk_names:
self.execute(self._delete_fk_sql(model, fk_name))
# Delete the column
sql = self.sql_delete_column % {
"table": self.quote_name(model._meta.db_table),
"column": self.quote_name(field.column),
}
self.execute(sql)
# Reset connection if required
if self.connection.features.connection_persists_old_columns:
self.connection.close()
# Remove all deferred statements referencing the deleted column.
for sql in list(self.deferred_sql):
if isinstance(sql, Statement) and sql.references_column(
model._meta.db_table, field.column
):
self.deferred_sql.remove(sql)
[docs]
def alter_field(self, model, old_field, new_field, strict=False):
"""
Allow a field's type, uniqueness, nullability, default, column,
constraints, etc. to be modified.
`old_field` is required to compute the necessary changes.
If `strict` is True, raise errors if the old column does not match
`old_field` precisely.
"""
if not self._field_should_be_altered(old_field, new_field):
return
# Ensure this field is even column-based
old_db_params = old_field.db_parameters(connection=self.connection)
old_type = old_db_params["type"]
new_db_params = new_field.db_parameters(connection=self.connection)
new_type = new_db_params["type"]
if (old_type is None and old_field.remote_field is None) or (
new_type is None and new_field.remote_field is None
):
raise ValueError(
"Cannot alter field %s into %s - they do not properly define "
"db_type (are you using a badly-written custom field?)"
% (old_field, new_field),
)
elif (
old_type is None
and new_type is None
and (
old_field.remote_field.through
and new_field.remote_field.through
and old_field.remote_field.through._meta.auto_created
and new_field.remote_field.through._meta.auto_created
)
):
return self._alter_many_to_many(model, old_field, new_field, strict)
elif (
old_type is None
and new_type is None
and (
old_field.remote_field.through
and new_field.remote_field.through
and not old_field.remote_field.through._meta.auto_created
and not new_field.remote_field.through._meta.auto_created
)
):
# Both sides have through models; this is a no-op.
return
elif old_type is None or new_type is None:
raise ValueError(
"Cannot alter field %s into %s - they are not compatible types "
"(you cannot alter to or from M2M fields, or add or remove "
"through= on M2M fields)" % (old_field, new_field)
)
elif old_field.generated != new_field.generated or (
new_field.generated
and (
old_field.db_persist != new_field.db_persist
or old_field.generated_sql(self.connection)
!= new_field.generated_sql(self.connection)
)
):
raise ValueError(
f"Modifying GeneratedFields is not supported - the field {new_field} "
"must be removed and re-added with the new definition."
)
self._alter_field(
model,
old_field,
new_field,
old_type,
new_type,
old_db_params,
new_db_params,
strict,
)
def _field_db_check(self, field, field_db_params):
# Always check constraints with the same mocked column name to avoid
# recreating constrains when the column is renamed.
check_constraints = self.connection.data_type_check_constraints
data = field.db_type_parameters(self.connection)
data["column"] = "__column_name__"
try:
return check_constraints[field.get_internal_type()] % data
except KeyError:
return None
def _alter_field(
self,
model,
old_field,
new_field,
old_type,
new_type,
old_db_params,
new_db_params,
strict=False,
):
"""Perform a "physical" (non-ManyToMany) field update."""
# Drop any FK constraints, we'll remake them later
fks_dropped = set()
if (
self.connection.features.supports_foreign_keys
and old_field.remote_field
and old_field.db_constraint
and self._field_should_be_altered(
old_field,
new_field,
ignore={"db_comment"},
)
):
fk_names = self._constraint_names(
model, [old_field.column], foreign_key=True
)
if strict and len(fk_names) != 1:
raise ValueError(
"Found wrong number (%s) of foreign key constraints for %s.%s"
% (
len(fk_names),
model._meta.db_table,
old_field.column,
)
)
for fk_name in fk_names:
fks_dropped.add((old_field.column,))
self.execute(self._delete_fk_sql(model, fk_name))
# Has unique been removed?
if old_field.unique and (
not new_field.unique or self._field_became_primary_key(old_field, new_field)
):
# Find the unique constraint for this field
meta_constraint_names = {
constraint.name for constraint in model._meta.constraints
}
constraint_names = self._constraint_names(
model,
[old_field.column],
unique=True,
primary_key=False,
exclude=meta_constraint_names,
)
if strict and len(constraint_names) != 1:
raise ValueError(
"Found wrong number (%s) of unique constraints for %s.%s"
% (
len(constraint_names),
model._meta.db_table,
old_field.column,
)
)
for constraint_name in constraint_names:
self.execute(self._delete_unique_sql(model, constraint_name))
# Drop incoming FK constraints if the field is a primary key or unique,
# which might be a to_field target, and things are going to change.
old_collation = old_db_params.get("collation")
new_collation = new_db_params.get("collation")
drop_foreign_keys = (
self.connection.features.supports_foreign_keys
and (
(old_field.primary_key and new_field.primary_key)
or (old_field.unique and new_field.unique)
)
and ((old_type != new_type) or (old_collation != new_collation))
)
if drop_foreign_keys:
# '_meta.related_field' also contains M2M reverse fields, these
# will be filtered out
for _old_rel, new_rel in _related_non_m2m_objects(old_field, new_field):
rel_fk_names = self._constraint_names(
new_rel.related_model, [new_rel.field.column], foreign_key=True
)
for fk_name in rel_fk_names:
self.execute(self._delete_fk_sql(new_rel.related_model, fk_name))
# Removed an index? (no strict check, as multiple indexes are possible)
# Remove indexes if db_index switched to False or a unique constraint
# will now be used in lieu of an index. The following lines from the
# truth table show all True cases; the rest are False:
#
# old_field.db_index | old_field.unique | new_field.db_index | new_field.unique
# ------------------------------------------------------------------------------
# True | False | False | False
# True | False | False | True
# True | False | True | True
if (
old_field.db_index
and not old_field.unique
and (not new_field.db_index or new_field.unique)
):
# Find the index for this field
meta_index_names = {index.name for index in model._meta.indexes}
# Retrieve only BTREE indexes since this is what's created with
# db_index=True.
index_names = self._constraint_names(
model,
[old_field.column],
index=True,
type_=Index.suffix,
exclude=meta_index_names,
)
for index_name in index_names:
# The only way to check if an index was created with
# db_index=True or with Index(['field'], name='foo')
# is to look at its name (refs #28053).
self.execute(self._delete_index_sql(model, index_name))
# Change check constraints?
old_db_check = self._field_db_check(old_field, old_db_params)
new_db_check = self._field_db_check(new_field, new_db_params)
if old_db_check != new_db_check and old_db_check:
meta_constraint_names = {
constraint.name for constraint in model._meta.constraints
}
constraint_names = self._constraint_names(
model,
[old_field.column],
check=True,
exclude=meta_constraint_names,
)
if strict and len(constraint_names) != 1:
raise ValueError(
"Found wrong number (%s) of check constraints for %s.%s"
% (
len(constraint_names),
model._meta.db_table,
old_field.column,
)
)
for constraint_name in constraint_names:
self.execute(self._delete_check_sql(model, constraint_name))
# Have they renamed the column?
if old_field.column != new_field.column:
self.execute(
self._rename_field_sql(
model._meta.db_table, old_field, new_field, new_type
)
)
# Rename all references to the renamed column.
for sql in self.deferred_sql:
if isinstance(sql, Statement):
sql.rename_column_references(
model._meta.db_table, old_field.column, new_field.column
)
# Next, start accumulating actions to do
actions = []
null_actions = []
post_actions = []
# Type suffix change? (e.g. auto increment).
old_type_suffix = old_field.db_type_suffix(connection=self.connection)
new_type_suffix = new_field.db_type_suffix(connection=self.connection)
# Type, collation, or comment change?
if (
old_type != new_type
or old_type_suffix != new_type_suffix
or old_collation != new_collation
or (
self.connection.features.supports_comments
and old_field.db_comment != new_field.db_comment
)
):
fragment, other_actions = self._alter_column_type_sql(
model, old_field, new_field, new_type, old_collation, new_collation
)
actions.append(fragment)
post_actions.extend(other_actions)
if new_field.db_default is not NOT_PROVIDED:
if (
old_field.db_default is NOT_PROVIDED
or new_field.db_default != old_field.db_default
):
actions.append(
self._alter_column_database_default_sql(model, old_field, new_field)
)
elif old_field.db_default is not NOT_PROVIDED:
actions.append(
self._alter_column_database_default_sql(
model, old_field, new_field, drop=True
)
)
# When changing a column NULL constraint to NOT NULL with a given
# default value, we need to perform 4 steps:
# 1. Add a default for new incoming writes
# 2. Update existing NULL rows with new default
# 3. Replace NULL constraint with NOT NULL
# 4. Drop the default again.
# Default change?
needs_database_default = False
if (
old_field.null
and not new_field.null
and new_field.db_default is NOT_PROVIDED
):
old_default = self.effective_default(old_field)
new_default = self.effective_default(new_field)
if (
not self.skip_default_on_alter(new_field)
and old_default != new_default
and new_default is not None
):
needs_database_default = True
actions.append(
self._alter_column_default_sql(model, old_field, new_field)
)
# Nullability change?
if old_field.null != new_field.null:
fragment = self._alter_column_null_sql(model, old_field, new_field)
if fragment:
null_actions.append(fragment)
# Only if we have a default and there is a change from NULL to NOT NULL
four_way_default_alteration = (
new_field.has_default() or new_field.db_default is not NOT_PROVIDED
) and (old_field.null and not new_field.null)
if actions or null_actions:
if not four_way_default_alteration:
# If we don't have to do a 4-way default alteration we can
# directly run a (NOT) NULL alteration
actions += null_actions
# Combine actions together if we can (e.g. postgres)
if self.connection.features.supports_combined_alters and actions:
sql, params = tuple(zip(*actions))
actions = [(", ".join(sql), sum(params, []))]
# Apply those actions
for sql, params in actions:
self.execute(
self.sql_alter_column
% {
"table": self.quote_name(model._meta.db_table),
"changes": sql,
},
params,
)
if four_way_default_alteration:
if new_field.db_default is NOT_PROVIDED:
default_sql = "%s"
params = [new_default]
else:
default_sql, params = self.db_default_sql(new_field)
# Update existing rows with default value
self.execute(
self.sql_update_with_default
% {
"table": self.quote_name(model._meta.db_table),
"column": self.quote_name(new_field.column),
"default": default_sql,
},
params,
)
# Since we didn't run a NOT NULL change before we need to do it
# now
for sql, params in null_actions:
self.execute(
self.sql_alter_column
% {
"table": self.quote_name(model._meta.db_table),
"changes": sql,
},
params,
)
if post_actions:
for sql, params in post_actions:
self.execute(sql, params)
# If primary_key changed to False, delete the primary key constraint.
if old_field.primary_key and not new_field.primary_key:
self._delete_primary_key(model, strict)
# Added a unique?
if self._unique_should_be_added(old_field, new_field):
self.execute(self._create_unique_sql(model, [new_field]))
# Added an index? Add an index if db_index switched to True or a unique
# constraint will no longer be used in lieu of an index. The following
# lines from the truth table show all True cases; the rest are False:
#
# old_field.db_index | old_field.unique | new_field.db_index | new_field.unique
# ------------------------------------------------------------------------------
# False | False | True | False
# False | True | True | False
# True | True | True | False
if (
(not old_field.db_index or old_field.unique)
and new_field.db_index
and not new_field.unique
):
self.execute(self._create_index_sql(model, fields=[new_field]))
# Type alteration on primary key? Then we need to alter the column
# referring to us.
rels_to_update = []
if drop_foreign_keys:
rels_to_update.extend(_related_non_m2m_objects(old_field, new_field))
# Changed to become primary key?
if self._field_became_primary_key(old_field, new_field):
# Make the new one
self.execute(self._create_primary_key_sql(model, new_field))
# Update all referencing columns
rels_to_update.extend(_related_non_m2m_objects(old_field, new_field))
# Handle our type alters on the other end of rels from the PK stuff above
for old_rel, new_rel in rels_to_update:
rel_db_params = new_rel.field.db_parameters(connection=self.connection)
rel_type = rel_db_params["type"]
rel_collation = rel_db_params.get("collation")
old_rel_db_params = old_rel.field.db_parameters(connection=self.connection)
old_rel_collation = old_rel_db_params.get("collation")
fragment, other_actions = self._alter_column_type_sql(
new_rel.related_model,
old_rel.field,
new_rel.field,
rel_type,
old_rel_collation,
rel_collation,
)
self.execute(
self.sql_alter_column
% {
"table": self.quote_name(new_rel.related_model._meta.db_table),
"changes": fragment[0],
},
fragment[1],
)
for sql, params in other_actions:
self.execute(sql, params)
# Does it have a foreign key?
if (
self.connection.features.supports_foreign_keys
and new_field.remote_field
and (
fks_dropped or not old_field.remote_field or not old_field.db_constraint
)
and new_field.db_constraint
):
self.execute(
self._create_fk_sql(model, new_field, "_fk_%(to_table)s_%(to_column)s")
)
# Rebuild FKs that pointed to us if we previously had to drop them
if drop_foreign_keys:
for _, rel in rels_to_update:
if rel.field.db_constraint:
self.execute(
self._create_fk_sql(rel.related_model, rel.field, "_fk")
)
# Does it have check constraints we need to add?
if old_db_check != new_db_check and new_db_check:
constraint_name = self._create_index_name(
model._meta.db_table, [new_field.column], suffix="_check"
)
self.execute(
self._create_check_sql(model, constraint_name, new_db_params["check"])
)
# Drop the default if we need to
# (Django usually does not use in-database defaults)
if needs_database_default:
changes_sql, params = self._alter_column_default_sql(
model, old_field, new_field, drop=True
)
sql = self.sql_alter_column % {
"table": self.quote_name(model._meta.db_table),
"changes": changes_sql,
}
self.execute(sql, params)
# Reset connection if required
if self.connection.features.connection_persists_old_columns:
self.connection.close()
def _alter_column_null_sql(self, model, old_field, new_field):
"""
Hook to specialize column null alteration.
Return a (sql, params) fragment to set a column to null or non-null
as required by new_field, or None if no changes are required.
"""
if (
self.connection.features.interprets_empty_strings_as_nulls
and new_field.empty_strings_allowed
):
# The field is nullable in the database anyway, leave it alone.
return
else:
new_db_params = new_field.db_parameters(connection=self.connection)
sql = (
self.sql_alter_column_null
if new_field.null
else self.sql_alter_column_not_null
)
return (
sql
% {
"column": self.quote_name(new_field.column),
"type": new_db_params["type"],
},
[],
)
def _alter_column_default_sql(self, model, old_field, new_field, drop=False):
"""
Hook to specialize column default alteration.
Return a (sql, params) fragment to add or drop (depending on the drop
argument) a default to new_field's column.
"""
new_default = self.effective_default(new_field)
default = self._column_default_sql(new_field)
params = [new_default]
if drop:
params = []
elif self.connection.features.requires_literal_defaults:
# Some databases (Oracle) can't take defaults as a parameter
# If this is the case, the SchemaEditor for that database should
# implement prepare_default().
default = self.prepare_default(new_default)
params = []
new_db_params = new_field.db_parameters(connection=self.connection)
if drop:
if new_field.null:
sql = self.sql_alter_column_no_default_null
else:
sql = self.sql_alter_column_no_default
else:
sql = self.sql_alter_column_default
return (
sql
% {
"column": self.quote_name(new_field.column),
"type": new_db_params["type"],
"default": default,
},
params,
)
def _alter_column_database_default_sql(
self, model, old_field, new_field, drop=False
):
"""
Hook to specialize column database default alteration.
Return a (sql, params) fragment to add or drop (depending on the drop
argument) a default to new_field's column.
"""
if drop:
sql = self.sql_alter_column_no_default
default_sql = ""
params = []
else:
sql = self.sql_alter_column_default
default_sql, params = self.db_default_sql(new_field)
new_db_params = new_field.db_parameters(connection=self.connection)
return (
sql
% {
"column": self.quote_name(new_field.column),
"type": new_db_params["type"],
"default": default_sql,
},
params,
)
def _alter_column_type_sql(
self, model, old_field, new_field, new_type, old_collation, new_collation
):
"""
Hook to specialize column type alteration for different backends,
for cases when a creation type is different to an alteration type
(e.g. SERIAL in PostgreSQL, PostGIS fields).
Return a 2-tuple of: an SQL fragment of (sql, params) to insert into
an ALTER TABLE statement and a list of extra (sql, params) tuples to
run once the field is altered.
"""
other_actions = []
if collate_sql := self._collate_sql(
new_collation, old_collation, model._meta.db_table
):
collate_sql = f" {collate_sql}"
else:
collate_sql = ""
# Comment change?
comment_sql = ""
if self.connection.features.supports_comments and not new_field.many_to_many:
if old_field.db_comment != new_field.db_comment:
# PostgreSQL and Oracle can't execute 'ALTER COLUMN ...' and
# 'COMMENT ON ...' at the same time.
sql, params = self._alter_column_comment_sql(
model, new_field, new_type, new_field.db_comment
)
if sql:
other_actions.append((sql, params))
if new_field.db_comment:
comment_sql = self._comment_sql(new_field.db_comment)
return (
(
self.sql_alter_column_type
% {
"column": self.quote_name(new_field.column),
"type": new_type,
"collation": collate_sql,
"comment": comment_sql,
},
[],
),
other_actions,
)
def _alter_column_comment_sql(self, model, new_field, new_type, new_db_comment):
return (
self.sql_alter_column_comment
% {
"table": self.quote_name(model._meta.db_table),
"column": self.quote_name(new_field.column),
"comment": self._comment_sql(new_db_comment),
},
[],
)
def _comment_sql(self, comment):
return self.quote_value(comment or "")
def _alter_many_to_many(self, model, old_field, new_field, strict):
"""Alter M2Ms to repoint their to= endpoints."""
# Rename the through table
if (
old_field.remote_field.through._meta.db_table
!= new_field.remote_field.through._meta.db_table
):
self.alter_db_table(
old_field.remote_field.through,
old_field.remote_field.through._meta.db_table,
new_field.remote_field.through._meta.db_table,
)
# Repoint the FK to the other side
self.alter_field(
new_field.remote_field.through,
# The field that points to the target model is needed, so we can
# tell alter_field to change it - this is m2m_reverse_field_name()
# (as opposed to m2m_field_name(), which points to our model).
old_field.remote_field.through._meta.get_field(
old_field.m2m_reverse_field_name()
),
new_field.remote_field.through._meta.get_field(
new_field.m2m_reverse_field_name()
),
)
self.alter_field(
new_field.remote_field.through,
# for self-referential models we need to alter field from the other end too
old_field.remote_field.through._meta.get_field(old_field.m2m_field_name()),
new_field.remote_field.through._meta.get_field(new_field.m2m_field_name()),
)
def _create_index_name(self, table_name, column_names, suffix=""):
"""
Generate a unique name for an index/unique constraint.
The name is divided into 3 parts: the table name, the column names,
and a unique digest and suffix.
"""
_, table_name = split_identifier(table_name)
hash_suffix_part = "%s%s" % (
names_digest(table_name, *column_names, length=8),
suffix,
)
max_length = self.connection.ops.max_name_length() or 200
# If everything fits into max_length, use that name.
index_name = "%s_%s_%s" % (table_name, "_".join(column_names), hash_suffix_part)
if len(index_name) <= max_length:
return index_name
# Shorten a long suffix.
if len(hash_suffix_part) > max_length / 3:
hash_suffix_part = hash_suffix_part[: max_length // 3]
other_length = (max_length - len(hash_suffix_part)) // 2 - 1
index_name = "%s_%s_%s" % (
table_name[:other_length],
"_".join(column_names)[:other_length],
hash_suffix_part,
)
# Prepend D if needed to prevent the name from starting with an
# underscore or a number (not permitted on Oracle).
if index_name[0] == "_" or index_name[0].isdigit():
index_name = "D%s" % index_name[:-1]
return index_name
def _get_index_tablespace_sql(self, model, fields, db_tablespace=None):
if db_tablespace is None:
if len(fields) == 1 and fields[0].db_tablespace:
db_tablespace = fields[0].db_tablespace
elif settings.DEFAULT_INDEX_TABLESPACE:
db_tablespace = settings.DEFAULT_INDEX_TABLESPACE
elif model._meta.db_tablespace:
db_tablespace = model._meta.db_tablespace
if db_tablespace is not None:
return " " + self.connection.ops.tablespace_sql(db_tablespace)
return ""
def _index_condition_sql(self, condition):
if condition:
return " WHERE " + condition
return ""
def _index_include_sql(self, model, columns):
if not columns or not self.connection.features.supports_covering_indexes:
return ""
return Statement(
" INCLUDE (%(columns)s)",
columns=Columns(model._meta.db_table, columns, self.quote_name),
)
def _create_index_sql(
self,
model,
*,
fields=None,
name=None,
suffix="",
using="",
db_tablespace=None,
col_suffixes=(),
sql=None,
opclasses=(),
condition=None,
include=None,
expressions=None,
):
"""
Return the SQL statement to create the index for one or several fields
or expressions. `sql` can be specified if the syntax differs from the
standard (GIS indexes, ...).
"""
fields = fields or []
expressions = expressions or []
compiler = Query(model, alias_cols=False).get_compiler(
connection=self.connection,
)
tablespace_sql = self._get_index_tablespace_sql(
model, fields, db_tablespace=db_tablespace
)
columns = [field.column for field in fields]
sql_create_index = sql or self.sql_create_index
table = model._meta.db_table
def create_index_name(*args, **kwargs):
nonlocal name
if name is None:
name = self._create_index_name(*args, **kwargs)
return self.quote_name(name)
return Statement(
sql_create_index,
table=Table(table, self.quote_name),
name=IndexName(table, columns, suffix, create_index_name),
using=using,
columns=(
self._index_columns(table, columns, col_suffixes, opclasses)
if columns
else Expressions(table, expressions, compiler, self.quote_value)
),
extra=tablespace_sql,
condition=self._index_condition_sql(condition),
include=self._index_include_sql(model, include),
)
def _delete_index_sql(self, model, name, sql=None):
return Statement(
sql or self.sql_delete_index,
table=Table(model._meta.db_table, self.quote_name),
name=self.quote_name(name),
)
def _rename_index_sql(self, model, old_name, new_name):
return Statement(
self.sql_rename_index,
table=Table(model._meta.db_table, self.quote_name),
old_name=self.quote_name(old_name),
new_name=self.quote_name(new_name),
)
def _index_columns(self, table, columns, col_suffixes, opclasses):
return Columns(table, columns, self.quote_name, col_suffixes=col_suffixes)
def _model_indexes_sql(self, model):
"""
Return a list of all index SQL statements (field indexes, Meta.indexes)
for the specified model.
"""
if not model._meta.managed or model._meta.proxy or model._meta.swapped:
return []
output = []
for field in model._meta.local_fields:
output.extend(self._field_indexes_sql(model, field))
for index in model._meta.indexes:
if (
not index.contains_expressions
or self.connection.features.supports_expression_indexes
):
output.append(index.create_sql(model, self))
return output
def _field_indexes_sql(self, model, field):
"""
Return a list of all index SQL statements for the specified field.
"""
output = []
if self._field_should_be_indexed(model, field):
output.append(self._create_index_sql(model, fields=[field]))
return output
def _field_should_be_altered(self, old_field, new_field, ignore=None):
if not old_field.concrete and not new_field.concrete:
return False
ignore = ignore or set()
_, old_path, old_args, old_kwargs = old_field.deconstruct()
_, new_path, new_args, new_kwargs = new_field.deconstruct()
# Don't alter when:
# - changing only a field name
# - changing an attribute that doesn't affect the schema
# - changing an attribute in the provided set of ignored attributes
# - adding only a db_column and the column name is not changed
# - db_table does not change for model referenced by foreign keys
for attr in ignore.union(old_field.non_db_attrs):
old_kwargs.pop(attr, None)
for attr in ignore.union(new_field.non_db_attrs):
new_kwargs.pop(attr, None)
if (
not new_field.many_to_many
and old_field.remote_field
and new_field.remote_field
and old_field.remote_field.model._meta.db_table
== new_field.remote_field.model._meta.db_table
):
old_kwargs.pop("to", None)
new_kwargs.pop("to", None)
return self.quote_name(old_field.column) != self.quote_name(
new_field.column
) or (old_path, old_args, old_kwargs) != (new_path, new_args, new_kwargs)
def _field_should_be_indexed(self, model, field):
return field.db_index and not field.unique
def _field_became_primary_key(self, old_field, new_field):
return not old_field.primary_key and new_field.primary_key
def _unique_should_be_added(self, old_field, new_field):
return (
not new_field.primary_key
and new_field.unique
and (not old_field.unique or old_field.primary_key)
)
def _rename_field_sql(self, table, old_field, new_field, new_type):
return self.sql_rename_column % {
"table": self.quote_name(table),
"old_column": self.quote_name(old_field.column),
"new_column": self.quote_name(new_field.column),
"type": new_type,
}
def _create_fk_sql(self, model, field, suffix):
table = Table(model._meta.db_table, self.quote_name)
name = self._fk_constraint_name(model, field, suffix)
column = Columns(model._meta.db_table, [field.column], self.quote_name)
to_table = Table(field.target_field.model._meta.db_table, self.quote_name)
to_column = Columns(
field.target_field.model._meta.db_table,
[field.target_field.column],
self.quote_name,
)
deferrable = self.connection.ops.deferrable_sql()
return Statement(
self.sql_create_fk,
table=table,
name=name,
column=column,
to_table=to_table,
to_column=to_column,
deferrable=deferrable,
)
def _fk_constraint_name(self, model, field, suffix):
def create_fk_name(*args, **kwargs):
return self.quote_name(self._create_index_name(*args, **kwargs))
return ForeignKeyName(
model._meta.db_table,
[field.column],
split_identifier(field.target_field.model._meta.db_table)[1],
[field.target_field.column],
suffix,
create_fk_name,
)
def _delete_fk_sql(self, model, name):
return self._delete_constraint_sql(self.sql_delete_fk, model, name)
def _deferrable_constraint_sql(self, deferrable):
if deferrable is None:
return ""
if deferrable == Deferrable.DEFERRED:
return " DEFERRABLE INITIALLY DEFERRED"
if deferrable == Deferrable.IMMEDIATE:
return " DEFERRABLE INITIALLY IMMEDIATE"
def _unique_index_nulls_distinct_sql(self, nulls_distinct):
if nulls_distinct is False:
return " NULLS NOT DISTINCT"
elif nulls_distinct is True:
return " NULLS DISTINCT"
return ""
def _unique_supported(
self,
condition=None,
deferrable=None,
include=None,
expressions=None,
nulls_distinct=None,
):
return (
(not condition or self.connection.features.supports_partial_indexes)
and (
not deferrable
or self.connection.features.supports_deferrable_unique_constraints
)
and (not include or self.connection.features.supports_covering_indexes)
and (
not expressions or self.connection.features.supports_expression_indexes
)
and (
nulls_distinct is None
or self.connection.features.supports_nulls_distinct_unique_constraints
)
)
def _unique_sql(
self,
model,
fields,
name,
condition=None,
deferrable=None,
include=None,
opclasses=None,
expressions=None,
nulls_distinct=None,
):
if not self._unique_supported(
condition=condition,
deferrable=deferrable,
include=include,
expressions=expressions,
nulls_distinct=nulls_distinct,
):
return None
if (
condition
or include
or opclasses
or expressions
or nulls_distinct is not None
):
# Databases support conditional, covering, functional unique,
# and nulls distinct constraints via a unique index.
sql = self._create_unique_sql(
model,
fields,
name=name,
condition=condition,
include=include,
opclasses=opclasses,
expressions=expressions,
nulls_distinct=nulls_distinct,
)
if sql:
self.deferred_sql.append(sql)
return None
constraint = self.sql_unique_constraint % {
"columns": ", ".join([self.quote_name(field.column) for field in fields]),
"deferrable": self._deferrable_constraint_sql(deferrable),
}
return self.sql_constraint % {
"name": self.quote_name(name),
"constraint": constraint,
}
def _create_unique_sql(
self,
model,
fields,
name=None,
condition=None,
deferrable=None,
include=None,
opclasses=None,
expressions=None,
nulls_distinct=None,
):
if not self._unique_supported(
condition=condition,
deferrable=deferrable,
include=include,
expressions=expressions,
nulls_distinct=nulls_distinct,
):
return None
compiler = Query(model, alias_cols=False).get_compiler(
connection=self.connection
)
table = model._meta.db_table
columns = [field.column for field in fields]
if name is None:
name = self._unique_constraint_name(table, columns, quote=True)
else:
name = self.quote_name(name)
if condition or include or opclasses or expressions:
sql = self.sql_create_unique_index
else:
sql = self.sql_create_unique
if columns:
columns = self._index_columns(
table, columns, col_suffixes=(), opclasses=opclasses
)
else:
columns = Expressions(table, expressions, compiler, self.quote_value)
return Statement(
sql,
table=Table(table, self.quote_name),
name=name,
columns=columns,
condition=self._index_condition_sql(condition),
deferrable=self._deferrable_constraint_sql(deferrable),
include=self._index_include_sql(model, include),
nulls_distinct=self._unique_index_nulls_distinct_sql(nulls_distinct),
)
def _unique_constraint_name(self, table, columns, quote=True):
if quote:
def create_unique_name(*args, **kwargs):
return self.quote_name(self._create_index_name(*args, **kwargs))
else:
create_unique_name = self._create_index_name
return IndexName(table, columns, "_uniq", create_unique_name)
def _delete_unique_sql(
self,
model,
name,
condition=None,
deferrable=None,
include=None,
opclasses=None,
expressions=None,
nulls_distinct=None,
):
if not self._unique_supported(
condition=condition,
deferrable=deferrable,
include=include,
expressions=expressions,
nulls_distinct=nulls_distinct,
):
return None
if condition or include or opclasses or expressions:
sql = self.sql_delete_index
else:
sql = self.sql_delete_unique
return self._delete_constraint_sql(sql, model, name)
def _check_sql(self, name, check):
return self.sql_constraint % {
"name": self.quote_name(name),
"constraint": self.sql_check_constraint % {"check": check},
}
def _create_check_sql(self, model, name, check):
if not self.connection.features.supports_table_check_constraints:
return None
return Statement(
self.sql_create_check,
table=Table(model._meta.db_table, self.quote_name),
name=self.quote_name(name),
check=check,
)
def _delete_check_sql(self, model, name):
if not self.connection.features.supports_table_check_constraints:
return None
return self._delete_constraint_sql(self.sql_delete_check, model, name)
def _delete_constraint_sql(self, template, model, name):
return Statement(
template,
table=Table(model._meta.db_table, self.quote_name),
name=self.quote_name(name),
)
def _constraint_names(
self,
model,
column_names=None,
unique=None,
primary_key=None,
index=None,
foreign_key=None,
check=None,
type_=None,
exclude=None,
):
"""Return all constraint names matching the columns and conditions."""
if column_names is not None:
column_names = [
self.connection.introspection.identifier_converter(
truncate_name(name, self.connection.ops.max_name_length())
)
if self.connection.features.truncates_names
else self.connection.introspection.identifier_converter(name)
for name in column_names
]
with self.connection.cursor() as cursor:
constraints = self.connection.introspection.get_constraints(
cursor, model._meta.db_table
)
result = []
for name, infodict in constraints.items():
if column_names is None or column_names == infodict["columns"]:
if unique is not None and infodict["unique"] != unique:
continue
if primary_key is not None and infodict["primary_key"] != primary_key:
continue
if index is not None and infodict["index"] != index:
continue
if check is not None and infodict["check"] != check:
continue
if foreign_key is not None and not infodict["foreign_key"]:
continue
if type_ is not None and infodict["type"] != type_:
continue
if not exclude or name not in exclude:
result.append(name)
return result
def _delete_primary_key(self, model, strict=False):
constraint_names = self._constraint_names(model, primary_key=True)
if strict and len(constraint_names) != 1:
raise ValueError(
"Found wrong number (%s) of PK constraints for %s"
% (
len(constraint_names),
model._meta.db_table,
)
)
for constraint_name in constraint_names:
self.execute(self._delete_primary_key_sql(model, constraint_name))
def _create_primary_key_sql(self, model, field):
return Statement(
self.sql_create_pk,
table=Table(model._meta.db_table, self.quote_name),
name=self.quote_name(
self._create_index_name(
model._meta.db_table, [field.column], suffix="_pk"
)
),
columns=Columns(model._meta.db_table, [field.column], self.quote_name),
)
def _delete_primary_key_sql(self, model, name):
return self._delete_constraint_sql(self.sql_delete_pk, model, name)
def _collate_sql(self, collation, old_collation=None, table_name=None):
return "COLLATE " + self.quote_name(collation) if collation else ""
def remove_procedure(self, procedure_name, param_types=()):
sql = self.sql_delete_procedure % {
"procedure": self.quote_name(procedure_name),
"param_types": ",".join(param_types),
}
self.execute(sql)
Jan 15, 2024