Spamworldpro Mini Shell
Spamworldpro


Server : Apache
System : Linux indy02.toastserver.com 3.10.0-962.3.2.lve1.5.85.el7.x86_64 #1 SMP Thu Apr 18 15:18:36 UTC 2024 x86_64
User : palandch ( 1163)
PHP Version : 7.1.33
Disable Function : NONE
Directory :  /opt/imunify360/venv/lib64/python3.11/site-packages/imav/malwarelib/scan/mds/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Current File : //opt/imunify360/venv/lib64/python3.11/site-packages/imav/malwarelib/scan/mds/report.py
"""
This program is free software: you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License,
or (at your option) any later version.


This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. 
See the GNU General Public License for more details.


You should have received a copy of the GNU General Public License
 along with this program.  If not, see <https://www.gnu.org/licenses/>.

Copyright © 2019 Cloud Linux Software Inc.

This software is also available under ImunifyAV commercial license,
see <https://www.imunify360.com/legal/eula>
"""
from __future__ import annotations

import pwd
from dataclasses import dataclass
from pathlib import Path
from typing import Collection, Iterable, Set, Tuple

from defence360agent.subsys.panels import hosting_panel
from defence360agent.utils.threads import to_thread
from imav.malwarelib.utils.cloudways import CloudwaysUser


@dataclass(eq=True, frozen=True)
class MalwareDatabaseHitError:
    code: int
    message: str


@dataclass(eq=True, frozen=True)
class MalwareDatabaseHitInfo:
    scan_id: str
    path: str
    signature: str
    app_name: str
    db_host: str
    db_name: str
    db_port: int
    errors: Tuple[MalwareDatabaseHitError, ...]
    owner: int
    user: int
    table_name: str = None
    table_field: str = None
    table_row_inf: int = None
    snippet: str = None

    def _get_db_info(self):
        return (
            self.path,
            self.app_name,
            self.db_host,
            self.db_port,
            self.db_name,
        )

    @classmethod
    def get_hits_per_db(
        cls, hits: Iterable[MalwareDatabaseHitInfo]
    ) -> set[MalwareDatabaseHitInfo]:
        db_hits = {}
        for hit in hits:
            db_info = hit._get_db_info()
            if db_info not in db_hits:
                db_hits[db_info] = hit
        return set(db_hits.values())

    @classmethod
    def from_report(
        cls, report, users_from_panel, pw_all, scan_id
    ) -> MalwareDatabaseHitInfo:
        signature = "BAD URL"
        snippet = None
        detailed_reports = report.get("detailed_reports") or report.get(
            "detailed_urls_reports"
        )
        if detailed_reports:
            detailed_report = detailed_reports[0]
            signature = detailed_report["sigid"]
            snippet = detailed_report.get("snpt")

        path: str = report["path"]
        owner: int = report["app_owner_uid"]
        user: int = CloudwaysUser.override_uid_by_path(
            Path(path), owner, users_from_panel, pw_all
        )

        return cls(
            scan_id=scan_id,
            path=path,
            signature=signature,
            app_name=report["app"],
            db_host=report["database_host"],
            db_name=report["database_name"],
            errors=tuple(
                MalwareDatabaseHitError(err["code"], err["message"])
                for err in report["error_list"]
            ),
            db_port=report["database_port"],
            owner=owner,
            user=user,
            snippet=snippet,
        )

    @classmethod
    def _get_hits_from_report(
        cls, data: dict, **kwargs
    ) -> Iterable[MalwareDatabaseHitInfo]:
        for table in data.get("tables", []):
            for field in table.get("fields", []):
                for row_id in field.get("row_ids", []):
                    yield cls(
                        signature=data["sigid"],
                        table_name=table["table"],
                        table_field=field["field"],
                        table_row_inf=int(row_id),
                        snippet=data.get("snpt"),
                        **kwargs,
                    )

    @classmethod
    def iter_from_scan_report(
        cls, report, users_from_panel, pw_all, scan_id
    ) -> Iterable[MalwareDatabaseHitInfo]:
        # create a separate hit for each scanned row

        path: str = report["path"]
        owner: int = report["app_owner_uid"]
        user: int = CloudwaysUser.override_uid_by_path(
            Path(path), owner, users_from_panel, pw_all
        )

        kwargs = {
            "scan_id": scan_id,
            "path": path,
            "app_name": report["app"],
            "db_host": report["database_host"],
            "db_name": report["database_name"],
            "errors": tuple(
                MalwareDatabaseHitError(err["code"], err["message"])
                for err in report["error_list"]
            ),
            "db_port": report["database_port"],
            "owner": owner,
            "user": user,
        }
        if detailed_reports := report.get("detailed_reports"):
            for detailed_report in detailed_reports:
                yield from cls._get_hits_from_report(detailed_report, **kwargs)
        else:
            for detailed_report in report.get("detailed_urls_reports", []):
                yield from cls._get_hits_from_report(detailed_report, **kwargs)


@dataclass(eq=True, frozen=True)
class MalwareDatabaseScanReport:
    hits: Set[MalwareDatabaseHitInfo]
    started: int
    completed: int
    total_resources: int
    total_malicious: int


def _last_completed_time(reports: Iterable[dict]) -> int:
    return int(
        max(
            (
                report["start_time"] + report["running_time"]
                for report in reports
            ),
            default=0,
        )
    )


def _first_started_time(reports: Iterable[dict]) -> int:
    return int(min((report["start_time"] for report in reports), default=0))


def _found(reports: Iterable[dict]) -> Iterable[dict]:
    return iter(filter(lambda r: r["app"] is not None, reports))


def _malicious(reports: Iterable[dict]) -> Iterable[dict]:
    return iter(
        filter(
            lambda r: r["count_of_detected_malicious_entries"] > 0,
            reports,
        )
    )


def _total_scanned_rows(reports: Iterable[dict]) -> int:
    return sum(report.get("rows_count", 0) for report in reports)


async def scan_report(
    hit_report_list: Collection[dict], scan_id: str
) -> MalwareDatabaseScanReport:
    users_from_panel = set(await hosting_panel.HostingPanel().get_users())
    pw_all = await to_thread(pwd.getpwall)

    hits = set()
    for report in _malicious(_found(hit_report_list)):
        hits |= set(
            MalwareDatabaseHitInfo.iter_from_scan_report(
                report, users_from_panel, pw_all, scan_id
            )
        )

    started = _first_started_time(hit_report_list)
    completed = _last_completed_time(hit_report_list)
    total_resources = _total_scanned_rows(hit_report_list)
    total_malicious = len(hits)
    return MalwareDatabaseScanReport(
        hits, started, completed, total_resources, total_malicious
    )

Spamworldpro Mini