Server : Apache System : Linux indy02.toastserver.com 3.10.0-962.3.2.lve1.5.85.el7.x86_64 #1 SMP Thu Apr 18 15:18:36 UTC 2024 x86_64 User : palandch ( 1163) PHP Version : 7.1.33 Disable Function : NONE Directory : /opt/imunify360/venv/lib64/python3.11/site-packages/peewee_migrate/ |
"""Automatically create migrations.""" from __future__ import annotations from collections.abc import Hashable from typing import ( TYPE_CHECKING, Any, Callable, Dict, Final, Iterable, List, Optional, Type, Union, cast, ) import peewee as pw from playhouse.reflection import Column as VanilaColumn if TYPE_CHECKING: from .migrator import Migrator from .types import TModelType, TParams INDENT: Final = " " NEWLINE: Final = "\n" + INDENT FIELD_MODULES_MAP: Final = { "ArrayField": "pw_pext", "BinaryJSONField": "pw_pext", "DateTimeTZField": "pw_pext", "HStoreField": "pw_pext", "IntervalField": "pw_pext", "JSONField": "pw_pext", "TSVectorField": "pw_pext", } PW_MODULES: Final = "playhouse.postgres_ext", "playhouse.fields", "peewee" def fk_to_params(field: pw.ForeignKeyField) -> TParams: """Get params from the given fk.""" params = {} if field.on_delete is not None: params["on_delete"] = f"'{field.on_delete}'" if field.on_update is not None: params["on_update"] = f"'{field.on_update}'" return params def dtf_to_params(field: pw.DateTimeField) -> TParams: """Get params from the given datetime field.""" params = {} if not isinstance(field.formats, list): params["formats"] = field.formats return params FIELD_TO_PARAMS: Dict[Type[pw.Field], Callable[[Any], TParams]] = { pw.CharField: lambda f: {"max_length": f.max_length}, pw.DecimalField: lambda f: { "max_digits": f.max_digits, "decimal_places": f.decimal_places, "auto_round": f.auto_round, "rounding": f.rounding, }, pw.ForeignKeyField: fk_to_params, pw.DateTimeField: dtf_to_params, } class Column(VanilaColumn): """Get field's migration parameters.""" field_class: Type[pw.Field] def __init__(self, field: pw.Field, **kwargs): super(Column, self).__init__( field.name, find_field_type(field), field.field_type, field.null, primary_key=field.primary_key, column_name=field.column_name, index=field.index, unique=field.unique, extra_parameters=None, ) if field.default is not None and not callable(field.default): self.default = repr(field.default) if self.field_class in FIELD_TO_PARAMS: if self.extra_parameters is None: # type: ignore[has-type] self.extra_parameters = {} self.extra_parameters.update(FIELD_TO_PARAMS[self.field_class](field)) self.rel_model = None self.to_field = None if isinstance(field, pw.ForeignKeyField): self.to_field = field.rel_field.name self.rel_model = ( "'self'" if field.rel_model == field.model else "migrator.orm['%s']" % field.rel_model._meta.table_name ) def get_field(self, space: str = " ") -> str: """Generate the field definition for this column.""" field = super(Column, self).get_field() module = FIELD_MODULES_MAP.get(self.field_class.__name__, "pw") name, _, field = [s and s.strip() for s in field.partition("=")] return "{name}{space}={space}{module}.{field}".format( name=name, field=field, space=space, module=module ) def get_field_parameters(self) -> TParams: """Generate parameters for self field.""" params = super(Column, self).get_field_parameters() if self.default is not None: params["default"] = self.default return params def diff_one(model1: TModelType, model2: TModelType, **kwargs) -> List[str]: # noqa: """Find difference between given peewee models.""" changes = [] meta1, meta2 = model1._meta, model2._meta # type: ignore[] field_names1 = meta1.fields field_names2 = meta2.fields # Add fields names1 = set(field_names1) - set(field_names2) if names1: fields = [field_names1[name] for name in names1] changes.append(create_fields(model1, *fields, **kwargs)) # Drop fields names2 = set(field_names2) - set(field_names1) if names2: changes.append(drop_fields(model1, *names2)) # Change fields fields_ = [] nulls_ = [] indexes_ = [] for name in set(field_names1) - names1 - names2: field1, field2 = field_names1[name], field_names2[name] diff = compare_fields(field1, field2) null = diff.pop("null", None) index = diff.pop("index", None) if diff: fields_.append(field1) if null is not None: nulls_.append((name, null)) if index is not None: indexes_.append((name, index[0], index[1])) if fields_: changes.append(change_fields(model1, *fields_, **kwargs)) for name, null in nulls_: changes.append(change_not_null(model1, name, null=null)) for name, index, unique in indexes_: if index is True or unique is True: if field_names2[name].unique or field_names2[name].index: changes.append(drop_index(model1, name)) changes.append(add_index(model1, name, unique=unique)) else: changes.append(drop_index(model1, name)) # Check additional compound indexes indexes1 = meta1.indexes indexes2 = meta2.indexes # Drop compound indexes indexes_to_drop = set(indexes2) - set(indexes1) for index in indexes_to_drop: if isinstance(index[0], (list, tuple)) and len(index[0]) > 1: changes.append(drop_index(model1, name=index[0])) # Add compound indexes indexes_to_add = set(indexes1) - set(indexes2) for index in indexes_to_add: if isinstance(index[0], (list, tuple)) and len(index[0]) > 1: changes.append(add_index(model1, name=index[0], unique=index[1])) return changes def diff_many( models1: List[TModelType], models2: List[TModelType], migrator: Optional[Migrator] = None, *, reverse=False, ) -> List[str]: """Calculate changes for migrations from models2 to models1.""" models1 = cast(List["TModelType"], pw.sort_models(models1)) # type: ignore[] models2 = cast(List["TModelType"], pw.sort_models(models2)) # type: ignore[] if reverse: models1 = list(reversed(models1)) models2 = list(reversed(models2)) models_map1 = {cast(str, m._meta.table_name): m for m in models1} # type: ignore[] models_map2 = {cast(str, m._meta.table_name): m for m in models2} # type: ignore[] changes: List[str] = [] for name, model1 in models_map1.items(): if name not in models_map2: continue changes.extend(diff_one(model1, models_map2[name], migrator=migrator)) # Add models for name in [m for m in models_map1 if m not in models_map2]: changes.append(create_model(models_map1[name], migrator=migrator)) # Remove models for name in [m for m in models_map2 if m not in models_map1]: changes.append(remove_model(models_map2[name])) return changes def model_to_code(model_cls: TModelType, **kwargs) -> str: """Generate migrations for the given model.""" template = "class {classname}(pw.Model):\n{fields}\n\n{meta}" meta = model_cls._meta # type: ignore[] fields = INDENT + NEWLINE.join( [ field_to_code(field, **kwargs) for field in meta.sorted_fields if not (isinstance(field, pw.PrimaryKeyField) and field.name == "id") ] ) meta = INDENT + NEWLINE.join( filter( None, [ "class Meta:", f'{INDENT}table_name = "{meta.table_name}"', f'{INDENT}schema = "{meta.schema}"' if meta.schema else "", ( f"{INDENT}primary_key = pw.CompositeKey{meta.primary_key.field_names!r}" if isinstance(meta.primary_key, pw.CompositeKey) else "" ), f"{INDENT}indexes = {meta.indexes!r}" if meta.indexes else "", ], ) ) return template.format(classname=model_cls.__name__, fields=fields, meta=meta) def create_model(model_cls: TModelType, **kwargs) -> str: """Generate migrations to create model.""" return "@migrator.create_model\n" + model_to_code(model_cls, **kwargs) def remove_model(model_cls: TModelType, **kwargs) -> str: """Generate migrations to remove model.""" meta = model_cls._meta # type: ignore[] return "migrator.remove_model('%s')" % meta.table_name def create_fields(model_cls: TModelType, *fields: pw.Field, **kwargs) -> str: """Generate migrations to add fields.""" meta = model_cls._meta # type: ignore[] return "migrator.add_fields(%s'%s', %s)" % ( NEWLINE, meta.table_name, NEWLINE + ("," + NEWLINE).join([field_to_code(field, space=False, **kwargs) for field in fields]), ) def drop_fields(model_cls: TModelType, *fields: pw.Field, **kwargs) -> str: """Generate migrations to remove fields.""" meta = model_cls._meta # type: ignore[] return "migrator.remove_fields('%s', %s)" % ( meta.table_name, ", ".join(map(repr, fields)), ) def field_to_code(field: pw.Field, *, space: bool = True, **kwargs) -> str: """Generate field description.""" col = Column(field, **kwargs) return col.get_field(" " if space else "") def compare_fields(field1: pw.Field, field2: pw.Field, **kwargs) -> Dict: """Find diffs between the given fields.""" ftype1, ftype2 = find_field_type(field1), find_field_type(field2) if ftype1 != ftype2: return {"cls": True} params1 = field_to_params(field1) params1["null"] = field1.null params2 = field_to_params(field2) params2["null"] = field2.null return dict(set(params1.items()) - set(params2.items())) def field_to_params(field: pw.Field, **kwargs) -> TParams: """Generate params for the given field.""" ftype = find_field_type(field) params = FIELD_TO_PARAMS.get(ftype, lambda f: {})(field) if ( field.default is not None and not callable(field.default) and isinstance(field.default, Hashable) ): params["default"] = field.default params["index"] = field.index and not field.unique, field.unique params.pop("backref", None) # Ignore backref return params def change_fields(model_cls: TModelType, *fields: pw.Field, **kwargs) -> str: """Generate migrations to change fields.""" meta = model_cls._meta # type: ignore[] return "migrator.change_fields('%s', %s)" % ( meta.table_name, ("," + NEWLINE).join([field_to_code(f, space=False) for f in fields]), ) def change_not_null(model_cls: TModelType, name: str, *, null: bool) -> str: """Generate migrations.""" meta = model_cls._meta # type: ignore[] operation = "drop_not_null" if null else "add_not_null" return "migrator.%s('%s', %s)" % (operation, meta.table_name, repr(name)) def add_index(model_cls: TModelType, name: Union[str, Iterable[str]], *, unique: bool) -> str: """Generate migrations.""" meta = model_cls._meta # type: ignore[] columns = repr(name).strip("()[]") return f"migrator.add_index('{meta.table_name}', {columns}, unique={unique})" def drop_index(model_cls: TModelType, name: Union[str, Iterable[str]]) -> str: """Generate migrations.""" meta = model_cls._meta # type: ignore[] columns = repr(name).strip("()[]") return f"migrator.drop_index('{meta.table_name}', {columns})" def find_field_type(field: pw.Field) -> Type[pw.Field]: ftype = type(field) if ftype.__module__ not in PW_MODULES: for cls in ftype.mro(): if cls.__module__ in PW_MODULES: return cls return ftype