Spamworldpro Mini Shell
Spamworldpro


Server : Apache
System : Linux indy02.toastserver.com 3.10.0-962.3.2.lve1.5.85.el7.x86_64 #1 SMP Thu Apr 18 15:18:36 UTC 2024 x86_64
User : palandch ( 1163)
PHP Version : 7.1.33
Disable Function : NONE
Directory :  /opt/cloudlinux/venv/lib64/python3.11/site-packages/xray/internal/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Current File : //opt/cloudlinux/venv/lib64/python3.11/site-packages/xray/internal/utils.py
# -*- coding: utf-8 -*-

# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2021 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT

"""
This module contains helpful utility functions for X-Ray Manager
"""
import dbm
import errno
from getpass import getuser

import fcntl
import logging
import os
import shelve
import shutil
import subprocess
import platform
import time
import xml.etree.ElementTree as ET
from contextlib import contextmanager
from datetime import date, timedelta
from functools import wraps
from glob import glob
from socket import (socket, fromfd, AF_UNIX, SOCK_STREAM, SOCK_DGRAM,
                    AF_INET, AF_INET6)
from typing import Callable, List, Optional

import sentry_sdk
from sentry_sdk.integrations.atexit import AtexitIntegration
from sentry_sdk.integrations.logging import LoggingIntegration

from clcommon.const import Feature
from clcommon.cpapi import is_panel_feature_supported, get_cp_description, getCPName, is_wp2_environment
from clcommon.lib.cledition import get_cl_edition_readable

from clcommon.ui_config import UIConfig
from clcommon.clpwd import drop_privileges
from clcommon.utils import get_rhn_systemid_value
from clcommon.lib.network import get_hostname
from clwpos.papi import php_get_vhost_versions_user

from xray import gettext as _

from .constants import (
    sentry_dsn,
    local_tasks_storage,
    agent_file,
    logging_level,
    jwt_token_location,
    user_agent_sock
)
from .exceptions import XRayError, XRayManagerExit


logger = logging.getLogger('utils')
subprocess_errors = (
    OSError, ValueError, subprocess.SubprocessError
)

# --------- DECORATORS ---------


def skeleton_update(func: Callable) -> Callable:
    """
    Decorator aimed to update ini file in cagefs-skeleton
    Applies to task.add nd task.remove
    """

    def update(*args):
        """
        Copy ini file to cagefs-skeleton
        Action takes place for cPanel ea-php only
        """
        original_ini = os.path.join(args[0].ini_location, 'xray.ini')
        if original_ini.startswith('/opt/cpanel') and glob(
                '/usr/share/cagefs'):
            skeleton_ini = os.path.join('/usr/share/cagefs/.cpanel.multiphp',
                                        original_ini[1:])
        elif original_ini.startswith('/usr/local') and glob(
                '/usr/share/cagefs-skeleton'):
            skeleton_ini = os.path.join('/usr/share/cagefs-skeleton',
                                        original_ini[1:])
            if not os.path.exists(os.path.dirname(skeleton_ini)):
                os.mkdir(os.path.dirname(skeleton_ini))
        else:
            return

        if not os.path.exists(original_ini):
            if os.path.exists(skeleton_ini):
                try:
                    os.unlink(skeleton_ini)
                except OSError as e:
                    logger.warning('Failed to unlink ini in cagefs-skeleton',
                                   extra={'xray_ini': skeleton_ini,
                                          'err': str(e)})
        else:
            try:
                shutil.copy(original_ini, skeleton_ini)
            except OSError as e:
                logger.warning('Failed to copy ini into cagefs-skeleton',
                               extra={'xray_ini': original_ini,
                                      'err': str(e)})

    @wraps(func)
    def wrapper(*args, **kwargs):
        """
        Wraps func
        """
        func(*args, **kwargs)
        update(*args)

    return wrapper


