Server : Apache System : Linux indy02.toastserver.com 3.10.0-962.3.2.lve1.5.85.el7.x86_64 #1 SMP Thu Apr 18 15:18:36 UTC 2024 x86_64 User : palandch ( 1163) PHP Version : 7.1.33 Disable Function : NONE Directory : /opt/imunify360/venv/lib64/python3.11/site-packages/defence360agent/contracts/ |
import pwd import uuid from pathlib import Path from typing import Dict, List, Optional from defence360agent.contracts.permissions import logger from defence360agent.model import instance from defence360agent.myimunify.model import MyImunify, update_users_protection from defence360agent.subsys.panels.hosting_panel import HostingPanel from defence360agent.utils import safe_fileops MYIMUNIFY_ID_FILE_NAME = ".myimunify_id" class MyImunifyIdError(Exception): """Exception representing issues related to MyImunify id""" async def add_myimunify_user( sink, user: str, protection: bool ) -> Optional[str]: """Save subscription type to the DB and generate id file""" myimunify, _ = MyImunify.get_or_create(user=user) myimunify.save() await update_users_protection(sink, [user], protection) logger.info("Applied setting MyImunify=%s for user %s", protection, user) try: myimunify_id = await _get_or_generate_id(user) except MyImunifyIdError: # User no longer exists return None return myimunify_id async def get_myimunify_users() -> List[Dict]: """ Get a list of MyImunify users, their subscription types and unique ids """ users = [] user_details = await HostingPanel().get_user_details() myimunify_user_to_id = await _myimunify_user_to_id() with instance.db.transaction(): for user, myimunify_uid in sorted(myimunify_user_to_id.items()): record, _ = MyImunify.get_or_create(user=user) users.append( { "email": user_details.get(user, {}).get("email", ""), "username": user, "myimunify_id": myimunify_uid, "protection": record.protection, "locale": user_details.get(user, {}).get("locale", ""), } ) return users async def _myimunify_user_to_id() -> Dict[str, str]: """Get a list of users and their MyImunify ids""" user_to_id = {} for user in await HostingPanel().get_users(): try: user_to_id[user] = await _get_or_generate_id(user) except MyImunifyIdError: # User does not exist continue except safe_fileops.UnsafeFileOperation as e: logger.error( "Unable to generate id for user=%s, error=%s", user, str(e) ) continue return user_to_id async def _get_or_generate_id(user: str) -> str: """ Read MyImunify id if exists or generate a new one and write into the file """ id_file = await _get_myimunify_id_file(user) try: return _read_id(id_file) except (FileNotFoundError, MyImunifyIdError): myimunify_id = uuid.uuid1().hex return await _write_id(myimunify_id, id_file) async def _write_id(myimunify_id: str, id_file: Path) -> str: """Write MyImunify id to file""" text = ( "# DO NOT EDIT\n" "# This file contains MyImunify id unique to this user\n" "\n" f"{myimunify_id}\n" ) try: await safe_fileops.write_text(str(id_file), text) except (OSError, PermissionError) as e: logger.error("Unable to write myimunify_id in user home dir: %s", e) raise MyImunifyIdError from e return myimunify_id def _read_id(id_file: Path) -> str: """Read MyImunify id from file""" with id_file.open("r") as f: for line in reversed(f.readlines()): if line and not line.startswith("#"): if myimunify_id := line.strip(): return myimunify_id raise MyImunifyIdError async def _get_myimunify_id_file(user: str) -> Path: """Get a file with MyImunify id and create it if does not exist""" try: user_pwd = pwd.getpwnam(user) except KeyError as e: logger.error("No such user: %s", user) raise MyImunifyIdError from e else: id_file = Path(user_pwd.pw_dir) / MYIMUNIFY_ID_FILE_NAME if not id_file.exists(): if not id_file.parent.exists(): logger.error("No such user homedir: %s", user) raise MyImunifyIdError try: await safe_fileops.touch(str(id_file)) except (PermissionError, OSError) as e: logger.error( "Unable to put myimunify_id in user home dir: %s", e ) raise MyImunifyIdError from e return id_file