Server : Apache System : Linux indy02.toastserver.com 3.10.0-962.3.2.lve1.5.85.el7.x86_64 #1 SMP Thu Apr 18 15:18:36 UTC 2024 x86_64 User : palandch ( 1163) PHP Version : 7.1.33 Disable Function : NONE Directory : /opt/cloudlinux/venv/lib64/python3.11/site-packages/clselect/clselectnodejsuser/ |
# -*- coding: utf-8 -*- # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2019 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT from __future__ import print_function from __future__ import division from __future__ import absolute_import import json import os import pwd from clselect.utils import check_call, list_dirs, run_command_full from .interpreters import Interpreter DEFAULT_PREFIX = 'nodevenv' NODEVENV_BIN = os.path.join(os.path.dirname(__file__), 'nodevenv.py') PYTHON_PATH = '/opt/cloudlinux/venv/bin/python3' APP_CONFIG = 'package.json' class Environment(object): def __init__(self, name, user=None, prefix=None): self.name = name if user: self.user = user else: self.user = pwd.getpwuid(os.getuid()).pw_name if prefix is None: self.prefix = DEFAULT_PREFIX else: self.prefix = prefix self.path = os.path.join(_abs_prefix(self.user, self.prefix), name) self._interpreter = None self._npm = None self.interpreter_name = 'node' + name def __repr__(self): return ("%s.%s(name='%s', user='%s', prefix='%s')" % ( self.__class__.__module__, self.__class__.__name__, self.name, self.user, self.prefix)) def _demote(self): user_pwd = pwd.getpwnam(self.user) def func(): os.setgid(user_pwd.pw_gid) os.setuid(user_pwd.pw_uid) os.environ['USER'] = self.user os.environ['HOME'] = user_pwd.pw_dir return func def as_dict(self, key=None): e = { 'name': self.name, 'interpreter': self.interpreter(), } if key: del e[key] return {getattr(self, key): e} return e def as_deepdict(self, key=None): e = { 'name': self.name, 'interpreter': self.interpreter().as_dict(), } if key: del e[key] return {getattr(self, key): e} return e def create(self, interpreter, destroy_first=False): args = [ PYTHON_PATH, NODEVENV_BIN, '--node', interpreter.binary, ] if destroy_first: args.append('--recreate-bin') args.append(self.path) kwargs = {'preexec_fn': self._demote()} if os.getuid() == 0: args = ['/bin/cagefs_enter'] + args check_call(*args, **kwargs) def destroy(self): check_call('/bin/rm', '-r', '--interactive=never', self.path, preexec_fn=self._demote()) def exists(self): return os.path.exists(self.path) def interpreter(self): if not self._interpreter: self._interpreter = Interpreter(prefix=self.path) return self._interpreter def npm(self): if not self._npm: self._npm = os.path.join(self.path, 'bin', 'npm') return self._npm def extension_install(self, extension='-', cwd=None): """ Install nodejs extension :param cwd: current working directory for npm (it's full path to app root by default) :param extension: name and version of extension :return: None """ # git is need for npm install from git repositories # we didn't add git to cagefs, because we use git from cpanel command = (self.npm(), 'install') # extension is defined if extension != '-': command += (extension,) # npm takes list of extensions from package.json if extension is '-' else: config_path = os.path.join(cwd, APP_CONFIG) if not os.path.isfile(config_path): # package.json is absent return 0 # command sample # npm install <module>@<version> <git_path_to_repository> ... check_call(args=command, preexec_fn=self._demote(), cwd=cwd) def extension_install_single_call(self, extensions_list): """ Install nodejs extension :param extensions_list: name and version of extension :return: None """ command = [self.npm(), 'install'] command.extend(extensions_list) # npm install <module1>@<version1> <module2>@<version2> ... ret_code, _, _ = run_command_full(command, preexec_fn=self._demote()) return ret_code def extension_update(self, extension): raise NotImplementedError("It's not used for nodejs selector") def extension_uninstall(self, extension): raise NotImplementedError("It's not used for nodejs selector") def extensions(self): """ Retrieves extensions dictionary for this virtual environment :return: Extensions list. Example: [u'xpath@0.0.27', u'https://github.com/visionmedia/express'] """ # get path to current virtual nodejs environment cwd = os.path.split(os.path.split(self.npm())[0])[0] # npm list --json _, output, _ = run_command_full([self.npm(), 'list', '--json'], preexec_fn=self._demote(), cwd=cwd) if output.strip(): data = json.loads(output) else: data = {} extensions = data.get('dependencies', {}) # extensions example: # {u'express': {u'version': u'4.16.3', u'from': u'express@^4.15.3', .... }, # u'express_new': {u'version': u'4.16.3', u'from': u'git+https://github.com/visionmedia/express.git', ....}, # u'request': {u'required': u'^2.81.0', u'missing': True}, # u'body-parser': {u'required': u'^1.17.2', u'missing': True} # } result = [] for name, info in extensions.items(): # module is missing or has no needed attributes - pass it if info.get('missing', False) or 'from' not in info or 'version' not in info: continue if info['from'].startswith('git+'): # extension from git result.append(info['from']) else: # extension from standard npm repo result.append('%s@%s' % (name, info['version'])) return result def _abs_prefix(user=None, prefix=None): if not prefix: prefix = DEFAULT_PREFIX if user: return os.path.join(pwd.getpwnam(user).pw_dir, prefix) else: return os.path.join(pwd.getpwuid(os.getuid()).pw_dir, prefix) def environments(user=None, prefix=None): venv_path = _abs_prefix(user, prefix) try: env_list = list_dirs(venv_path) except OSError: return [] envs = [] for env_name in env_list: envs.append(Environment(env_name, user, prefix)) return envs def environments_dict(key, user=None, prefix=None): return dict(list(e.as_dict(key=key).items()) for e in environments(user, prefix)) def environments_deepdict(key, user=None, prefix=None): return dict(list(e.as_deepdict(key=key).items()) for e in environments(user, prefix))