def dbm_storage_update(func: Callable) -> Callable:
    """
    Decorator aimed to update DBM storage with fake_id:real_id mapping
    Applies to task.add nd task.remove
    """

    def update(*args):
        """
        Update DBM storage contents
        """
        task_instance = args[0]
        with dbm_storage(local_tasks_storage) as task_storage:
            task_storage[task_instance.fake_id] = task_instance.task_id

    def remove(*args):
        """
        Remove task from DBM storage
        """
        with dbm_storage(local_tasks_storage) as task_storage:
            try:
                del task_storage[args[0].fake_id.encode()]
            except KeyError:
                # ignore absence of item during removal
                pass

    @wraps(func)
    def wrapper(*args, **kwargs):
        """
        Wraps func
        """
        # add task id into DBM storage as early as possible
        try:
            if func.__name__ == 'add':
                update(*args)
        except RuntimeError as e:
            raise XRayError(str(e))

        try:
            func(*args, **kwargs)
        except Exception:
            # cleanup recently added task id from DBM storage in case of
            # any accidental fails during add procedure
            if func.__name__ == 'add':
                remove(*args)
            raise

        # during task removal cleanup task from DBM storage as late as possible
        try:
            if func.__name__ == 'remove':
                remove(*args)
        except RuntimeError as e:
            raise XRayError(str(e))

    return wrapper


def check_jwt(func: Callable) -> Callable:
    """
    Decorator aimed to validate given JWT token
    """

    def check():
        """
        Check if retrieved JWT token is valid
        """
        is_xray_supported()

    @wraps(func)
    def wrapper(*args, **kwargs):
        """
        Wraps func
        """
        token = func(*args, **kwargs)
        check()
        return token

    return wrapper


# --------- FUNCTIONS ---------


def timestamp() -> int:
    """
    Get current epoch timestamp as int
    :return: timestamp as int
    """
    return int(time.time())


def prev_date() -> date:
    """
    Pick a yesterday date
    :return: a datetime.date object
    """
    return date.today() - timedelta(days=1)


def date_of_timestamp(ts: int) -> date:
    """
    Get the datetime.date object for given int timestamp
    :param ts: timestamp
    :return: datetime.date object
    """
    return date.fromtimestamp(ts)


def get_formatted_date() -> str:
    """
    Get a formatted representation of yesterday date
    :return: str date in the form of dd/mm/YYYY
    """
    return prev_date().strftime("%d/%m/%Y")


def get_html_formatted_links(links: List[dict]) -> str:
    """
    HTML formatted links
    """
    html_item = '<p>{num}) <a href={link}>{domain}</a></p>'
    return '\n'.join([html_item.format(num=i, link=v, domain=k) for i, l in
                      enumerate(links, 1) for k, v in l.items()])


def get_text_formatted_links(links: List[dict]) -> str:
    """
    Formatted links
    """
    text_item = '{num}) {dom}: {link}'
    return '\n'.join([text_item.format(num=i, dom=k, link=v) for i, l in
                      enumerate(links, 1) for k, v in l.items()])


def read_sys_id() -> str:
    """
    Obtain system ID from /etc/sysconfig/rhn/systemid
    :return: system ID without ID- prefix
    """
    try:
        tree = ET.parse('/etc/sysconfig/rhn/systemid')
        root = tree.getroot()
        whole_id = root.find(".//member[name='system_id']/value/string").text
        with sentry_sdk.configure_scope() as scope:
            scope.set_tag("system_id", whole_id)
        return whole_id.lstrip('ID-')
    except (OSError, ET.ParseError) as e:
        raise XRayError(_('Failed to retrieve system_id')) from e


def write_sys_id(sys_id: str, agent_system_id_path: str = agent_file) -> None:
    """
    Write system_id into file /usr/share/alt-php-xray/agent_sys_id
    """
    with open(agent_system_id_path, 'w') as out:
        out.write(sys_id)


def read_agent_sys_id(agent_system_id_path: str = agent_file) -> str:
    """
    Read system_id saved by agent during its initialization
    """
    try:
        with open(agent_system_id_path) as agent_sysid_file:
            return agent_sysid_file.read().strip()
    except OSError as e:
        logger.info(
            "Failed to retrieve agent's system_id, returning real one",
            extra={'err': str(e)})
        return read_sys_id()


def read_agent_sys_id() -> str:
    """
    Read system_id saved by agent during its initialization
    """
    try:
        with open(agent_file) as agent_sysid_file:
            return agent_sysid_file.read().strip()
    except OSError as e:
        logger.info(
            "Failed to retrieve agent's system_id, returning real one",
            extra={'err': str(e)})
        return read_sys_id()
        # raise XRayError("Failed to retrieve agent's system_id") from e


