Spamworldpro Mini Shell
Spamworldpro


Server : Apache
System : Linux indy02.toastserver.com 3.10.0-962.3.2.lve1.5.85.el7.x86_64 #1 SMP Thu Apr 18 15:18:36 UTC 2024 x86_64
User : palandch ( 1163)
PHP Version : 7.1.33
Disable Function : NONE
Directory :  /opt/cloudlinux/venv/lib/python3.11/site-packages/clwpos/user/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Current File : //opt/cloudlinux/venv/lib/python3.11/site-packages/clwpos/user/config.py
# -*- coding: utf-8 -*-

# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2021 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT

from __future__ import absolute_import

import datetime
import json
import logging
import os
import pwd
import traceback
import warnings
from copy import deepcopy
from enum import IntEnum, auto
from typing import Iterable, Optional

from clwpos.optimization_features import ALL_OPTIMIZATION_FEATURES, Feature
from clwpos.logsetup import setup_logging
from clwpos.utils import (
    get_relative_docroot,
    create_clwpos_dir_if_not_exists,
    is_run_under_user
)
from clcommon.clwpos_lib import is_wp_path
from clwpos import constants
from clwpos.cl_wpos_exceptions import WposError
from clwpos import gettext as _


class ConfigError(WposError):
    """
    Used for all exceptions during handling clwpos user config
    in UserConfig methods
    """
    pass


class LicenseApproveStatus(IntEnum):
    # feature does not require approve to work
    NOT_REQUIRED = auto()
    # feature required approve, but it was not given yet
    NOT_APPROVED = auto()
    # feature required approve and it was given
    APPROVED = auto()

    # feature required approve,it was given,
    # but license changed and we need another approve
    # TODO: currently unused
    # UPDATE_REQUIRED = auto()


