Spamworldpro Mini Shell
Spamworldpro


Server : Apache
System : Linux indy02.toastserver.com 3.10.0-962.3.2.lve1.5.85.el7.x86_64 #1 SMP Thu Apr 18 15:18:36 UTC 2024 x86_64
User : palandch ( 1163)
PHP Version : 7.1.33
Disable Function : NONE
Directory :  /opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/utils/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Current File : //opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/utils/whmcs.py
import json
import os
import urllib.parse

import defence360agent.subsys.panels.hosting_panel as hp

from logging import getLogger
from defence360agent.contracts import config
from defence360agent.utils.config import update_config
from defence360agent.myimunify.model import update_users_protection, MyImunify
from defence360agent.utils.wordpress_mu_plugin import (
    MU_PLUGIN_INSTALLATION,
    ADVICE_EMAIL_NOTIFICATION,
    WordPressMuPlugin,
)


logger = getLogger(__name__)

MU_PLUGIN_KEYS = [MU_PLUGIN_INSTALLATION, ADVICE_EMAIL_NOTIFICATION]


class WhmcsConf:
    """
    read/write data passed by whmcs
    Internal use, for commands called from whcms only
    it saves ALL data came from whcms w/o any validation deliberately
    in order to simplify compatability with current installed whmcs plugin
    """

    path = "/var/imunify360/whmcs_data.json"

    def read(self):
        if not os.path.exists(self.path):
            return {}

        try:
            with open(self.path, "r") as f:
                raw_data = f.read()
        except IOError as e:
            logger.error("Failed to read whmcs data file: %s", str(e))
            return {}

        try:
            data = json.loads(raw_data)
        except (json.JSONDecodeError, ValueError):
            logger.error("Malformed file with whmcs data: %s", raw_data)
            return {}

        return data

    def save(self, data):
        """
        Saves ALL data passed by WHMCS
        it should not have any validations deliberately to be as compatible as possible
        with current installed WHMCS plugin
        """
        current_data = self.read()
        # no validation needed
        current_data.update(data)

        try:
            with open(self.path, "w") as file:
                json.dump(current_data, file, indent=4)
        except IOError as e:
            logger.error("Failed to write whmcs data to file: %s", str(e))


async def sync_billing_data(sink, data):
    my_imunify_updates = data.get(config.MY_IMUNIFY_KEY)
    return await mi_update(sink, my_imunify_updates)


def convert_to_config_key_value(key, value):
    """
    Convert several keys to config key, otherwise just return same key
    any key is acceptable
    """
    if key == "status":
        return (
            "enable",
            {
                "active": True,
                "inactive": False,
            }[value],
        )
    elif key == "protection":
        return (
            "protection",
            {
                "enabled": True,
                "disabled": False,
            }[value],
        )
    elif key == "mu_plugin_installation":
        return "smart_advice_allowed", value
    return key, value


def convert_from_config_key_value(key, value):
    """
    Convert several keys from config format, otherwise just return same key
    any key is acceptable
    """
    if key == "enable":
        return "status", ("active" if value else "inactive")
    elif key == "protection":
        return "protection", ("enabled" if value else "disabled")
    elif key == "smart_advice_allowed":
        return "mu_plugin_installation", value
    return key, value


async def get_users():
    return await hp.HostingPanel().get_users()


async def mi_update(sink, requested_myimunify_data):
    """
    Updates supported parameters if passed, otherwise does nothing
    updates 2 config parameters (if specified): status and purchase_page_url
    updates protection status for users (if specified)
    """
    if not requested_myimunify_data:
        logger.info("Nothing to update for Myimunify")
        return

    whmcs_activation_status = requested_myimunify_data.get("status")
    if whmcs_activation_status:
        # no validation needed
        WhmcsConf().save({"status": requested_myimunify_data.get("status")})

    await update_configs(sink, requested_myimunify_data)
    WordPressMuPlugin().prepare_for_mu_plugin_installation(
        whmcs_activation_status,
        requested_myimunify_data.get(MU_PLUGIN_INSTALLATION),
    )

    if not requested_myimunify_data.get("protection"):
        return await get_current_whmcs_data([])

    all_users = await get_users()
    target_users = requested_myimunify_data.get("users", []) or all_users
    filtered_passed_users = [
        user for user in target_users if user in all_users
    ]

    if filtered_passed_users:
        logger.info(
            "Updating protection status for users=%s",
            str(filtered_passed_users),
        )
        await update_users_protection(
            sink,
            filtered_passed_users,
            convert_to_config_key_value(
                "protection", requested_myimunify_data["protection"]
            )[1],
        )
    else:
        logger.warning("No users to update protection for")

    return await get_current_whmcs_data(filtered_passed_users)


async def update_configs(sink, requested_myimunify_data):
    # those params are stored in config
    mi_config_parameters = (
        ["purchase_page_url"]
        if config.is_mi_freemium_license()
        else ["purchase_page_url", "status"]
    )

    mi_config_data = dict(
        convert_to_config_key_value(param, value)
        for param, value in requested_myimunify_data.items()
        if param in mi_config_parameters
    )

    mu_plugin_data = dict(
        convert_to_config_key_value(param, value)
        for param, value in requested_myimunify_data.items()
        if param in MU_PLUGIN_KEYS
    )

    config_dict = {}
    if mi_config_data:
        config_dict[config.MY_IMUNIFY_KEY] = mi_config_data

    if mu_plugin_data:
        config_dict["CONTROL_PANEL"] = mu_plugin_data

    if config_dict:
        logger.info("Updating config with data: %s", str(config_dict))
        # updates only 2 supported keys: purchase_page_url and status
        await update_config(sink, config_dict)


async def get_users_info(users):
    """
    Returns information from database based on passed users
    if no users passed - returns for all users
    """
    result = (
        MyImunify.select().where(MyImunify.user.in_(users)).dicts()
        if users
        else MyImunify.select().dicts()
    )
    return [
        {
            "user": item["user"],
            "protection": convert_from_config_key_value(
                "protection", item["protection"]
            )[1],
        }
        for item in result
    ]


async def get_current_whmcs_data(users):
    """
    Returns the current configuration and user protection status.
    {MY_IMUNIFY: {'status': 'active/inactive', 'purchase_page_url': 'SOMEURL', 'protection': []}}
    """
    conf_data = config.ConfigFile().config_to_dict()
    current_config = dict(
        convert_from_config_key_value(param, value)
        for param, value in conf_data.get(config.MY_IMUNIFY_KEY, {}).items()
    )

    cp_data = conf_data.get("CONTROL_PANEL")
    current_config[MU_PLUGIN_INSTALLATION] = convert_from_config_key_value(
        "smart_advice_allowed", cp_data.get("smart_advice_allowed")
    )[1]
    current_config[ADVICE_EMAIL_NOTIFICATION] = cp_data.get(
        ADVICE_EMAIL_NOTIFICATION
    )

    current_config["protection"] = await get_users_info(users)
    return current_config


def get_upgrade_url_link(username, domain):
    purchase_url_link = (
        config.MyImunifyConfig.PURCHASE_PAGE_URL.rstrip("/")
        + "/?"
        + urllib.parse.urlencode(
            {
                "m": "cloudlinux_advantage",
                "action": "provisioning",
                "suite": "my_imunify_account_protection",
                "username": username,
                "domain": domain,
                "server_ip": hp.HostingPanel().get_server_ip(),
            }
        )
    )
    return purchase_url_link

Spamworldpro Mini