def is_xray_supported() -> Optional[bool]:
    """Raise XRayError in case of detected non-supported edition"""
    is_supported = is_panel_feature_supported(Feature.XRAY)
    if not is_supported:
        current_edition = get_cl_edition_readable()
        current_panel = getCPName()
        logger.info('Current CloudLinux edition: %s or Control Panel: %s is not supported by X-Ray',
                    str(current_edition), str(current_panel))
        raise XRayManagerExit(_('Current CloudLinux edition: {} or '
                                'Control Panel: {} is not supported by X-Ray'.format(current_edition,
                                                                                     current_panel)))
    return True


@check_jwt
def read_jwt_token() -> str:
    """
    Obtain jwt token from /etc/sysconfig/rhn/jwt.token
    :return: token read
    """
    try:
        with open(jwt_token_location) as token_file:
            return token_file.read().strip()
    except (OSError, IOError):
        raise XRayError(_('JWT file %s read error') % str(jwt_token_location))


def pkg_version(filepath: str) -> Optional[str]:
    """Get version of package from file. alt-php-xray supported"""
    try:
        with open(filepath) as v_file:
            version = v_file.read().strip()
    except OSError:
        return
    # remove dist suffix
    return '.'.join(version.split('.')[:2]) or '0.0-0'

def xray_version() -> Optional[str]:
    """Get version of alt-php-xray package"""
    return pkg_version('/usr/share/alt-php-xray/version')

def sentry_init() -> None:
    """
    Initialize Sentry client
    shutdown_timeout=0 disables Atexit integration as stated in docs:
    'it’s easier to disable it by setting the shutdown_timeout to 0'
    https://docs.sentry.io/platforms/python/default-integrations/#atexit
    On the other hand, docs say, that
    'Setting this value too low will most likely cause problems
    for sending events from command line applications'
    https://docs.sentry.io/error-reporting/configuration/?platform=python#shutdown-timeout
    """

    def add_info(event: dict, hint: dict) -> dict:
        """
        Add extra data into sentry event
        :param event: original event
        :param hint: additional data caught
        :return: updated event
        """
        event['extra'].update({'xray.version': '0.6-29.el7'})

        extra_data = event.get('extra', {})
        fingerprint = extra_data.get('fingerprint', None)

        if fingerprint:
            event['fingerprint'] = [fingerprint]

        return event

    def set_tags(sentry_scope):
        cp_description = get_cp_description()
        cp_version = cp_description.get('version') if cp_description else None
        cp_name = cp_description.get('name') if cp_description else None
        cp_product = 'WP2' if is_wp2_environment() else None

        tags = (('Control Panel Name', cp_name),
                ('Control Panel Version', cp_version),
                ('Control Panel Product', cp_product),
                ('kernel', platform.release()),
                ('CloudLinux version', get_rhn_systemid_value("os_release")),
                ('Cloudlinux edition', get_cl_edition_readable()),
                ('Architecture', get_rhn_systemid_value("architecture")),
                ('ip_address', ip_addr()),
                ('username', getuser())
                )
        # set_tags does not work in current version of sentry_sdk
        # https://github.com/getsentry/sentry-python/issues/1344
        for tag in tags:
            sentry_scope.set_tag(*tag)

    def nope(pending, timeout) -> None:
        pass

    def try_get_ip(address_family, private_ip):
        """
        address_family - we can choose constants represent the address
                           (and protocol) families
                           (AF_INET for ipv4 and AF_INET6 for ipv6)
        private_ip - specify some private ip address. For instance:
                     ipv4 -> 10.255.255.255 or ipv6 -> fc00::
        """
        with socket(address_family, SOCK_DGRAM) as s:
            try:
                s.connect((private_ip, 1))
                IP = s.getsockname()[0]
            except Exception:
                IP = None
        return IP

    def ip_addr() -> str:
        """
        Retrieve server's IP
        """
        ipversions = (AF_INET, '10.255.255.255'), (AF_INET6, 'fc00::')
        for addr_fam, priv_ip in ipversions:
            ip = try_get_ip(addr_fam, priv_ip)
            if ip:
                return ip
        return '127.0.0.1'

    sentry_logging = LoggingIntegration(level=logging.INFO,
                                        event_level=logging.WARNING)
    xray_ver = xray_version() or 'alt-php-xray@0.6-29.el7'
    silent_atexit = AtexitIntegration(callback=nope)
    sentry_sdk.init(dsn=sentry_dsn, before_send=add_info,
                    release=xray_ver,
                    max_value_length=10000,
                    integrations=[sentry_logging, silent_atexit])
    with sentry_sdk.configure_scope() as scope:
        scope.user = {
            "id": get_rhn_systemid_value("system_id") or ip_addr() or get_hostname() or getuser()
        }
        try:
            set_tags(scope)
        except Exception:
            pass


