Spamworldpro Mini Shell
Spamworldpro


Server : Apache
System : Linux indy02.toastserver.com 3.10.0-962.3.2.lve1.5.85.el7.x86_64 #1 SMP Thu Apr 18 15:18:36 UTC 2024 x86_64
User : palandch ( 1163)
PHP Version : 7.1.33
Disable Function : NONE
Directory :  /opt/cloudlinux/venv/lib64/python3.11/site-packages/clwizard/modules/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Current File : //opt/cloudlinux/venv/lib64/python3.11/site-packages/clwizard/modules/governor.py
# coding=utf-8
#
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2021 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENCE.TXT
#
import os
from typing import Dict, List  # NOQA

from clcommon.const import Feature
from clcommon.lib.mysql_governor_lib import MySQLGovernor
from clcommon.utils import ExternalProgramFailed, run_command
from clconfig.db_info_lib import MysqlInfo
from cldetectlib import getCPName
from clwizard.constants import MODULES_LOGS_DIR
from clwizard.exceptions import InstallationFailedException, UserInterventionNeededError

from .base import WizardInstaller


class GovernorInstaller(WizardInstaller):
    LOG_FILE = os.path.join(MODULES_LOGS_DIR, 'governor.log')
    UNKNOWN = 'unknown'
    UTILITY_PATH = '/usr/share/lve/dbgovernor/mysqlgovernor.py'
    _REQUIRED_CL_COMPONENT_SUPPORT = Feature.GOVERNOR

    def __init__(self):
        super().__init__()

        # dict(
        #  'vendor': MySQL|MariaDB|Percona,
        #  'version': server version in the form of {major}.{minor}
        #             or {major}.{minor}-{release} for percona,
        #  'cll-lve': patches from CL applied or not (True|False)
        # )
        self.db_info = MysqlInfo().get()

    def _install_governor_package(self):
        if not self._is_package_installed('governor-mysql'):
            try:
                out = self._install_package('governor-mysql')
            except ExternalProgramFailed as err:
                self.app_logger.error("Package installation failed with error: %s", str(err))
                raise InstallationFailedException() from err
            self.app_logger.info("Package was installed successfully: %s", out)
        else:
            self.app_logger.info("Skip the governor-mysql installation, it is already installed")

    def _prepare_db_options(self):
        if GovernorInstaller.UNKNOWN in (self.db_info["vendor"], self.db_info["version"]):
            return None
        try:
            vendor = self.db_info['vendor'].lower()
            # split(-)[0] drop release
            # split[.][0-2] use only major and minor versions
            version = ''.join(self.db_info['version'].split('-')[0].split('.')[0:2])
            return vendor + version
        except IndexError:
            return None

    def _initialize_governor(self):
        """
        Trying to install governor with --wizard key, it detects the DB
        automatically (on DA and cPanel), for other panels we will have blockers
        for the governor module.
        """
        try:
            self.app_logger.info("Install MySQL Governor")
            # for governor there are errors that could be handled only manually by user
            # these errors have exit code 3
            self._run_cmd_and_check_special_status(
                [GovernorInstaller.UTILITY_PATH, '--install', '--wizard'], exit_status=3
            )
        except ExternalProgramFailed as err:
            raise InstallationFailedException() from err

    def _run_cmd_and_check_special_status(self, cmd, exit_status):
        """
        There are cases when some commands can't be executed without the user's
        intervention. Such scripts/commands return a special exit code that must
        be checked for.
        """
        retcode, out, _ = run_command(cmd, return_full_output=True)
        if retcode == exit_status:
            self.app_logger.warning('Can`t install governor automatically')
            self.app_logger.warning('Reason: %s', out)
            raise UserInterventionNeededError()
        if retcode != 0:
            self.app_logger.error('Error occurred during running "%s"\nReason is: "%s"', cmd, out)
            raise ExternalProgramFailed(out)

    def run_installation(self, options):
        if self.db_info["vendor"] is None:
            raise InstallationFailedException("Please, install a MySQL server first.")
        self._install_governor_package()
        self._initialize_governor()

    def _get_warnings(self):
        # type: () -> List
        """
        Get list of warnings that should be shown in wizard
        before module installation
        """
        warnings = [
            {
                "message": (
                    "Please create a full database backup (including system tables)."
                    " MySQL/MariaDB/Percona server will be updated from CloudLinux repositories."
                )
            }
        ]  # type: List[Dict]
        if self.db_info["vendor"] is None:
            warnings.append(
                {
                    "message": (
                        "Could not detect a MySQL server."
                        " For a list of compatible options please see %(url)s."
                    ),
                    "context": {
                        "url": "https://docs.cloudlinux.com/mysql_governor_installation.html"
                    },
                }
            )
        return warnings

    def _get_blockers(self):
        # type: () -> List
        """
        Get a list of possible blockers to disable Governor module in Wizard UI.
        """
        blockers = []
        if getCPName() not in ['cPanel', 'DirectAdmin']:
            blockers.append(
                {
                    'message': 'MySQL Governor can\'t be automatically installed.'
                    ' Please install it through the command-line interface.'
                    ' Learn more here: %(url)s',
                    'context': {
                        'url': 'https://docs.cloudlinux.com/installation-wizard.html',
                    },
                }
            )
        return blockers

    def _is_already_configured(self):
        """
        Check if the governor is ready for work.
        """
        mysql_gov = MySQLGovernor()
        return mysql_gov.is_governor_present() and self.db_info['cll-lve']

    def initial_status(self):
        result = {
            'already_configured': self._is_already_configured(),
            'db_version': self.db_info['version'],
        }
        warnings = self._get_warnings()
        if warnings:
            result.update({'warnings': warnings})
        blockers = self._get_blockers()
        if blockers:
            result.update({'blockers': blockers})
        return result

Spamworldpro Mini