Server : Apache System : Linux indy02.toastserver.com 3.10.0-962.3.2.lve1.5.85.el7.x86_64 #1 SMP Thu Apr 18 15:18:36 UTC 2024 x86_64 User : palandch ( 1163) PHP Version : 7.1.33 Disable Function : NONE Directory : /opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/feature_management/ |
import logging from itertools import chain from typing import List, Any from defence360agent.feature_management import hooks from defence360agent.feature_management.model import FeatureManagementPerms from defence360agent.model import instance from defence360agent.utils import execute_iterable_expression from .lookup import features logger = logging.getLogger(__name__) async def set_feature(user: str, feature: str, value: str) -> bool: """Sets a `feature` to `value` for a given `user`. Calls appropriate hook and returns its (bool) result. Logs the result of setting change. If hook fails rollbacks changes to database. """ with instance.db.atomic() as trx: perm = FeatureManagementPerms.get_perm(user) perm.set_feature(feature, value) hook = hooks.get_hook(feature) ok = hook(user, value) if ok: logger.info( "Applied setting %s=%s for user %s", feature, value, user ) else: logger.error( "Failed to apply setting %s=%s for user %s", feature, value, user, ) trx.rollback() return ok async def update_users( feature: str, users: List[str], value: Any, existing_users: List[str] ): result = {"succeeded": [], "failed": []} for user in users: if user not in existing_users: logger.warning("No such user: %s", user) continue if await set_feature(user, feature, value): result["succeeded"].append(user) else: result["failed"].append(user) return result async def update_default(feature: str, value: Any): perm = FeatureManagementPerms.get_default() perm.set_feature(feature, value) hook = hooks.get_hook(feature) return hook(None, value) async def sync_users(users: List[str]) -> bool: """Synchronize existing permissions with panel users""" panel_users = set(users) perm_users = FeatureManagementPerms.select(FeatureManagementPerms.user) perm_users = set(chain(*perm_users.tuples())) perms_to_remove = perm_users - panel_users perms_to_remove.remove(FeatureManagementPerms.DEFAULT) if perms_to_remove: logger.info("Remove permissions of users %s", perms_to_remove) def expression(perms_to_remove): return FeatureManagementPerms.delete().where( FeatureManagementPerms.user.in_(perms_to_remove) ) execute_iterable_expression(expression, list(perms_to_remove)) perms_to_add = panel_users - perm_users if perms_to_add: logger.info("Add permissions to users %s", perms_to_add) for user in perms_to_add: perm = FeatureManagementPerms.get_perm(user) for feature in features: value = perm.get_feature(feature) callback = hooks.get_hook(feature) callback(user, value) return bool(perms_to_add) or bool(perms_to_remove) async def reset_features(**features): """Sets feature values for all existing users in feature management database to given values in `features`.""" users = list( chain( *FeatureManagementPerms.select(FeatureManagementPerms.user) .where( FeatureManagementPerms.user != FeatureManagementPerms.DEFAULT ) .tuples() ) ) for user in users: for feature, value in features.items(): await set_feature(user, feature, value)