"""
This program is free software: you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License,
or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see .
Copyright © 2019 Cloud Linux Software Inc.
This software is also available under ImunifyAV commercial license,
see
"""
import logging
import os
import shlex
import subprocess
from datetime import datetime, timedelta
from functools import lru_cache
from typing import Optional
from clcommon.cpapi import (
get_domains_php_info,
get_installed_php_versions,
)
from defence360agent.contracts.config import (
MalwareScanSchedule,
MalwareScanScheduleInterval as Interval,
)
from imav.malwarelib.model import MalwareHit
from imav.malwarelib.scan.queue_supervisor_sync import (
QueueSupervisorSync as ScanQueue,
)
from imav.malwarelib.utils import user_list
from imav.wordpress import (
WP_CLI_WRAPPER_PATH,
)
from imav.wordpress.exception import PHPError
CAGEFS_ENTER_PATH = "/usr/sbin/cagefs_enter_user"
CAGEFS_CTL_PATH = "/usr/sbin/cagefsctl"
logger = logging.getLogger(__name__)
def wp_wrapper(php_path: str, docroot: str) -> list:
"""Get wp cli common command list"""
return [str(WP_CLI_WRAPPER_PATH), php_path, docroot]
@lru_cache(maxsize=1)
def get_cagefs_enabled_users() -> set:
"""Get the list of users enabled for CageFS."""
if not os.path.isfile(CAGEFS_CTL_PATH) or not os.access(
CAGEFS_CTL_PATH, os.X_OK
):
return set()
result = subprocess.run(
[CAGEFS_CTL_PATH, "--list-enabled"], capture_output=True, text=True
)
if result.returncode != 0:
return set()
lines = result.stdout.strip().split("\n")
return set(lines[1:]) # Skip the first line which is a summary
def clear_get_cagefs_enabled_users_cache():
get_cagefs_enabled_users.cache_clear()
def build_command_for_user(username: str, args: list) -> list:
"""Build the necessary command to run the given cmdline args with specified user."""
if username in get_cagefs_enabled_users():
if os.path.isfile(CAGEFS_ENTER_PATH) and os.access(
CAGEFS_ENTER_PATH, os.X_OK
):
return [
CAGEFS_ENTER_PATH,
"--no-io-and-memory-limit",
username,
*args,
]
return [
"su",
"-s",
"/bin/bash",
username,
"-c",
shlex.join(args),
]
def get_php_binary_path(domain: str, username: str) -> Optional[str]:
"""Determine PHP binary path for the given domain and username."""
logger.debug(
"Get php binary path for domain: %s, username: %s", domain, username
)
# Get the PHP version identifier for the given domain and username
domains_php_info = get_domains_php_info()
domain_info = domains_php_info.get(domain)
if not domain_info or domain_info.get("username") != username:
raise PHPError(
"Unable to get PHP version for domain: {domain}, username:"
" {username}".format(domain=domain, username=username)
)
php_display_version = domain_info.get("display_version")
if not php_display_version:
raise PHPError(
"PHP binary was not identified for domain: {domain}, username:"
" {username}".format(domain=domain, username=username)
)
# Get the PHP binary path from the installed PHP versions
installed_php_versions = get_installed_php_versions()
php_binary_path = None
for php_version in installed_php_versions:
if php_version.get("identifier") == php_display_version:
php_binary_path = php_version.get("bin")
break
if not php_binary_path:
raise PHPError(
"PHP binary was not identified for domain: {domain}, username:"
" {username}".format(domain=domain, username=username)
)
return php_binary_path
def get_malware_history(username: str) -> list:
"""
Get malware history for the specified user.
This is an equivalent of calling `imunify360-agent malware history list --user {username}`.
``
"""
(max_count, hits) = MalwareHit.malicious_list(user=username)
return hits
async def get_last_scan(sink, username: str) -> dict:
"""
Get the last scan for the specified user.
This is an equivalent of calling `imunify360-agent malware user list --user {username}`.
"""
queue = ScanQueue(sink)
_, users = await user_list.fetch_user_list(
queue.get_scans_from_paths, match={username}
)
if not users:
return {}
users = user_list.sort(users, "scan_date", desc=True)
return users[0]
def calculate_next_scan_timestamp():
today = datetime.utcnow()
if MalwareScanSchedule.INTERVAL == Interval.DAY:
next_scan = today.replace(
hour=MalwareScanSchedule.HOUR,
minute=0,
second=0,
microsecond=0,
)
if today >= next_scan:
next_scan += timedelta(days=1)
return next_scan.timestamp()
if MalwareScanSchedule.INTERVAL == Interval.WEEK:
# today.weekday() returns 0 for Monday, 6 for Sunday, but MalwareScanSchedule.DAY_OF_WEEK uses 0 for Sunday,
# 1 for Monday, ..., 6 for Saturday. So we need to adjust the calculation.
days_ahead = (
MalwareScanSchedule.DAY_OF_WEEK - (today.weekday() + 1) % 7 + 7
) % 7
if days_ahead == 0 and today.hour >= MalwareScanSchedule.HOUR:
days_ahead = 7
next_scan_date = today + timedelta(days=days_ahead)
return next_scan_date.replace(
hour=MalwareScanSchedule.HOUR, minute=0, second=0, microsecond=0
).timestamp()
if MalwareScanSchedule.INTERVAL == Interval.MONTH:
next_scan_date = today.replace(
day=MalwareScanSchedule.DAY_OF_MONTH,
hour=MalwareScanSchedule.HOUR,
minute=0,
second=0,
microsecond=0,
)
if today.day > MalwareScanSchedule.DAY_OF_MONTH or (
today.day == MalwareScanSchedule.DAY_OF_MONTH
and today.hour >= MalwareScanSchedule.HOUR
):
next_month = today.month + 1 if today.month < 12 else 1
next_scan_date = next_scan_date.replace(month=next_month)
if next_month == 1: # Handle year change
next_scan_date = next_scan_date.replace(year=today.year + 1)
return next_scan_date.timestamp()