Spamworldpro Mini Shell
Spamworldpro


Server : Apache
System : Linux indy02.toastserver.com 3.10.0-962.3.2.lve1.5.85.el7.x86_64 #1 SMP Thu Apr 18 15:18:36 UTC 2024 x86_64
User : palandch ( 1163)
PHP Version : 7.1.33
Disable Function : NONE
Directory :  /opt/cloudlinux/venv/lib64/python3.11/site-packages/clcommon/lib/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Current File : //opt/cloudlinux/venv/lib64/python3.11/site-packages/clcommon/lib/cmt_utils.py
# coding=utf-8
#
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2020 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENCE.TXT
#

import json
import os
import requests
import subprocess
import shutil
from clcommon.lib.consts import (
    DEFAULT_JWT_ES_TOKEN_PATH,
    PUSHGATEWAY_ADDRESS,
    DISABLE_CMT_FILE,
)

def client_activation_data(raise_exception=False) -> dict:
    """
    Check that client is activated.
    Absence of JWT token means that client can have volume license like GoDaddy
    In this case we return False.
    Also we return False in case of some errors:
    - Errors during request of status from CM
    - Malformed JWT token
    - Incorrect answer from CM
    """
    try:
        with open(DEFAULT_JWT_ES_TOKEN_PATH, "r", encoding="utf-8") as f:
            token = f.read().strip()
        req = requests.get(
            f'{PUSHGATEWAY_ADDRESS}/api/cm/get_backend_activation',
            headers={"X-Auth": token},
            timeout=25,
        )
        req.raise_for_status()
        activation_info = req.json()
    except (
            json.JSONDecodeError,
            requests.exceptions.RequestException,
            OSError,
            KeyError,
            IOError,
    ):
        if raise_exception:
            raise
        # we do not want to raise for cldiag
        return {}
    return activation_info


def is_client_enabled(raise_exception=False) -> bool:
    return client_activation_data(raise_exception).get('activate', False)


def collect_cmt_request_debug(cmt_domain):
    """
    Several checks for CM
     - ping cmt domain
     - ping something that MUST be accessible
     - traceroute to CM
    """
    process_results = []
    ping_binary = shutil.which('ping')
    if ping_binary:
        ping_process = subprocess.run([ping_binary, '-c', '5', cmt_domain],
                                      check=False,
                                      capture_output=True,
                                      text=True)
        ping_external_process = subprocess.run([ping_binary, '-c', '5', 'google.com'],
                                               check=False,
                                               capture_output=True,
                                               text=True)
        ping_cloudlinux = subprocess.run([ping_binary, '-c', '5', 'cloudlinux.com'],
                                         check=False,
                                         capture_output=True,
                                         text=True)
        process_results.extend([ping_process, ping_cloudlinux, ping_external_process])
    traceroute_binary = shutil.which('traceroute')
    if traceroute_binary:
        traceroute_process = subprocess.run([traceroute_binary, '-T', cmt_domain],
                                            check=False,
                                            capture_output=True,
                                            text=True)
        process_results.append(traceroute_process)
    return '\n'.join(f'Command: {process_data.args}\nRetcode: {process_data.returncode}\n'
                     f'Stdout: {process_data.stdout}\nStderr: {process_data.stderr}\n'
                     for process_data in process_results)


def log_request_error_debug_info(component, logger, error, url, with_debug=False):
    """
    Log debug info for request exceptions
    """
    debug_info = 'N/A'
    if with_debug:
        debug_info = collect_cmt_request_debug(url)
    resolv_conf_info = get_resolv_conf_info() or 'N/A'
    logger.error('%s error while requesting %s: %s \n\n'
                 'DNS info: %s \n'
                 'Debug info for request: \n %s', component, url, str(error), str(resolv_conf_info), debug_info,
                 extra={'fingerprint': ['{{ default }}', component, error.__class__.__name__, url]})


def get_resolv_conf_info():
    """
    Get info from revolv.conf
    """
    resolv_conf = '/etc/resolv.conf'
    if not os.path.exists(resolv_conf):
        return None
    with open(resolv_conf, encoding='utf-8') as f:
        return f.read()


def is_cmt_disabled() -> bool:
    """
    Check that CM is disabled localy
    """

    return os.path.exists(DISABLE_CMT_FILE)

Spamworldpro Mini