Server : Apache System : Linux indy02.toastserver.com 3.10.0-962.3.2.lve1.5.85.el7.x86_64 #1 SMP Thu Apr 18 15:18:36 UTC 2024 x86_64 User : palandch ( 1163) PHP Version : 7.1.33 Disable Function : NONE Directory : /opt/cloudlinux/venv/lib64/python3.11/site-packages/xray/internal/ |
# -*- coding: utf-8 -*- # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2021 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT from dataclasses import dataclass from typing import Optional from xray import gettext as _ from .constants import fpm_stat_storage, fpm_reload_timeout from .exceptions import XRayError from .utils import dbm_storage, timestamp @dataclass class FPMReloadController: """ Class to control the frequency of FPM service reloading """ fpm_service: str @property def latest_reload(self) -> Optional[int]: """ Get the timestamp of latest reload of FPM service """ try: with dbm_storage(fpm_stat_storage) as _fpm_reload_stat: try: return int(_fpm_reload_stat[self.fpm_service].decode()) except KeyError: return None except RuntimeError as e: raise XRayError( _('Failed to get the timestamp of latest FPM reload: %s') % str(e), flag='warning') def save_latest_reload(self): """ Save the timestamp of latest reload of FPM service """ try: # save timestamp of latest reload of corresponding service with dbm_storage(fpm_stat_storage) as _fpm_reload_stat: _fpm_reload_stat[self.fpm_service] = str(timestamp()) except RuntimeError as e: raise XRayError( _('Failed to save the timestamp of latest FPM reload %s, but main operation executed') % str(e), flag='warning') def delta(self) -> float: """ Get the time delta between current time and saved timestamp """ try: # we store timestamp in seconds, # but timeout -- fpm_reload_timeout -- in minutes return (timestamp() - self.latest_reload) / 60 except (TypeError, XRayError): # 1 hour, # big enough for non-blocking flow in case of unexpected errors return 60.0 def restrict(self) -> bool: """ Check if FPM reload should be blocked """ return self.delta() <= fpm_reload_timeout