"""
This program is free software: you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License,
or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see .
Copyright © 2019 Cloud Linux Software Inc.
This software is also available under ImunifyAV commercial license,
see
"""
import asyncio
from logging import getLogger
from typing import TYPE_CHECKING, Any
from defence360agent.contracts.messages import MessageType
from defence360agent.contracts.plugins import (
MessageSink,
MessageSource,
expect,
)
from defence360agent.utils import Scope
from imav.contracts.imunify_patch_id import (
get_imunify_patch_id,
)
from imav.contracts.messages import VulnerabilityPatchTask
from imav.malwarelib.api.imunify_patch_subscription import (
ImunifyPatchSubscriptionAPI,
)
from imav.malwarelib.config import VulnerabilityHitStatus
from imav.malwarelib.model import ImunifyPatchSubscription, VulnerabilityHit
from imav.malwarelib.utils import user_list
if TYPE_CHECKING:
from imav.contracts.imunify_patch_id import ImunifyPatchUserId
from imav.contracts.messages import RefreshImunifyPatchSubscription
logger = getLogger(__name__)
class ImunifyPatchSubscriptionPlugin(MessageSink, MessageSource):
SCOPE = Scope.AV
async def create_sink(self, loop):
pass
async def create_source(self, loop, sink):
self._sink = sink
def __init__(self) -> None:
self._lock = asyncio.Lock()
async def _get_users_map(self) -> dict["ImunifyPatchUserId", Any]:
users_map = {}
for user in await user_list.panel_users():
patch_id = await get_imunify_patch_id(user["user"])
if patch_id:
users_map[patch_id] = user["user"]
return users_map
@expect(MessageType.RefreshImunifyPatchSubscription)
async def refresh_subscription(
self, _: "RefreshImunifyPatchSubscription"
) -> None:
async with self._lock: # handle concurrent updates
users = await self._get_users_map()
if not users:
logger.warning("No users found while refreshing subscription")
return
subscribed_users_ids = (
await ImunifyPatchSubscriptionAPI.get_subscriptions(
list(users.keys())
)
)
# do not update subscription if something's wrong with API
if subscribed_users_ids is None:
return
for user_id in subscribed_users_ids:
if user_id in users:
await self._set_subscription(users[user_id])
unsubscribed_users = [
users[user_id]
for user_id in users
if user_id not in subscribed_users_ids
]
self._unset_subscription(unsubscribed_users)
async def _set_subscription(self, user: str) -> None:
_, created = ImunifyPatchSubscription.get_or_create(user_id=user)
if created:
paths = [
x.orig_file
for x in VulnerabilityHit.select(
VulnerabilityHit.orig_file
).where(
VulnerabilityHit.user == user,
VulnerabilityHit.status
== VulnerabilityHitStatus.VULNERABLE,
)
]
if paths:
await self._sink.process_message(
VulnerabilityPatchTask(filelist=paths, user=user)
)
def _unset_subscription(self, users: list[str]) -> None:
ImunifyPatchSubscription.delete().where(
ImunifyPatchSubscription.user_id.in_(users)
).execute()