Server : Apache System : Linux indy02.toastserver.com 3.10.0-962.3.2.lve1.5.85.el7.x86_64 #1 SMP Thu Apr 18 15:18:36 UTC 2024 x86_64 User : palandch ( 1163) PHP Version : 7.1.33 Disable Function : NONE Directory : /opt/imunify360/venv/lib64/python3.11/site-packages/defence360agent/feature_management/ |
import inspect from typing import Any, Callable, List from ..contracts.config import MyImunifyConfig from ..rpc_tools.lookup import wraps from .checkers import check_feature from .exceptions import UserArgumentNotFound features = set() # feature storage def _wrapper( name: str, permissions: List[str], func: Callable[..., Any], user_key: str ) -> Callable[..., Any]: """ Wrapper to enable feature management for func :param name: feature name :param func: function/method to wrap :param user_key: parameter name which contains user name :return: new callable object """ signature = inspect.signature(func) if user_key not in signature.parameters: raise UserArgumentNotFound( "Expecting argument '%s' for %s", user_key, func ) user_param = signature.parameters[user_key] user_key_required = user_param.default is user_param.empty def checker(**kwargs): if user_key_required and user_key not in kwargs: raise UserArgumentNotFound( "Argument '%s' for '%s' must be specified explicitly", user_key, func, ) if MyImunifyConfig.ENABLED: """Ignore the decorator if MyImunify is enabled""" return user = kwargs.get(user_key, user_param.default) check_feature(name, permissions, user) @wraps(func) def wrapper(*args, **kwargs): checker(**kwargs) return func(*args, **kwargs) @wraps(func) async def async_wrapper(*args, **kwargs): checker(**kwargs) return await func(*args, **kwargs) if inspect.iscoroutinefunction(func): return async_wrapper return wrapper def feature( name: str, permissions: List[str], user_key="user" ) -> Callable[[Any], Any]: """ Get decorator to manage function/method with feature management :param name: feature name :param user_key: parameter name which contains user name :param permissions: list of permission values, with which user can access specifig endpoint :return: decorator """ def decorator(obj): if inspect.isclass(obj): for m_name, m_obj in getattr(obj, "__dict__", {}).items(): if not m_name.startswith("_") and inspect.isfunction(m_obj): wrapper = _wrapper(name, permissions, m_obj, user_key) setattr(obj, m_name, wrapper) elif inspect.isfunction(obj): obj = _wrapper(name, permissions, obj, user_key) features.add(name) return obj return decorator