"""
This program is free software: you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License,
or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see .
Copyright © 2019 Cloud Linux Software Inc.
This software is also available under ImunifyAV commercial license,
see
"""
from collections import defaultdict
from itertools import chain
from peewee import Case, fn
from defence360agent.contracts.messages import MessageType
from defence360agent.contracts.permissions import (
MS_IMUNIFY_PATCH_ENABLED,
has_permission,
)
from defence360agent.rpc_tools.lookup import (
CommonEndpoints,
RootEndpoints,
bind,
)
from defence360agent.subsys.panels.hosting_panel import HostingPanel
from imav.contracts.imunify_patch_id import (
get_imunify_patch_id,
get_imunify_patch_purchase_url,
)
from imav.contracts.messages import RefreshImunifyPatchSubscription
from imav.contracts.permissions import is_imunify_patch_enabled
from imav.malwarelib.api.vulnerability import VulnerabilityAPI
from imav.malwarelib.config import VulnerabilityHitStatus
from imav.malwarelib.model import VulnerabilityHit
from imav.malwarelib.vulnerabilities.storage import restore_hits
from imav.malwarelib.utils import user_list
class VulnerabilitiesAdminEndpoints(RootEndpoints):
@bind("vulnerabilities", "user", "list")
async def vulnerabilities_user_list(
self, offset, limit, user=None, search=None
):
"""
Return list of users with summary vulnerabilities info
"""
if user:
# user endpoint
_, users = await user_list.get_matched_users(match={user})
max_count = len(users)
elif search:
# search
_, users = await user_list.get_matched_users(match=search)
max_count = len(users)
else:
# all users
max_count, users = await user_list.get_matched_users(match=None)
users_list = sorted([u["user"] for u in users])
vulnerable_case = Case(
None,
[((VulnerabilityHit.status != VulnerabilityHitStatus.PATCHED), 1)],
0,
)
query = (
VulnerabilityHit.select(
VulnerabilityHit.user,
fn.SUM(vulnerable_case).alias("vulnerable"),
)
.where(VulnerabilityHit.user.in_(users_list))
.group_by(VulnerabilityHit.user)
)
users_vulnerabilities = {row.user: row.vulnerable for row in query}
users_domains = await HostingPanel().get_domains_per_user()
results = []
for user in users_list:
subscribed = is_imunify_patch_enabled(user)
results.append(
{
"username": user,
"domains": users_domains.get(user, []),
"vulnerable_file_count": users_vulnerabilities.get(
user, 0
),
"imunify_patch_user_id": await get_imunify_patch_id(user),
"subscribed": subscribed,
"purchase_url": (
await get_imunify_patch_purchase_url(user)
if not subscribed
else None
),
}
)
start = offset
end = offset + limit
return max_count, results[start:end]
class VulnerabilitiesEndpoints(CommonEndpoints):
@bind("vulnerabilities", "file", "list")
async def vulnerabilities_file_list(self, user=None, **kwargs):
"""
Return list vulnerable/patched files
"""
await self._sink.process_message(RefreshImunifyPatchSubscription())
max_count, hits = VulnerabilityHit.list(user=user, **kwargs)
vuln_info = await VulnerabilityAPI.get_details(
VulnerabilityHit.get_vulnerabilities_ids(hits)
)
results = []
for hit in hits:
username = hit["username"]
subscribed = is_imunify_patch_enabled(username)
record = {
"id": hit["id"],
"username": username,
"file_path": hit["file_path"],
"status": hit["status"],
"app_name": "",
"imunify_patch_user_id": await get_imunify_patch_id(username),
"subscribed": subscribed,
"purchase_url": (
await get_imunify_patch_purchase_url(username)
if not subscribed
else None
),
"vulnerabilities": [],
}
for vuln_id in VulnerabilityHit.get_vulnerability_ids(hit["type"]):
record["vulnerabilities"].append(
{
"cve_id": vuln_info[vuln_id]["cveId"],
"vulnerability_type": vuln_info[vuln_id]["type"],
"vulnerability_description": vuln_info[vuln_id][
"name"
],
}
)
if not record["app_name"]: # set it once
record["app_name"] = vuln_info[vuln_id]["app"]
results.append(record)
return max_count, results
@bind("vulnerabilities", "file", "patch")
async def vulnerabilities_file_patch(self, paths, user=None):
query = VulnerabilityHit.select().where(
VulnerabilityHit.orig_file.in_(paths)
)
if user is not None:
query = query.where(VulnerabilityHit.user == user)
# make sure all associated users have patch permission
user_paths = defaultdict(list)
for vulnerability in query:
user_paths[vulnerability.user].append(vulnerability.orig_file)
for panel_user in user_paths.keys():
_check_imunify_patch_permission(panel_user)
if filelist := list(chain.from_iterable(user_paths.values())):
await self._sink.process_message(
MessageType.VulnerabilityPatchTask(
filelist=filelist, initiator=user, manual=True
)
)
@bind("vulnerabilities", "file", "revert")
async def vulnerabilities_file_revert(self, paths, user=None):
query = VulnerabilityHit.select().where(
VulnerabilityHit.orig_file.in_(paths),
VulnerabilityHit.status.in_([VulnerabilityHitStatus.PATCHED]),
)
if user is not None:
query = query.where(VulnerabilityHit.user == user)
hits = list(query)
succeeded, failed = await restore_hits(hits)
return {
"succeeded": [hit.orig_file for hit in succeeded],
"failed": [hit.orig_file for hit in failed],
}
def _check_imunify_patch_permission(user: str | None) -> None:
if not has_permission(MS_IMUNIFY_PATCH_ENABLED, user):
raise PermissionError(
"Unable to perform the command. "
f"User '{user}' does not have required permissions.",
)