Spamworldpro Mini Shell
Spamworldpro


Server : Apache
System : Linux indy02.toastserver.com 3.10.0-962.3.2.lve1.5.85.el7.x86_64 #1 SMP Thu Apr 18 15:18:36 UTC 2024 x86_64
User : palandch ( 1163)
PHP Version : 7.1.33
Disable Function : NONE
Directory :  /opt/cloudlinux/venv/lib/python3.11/site-packages/clwpos/hooks/lib/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Current File : //opt/cloudlinux/venv/lib/python3.11/site-packages/clwpos/hooks/lib/user_disable_caching.py
#!/opt/cloudlinux/venv/bin/python3 -sbb
# -*- coding: utf-8 -*-

# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2021 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT

from __future__ import absolute_import

import itertools
import json
import logging
import os
from typing import Dict, Set, Union

from clwpos.cl_wpos_exceptions import WposError
from clwpos.data_collector_utils import php_info as get_php_info
from clwpos.optimization_features import ALL_OPTIMIZATION_FEATURES
from clwpos.user.config import UserConfig
from clwpos.utils import WposUser, is_run_under_user, run_in_cagefs_if_needed, ExtendedJSONEncoder


def disable_module(domain: str, wp_path: str, module: str) -> None:
    """
    Disable optimization feature for specified domain and wp_path
    """
    disable_cmd = ['/usr/bin/clwpos-user', 'disable', f'--domain={domain}',
                   f'--wp-path={wp_path}', f'--feature={module}']
    result = run_in_cagefs_if_needed(disable_cmd)
    logging.debug('CLWPOS-USER DISABLE --> %s', result)


def check_has_incompatibilities(
        wordpress_info: Dict[str, Union[str, dict]], module: str) -> bool:
    """
    Check for module's incompatibility issues for specified wp.
    """
    module_info = wordpress_info.get('features', {}).get(module, {})
    is_enabled = module_info.get('enabled', False)
    issues = module_info.get('issues', [])
    return is_enabled and any(issue['type'] == 'incompatibility' for issue in issues)


def create_php_info_file(user: WposUser) -> None:
    """
    Create .php_info-<id> file user's .clwpos directory
    and write data about user's php versions into it.
    """
    if not is_run_under_user():
        raise WposError('Internal Error. Contact CloudLinux support')
    _pid = os.environ.get('CLWPOS_PHP_FILE_ID')
    if not os.path.isdir(user.wpos_dir) or _pid is None:
        return
    php_info = get_php_info()
    file_path = user.php_info.format(file_id=_pid)
    with open(file_path, 'w') as f:
        json.dump(php_info, f, cls=ExtendedJSONEncoder)


def remove_php_info_file(user: WposUser) -> None:
    """
    Remove .php_info-<id> file from user's .clwpos directory if it exists.
    """
    if not is_run_under_user():
        raise WposError('Internal Error. Contact CloudLinux support')
    _pid = os.environ.get('CLWPOS_PHP_FILE_ID')
    file_path = user.php_info.format(file_id=_pid)
    if os.path.isfile(file_path):
        os.unlink(file_path)


def is_docroot_with_wpos_enabled_affected(user: WposUser, docroots_affected: Set[str]) -> bool:
    """
    Check if any WPOS optimization feature is enabled
    on docroots that are affected by php version or handler change.
    """
    if not os.path.exists(user.wpos_dir):
        return False

    for docroot, _, _ in UserConfig(user.name).enabled_modules():
        abs_docroot_path = os.path.join(user.home_dir, docroot)
        if abs_docroot_path.rstrip('/') in docroots_affected:
            return True

    return False


def check_domains_wpos_info_and_disable_module(domains_affected: Set[str]) -> None:
    """
    Call clwpos-user get command to obtain WPOS data about user's sites.
    If site has incompatibilities, disable object caching on it.
    """
    get_cmd = ['/usr/bin/clwpos-user', 'get']
    result = run_in_cagefs_if_needed(get_cmd)
    logging.debug('CLWPOS-USER GET --> %s', result)

    for docroot_info in json.loads(result.stdout).get('docroots', []):
        docroot_domains = docroot_info['domains']
        if any(domain in domains_affected for domain in docroot_domains):
            wordpresses = docroot_info['wps']
            for wordpress_info, module in itertools.product(wordpresses,
                                                            [feature.to_interface_name() for
                                                             feature in ALL_OPTIMIZATION_FEATURES]):
                if check_has_incompatibilities(wordpress_info, module):
                    disable_module(docroot_domains[0], wordpress_info['path'], module)


def _run_pre_hook(user: WposUser):
    """
    Create file with data about domains' php versions in user's .clwpos directory.
    """
    create_php_info_file(user)


def _run_post_hook(user: WposUser, domains_affected: Set[str]):
    """
    Check incompatibilies for user's domains and disable object caching if needed.
    """
    check_domains_wpos_info_and_disable_module(domains_affected)
    remove_php_info_file(user)

Spamworldpro Mini