Spamworldpro Mini Shell
Spamworldpro


Server : Apache
System : Linux indy02.toastserver.com 3.10.0-962.3.2.lve1.5.85.el7.x86_64 #1 SMP Thu Apr 18 15:18:36 UTC 2024 x86_64
User : palandch ( 1163)
PHP Version : 7.1.33
Disable Function : NONE
Directory :  /opt/imunify360/venv/lib64/python3.11/site-packages/defence360agent/feature_management/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Current File : //opt/imunify360/venv/lib64/python3.11/site-packages/defence360agent/feature_management/lookup.py
import inspect
from typing import Any, Callable, List

from ..contracts.config import MyImunifyConfig
from ..rpc_tools.lookup import wraps
from .checkers import check_feature
from .exceptions import UserArgumentNotFound

features = set()  # feature storage


def _wrapper(
    name: str, permissions: List[str], func: Callable[..., Any], user_key: str
) -> Callable[..., Any]:
    """
    Wrapper to enable feature management for func

    :param name: feature name
    :param func: function/method to wrap
    :param user_key: parameter name which contains user name
    :return: new callable object
    """
    signature = inspect.signature(func)
    if user_key not in signature.parameters:
        raise UserArgumentNotFound(
            "Expecting argument '%s' for %s", user_key, func
        )

    user_param = signature.parameters[user_key]
    user_key_required = user_param.default is user_param.empty

    def checker(**kwargs):
        if user_key_required and user_key not in kwargs:
            raise UserArgumentNotFound(
                "Argument '%s' for '%s' must be specified explicitly",
                user_key,
                func,
            )

        if MyImunifyConfig.ENABLED:
            """Ignore the decorator if MyImunify is enabled"""
            return

        user = kwargs.get(user_key, user_param.default)
        check_feature(name, permissions, user)

    @wraps(func)
    def wrapper(*args, **kwargs):
        checker(**kwargs)
        return func(*args, **kwargs)

    @wraps(func)
    async def async_wrapper(*args, **kwargs):
        checker(**kwargs)
        return await func(*args, **kwargs)

    if inspect.iscoroutinefunction(func):
        return async_wrapper
    return wrapper


def feature(
    name: str, permissions: List[str], user_key="user"
) -> Callable[[Any], Any]:
    """
    Get decorator to manage function/method with feature management

    :param name: feature name
    :param user_key: parameter name which contains user name
    :param permissions: list of permission values, with which user can
    access specifig endpoint
    :return: decorator
    """

    def decorator(obj):
        if inspect.isclass(obj):
            for m_name, m_obj in getattr(obj, "__dict__", {}).items():
                if not m_name.startswith("_") and inspect.isfunction(m_obj):
                    wrapper = _wrapper(name, permissions, m_obj, user_key)
                    setattr(obj, m_name, wrapper)
        elif inspect.isfunction(obj):
            obj = _wrapper(name, permissions, obj, user_key)

        features.add(name)

        return obj

    return decorator

Spamworldpro Mini