""" This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program.  If not, see . Copyright © 2019 Cloud Linux Software Inc. This software is also available under ImunifyAV commercial license, see """ import logging import os import shlex import subprocess from datetime import datetime, timedelta from functools import lru_cache from typing import Optional from clcommon.cpapi import ( get_domains_php_info, get_installed_php_versions, ) from defence360agent.contracts.config import ( MalwareScanSchedule, MalwareScanScheduleInterval as Interval, ) from imav.malwarelib.model import MalwareHit from imav.malwarelib.scan.queue_supervisor_sync import ( QueueSupervisorSync as ScanQueue, ) from imav.malwarelib.utils import user_list from imav.wordpress import ( WP_CLI_WRAPPER_PATH, ) from imav.wordpress.exception import PHPError CAGEFS_ENTER_PATH = "/usr/sbin/cagefs_enter_user" CAGEFS_CTL_PATH = "/usr/sbin/cagefsctl" logger = logging.getLogger(__name__) def wp_wrapper(php_path: str, docroot: str) -> list: """Get wp cli common command list""" return [str(WP_CLI_WRAPPER_PATH), php_path, docroot] @lru_cache(maxsize=1) def get_cagefs_enabled_users() -> set: """Get the list of users enabled for CageFS.""" if not os.path.isfile(CAGEFS_CTL_PATH) or not os.access( CAGEFS_CTL_PATH, os.X_OK ): return set() result = subprocess.run( [CAGEFS_CTL_PATH, "--list-enabled"], capture_output=True, text=True ) if result.returncode != 0: return set() lines = result.stdout.strip().split("\n") return set(lines[1:]) # Skip the first line which is a summary def clear_get_cagefs_enabled_users_cache(): get_cagefs_enabled_users.cache_clear() def build_command_for_user(username: str, args: list) -> list: """Build the necessary command to run the given cmdline args with specified user.""" if username in get_cagefs_enabled_users(): if os.path.isfile(CAGEFS_ENTER_PATH) and os.access( CAGEFS_ENTER_PATH, os.X_OK ): return [ CAGEFS_ENTER_PATH, "--no-io-and-memory-limit", username, *args, ] return [ "su", "-s", "/bin/bash", username, "-c", shlex.join(args), ] def get_php_binary_path(domain: str, username: str) -> Optional[str]: """Determine PHP binary path for the given domain and username.""" logger.debug( "Get php binary path for domain: %s, username: %s", domain, username ) # Get the PHP version identifier for the given domain and username domains_php_info = get_domains_php_info() domain_info = domains_php_info.get(domain) if not domain_info or domain_info.get("username") != username: raise PHPError( "Unable to get PHP version for domain: {domain}, username:" " {username}".format(domain=domain, username=username) ) php_display_version = domain_info.get("display_version") if not php_display_version: raise PHPError( "PHP binary was not identified for domain: {domain}, username:" " {username}".format(domain=domain, username=username) ) # Get the PHP binary path from the installed PHP versions installed_php_versions = get_installed_php_versions() php_binary_path = None for php_version in installed_php_versions: if php_version.get("identifier") == php_display_version: php_binary_path = php_version.get("bin") break if not php_binary_path: raise PHPError( "PHP binary was not identified for domain: {domain}, username:" " {username}".format(domain=domain, username=username) ) return php_binary_path def get_malware_history(username: str) -> list: """ Get malware history for the specified user. This is an equivalent of calling `imunify360-agent malware history list --user {username}`. `` """ (max_count, hits) = MalwareHit.malicious_list(user=username) return hits async def get_last_scan(sink, username: str) -> dict: """ Get the last scan for the specified user. This is an equivalent of calling `imunify360-agent malware user list --user {username}`. """ queue = ScanQueue(sink) _, users = await user_list.fetch_user_list( queue.get_scans_from_paths, match={username} ) if not users: return {} users = user_list.sort(users, "scan_date", desc=True) return users[0] def calculate_next_scan_timestamp(): today = datetime.utcnow() if MalwareScanSchedule.INTERVAL == Interval.DAY: next_scan = today.replace( hour=MalwareScanSchedule.HOUR, minute=0, second=0, microsecond=0, ) if today >= next_scan: next_scan += timedelta(days=1) return next_scan.timestamp() if MalwareScanSchedule.INTERVAL == Interval.WEEK: # today.weekday() returns 0 for Monday, 6 for Sunday, but MalwareScanSchedule.DAY_OF_WEEK uses 0 for Sunday, # 1 for Monday, ..., 6 for Saturday. So we need to adjust the calculation. days_ahead = ( MalwareScanSchedule.DAY_OF_WEEK - (today.weekday() + 1) % 7 + 7 ) % 7 if days_ahead == 0 and today.hour >= MalwareScanSchedule.HOUR: days_ahead = 7 next_scan_date = today + timedelta(days=days_ahead) return next_scan_date.replace( hour=MalwareScanSchedule.HOUR, minute=0, second=0, microsecond=0 ).timestamp() if MalwareScanSchedule.INTERVAL == Interval.MONTH: next_scan_date = today.replace( day=MalwareScanSchedule.DAY_OF_MONTH, hour=MalwareScanSchedule.HOUR, minute=0, second=0, microsecond=0, ) if today.day > MalwareScanSchedule.DAY_OF_MONTH or ( today.day == MalwareScanSchedule.DAY_OF_MONTH and today.hour >= MalwareScanSchedule.HOUR ): next_month = today.month + 1 if today.month < 12 else 1 next_scan_date = next_scan_date.replace(month=next_month) if next_month == 1: # Handle year change next_scan_date = next_scan_date.replace(year=today.year + 1) return next_scan_date.timestamp()