Server : Apache System : Linux indy02.toastserver.com 3.10.0-962.3.2.lve1.5.85.el7.x86_64 #1 SMP Thu Apr 18 15:18:36 UTC 2024 x86_64 User : palandch ( 1163) PHP Version : 7.1.33 Disable Function : NONE Directory : /opt/imunify360/venv/lib/python3.11/site-packages/imav/malwarelib/scan/mds/ |
""" This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see <https://www.gnu.org/licenses/>. Copyright © 2019 Cloud Linux Software Inc. This software is also available under ImunifyAV commercial license, see <https://www.imunify360.com/legal/eula> """ from __future__ import annotations import pwd from dataclasses import dataclass from pathlib import Path from typing import Collection, Iterable, Set, Tuple from defence360agent.subsys.panels import hosting_panel from defence360agent.utils.threads import to_thread from imav.malwarelib.utils.cloudways import CloudwaysUser @dataclass(eq=True, frozen=True) class MalwareDatabaseHitError: code: int message: str @dataclass(eq=True, frozen=True) class MalwareDatabaseHitInfo: scan_id: str path: str signature: str app_name: str db_host: str db_name: str db_port: int errors: Tuple[MalwareDatabaseHitError, ...] owner: int user: int table_name: str = None table_field: str = None table_row_inf: int = None snippet: str = None def _get_db_info(self): return ( self.path, self.app_name, self.db_host, self.db_port, self.db_name, ) @classmethod def get_hits_per_db( cls, hits: Iterable[MalwareDatabaseHitInfo] ) -> set[MalwareDatabaseHitInfo]: db_hits = {} for hit in hits: db_info = hit._get_db_info() if db_info not in db_hits: db_hits[db_info] = hit return set(db_hits.values()) @classmethod def from_report( cls, report, users_from_panel, pw_all, scan_id ) -> MalwareDatabaseHitInfo: signature = "BAD URL" snippet = None detailed_reports = report.get("detailed_reports") or report.get( "detailed_urls_reports" ) if detailed_reports: detailed_report = detailed_reports[0] signature = detailed_report["sigid"] snippet = detailed_report.get("snpt") path: str = report["path"] owner: int = report["app_owner_uid"] user: int = CloudwaysUser.override_uid_by_path( Path(path), owner, users_from_panel, pw_all ) return cls( scan_id=scan_id, path=path, signature=signature, app_name=report["app"], db_host=report["database_host"], db_name=report["database_name"], errors=tuple( MalwareDatabaseHitError(err["code"], err["message"]) for err in report["error_list"] ), db_port=report["database_port"], owner=owner, user=user, snippet=snippet, ) @classmethod def _get_hits_from_report( cls, data: dict, **kwargs ) -> Iterable[MalwareDatabaseHitInfo]: for table in data.get("tables", []): for field in table.get("fields", []): for row_id in field.get("row_ids", []): yield cls( signature=data["sigid"], table_name=table["table"], table_field=field["field"], table_row_inf=int(row_id), snippet=data.get("snpt"), **kwargs, ) @classmethod def iter_from_scan_report( cls, report, users_from_panel, pw_all, scan_id ) -> Iterable[MalwareDatabaseHitInfo]: # create a separate hit for each scanned row path: str = report["path"] owner: int = report["app_owner_uid"] user: int = CloudwaysUser.override_uid_by_path( Path(path), owner, users_from_panel, pw_all ) kwargs = { "scan_id": scan_id, "path": path, "app_name": report["app"], "db_host": report["database_host"], "db_name": report["database_name"], "errors": tuple( MalwareDatabaseHitError(err["code"], err["message"]) for err in report["error_list"] ), "db_port": report["database_port"], "owner": owner, "user": user, } if detailed_reports := report.get("detailed_reports"): for detailed_report in detailed_reports: yield from cls._get_hits_from_report(detailed_report, **kwargs) else: for detailed_report in report.get("detailed_urls_reports", []): yield from cls._get_hits_from_report(detailed_report, **kwargs) @dataclass(eq=True, frozen=True) class MalwareDatabaseScanReport: hits: Set[MalwareDatabaseHitInfo] started: int completed: int total_resources: int total_malicious: int def _last_completed_time(reports: Iterable[dict]) -> int: return int( max( ( report["start_time"] + report["running_time"] for report in reports ), default=0, ) ) def _first_started_time(reports: Iterable[dict]) -> int: return int(min((report["start_time"] for report in reports), default=0)) def _found(reports: Iterable[dict]) -> Iterable[dict]: return iter(filter(lambda r: r["app"] is not None, reports)) def _malicious(reports: Iterable[dict]) -> Iterable[dict]: return iter( filter( lambda r: r["count_of_detected_malicious_entries"] > 0, reports, ) ) def _total_scanned_rows(reports: Iterable[dict]) -> int: return sum(report.get("rows_count", 0) for report in reports) async def scan_report( hit_report_list: Collection[dict], scan_id: str ) -> MalwareDatabaseScanReport: users_from_panel = set(await hosting_panel.HostingPanel().get_users()) pw_all = await to_thread(pwd.getpwall) hits = set() for report in _malicious(_found(hit_report_list)): hits |= set( MalwareDatabaseHitInfo.iter_from_scan_report( report, users_from_panel, pw_all, scan_id ) ) started = _first_started_time(hit_report_list) completed = _last_completed_time(hit_report_list) total_resources = _total_scanned_rows(hit_report_list) total_malicious = len(hits) return MalwareDatabaseScanReport( hits, started, completed, total_resources, total_malicious )