class UserConfig(object):
    """
    Class to manage clwpos user config - read, write, set params in config.
    """
    CONFIG_PATH = os.path.join("{homedir}", constants.USER_WPOS_DIR, constants.USER_CLWPOS_CONFIG)
    DEFAULT_MAX_CACHE_MEMORY = f"{constants.DEFAULT_MAX_CACHE_MEMORY}mb"
    DEFAULT_CONFIG = {"docroots": {}, "max_cache_memory": DEFAULT_MAX_CACHE_MEMORY}

    def __init__(self, username: str | pwd.struct_passwd, allow_root=False, setup_logs=True):
        if not allow_root:
            self._validate_permissions()

        if isinstance(username, str):
            # Outdated way of config instance initialization:
            # consider passing pwd struct instead of username
            self.username = username
            self.pw = pwd.getpwnam(username)
            self.homedir = self.pw.pw_dir
        else:
            self.pw = username
            self.username = username.pw_name
            self.homedir = username.pw_dir

        self.config_path = self.CONFIG_PATH.format(homedir=self.homedir)

        # FIXME: just logger = logging.getLogger(__name__)
        if setup_logs:
            self._logger = setup_logging(__name__)
        else:
            self._logger = logging.getLogger("UserConfig")

    def _validate_permissions(self):
        if not is_run_under_user():
            raise ConfigError(_("Trying to use UserConfig class as root"))

    def read_config(self):
        """
        Reads config from self.config_path

        DO NOT USE THIS DIRECTLY! USE get_config INSTEAD!
        """
        try:
            with open(self.config_path, "r") as f:
                return json.loads(f.read())
        except Exception:
            exc_string = traceback.format_exc()
            raise ConfigError(
                message=_("Error while reading config %(config_path)s: %(exception_string)s"),
                context={"config_path": self.config_path, "exception_string": exc_string}
            )

    def write_config(self, config: dict):
        """
        Writes config (as json) to self.config_path
        """
        create_clwpos_dir_if_not_exists(self.pw)

        try:
            config_json = json.dumps(config, indent=4, sort_keys=True)
            with open(self.config_path, "w") as f:
                f.write(config_json)
        except Exception as e:
            raise ConfigError(
                message=_("Attempt of writing to config file failed due to error:\n%(exception)s"),
                context={"exception": e}
            )

    def is_default_config(self):
        """
        Checks if user customized his config already.
        """
        return not os.path.exists(self.config_path)

    def get_config(self):
        """
        Returns default config or config content from self.config_path
        """
        # if config file is not exists, returns DEFAULT CONFIG
        if self.is_default_config():
            return deepcopy(self.DEFAULT_CONFIG)

        # Otherwise, reads config from file
        # and returns it if it's not broken
        try:
            config = self.read_config()
        except ConfigError:
            return deepcopy(self.DEFAULT_CONFIG)
        return config if isinstance(config, dict) else deepcopy(self.DEFAULT_CONFIG)

    def set_params(self, params: dict):
        """
        Set outer (not "docroots") params in config.
        Example:
        Old config:
        {
            "docroots": ...,
            "max_cache_memory": "123mb",
        }
        Input params:
        {
            "max_cache_memory": "1024mb",
            "param": "value"
        }
        New config:
        {
            "docroots": ...,
            "max_cache_memory": "1024mb",
            "param": "value"
        }
        """
        config = self.get_config()
        for key, value in params.items():
            config[key] = value
        self.write_config(config)

    def is_module_enabled(
            self, domain: str, wp_path: str, module: str, config: Optional[dict] = None) -> bool:
        config = config or self.get_config()
        try:
            docroot = get_relative_docroot(domain, self.homedir)
        except Exception as e:
            self._logger.warning(e, exc_info=True)
            raise ConfigError(
                message=_("Can't find docroot for domain '%(domain)s' and homedir '%(homedir)s'"),
                context={"domain": domain, "homedir": self.homedir}
            )

        if not is_wp_path(os.path.join(self.homedir, docroot, wp_path)):
            raise ConfigError(
                message=_("Wrong wordpress path '%(wp_path)s' passed"),
                context={"wp_path": wp_path}
            )

        if module not in ALL_OPTIMIZATION_FEATURES:
            raise ConfigError(
                message=_("Invalid feature %(feature)s, available choices: %(choices)s"),
                context={"feature": module, "choices": ALL_OPTIMIZATION_FEATURES}
            )

        try:
            docroots = config["docroots"]
            module_info = docroots.get(docroot, {}).get(wp_path, [])
            return module in module_info
        except (KeyError, AttributeError, TypeError) as e:
            self._logger.warning(f"config {self.config_path} is broken: {e}", exc_info=True)
            raise ConfigError(
                message=_("Config is broken.\nRepair %(config_path)s or restore from backup."),
                context={"config_path": self.config_path}
            )

    def get_license_approve_status(self, feature: Feature) -> LicenseApproveStatus:
        """
        Returns NOT_REQUIRED if feature does not require any approve

        Returns NOT_APPROVED in case if user is required to approve
        license terms before he can use the feature.

        Returns APPROVED in case if license terms were applied.
        """
        if not feature.HAS_LICENSE_TERMS:
            return LicenseApproveStatus.NOT_REQUIRED

        if feature.NAME not in self.get_config().get('approved_licenses', {}):
            return LicenseApproveStatus.NOT_APPROVED

        return LicenseApproveStatus.APPROVED

    def approve_license_agreement(self, feature: Feature):
        """
        Writes information about approved license terms for given feature to config file.
        """
        config = self.get_config()
        approved_licenses = config.get('approved_licenses', {})
        approved_licenses[feature.NAME] = dict(
            approve_date=datetime.datetime.now().isoformat()
        )

        config['approved_licenses'] = approved_licenses
        self.write_config(config)

    def disable_module(self, domain: str, wp_path: str, module: str) -> None:
        try:
            docroot = get_relative_docroot(domain, self.homedir)
        except Exception as e:
            self._logger.exception(e)
            raise ConfigError(
                message=_("Docroot for domain '%(domain)s' is not found"),
                context={"domain": domain}
            )

        if not is_wp_path(os.path.join(self.homedir, docroot, wp_path)):
            raise ConfigError(
                message=_("Wrong wordpress path '%(wp_path)s' passed"),
                context={"wp_path": wp_path}
            )

        if module not in ALL_OPTIMIZATION_FEATURES:
            raise ConfigError(
                message=_("Invalid feature %(feature)s, available choices: %(choices)s"),
                context={"feature": module, "choices": ALL_OPTIMIZATION_FEATURES}
            )

        config = self.get_config()
        # check here as well that config has expected structure
        if not self.is_module_enabled(domain, wp_path, module, config):
            return

        # remove module from the list
        config["docroots"][docroot][wp_path].remove(module)

        # delete wp_path if all modules are disabled
        if not config["docroots"][docroot][wp_path]:
            del config["docroots"][docroot][wp_path]

        # delete docroot in it doesn't have wordpresses
        if not config["docroots"][docroot]:
            del config["docroots"][docroot]

        self.write_config(config)

    def enable_module(self, domain: str, wp_path: str, feature: str) -> None:
        try:
            docroot = get_relative_docroot(domain, self.homedir)
        except Exception as e:
            self._logger.exception(e)
            raise ConfigError(
                message=_("Docroot for domain '%(domain)s' is not found"),
                context={"domain": domain}
            )

        if not is_wp_path(os.path.join(self.homedir, docroot, wp_path)):
            raise ConfigError(
                message=_("Wrong wordpress path '%(wp_path)s' passed"),
                context={"wp_path": wp_path}
            )

        if feature not in ALL_OPTIMIZATION_FEATURES:
            raise ConfigError(
                message=_("Invalid feature %(feature)s, available choices: %(choices)s"),
                context={"feature": feature, "choices": ALL_OPTIMIZATION_FEATURES}
            )

        config = self.get_config()
        # check here as well that config has expected structure
        if self.is_module_enabled(domain, wp_path, feature, config):
            return

        if "docroots" not in config:
            config["docroots"] = {}

        if docroot not in config["docroots"]:
            config["docroots"][docroot] = {}

        if wp_path not in config["docroots"][docroot]:
            config["docroots"][docroot][wp_path] = []

        config["docroots"][docroot][wp_path].append(feature)
        self.write_config(config)

    def enabled_modules(self):
        for doc_root, doc_root_info in self.get_config()["docroots"].items():
            for wp_path, module_names in doc_root_info.items():
                for name in module_names:
                    yield doc_root, wp_path, name

    def wp_paths_with_enabled_module(self, module_name: str) -> Iterable[str]:
        """
        Return absolute WP paths with specified module enabled.
        """
        for doc_root, wp_path, name in self.enabled_modules():
            if name == module_name:
                yield os.path.join(self.homedir, doc_root, wp_path)

    def wp_paths_with_active_suite_features(self, features_set: set):
        """
        Unique set of sites with active features from feature set
        SET is used here, because one site may have several features activated from one set
        e.g: site1 with activated object_cache, shortcodes = 1 path
        """
        sites = set()
        for feature in features_set:
            sites_with_enabled_feature = self.wp_paths_with_enabled_module(feature)
            for site in sites_with_enabled_feature:
                sites.add(site)
        return sites

    def get_enabled_sites_count_by_modules(self, checked_module_names):
        """
        Returns count of sites with enabled module
        """
        sites_count = 0
        for _, doc_root_info in self.get_config().get('docroots', {}).items():
            for _, module_names in doc_root_info.items():
                sites_count += any(checked_module_name in module_names for checked_module_name in checked_module_names)
        return sites_count

Spamworldpro Mini