"""
This program is free software: you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License,
or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see .
Copyright © 2019 Cloud Linux Software Inc.
This software is also available under ImunifyAV commercial license,
see
"""
import asyncio
import logging
import os
import shutil
import tempfile
import time
from functools import cached_property
from io import BytesIO
from pathlib import Path
from peewee import DoesNotExist
from defence360agent.utils import safe_fileops
from imav.contracts.config import Malware as Config
from imav.malwarelib.config import MalwareHitStatus, MalwareScanResourceType
from imav.malwarelib.cleanup.cleaner import MalwareCleaner
from imav.malwarelib.cleanup.storage import CleanupStorage
from imav.malwarelib.model import MalwareHit
from imav.utils import get_files_diff
logger = logging.getLogger(__name__)
IMUNIFY_USER = "_imunify"
IMUNIFY_GROUP = "_imunify"
class DiffError(Exception):
pass
class SafeFilePath(os.PathLike):
def __init__(self, path, user=None, missing_ok=False):
self._path = Path(path)
self._user = user
self._missing_ok = missing_ok
def __str__(self):
return str(self._path)
def __fspath__(self):
return self.__str__()
def __getattr__(self, attr):
return getattr(self._path, attr)
def check_readability(self) -> bool:
"""
Return True if the file is readable by the user or
raise UnsafeFileOperation otherwise
"""
with self.safe_open():
return True
def safe_open(self, mode="rb"):
if self._missing_ok and not self._path.exists():
return BytesIO(b"")
if self._user:
return safe_fileops.safe_open_file(
self._path,
mode=mode,
user=self._user,
respect_homedir=False,
)
else:
return self.open(mode)
class MalwareHitDiff:
"""
Used to compare infected and cleaned versions of a malicious file.
"""
def __init__(self, id: int, user: str = None):
self._id = id
self._user = user
self._cleaner = MalwareCleaner(
loop=None, sink=None, watch_progress=False
)
@cached_property
def hit(self):
try:
return MalwareHit.get(
MalwareHit.id == self._id,
MalwareHit.resource_type == MalwareScanResourceType.FILE.value,
MalwareHit.malicious == True, # noqa: E712
*([MalwareHit.user == self._user] * bool(self._user)),
)
except DoesNotExist:
raise DiffError(
f"No malware file hit found (id={self._id},"
f" user={self._user})."
)
async def get_unified_diff_for_cleaned_file(self) -> bytes:
diff = b""
# compare the current cleaned version with the original file
if self.hit.status in MalwareHitStatus.CLEANED:
cleaned_file_path = SafeFilePath(
self.hit.orig_file_path,
user=self._user,
missing_ok=True,
)
infected_file_path = SafeFilePath(
CleanupStorage.get_hit_store_path(self.hit),
user=None,
)
diff = await self._get_diff(
infected_file_path,
cleaned_file_path,
cleaned_at=self.hit.cleaned_at,
)
else:
logger.warning(
"Malware hit has unexpected status=%s. Use the empty diff.",
self.hit.status,
)
return diff
async def clean_and_get_unified_diff(self) -> bytes:
diff = b""
if self.hit.status == MalwareHitStatus.FOUND: # infected
# clean copy of file and compare with the original file
infected_file_path = SafeFilePath(
self.hit.orig_file_path, user=self._user
)
# do not attempt any of the following actions
# if the user does not have read permissions
infected_file_path.check_readability()
with tempfile.NamedTemporaryFile(
mode="w+", dir=Config.TEMP_CLEANUP_DIR
) as temp_file:
cleaned_file_path = SafeFilePath(
temp_file.name, user=None, missing_ok=True
)
await safe_fileops.safe_move(
self.hit.orig_file,
cleaned_file_path,
src_unlink=False,
dst_overwrite=True,
safe_src=False,
safe_dst=True,
)
# so that procu2.php has access to the file
shutil.chown(
cleaned_file_path, user=IMUNIFY_USER, group=IMUNIFY_GROUP
)
result, error, cmd = await self._cleaner.start(
IMUNIFY_USER, [str(cleaned_file_path)]
)
hit_result = result.get(str(cleaned_file_path))
if hit_result and (
hit_result.is_cleaned() or hit_result.is_removed()
):
diff = await self._get_diff(
infected_file_path,
cleaned_file_path,
cleaned_at=time.time(),
)
else:
logger.warning(
"File %s was not cleaned to check diff: %s, %s, %s",
self.hit.orig_file,
result,
error,
cmd,
)
else:
logger.warning(
"Malware hit has unexpected status=%s. Use the empty diff.",
self.hit.status,
)
return diff
async def _get_diff(
self,
infected_file_path: SafeFilePath,
cleaned_file_path: SafeFilePath,
*,
cleaned_at: float,
):
if not infected_file_path.exists():
raise FileNotFoundError(
f"Original file not found for hit(id={self.hit.id})."
)
if (
cleaned_file_path.exists()
and cleaned_file_path.stat().st_ctime > cleaned_at
):
raise DiffError(
"The file was modified after cleaning, diff is not valid."
)
with infected_file_path.safe_open() as infected_file, cleaned_file_path.safe_open() as cleaned_file:
# don't block the whole loop while reading files
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None, get_files_diff, infected_file, cleaned_file
)