def configure_logging(logname: str, level=logging_level) -> Optional[str]:
    """
    Configure logging and Sentry
    :param logname: path to log
    :return: logpath
    """
    levels = {
        'debug': logging.DEBUG,
        'info': logging.INFO,
        'warning': logging.WARNING,
        'error': logging.ERROR,
        'critical': logging.CRITICAL
    }

    sentry_init()
    try:
        handlers = [
            logging.FileHandler(filename=logname)
        ]
        if level == 'debug':
            handlers.append(logging.StreamHandler())
        logging.basicConfig(level=levels.get(level, logging.INFO),
                            format='%(asctime)s [%(threadName)s:%(name)s] %(message)s',
                            datefmt='%m/%d/%Y %I:%M:%S %p',
                            handlers=handlers)
    except OSError:
        # dummy logging
        logging.basicConfig(handlers=[logging.NullHandler()])
        return

    try:
        os.chmod(logname, 0o600)
    except PermissionError:
        pass
    return logname


def safe_move(src: str, dst: str) -> None:
    """
    Move file with error catching
    :param src: source
    :param dst: destination
    """
    try:
        shutil.move(src, dst)
    except OSError as e:
        raise XRayError(_('Failed to move file {} to {}: {}'.format(src, dst, str(e)))) from e


def create_socket(sock_location: str) -> 'socket object':
    """
    Create world-writable socket in given sock_location
    or reuse existing one
    :param sock_location: socket address
    :return: socket object
    """
    listen_fds = int(os.environ.get("LISTEN_FDS", 0))
    if listen_fds == 0:
        with umask_0():
            try:
                # sock.close does not remove the file
                os.unlink(sock_location)
            except FileNotFoundError:
                pass
            sockobj = socket(AF_UNIX)
            sockobj.bind(sock_location)
            sockobj.listen()
    else:
        sockobj = fromfd(3, AF_UNIX, SOCK_STREAM)
        sockobj.listen()
    return sockobj


def get_current_cpu_throttling_time(lve_id: int) -> int:
    """
    Retrieve current value of CPU throttled time.
    Return 0 in case of failures
    """
    if not is_panel_feature_supported(Feature.LVE):
        return 0
    marker = 'throttled_time'
    stat_file = f'/sys/fs/cgroup/cpu,cpuacct/lve{lve_id}/cpu.stat'
    try:
        with open(stat_file) as stat_values:
            for value in stat_values:
                if value.startswith(marker):
                    logger.debug('%s', value)
                    return int(value.strip().split(marker)[-1].strip())
    except OSError as e:
        logger.error('Failed to open %s: %s', stat_file, str(e))
    return 0


def _selectorctl_get_version(username: str) -> Optional[tuple]:
    """
    'selectorctl -u username --user-current' command
    :param username: name of user
    :return: tuple(stdout, stderr) or None if command fails
    """
    _selectorctl = '/usr/bin/selectorctl'
    if not os.path.isfile(_selectorctl):
        return None
    try:
        result = subprocess.run([_selectorctl,
                                 '-u',
                                 username,
                                 '--user-current'],
                                capture_output=True, text=True, check=True)
        return result.stdout.strip(), result.stderr.strip()
    except subprocess.CalledProcessError as e:
        logger.warning('Failed to get selectorctl user-current',
                       extra={'err': str(e)})
    except subprocess_errors as e:
        logger.error('selectorctl --user-current failed: %s',
                     str(e))


def cagefsctl_get_prefix(username: str) -> Optional[str]:
    """
    'cagefsctl --get-prefix username' command
    :param username: name of user
    :return: cagefsctl prefix for given username
            or None if command fails
    """
    _cagefsctl = '/usr/sbin/cagefsctl'
    if not os.path.isfile(_cagefsctl):
        return None
    try:
        result = subprocess.run([_cagefsctl,
                                 '--getprefix',
                                 username],
                                capture_output=True, text=True, check=True)
        return result.stdout.strip()
    except subprocess.CalledProcessError as e:
        logger.warning('Failed to get cagefsctl prefix',
                       extra={'err': str(e)})
    except subprocess_errors as e:
        logger.error('cagefsctl --getprefix failed: %s',
                     str(e))


