""" This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program.  If not, see . Copyright © 2019 Cloud Linux Software Inc. This software is also available under ImunifyAV commercial license, see """ from collections import defaultdict from itertools import chain from peewee import Case, fn from defence360agent.contracts.messages import MessageType from defence360agent.contracts.permissions import ( MS_IMUNIFY_PATCH_ENABLED, has_permission, ) from defence360agent.rpc_tools.lookup import ( CommonEndpoints, RootEndpoints, bind, ) from defence360agent.subsys.panels.hosting_panel import HostingPanel from imav.contracts.imunify_patch_id import ( get_imunify_patch_id, get_imunify_patch_purchase_url, ) from imav.contracts.messages import RefreshImunifyPatchSubscription from imav.contracts.permissions import is_imunify_patch_enabled from imav.malwarelib.api.vulnerability import VulnerabilityAPI from imav.malwarelib.config import VulnerabilityHitStatus from imav.malwarelib.model import VulnerabilityHit from imav.malwarelib.vulnerabilities.storage import restore_hits from imav.malwarelib.utils import user_list class VulnerabilitiesAdminEndpoints(RootEndpoints): @bind("vulnerabilities", "user", "list") async def vulnerabilities_user_list( self, offset, limit, user=None, search=None ): """ Return list of users with summary vulnerabilities info """ if user: # user endpoint _, users = await user_list.get_matched_users(match={user}) max_count = len(users) elif search: # search _, users = await user_list.get_matched_users(match=search) max_count = len(users) else: # all users max_count, users = await user_list.get_matched_users(match=None) users_list = sorted([u["user"] for u in users]) vulnerable_case = Case( None, [((VulnerabilityHit.status != VulnerabilityHitStatus.PATCHED), 1)], 0, ) query = ( VulnerabilityHit.select( VulnerabilityHit.user, fn.SUM(vulnerable_case).alias("vulnerable"), ) .where(VulnerabilityHit.user.in_(users_list)) .group_by(VulnerabilityHit.user) ) users_vulnerabilities = {row.user: row.vulnerable for row in query} users_domains = await HostingPanel().get_domains_per_user() results = [] for user in users_list: subscribed = is_imunify_patch_enabled(user) results.append( { "username": user, "domains": users_domains.get(user, []), "vulnerable_file_count": users_vulnerabilities.get( user, 0 ), "imunify_patch_user_id": await get_imunify_patch_id(user), "subscribed": subscribed, "purchase_url": ( await get_imunify_patch_purchase_url(user) if not subscribed else None ), } ) start = offset end = offset + limit return max_count, results[start:end] class VulnerabilitiesEndpoints(CommonEndpoints): @bind("vulnerabilities", "file", "list") async def vulnerabilities_file_list(self, user=None, **kwargs): """ Return list vulnerable/patched files """ await self._sink.process_message(RefreshImunifyPatchSubscription()) max_count, hits = VulnerabilityHit.list(user=user, **kwargs) vuln_info = await VulnerabilityAPI.get_details( VulnerabilityHit.get_vulnerabilities_ids(hits) ) results = [] for hit in hits: username = hit["username"] subscribed = is_imunify_patch_enabled(username) record = { "id": hit["id"], "username": username, "file_path": hit["file_path"], "status": hit["status"], "app_name": "", "imunify_patch_user_id": await get_imunify_patch_id(username), "subscribed": subscribed, "purchase_url": ( await get_imunify_patch_purchase_url(username) if not subscribed else None ), "vulnerabilities": [], } for vuln_id in VulnerabilityHit.get_vulnerability_ids(hit["type"]): record["vulnerabilities"].append( { "cve_id": vuln_info[vuln_id]["cveId"], "vulnerability_type": vuln_info[vuln_id]["type"], "vulnerability_description": vuln_info[vuln_id][ "name" ], } ) if not record["app_name"]: # set it once record["app_name"] = vuln_info[vuln_id]["app"] results.append(record) return max_count, results @bind("vulnerabilities", "file", "patch") async def vulnerabilities_file_patch(self, paths, user=None): query = VulnerabilityHit.select().where( VulnerabilityHit.orig_file.in_(paths) ) if user is not None: query = query.where(VulnerabilityHit.user == user) # make sure all associated users have patch permission user_paths = defaultdict(list) for vulnerability in query: user_paths[vulnerability.user].append(vulnerability.orig_file) for panel_user in user_paths.keys(): _check_imunify_patch_permission(panel_user) if filelist := list(chain.from_iterable(user_paths.values())): await self._sink.process_message( MessageType.VulnerabilityPatchTask( filelist=filelist, initiator=user, manual=True ) ) @bind("vulnerabilities", "file", "revert") async def vulnerabilities_file_revert(self, paths, user=None): query = VulnerabilityHit.select().where( VulnerabilityHit.orig_file.in_(paths), VulnerabilityHit.status.in_([VulnerabilityHitStatus.PATCHED]), ) if user is not None: query = query.where(VulnerabilityHit.user == user) hits = list(query) succeeded, failed = await restore_hits(hits) return { "succeeded": [hit.orig_file for hit in succeeded], "failed": [hit.orig_file for hit in failed], } def _check_imunify_patch_permission(user: str | None) -> None: if not has_permission(MS_IMUNIFY_PATCH_ENABLED, user): raise PermissionError( "Unable to perform the command. " f"User '{user}' does not have required permissions.", )