Server : Apache System : Linux indy02.toastserver.com 3.10.0-962.3.2.lve1.5.85.el7.x86_64 #1 SMP Thu Apr 18 15:18:36 UTC 2024 x86_64 User : palandch ( 1163) PHP Version : 7.1.33 Disable Function : NONE Directory : /opt/imunify360/venv/lib64/python3.11/site-packages/defence360agent/subsys/panels/generic/ |
import asyncio import functools import json import logging import os from collections import defaultdict from typing import Dict, List, Sequence, Set import cerberus import yaml from defence360agent.api.integration_conf import ( ClIntegrationConfig, IntegrationConfig, ) from defence360agent.api.jwt_issuer import JWTIssuer from defence360agent.application.determine_hosting_panel import ( is_generic_panel_installed, ) from defence360agent.rpc_tools.lookup import UserType from defence360agent.utils import ( CheckRunError, check_run, get_non_system_users, ) from .. import base logger = logging.getLogger(__name__) _SCHEMA_PATH_TMPL = ( os.path.dirname(__file__) + "/users_script_schemas/schema-{}.yaml" ) ADMIN_LIST_FILE_PATH = "/etc/sysconfig/imunify360/auth.admin" METADATA = "metadata" @functools.lru_cache(maxsize=2) def _get_validator(script: str) -> cerberus.Validator: """Returns a validator for given script.""" with open(_SCHEMA_PATH_TMPL.format(script)) as schema_file: schema = yaml.safe_load(schema_file) if script is not METADATA: schema[METADATA] = {"required": True} return cerberus.Validator(schema) def get_users_default_impl(): return [dict(name=pw.pw_name) for pw in get_non_system_users()] def _get_conf_path(cl, script): d = cl.to_dict() if "integration_scripts" in d and script in d["integration_scripts"]: return d["integration_scripts"][script] return None async def get_integration_data(script: str): path = _get_conf_path(IntegrationConfig(), script) if not path: path = _get_conf_path(ClIntegrationConfig(), script) if not path: raise IntegrationScriptError( "%s not found neither in " "/etc/sysconfig/imunify360/integration.conf " "nor in /opt/cpvendor/etc/integration.ini." % script ) return await _get_integration_data(script, path) async def _get_integration_data(script: str, path: str): try: stdout = await check_run(path, shell=True) except CheckRunError as e: raise IntegrationScriptError( "Integrations script {script} " "failed with exit code {e.returncode} \n" "{e.stderr}".format(script=script, e=e) ) try: data = json.loads(stdout.decode()) except (UnicodeDecodeError, json.JSONDecodeError) as e: raise IntegrationScriptError( "Cannot decode output of %s as JSON" % path ) from e if not isinstance(data, dict): raise IntegrationScriptError("%s should return dict" % path) metadata_validator = _get_validator(METADATA) if not metadata_validator.validate(data): raise IntegrationScriptError( "Validation error in metadata of %s script: %s" % (script, metadata_validator.errors) ) if data[METADATA]["result"] != "ok": metadata_error = data[METADATA]["result"] if "message" in data[METADATA]: metadata_error += ": %s" % data[METADATA]["message"] raise IntegrationScriptError(metadata_error) validator = _get_validator(script) if not validator.validate(data): raise IntegrationScriptError( "Validation error in %s script: %s" % (script, validator.errors) ) return data["data"] async def _get_client_data(): try: users = await get_users_integration_data() domains = await get_integration_data("domains") for k, v in domains.items(): if v["owner"]: user_domains = users.setdefault(v["owner"], []) user_domains.append(k) return [{"name": k, "domains": v} for k, v in users.items()] except IntegrationScriptError: logger.warning( "Applying default implementation of users and domains lists" ) return get_users_default_impl() async def get_users_integration_data(): users = await get_integration_data("users") users_dict = {} for user in users: if not user["username"]: logger.warning(f"Found user with an empty username: {user}") else: users_dict[user["username"]] = [] return users_dict async def get_domain_data(): try: return await get_integration_data("domains") except IntegrationScriptError: logger.warning("Could not parse domains lists") return {} async def get_admin_list() -> List[str]: script_name = "admins" admins_set = {"root"} admins_from_integration_scripts = await asyncio.gather( _get_integration_data( script_name, _get_conf_path( IntegrationConfig(), script_name, ), ), _get_integration_data( script_name, _get_conf_path( ClIntegrationConfig(), script_name, ), ), return_exceptions=True, ) custom_admins = { admin["name"] for admins in admins_from_integration_scripts if isinstance(admins, list) # skip exceptions for admin in admins } if not custom_admins: logger.warning( "Error occurred during extracting admins " "from integration configs: %s", admins_from_integration_scripts, ) admins_set |= custom_admins try: with open(ADMIN_LIST_FILE_PATH) as admin_list_file: admins_set.update(admin_list_file.read().splitlines()) except OSError: logger.warning( "Failed to retrieve admins list from %s", ADMIN_LIST_FILE_PATH ) return list(admins_set) class IntegrationScriptError(base.PanelException): def __init__(self, *args, **kwargs): super().__init__(*args) logger.warning(self) class GenericPanel(base.AbstractPanel): """ Panel, UI to which is provided by imunify{-antivirus,360-firewall}-generic.{rpm,deb} """ NAME = "generic panel" exception = IntegrationScriptError @classmethod def is_installed(cls): return is_generic_panel_installed() # pragma: no cover async def enable_imunify360_plugin(self, name=None): pass async def disable_imunify360_plugin(self, plugin_name=None): pass @classmethod async def version(cls): try: info = await get_integration_data("panel_info") return "{name} {version}".format(**info) except IntegrationScriptError: return "0" async def get_user_domains(self): users = await _get_client_data() result = [] for user in users: result.extend(user.get("domains", tuple())) return result async def get_users(self) -> List[str]: users = await _get_client_data() return [user["name"] for user in users] async def get_domain_to_owner(self) -> Dict[str, List[str]]: users = await _get_client_data() result = defaultdict(list) for user in users: for domain in user.get("domains", []): result[domain].append(user["name"]) return result async def get_domains_per_user(self) -> Dict[str, List[str]]: users = await _get_client_data() return {user["name"]: user.get("domains", []) for user in users} def authenticate(self, protocol, data: dict): if protocol._uid != 0 and data["command"] != ["login", "pam"]: token = data["params"].pop("jwt", None) parsed_token = JWTIssuer.parse_token(token) return parsed_token["user_type"], ( parsed_token["user_name"] if parsed_token["user_type"] == UserType.NON_ROOT else None ) else: return protocol.user, None def basedirs(self) -> Set[str]: conf = IntegrationConfig().to_dict() if "malware" in conf and "basedir" in conf["malware"]: return set(conf["malware"]["basedir"].split()) return set() async def list_docroots(self) -> Dict[str, str]: domains = await get_domain_data() return {v["document_root"]: domain for domain, v in domains.items()}