def _cagefsctl_remount(username: str = None) -> None:
    """
    'cagefsctl --remount username' or 'cagefsctl --remount-all' command
    :param username: name of user or None (for remount-all)
    """
    _cagefsctl = '/usr/sbin/cagefsctl'
    if not os.path.isfile(_cagefsctl):
        return
    if username is None:
        args = [_cagefsctl, '--wait-lock', '--remount-all']
    else:
        args = [_cagefsctl, '--remount', username]
    try:
        subprocess.run(args, check=True, capture_output=True)
        logger.info('Remounted %s', username)
    except subprocess.CalledProcessError as e:
        logger.warning('Failed to remount cagefs',
                       extra={'err': str(e)})
    except subprocess_errors as e:
        logger.error('cagefsctl --remount failed: %s',
                     str(e))


def _is_cagefs_enabled(username: str) -> bool:
    """
    'cagefsctl --user-status username' command
    :param username: name of user
    :return: True if user has Enabled status, False otherwise
    """
    _cagefsctl = '/usr/sbin/cagefsctl'
    if not os.path.isfile(_cagefsctl):
        return False
    try:
        result = subprocess.run([_cagefsctl,
                                 '--user-status',
                                 username],
                                capture_output=True, text=True)
        return 'Enabled' in result.stdout.strip()
    except subprocess_errors as e:
        logger.error('cagefsctl --user-status failed: %s',
                     str(e))


def _is_selector_phpd_location_set() -> bool:
    """
    Check if there is php.d.location = selector
    set in /etc/cl.selector/symlinks.rules
    """
    try:
        with open('/etc/cl.selector/symlinks.rules') as rules_file:
            contents = rules_file.read()
    except OSError:
        return False
    return 'selector' in contents


def no_active_tasks() -> bool:
    """Check if there are no active tasks (== empty task storage)"""
    with dbm_storage(local_tasks_storage) as task_storage:
        return len(task_storage.keys()) == 0


def switch_schedstats(enabled: bool) -> None:
    """
    Switch on/off throttle statistics gathering by kmodlve
    :param enabled: True or False
    """
    if not is_panel_feature_supported(Feature.LVE):
        # do nothing if there is no LVE feature
        return

    try:
        with open('/proc/sys/kernel/sched_schedstats', mode='wb',
                  buffering=0) as f:
            f.write(b'1' if enabled else b'0')
    except OSError as e:
        logger.info('Failed to set sched_schedstats to %s: %s',
                    enabled, str(e))


def is_xray_app_available() -> bool:
    """
    Check if end-users have access to X-Ray UI of End-User plugin
    """
    return UIConfig().get_param('hideXrayApp', 'uiSettings') is False


def is_xray_user_agent_active() -> bool:
    """Check if User Agent is listening"""
    with socket(AF_UNIX, SOCK_STREAM) as s:
        try:
            s.connect(user_agent_sock)
        except (ConnectionError, OSError):
            return False
    return True


def ssa_disabled() -> bool:
    """Check if SSA is disabled by its internal flag-file"""
    return not os.path.isfile('/usr/share/clos_ssa/ssa_enabled')


def is_file_recently_modified(filepath: str) -> bool:
    """Check is file was modified during the last day"""
    # 86400sec == 1day
    try:
        return timestamp() - os.stat(filepath).st_mtime < 86400
    except OSError:
        return False


def get_user_php_version(user):
    with drop_privileges(user):
        result = php_get_vhost_versions_user()
    return result

# --------- CONTEXT MANAGERS ---------


@contextmanager
def filelock(fd: 'file object providing a fileno() method') -> None:
    """
    Context manager for locking given file object
    :param fd: а file object providing a fileno() method
    """
    # try to lock file with waiting for lock to be released
    # will be executed as __enter__
    for _ in range(120):
        try:
            fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
            logger.info('File %s locked', fd)
            break
        except OSError as e:
            logger.info('Failed to lock: %s', str(e))
            # raise on unrelated IOErrors
            if e.errno not in (errno.EAGAIN, errno.EACCES):
                raise
            time.sleep(0.5)
    else:
        raise XRayError(_('Failed to lock at all. Exiting thread'),
                        flag='warning')
    try:
        yield
    finally:
        # release lock
        # will be executed as __exit__
        fcntl.flock(fd, fcntl.LOCK_UN)
        logger.info('File %s unlocked', fd)


