Server : Apache System : Linux indy02.toastserver.com 3.10.0-962.3.2.lve1.5.85.el7.x86_64 #1 SMP Thu Apr 18 15:18:36 UTC 2024 x86_64 User : palandch ( 1163) PHP Version : 7.1.33 Disable Function : NONE Directory : /opt/imunify360/venv/lib64/python3.11/site-packages/imav/migrations/ |
""" This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see <https://www.gnu.org/licenses/>. Copyright © 2019 Cloud Linux Software Inc. This software is also available under ImunifyAV commercial license, see <https://www.imunify360.com/legal/eula> """ import json import logging import os from contextlib import suppress from functools import partial from itertools import chain from typing import Callable from peewee import SqliteDatabase, TextField from defence360agent.contracts.config import LocalConfig from imav.migration_utils.other import im360_present from imav.migration_utils.plesk_sdk import PleskSdk from imav.migration_utils.revisium import find_revisium_db logger = logging.getLogger(__name__) db = SqliteDatabase(None) class KeyValue(db.Model): """ex-Revisium settings database model""" key = TextField() value = TextField() class Meta: primary_key = False class SettingsExtractor: """Base class for ex-Revisium and Plesk settings extractors""" def __init__(self, key, default): self.key = key self.default = default def _get(self): raise NotImplementedError def get(self): try: value = self._get() except Exception as e: logger.warning("Fail to get %r value: %r", self.key, e) return self.default with suppress(Exception): value = json.loads(value) return value class Revisium(SettingsExtractor): """ex-Revisium database settings extractor""" def _get(self): return KeyValue.get(KeyValue.key == self.key).value class Plesk(SettingsExtractor): """Plesk settings extractor using PHP wrapper""" def _get(self): return PleskSdk.settings__get(self.key, self.default) class ConfigMapping: """ Transform a single or multiple dependent ex-Revisium settings parameters to a single ImunifyAV/360 config value and map it to a single or multiple config keys """ def __init__( self, source: SettingsExtractor | tuple[SettingsExtractor, ...], target: str | tuple[str, ...], *, converter: Callable, ): """ :param source: settings extractor(s) for ex-Revisium / Plesk :param target: name(s) of target config parameter(s) for ImunifyAV/360 :param converter: a callable to convert source value(s) to a target one """ if not isinstance(source, tuple): source = (source,) if not isinstance(target, tuple): target = (target,) self.source = source self.target = target self.converter = converter def convert(self): """Get target value from source value(s) and assign it to target(s)""" source_values = [source.get() for source in self.source] try: target_value = self.converter(*source_values) except Exception as e: logger.warning( "Fail to convert %r value(s) (%r): %r", self.source, source_values, e, ) return () return tuple(zip(self.target, (target_value,) * len(self.target))) def clamp(minimum: int, maximum: int, value: int) -> int: """ Ensure that a value is within limits """ return max(minimum, min(value, maximum)) def intensity_cpu(value, max_value) -> int: """ Calculate ImunifyAV/360 CPU intensity based on a value in a range """ intensity = 4 # the middle point of the CPU intensity with suppress(ZeroDivisionError): intensity = intensity * value // max_value return clamp(2, 7, intensity) schedule_interval_mapping = { "never": "none", "daily": "day", "weekly": "week", "monthly": "month", } def get_max_possible_cpu() -> int: """A half of available CPUs/cores (at least one)""" return max((os.cpu_count() or 1) // 2, 1) # we agreed to have all default settings in IM360 except for scanning time scan_time_mapping_only = ( ConfigMapping( Plesk("ra_auto_scan_period", "monthly"), "MALWARE_SCAN_SCHEDULE.interval", converter=schedule_interval_mapping.__getitem__, ), ConfigMapping( Plesk("ra_hour_auto_scan", 4), "MALWARE_SCAN_SCHEDULE.hour", converter=partial(clamp, 0, 23), ), ) all_mappings = ( ConfigMapping( ( Plesk("ra_max_worker_count", 2), Plesk("ra_max_possible_worker_count", get_max_possible_cpu()), ), ( "MALWARE_SCAN_INTENSITY.cpu", "MALWARE_SCAN_INTENSITY.user_scan_cpu", ), converter=intensity_cpu, ), ConfigMapping( Plesk("ra_keep_backups_days", 7), "MALWARE_CLEANUP.keep_original_files_days", converter=partial(max, 1), ), ConfigMapping( Plesk("ra_trim_files", True), "MALWARE_CLEANUP.trim_file_instead_of_removal", converter=bool, ), ConfigMapping( Revisium("ra_use_ignore_list_by_user", True), "PERMISSIONS.user_ignore_list", converter=bool, ), ) + scan_time_mapping_only def migrate_imav4plesk_settings(database=None): db.init(database) # convert strings like `MALWARE_SCAN_INTENSITY.cpu` to dicts and join them config = {} mappings = scan_time_mapping_only if im360_present() else all_mappings for target, value in chain.from_iterable(m.convert() for m in mappings): d = config *path, param = target.split(".") for key in path: d = d.setdefault(key, {}) d[param] = value return config def migrate(migrator, database, fake=False, **kwargs): if fake: return revisium_db_path = find_revisium_db() if revisium_db_path is None: logger.info("No legacy ImunifyAV database found. Skipping...") return try: if migrated_config := migrate_imav4plesk_settings(revisium_db_path): LocalConfig().dict_to_config(migrated_config, normalize=False) except Exception as e: logger.warning("Failed to migrate ImunifyAV for Plesk settings: %r", e) def rollback(migrator, database, fake=False, **kwargs): pass if __name__ == "__main__": print(migrate_imav4plesk_settings(find_revisium_db()))