Spamworldpro Mini Shell
Spamworldpro


Server : Apache
System : Linux indy02.toastserver.com 3.10.0-962.3.2.lve1.5.85.el7.x86_64 #1 SMP Thu Apr 18 15:18:36 UTC 2024 x86_64
User : palandch ( 1163)
PHP Version : 7.1.33
Disable Function : NONE
Directory :  /opt/cloudlinux/venv/lib64/python3.11/site-packages/xray/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Current File : //opt/cloudlinux/venv/lib64/python3.11/site-packages/xray/imunify_manager.py
# -*- coding: utf-8 -*-

# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2024 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT


import os
import logging
import subprocess
import json
import hashlib

from clcommon.clwpos_lib import find_wp_paths
from xray.internal.constants import advice_list_im360_cache
from xray.adviser.advice_helpers import filter_by_non_existence, ThirdPartyAdvice

ADV_TYPE = "IMUNIFY_PROTECTION"


class ImunifyManager:
    imunify360_agent = "/usr/bin/imunify360-agent"

    def __init__(self):
        self.logger = logging.getLogger('imunify_manager')

    @staticmethod
    def _filter_advice_list(advice_list):
        """
        Filter by non-exist user or website
        add more filters if needed
        """
        return filter_by_non_existence(advice_list)

    def call_im360_command(self, command):
        try:
            result = subprocess.run(command,
                                    text=True,
                                    capture_output=True,
                                    timeout=60,
                                    check=True)
            return json.loads(result.stdout)
        except (subprocess.CalledProcessError, OSError) as e:
            self.logger.exception('Unable to run IM360 command: %s', e)
        except subprocess.TimeoutExpired as e:
            self.logger.exception('Timeout expired while running IM360 command: %s', e)
        except json.JSONDecodeError as e:
            self.logger.exception('Unable to parse json of IM360 response: %s', e)
        return {}


    def is_im360_present(self):
        return os.path.exists(self.imunify360_agent)

    def is_im360_mu_plugin_disabled_server_wide(self):
        if not self.is_im360_present():
            return True
        settings = self.get_im360_settings()

        return not settings.get('mu_plugin_installation', False)

    def get_im360_settings(self):
        if not self.is_im360_present():
            return {}

        return self.call_im360_command([self.imunify360_agent, 'smart-advice', 'get-options', '--json'])

    def get_im360_protection_status(self, username):
        if not self.is_im360_present():
            return False

        mi_status = self.call_im360_command([self.imunify360_agent, 'myimunify', 'status', username, '--json'])
        if not mi_status:
            return False

        # "items": [{"username": "onemore", "protection": false}]
        items = mi_status.get('items', [])
        if items:
            return items[0].get('protection', False)
        return False

    def _make_advice(self, imunify_advice):
        """
        imunify advice item:
        {"id": 123,
        "server_id": null,
        "type": "malware_found_myimun_2",
        "date": 123,
        "severity": 1,
        "translation_id": "1",
        "parameters": {},
        "description": null,
        "link_text": null,
        "link": null,
        "dashboard": false,
        "popup": false,
        "snoozed_until": 0,
        "popup_title": null,
        "popup_description": null,
        "config_action": {}, "ignore": {},
        "notification": false,
        "smartadvice": true,
        "smartadvice_title": "Web hosting user account is infected",
        "smartadvice_description": "\nImunify detected live malware on the user account hosting this website:\n\n* inf1\n\n* inf2\n",
        "smartadvice_user": "isuser",
        "smartadvice_domain": "isuser.com",
        "smartadvice_docroot": "/",
        "ts": 123,
        "first_generated": 123,
        "iaid": "agent-iaid-123",
        "notification_body_html": null,
        "notification_period_limit": 0,
        "notification_subject": null,
        "notification_user": null}
        ->
        {'created_at': '2024-06-22T00:29:23.898986+00:00',
        'updated_at': '2024-06-22T00:29:23.898986+00:00',
        'metadata': {'username': 'tkcpanel', 'domain': 'tk-cpanel.com', 'website': '/'},
        'advice': {'id': 1, 'type': 'IMUNIFY_PROTECTION', 'status': 'review',
        'description': 'Turn on MyImunify Account protection',
        'detailed_description': 'To improve site security, enable the Imunify protection feature.',
        'is_premium': False, 'module_name': 'imunify', 'license_status': 'NOT_REQUIRED',
        'subscription': {'status': 'no',
        'upgrade_url': 'https://whmcs.dev.cloudlinux.com?username=tkcpanel&domain=tk-cpanel.com&server_ip=10.193.176.2
        &m=cloudlinux_advantage&action=provisioning&suite=my_imunify_account_protection'},
        'total_stages': 0, 'completed_stages': 0}}
        """
        advices_by_docroot = []
        username = imunify_advice['smartadvice_user']
        domain = imunify_advice['smartadvice_domain']
        infected_docroot = imunify_advice['smartadvice_docroot']
        upgrade_url = imunify_advice['upgrade_url']
        description = imunify_advice['smartadvice_title']
        detailed_description = imunify_advice['smartadvice_description']
        iaid = imunify_advice["iaid"]

        for site in find_wp_paths(infected_docroot):
            website = f'/{site}'
            # for analytics: iaid-hash(username-domain-website)
            hashed_udw = hashlib.md5(f"{username}-{domain}-{website}".encode('utf-8')).hexdigest()
            adv_id = f'{iaid}_{hashed_udw}'

            im360_protection_advice = ThirdPartyAdvice(username=username,
                                                       domain=domain,
                                                       website=website,
                                                       id=adv_id,
                                                       type=ADV_TYPE,
                                                       status='review',
                                                       description=description,
                                                       detailed_description=detailed_description,
                                                       is_premium=False,
                                                       module_name='imunify',
                                                       license_status='NOT_REQUIRED',
                                                       subscription_status='no',
                                                       upgrade_url=upgrade_url,
                                                       total_stages=0,
                                                       completed_stages=0)
            advices_by_docroot.append(im360_protection_advice.to_advice())
        return advices_by_docroot

    def im360_advice(self, cache_only=False) -> list:
        advices = []

        if not self.is_im360_present():
            return advices

        # Return only cache
        if cache_only:
            return self._im360_read_advice_cache()

        advice_list = self.call_im360_command([self.imunify360_agent, 'smart-advice', 'notifications', '--json'])
        logging.info('IM360 advice list: %s', str(advice_list))
        if advice_list:
            items = advice_list.get('items', [])
            for item in items:
                try:
                    advice_item = self._make_advice(item)
                except KeyError as e:
                    logging.error('Unable to make advice based on item: %s, malformed error: %s',
                                  str(item),
                                  str(e))
                    continue
                advices.extend(advice_item)
            advices = self._filter_advice_list([item for item in advices if item['advice']['type'].startswith(ADV_TYPE)])

        self._im360_update_advice_cache(advices)

        return advices

    def _im360_read_advice_cache(self) -> list:
        advice = []

        if not os.path.exists(advice_list_im360_cache):
            return advice

        try:
            with open(advice_list_im360_cache) as f:
                advice = self._filter_advice_list(json.load(f))
        except OSError as e:
            self.logger.exception('An OS error occurred while reading the cache for IM360: %s', e)
        except json.JSONDecodeError as e:
            self.logger.exception('Unable to read IM360 json advice cache: %s', e)

        return advice

    def _im360_update_advice_cache(self, advice):
        try:
            with open(advice_list_im360_cache, 'w') as f:
                json.dump(advice, f, indent=2)
        except OSError as e:
            self.logger.exception('An OS error occurred while updating the cache for IM360: %s', e)
        except json.JSONDecodeError as e:
            self.logger.exception('Unable to write IM360 json advice cache: %s', e)

Spamworldpro Mini