Spamworldpro Mini Shell
Spamworldpro


Server : Apache
System : Linux indy02.toastserver.com 3.10.0-962.3.2.lve1.5.85.el7.x86_64 #1 SMP Thu Apr 18 15:18:36 UTC 2024 x86_64
User : palandch ( 1163)
PHP Version : 7.1.33
Disable Function : NONE
Directory :  /opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/contracts/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Current File : //opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/contracts/myimunify_id.py
import pwd
import uuid
from pathlib import Path
from typing import Dict, List, Optional

from defence360agent.contracts.permissions import logger
from defence360agent.model import instance
from defence360agent.myimunify.model import MyImunify, update_users_protection
from defence360agent.subsys.panels.hosting_panel import HostingPanel
from defence360agent.utils import safe_fileops

MYIMUNIFY_ID_FILE_NAME = ".myimunify_id"


class MyImunifyIdError(Exception):
    """Exception representing issues related to MyImunify id"""


async def add_myimunify_user(
    sink, user: str, protection: bool
) -> Optional[str]:
    """Save subscription type to the DB and generate id file"""

    myimunify, _ = MyImunify.get_or_create(user=user)
    myimunify.save()
    await update_users_protection(sink, [user], protection)
    logger.info("Applied setting MyImunify=%s for user %s", protection, user)

    try:
        myimunify_id = await _get_or_generate_id(user)
    except MyImunifyIdError:
        # User no longer exists
        return None

    return myimunify_id


async def get_myimunify_users() -> List[Dict]:
    """
    Get a list of MyImunify users, their subscription types and unique ids
    """

    users = []
    user_details = await HostingPanel().get_user_details()
    myimunify_user_to_id = await _myimunify_user_to_id()
    with instance.db.transaction():
        for user, myimunify_uid in sorted(myimunify_user_to_id.items()):
            record, _ = MyImunify.get_or_create(user=user)
            users.append(
                {
                    "email": user_details.get(user, {}).get("email", ""),
                    "username": user,
                    "myimunify_id": myimunify_uid,
                    "protection": record.protection,
                    "locale": user_details.get(user, {}).get("locale", ""),
                }
            )
    return users


async def _myimunify_user_to_id() -> Dict[str, str]:
    """Get a list of users and their MyImunify ids"""

    user_to_id = {}
    for user in await HostingPanel().get_users():
        try:
            user_to_id[user] = await _get_or_generate_id(user)
        except MyImunifyIdError:
            # User does not exist
            continue
        except safe_fileops.UnsafeFileOperation as e:
            logger.error(
                "Unable to generate id for user=%s, error=%s", user, str(e)
            )
            continue
    return user_to_id


async def _get_or_generate_id(user: str) -> str:
    """
    Read MyImunify id if exists or generate a new one and write into the file
    """
    id_file = await _get_myimunify_id_file(user)
    try:
        return _read_id(id_file)
    except (FileNotFoundError, MyImunifyIdError):
        myimunify_id = uuid.uuid1().hex
        return await _write_id(myimunify_id, id_file)


async def _write_id(myimunify_id: str, id_file: Path) -> str:
    """Write MyImunify id to file"""
    text = (
        "# DO NOT EDIT\n"
        "# This file contains MyImunify id unique to this user\n"
        "\n"
        f"{myimunify_id}\n"
    )
    try:
        await safe_fileops.write_text(str(id_file), text)
    except (OSError, PermissionError) as e:
        logger.error("Unable to write myimunify_id in user home dir: %s", e)
        raise MyImunifyIdError from e
    return myimunify_id


def _read_id(id_file: Path) -> str:
    """Read MyImunify id from file"""

    with id_file.open("r") as f:
        for line in reversed(f.readlines()):
            if line and not line.startswith("#"):
                if myimunify_id := line.strip():
                    return myimunify_id

    raise MyImunifyIdError


async def _get_myimunify_id_file(user: str) -> Path:
    """Get a file with MyImunify id and create it if does not exist"""

    try:
        user_pwd = pwd.getpwnam(user)
    except KeyError as e:
        logger.error("No such user: %s", user)
        raise MyImunifyIdError from e
    else:
        id_file = Path(user_pwd.pw_dir) / MYIMUNIFY_ID_FILE_NAME
        if not id_file.exists():
            if not id_file.parent.exists():
                logger.error("No such user homedir: %s", user)
                raise MyImunifyIdError
            try:
                await safe_fileops.touch(str(id_file))
            except (PermissionError, OSError) as e:
                logger.error(
                    "Unable to put myimunify_id in user home dir: %s", e
                )
                raise MyImunifyIdError from e
    return id_file

Spamworldpro Mini