@contextmanager
def dbm_storage(filename: str, is_shelve: bool = False):
    """
    Context manager for waiting for lock to be released for DBM file storage,
    either plain DBM or a Shelf object
    (desired return value is controlled by _shelve_instance flag)
    :param filename: a DBM file to open
    :param is_shelve: if a shelve file should be opened instead of plain DBM
    """
    _file = os.path.basename(filename)
    _err = None
    for _ in range(100):
        try:
            if is_shelve:
                storage = shelve.open(filename)
            else:
                storage = dbm.open(filename, 'c')
            logger.debug('Storage %s opened', _file)
            break
        except dbm.error as e:
            logger.info('[#%i] Failed to open storage %s: %s', _,
                        _file, e)
            _err = e
            time.sleep(0.3)
    else:
        raise RuntimeError(
            f'Failed to open {_file} storage: {_err}')
    try:
        yield storage
    finally:
        storage.close()
        logger.debug('Storage %s closed', _file)


@contextmanager
def umask_0(mask: int = 0) -> None:
    """
    Context manager for dropping umask
    """
    prev = os.umask(mask)
    yield
    os.umask(prev)


@contextmanager
def set_privileges(target_uid: int = None, target_gid: int = None,
                   target_path='.', mask: int = None, with_check=True) -> None:
    """
    Context manager to drop privileges during some operation
    and then restore them back.
    If target_uid or target_gid are given, use input values.
    Otherwise, stat target_uid and target_gid from given target_path.
    If no target_path given, use current directory.
    Use mask if given.
    :param target_uid: uid to set
    :param target_gid: gid to set
    :param target_path: directory or file to stat for privileges,
                       default -- current directory
    :param mask: umask to use
    :param with_check: check the result of switching privileges
    """
    prev_uid = os.getuid()
    prev_gid = os.getgid()

    permission_issue_message = _('Unable to execute required operation: permission issue')
    try:
        stat_info = os.stat(target_path)
    except OSError:
        stat_info = None

    if target_uid is None:
        if stat_info is None:
            target_uid = prev_uid
        else:
            target_uid = stat_info.st_uid
    if target_gid is None:
        if stat_info is None:
            target_gid = prev_gid
        else:
            target_gid = stat_info.st_gid

    if mask is not None:
        prev = os.umask(mask)

    if prev_gid != target_gid:
        os.setegid(target_gid)
        logger.debug('Dropped GID privs to %s', target_gid)
        if with_check and os.getegid() != target_gid:
            # break operation if privileges dropping failed
            raise XRayError(permission_issue_message)
    if prev_uid != target_uid:
        os.seteuid(target_uid)
        logger.debug('Dropped UID privs to %s', target_uid)
        if with_check and os.geteuid() != target_uid:
            if prev_gid != target_gid:
                # check if GID should be restored
                os.setegid(prev_gid)
            # break operation if privileges dropping failed
            raise XRayError(permission_issue_message)
    yield
    if prev_uid != target_uid:
        os.seteuid(prev_uid)
        logger.debug('Restored UID privs to %s', prev_uid)
    if prev_gid != target_gid:
        os.setegid(prev_gid)
        logger.debug('Restored GID privs to %s', prev_gid)

    if mask is not None:
        os.umask(prev)


@contextmanager
def user_context(uid, gid):
    """
    Dive into user context by dropping permissions
    to avoid most of the security issues.

    Does not cover cagefs case because it also requires nsenter,
    which is only available with execve() call in our system
    """
    try:
        os.setegid(gid)
        os.seteuid(uid)

        yield
    finally:
        os.seteuid(0)
        os.setegid(0)



def retry_on_exceptions(max_retries, exceptions_to_retry):
    """
    Decorator to retry method on specific exceptions
    """
    def decorator(func):
        def wrapper(*args, **kwargs):
            retries = 0
            exception = ValueError(_('Request to website failed even after %s retries.') % str(max_retries))
            while retries < max_retries:
                try:
                    return func(*args, **kwargs)
                except tuple(exceptions_to_retry) as e:
                    retries += 1
                    logging.warning('Retry to request website, exception: %s', str(e))
                    exception = e
                    time.sleep(1)  # Wait for 1 second before retrying
            raise exception
        return wrapper
    return decorator

Spamworldpro Mini