""" This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program.  If not, see . Copyright © 2019 Cloud Linux Software Inc. This software is also available under ImunifyAV commercial license, see """ import asyncio import logging import os import shutil import tempfile import time from functools import cached_property from io import BytesIO from pathlib import Path from peewee import DoesNotExist from defence360agent.utils import safe_fileops from imav.contracts.config import Malware as Config from imav.malwarelib.config import MalwareHitStatus, MalwareScanResourceType from imav.malwarelib.cleanup.cleaner import MalwareCleaner from imav.malwarelib.cleanup.storage import CleanupStorage from imav.malwarelib.model import MalwareHit from imav.utils import get_files_diff logger = logging.getLogger(__name__) IMUNIFY_USER = "_imunify" IMUNIFY_GROUP = "_imunify" class DiffError(Exception): pass class SafeFilePath(os.PathLike): def __init__(self, path, user=None, missing_ok=False): self._path = Path(path) self._user = user self._missing_ok = missing_ok def __str__(self): return str(self._path) def __fspath__(self): return self.__str__() def __getattr__(self, attr): return getattr(self._path, attr) def check_readability(self) -> bool: """ Return True if the file is readable by the user or raise UnsafeFileOperation otherwise """ with self.safe_open(): return True def safe_open(self, mode="rb"): if self._missing_ok and not self._path.exists(): return BytesIO(b"") if self._user: return safe_fileops.safe_open_file( self._path, mode=mode, user=self._user, respect_homedir=False, ) else: return self.open(mode) class MalwareHitDiff: """ Used to compare infected and cleaned versions of a malicious file. """ def __init__(self, id: int, user: str = None): self._id = id self._user = user self._cleaner = MalwareCleaner( loop=None, sink=None, watch_progress=False ) @cached_property def hit(self): try: return MalwareHit.get( MalwareHit.id == self._id, MalwareHit.resource_type == MalwareScanResourceType.FILE.value, MalwareHit.malicious == True, # noqa: E712 *([MalwareHit.user == self._user] * bool(self._user)), ) except DoesNotExist: raise DiffError( f"No malware file hit found (id={self._id}," f" user={self._user})." ) async def get_unified_diff_for_cleaned_file(self) -> bytes: diff = b"" # compare the current cleaned version with the original file if self.hit.status in MalwareHitStatus.CLEANED: cleaned_file_path = SafeFilePath( self.hit.orig_file_path, user=self._user, missing_ok=True, ) infected_file_path = SafeFilePath( CleanupStorage.get_hit_store_path(self.hit), user=None, ) diff = await self._get_diff( infected_file_path, cleaned_file_path, cleaned_at=self.hit.cleaned_at, ) else: logger.warning( "Malware hit has unexpected status=%s. Use the empty diff.", self.hit.status, ) return diff async def clean_and_get_unified_diff(self) -> bytes: diff = b"" if self.hit.status == MalwareHitStatus.FOUND: # infected # clean copy of file and compare with the original file infected_file_path = SafeFilePath( self.hit.orig_file_path, user=self._user ) # do not attempt any of the following actions # if the user does not have read permissions infected_file_path.check_readability() with tempfile.NamedTemporaryFile( mode="w+", dir=Config.TEMP_CLEANUP_DIR ) as temp_file: cleaned_file_path = SafeFilePath( temp_file.name, user=None, missing_ok=True ) await safe_fileops.safe_move( self.hit.orig_file, cleaned_file_path, src_unlink=False, dst_overwrite=True, safe_src=False, safe_dst=True, ) # so that procu2.php has access to the file shutil.chown( cleaned_file_path, user=IMUNIFY_USER, group=IMUNIFY_GROUP ) result, error, cmd = await self._cleaner.start( IMUNIFY_USER, [str(cleaned_file_path)] ) hit_result = result.get(str(cleaned_file_path)) if hit_result and ( hit_result.is_cleaned() or hit_result.is_removed() ): diff = await self._get_diff( infected_file_path, cleaned_file_path, cleaned_at=time.time(), ) else: logger.warning( "File %s was not cleaned to check diff: %s, %s, %s", self.hit.orig_file, result, error, cmd, ) else: logger.warning( "Malware hit has unexpected status=%s. Use the empty diff.", self.hit.status, ) return diff async def _get_diff( self, infected_file_path: SafeFilePath, cleaned_file_path: SafeFilePath, *, cleaned_at: float, ): if not infected_file_path.exists(): raise FileNotFoundError( f"Original file not found for hit(id={self.hit.id})." ) if ( cleaned_file_path.exists() and cleaned_file_path.stat().st_ctime > cleaned_at ): raise DiffError( "The file was modified after cleaning, diff is not valid." ) with infected_file_path.safe_open() as infected_file, cleaned_file_path.safe_open() as cleaned_file: # don't block the whole loop while reading files loop = asyncio.get_event_loop() return await loop.run_in_executor( None, get_files_diff, infected_file, cleaned_file )