Server : Apache System : Linux indy02.toastserver.com 3.10.0-962.3.2.lve1.5.85.el7.x86_64 #1 SMP Thu Apr 18 15:18:36 UTC 2024 x86_64 User : palandch ( 1163) PHP Version : 7.1.33 Disable Function : NONE Directory : /opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/utils/ |
import json import os import urllib.parse import defence360agent.subsys.panels.hosting_panel as hp from logging import getLogger from defence360agent.contracts import config from defence360agent.utils.config import update_config from defence360agent.myimunify.model import update_users_protection, MyImunify from defence360agent.utils.wordpress_mu_plugin import ( MU_PLUGIN_INSTALLATION, ADVICE_EMAIL_NOTIFICATION, WordPressMuPlugin, ) logger = getLogger(__name__) MU_PLUGIN_KEYS = [MU_PLUGIN_INSTALLATION, ADVICE_EMAIL_NOTIFICATION] class WhmcsConf: """ read/write data passed by whmcs Internal use, for commands called from whcms only it saves ALL data came from whcms w/o any validation deliberately in order to simplify compatability with current installed whmcs plugin """ path = "/var/imunify360/whmcs_data.json" def read(self): if not os.path.exists(self.path): return {} try: with open(self.path, "r") as f: raw_data = f.read() except IOError as e: logger.error("Failed to read whmcs data file: %s", str(e)) return {} try: data = json.loads(raw_data) except (json.JSONDecodeError, ValueError): logger.error("Malformed file with whmcs data: %s", raw_data) return {} return data def save(self, data): """ Saves ALL data passed by WHMCS it should not have any validations deliberately to be as compatible as possible with current installed WHMCS plugin """ current_data = self.read() # no validation needed current_data.update(data) try: with open(self.path, "w") as file: json.dump(current_data, file, indent=4) except IOError as e: logger.error("Failed to write whmcs data to file: %s", str(e)) async def sync_billing_data(sink, data): my_imunify_updates = data.get(config.MY_IMUNIFY_KEY) return await mi_update(sink, my_imunify_updates) def convert_to_config_key_value(key, value): """ Convert several keys to config key, otherwise just return same key any key is acceptable """ if key == "status": return ( "enable", { "active": True, "inactive": False, }[value], ) elif key == "protection": return ( "protection", { "enabled": True, "disabled": False, }[value], ) elif key == "mu_plugin_installation": return "smart_advice_allowed", value return key, value def convert_from_config_key_value(key, value): """ Convert several keys from config format, otherwise just return same key any key is acceptable """ if key == "enable": return "status", ("active" if value else "inactive") elif key == "protection": return "protection", ("enabled" if value else "disabled") elif key == "smart_advice_allowed": return "mu_plugin_installation", value return key, value async def get_users(): return await hp.HostingPanel().get_users() async def mi_update(sink, requested_myimunify_data): """ Updates supported parameters if passed, otherwise does nothing updates 2 config parameters (if specified): status and purchase_page_url updates protection status for users (if specified) """ if not requested_myimunify_data: logger.info("Nothing to update for Myimunify") return whmcs_activation_status = requested_myimunify_data.get("status") if whmcs_activation_status: # no validation needed WhmcsConf().save({"status": requested_myimunify_data.get("status")}) await update_configs(sink, requested_myimunify_data) WordPressMuPlugin().prepare_for_mu_plugin_installation( whmcs_activation_status, requested_myimunify_data.get(MU_PLUGIN_INSTALLATION), ) if not requested_myimunify_data.get("protection"): return await get_current_whmcs_data([]) all_users = await get_users() target_users = requested_myimunify_data.get("users", []) or all_users filtered_passed_users = [ user for user in target_users if user in all_users ] if filtered_passed_users: logger.info( "Updating protection status for users=%s", str(filtered_passed_users), ) await update_users_protection( sink, filtered_passed_users, convert_to_config_key_value( "protection", requested_myimunify_data["protection"] )[1], ) else: logger.warning("No users to update protection for") return await get_current_whmcs_data(filtered_passed_users) async def update_configs(sink, requested_myimunify_data): # those params are stored in config mi_config_parameters = ( ["purchase_page_url"] if config.is_mi_freemium_license() else ["purchase_page_url", "status"] ) mi_config_data = dict( convert_to_config_key_value(param, value) for param, value in requested_myimunify_data.items() if param in mi_config_parameters ) mu_plugin_data = dict( convert_to_config_key_value(param, value) for param, value in requested_myimunify_data.items() if param in MU_PLUGIN_KEYS ) config_dict = {} if mi_config_data: config_dict[config.MY_IMUNIFY_KEY] = mi_config_data if mu_plugin_data: config_dict["CONTROL_PANEL"] = mu_plugin_data if config_dict: logger.info("Updating config with data: %s", str(config_dict)) # updates only 2 supported keys: purchase_page_url and status await update_config(sink, config_dict) async def get_users_info(users): """ Returns information from database based on passed users if no users passed - returns for all users """ result = ( MyImunify.select().where(MyImunify.user.in_(users)).dicts() if users else MyImunify.select().dicts() ) return [ { "user": item["user"], "protection": convert_from_config_key_value( "protection", item["protection"] )[1], } for item in result ] async def get_current_whmcs_data(users): """ Returns the current configuration and user protection status. {MY_IMUNIFY: {'status': 'active/inactive', 'purchase_page_url': 'SOMEURL', 'protection': []}} """ conf_data = config.ConfigFile().config_to_dict() current_config = dict( convert_from_config_key_value(param, value) for param, value in conf_data.get(config.MY_IMUNIFY_KEY, {}).items() ) cp_data = conf_data.get("CONTROL_PANEL") current_config[MU_PLUGIN_INSTALLATION] = convert_from_config_key_value( "smart_advice_allowed", cp_data.get("smart_advice_allowed") )[1] current_config[ADVICE_EMAIL_NOTIFICATION] = cp_data.get( ADVICE_EMAIL_NOTIFICATION ) current_config["protection"] = await get_users_info(users) return current_config def get_upgrade_url_link(username, domain): purchase_url_link = ( config.MyImunifyConfig.PURCHASE_PAGE_URL.rstrip("/") + "/?" + urllib.parse.urlencode( { "m": "cloudlinux_advantage", "action": "provisioning", "suite": "my_imunify_account_protection", "username": username, "domain": domain, "server_ip": hp.HostingPanel().get_server_ip(), } ) ) return purchase_url_link