Server : Apache System : Linux indy02.toastserver.com 3.10.0-962.3.2.lve1.5.85.el7.x86_64 #1 SMP Thu Apr 18 15:18:36 UTC 2024 x86_64 User : palandch ( 1163) PHP Version : 7.1.33 Disable Function : NONE Directory : /opt/imunify360/venv/lib/python3.11/site-packages/imav/malwarelib/plugins/ |
""" This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see <https://www.gnu.org/licenses/>. Copyright © 2019 Cloud Linux Software Inc. This software is also available under ImunifyAV commercial license, see <https://www.imunify360.com/legal/eula> """ import asyncio import json import logging import os import re from contextlib import suppress from defence360agent.contracts.messages import MessageType from defence360agent.contracts.plugins import MessageSource from imav.malwarelib.config import MalwareScanType from imav.malwarelib.scan.ai_bolit.report import parse_report_json from imav.malwarelib.scan.scan_result import ScanResult from imav.malwarelib.subsys.ainotify import Inotify, Watcher from defence360agent.utils import create_task_and_log_exceptions, Scope logger = logging.getLogger(__name__) class AibolitResultsScan(MessageSource): """ Plugin to handle generated ai-bolit scan reports. Checks the contents of the *RESULT_SCAN_DIR* for the presence ai-bolit report files that match the *REPORT_FILE_MASK* pattern processes and deletes them. """ SCOPE = Scope.IM360 RESULT_SCAN_DIR = "/var/imunify360/aibolit/resident/out" REPORT_FILE_MASK = re.compile(r"^(?P<uuid>[0-9a-f-]{36})\.report$") def __init__(self): self._watcher = None self._init_task = None async def create_source(self, loop, sink): self._loop = loop self._sink = sink self._init_task = create_task_and_log_exceptions( self._loop, self._init_handling_and_setup_watcher ) async def shutdown(self): if self._init_task is not None: self._init_task.cancel() await self._init_task self._shutdown_watcher() def _setup_watcher(self): # target directory must exist os.makedirs(self.RESULT_SCAN_DIR, mode=0o700, exist_ok=True) self._watcher = Watcher( self._loop, coro_callback=self._handle_incoming_report ) # path in bytes self._watcher.watch( path=self.RESULT_SCAN_DIR.encode(), mask=Inotify.MOVED_TO ) def _shutdown_watcher(self): if self._watcher is not None: self._watcher.close() def _get_scan_result_from_report(self, report: dict) -> ScanResult: report_summary = report["summary"] path = [hit["file_name"] for hit in parse_report_json(report)] scan_id = report_summary.get("scan_id") scan_result = ScanResult( path, scan_id=scan_id, scan_type=MalwareScanType.REALTIME ) scan_result.total_files = report_summary.get("total_files") scan_result.errors = report_summary.get("errors", []) end_time = report_summary.get("report_time") scan_time = report_summary.get("scan_time") start_time = end_time - scan_time scan_result.set_start_stop(start_time, end_time) scan_result.scans = [parse_report_json(report)] return scan_result async def _handle_report(self, report: dict): try: scan_result = self._get_scan_result_from_report(report) result = await scan_result.get() await self._sink.process_message( MessageType.MalwareScan(**result.to_dict()) ) except asyncio.CancelledError: raise except Exception as exc: logger.exception( "Error '{!r}' occurred while processing " "ai-bolit report: {}".format(exc, report) ) async def handle_report_file(self, path: str): match_file = self.REPORT_FILE_MASK.match(os.path.basename(path)) if match_file and os.path.isfile(path): try: with open(path) as f: report = json.load(f) except json.JSONDecodeError as exc: logger.warning( "Problem with parsing %s aibolit report: %s", path, exc ) else: report["summary"].setdefault( "scan_id", match_file.group("uuid") ) await self._handle_report(report) with suppress(FileNotFoundError): os.unlink(path) async def _handle_incoming_report(self, event): logger.info("Inotify event: %s", event) report_file = os.path.join( os.fsdecode(event.path), os.fsdecode(event.name) ) await self.handle_report_file(report_file) async def _handle_existing_reports(self): if os.path.exists(self.RESULT_SCAN_DIR): with os.scandir(self.RESULT_SCAN_DIR) as it: for entry in it: await self.handle_report_file(entry.path) async def _init_handling_and_setup_watcher(self): await self._handle_existing_reports() self._setup_watcher()