"""
This program is free software: you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License,
or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see .
Copyright © 2019 Cloud Linux Software Inc.
This software is also available under ImunifyAV commercial license,
see
"""
import logging
import os
import shlex
import subprocess
from collections import defaultdict
from datetime import datetime, timedelta
from functools import lru_cache
from typing import Optional
from pathlib import Path
from defence360agent.contracts.config import (
choose_value_from_config,
MalwareScanSchedule,
MalwareScanScheduleInterval as Interval,
)
from defence360agent.contracts.license import LicenseCLN
from defence360agent.subsys.panels.hosting_panel import HostingPanel
from defence360agent.utils import async_lru_cache
from imav.malwarelib.model import MalwareHit
from imav.malwarelib.scan.queue_supervisor_sync import (
QueueSupervisorSync as ScanQueue,
)
from imav.malwarelib.utils import user_list
from imav.model.wordpress import WPSite
from imav.wordpress import (
WP_CLI_WRAPPER_PATH,
)
from imav.wordpress.exception import PHPError
CAGEFS_ENTER_PATH = "/usr/sbin/cagefs_enter_user"
CAGEFS_CTL_PATH = "/usr/sbin/cagefsctl"
logger = logging.getLogger(__name__)
@async_lru_cache(ttl=60)
async def get_domain_paths() -> dict[str, list[str]]:
"""
Get a mapping of docroots to their associated domains, with caching.
"""
hosting_panel = HostingPanel()
panel_paths = await hosting_panel.get_domain_paths()
docroot_map = defaultdict(list)
for domain, docroots in panel_paths.items():
for docroot in docroots:
docroot_map[docroot].append(domain)
return docroot_map
def wp_wrapper(php_path: str, docroot: str) -> list:
"""Get wp cli common command list"""
return [str(WP_CLI_WRAPPER_PATH), php_path, docroot]
def get_data_dir(site: WPSite):
return Path(site.docroot) / "wp-content" / "imunify-security"
@lru_cache(maxsize=1)
def get_cagefs_enabled_users() -> set:
"""Get the list of users enabled for CageFS."""
if not os.path.isfile(CAGEFS_CTL_PATH) or not os.access(
CAGEFS_CTL_PATH, os.X_OK
):
return set()
result = subprocess.run(
[CAGEFS_CTL_PATH, "--list-enabled"], capture_output=True, text=True
)
if result.returncode != 0:
return set()
lines = result.stdout.strip().split("\n")
return set(lines[1:]) # Skip the first line which is a summary
def clear_get_cagefs_enabled_users_cache():
get_cagefs_enabled_users.cache_clear()
def build_command_for_user(username: str, args: list) -> list:
"""Build the necessary command to run the given cmdline args with specified user."""
if username in get_cagefs_enabled_users():
if os.path.isfile(CAGEFS_ENTER_PATH) and os.access(
CAGEFS_ENTER_PATH, os.X_OK
):
return [
CAGEFS_ENTER_PATH,
"--no-io-and-memory-limit",
username,
*args,
]
return [
"su",
"-s",
"/bin/bash",
username,
"-c",
shlex.join(args),
]
async def get_domains_for_docroot(
docroot: str, domain_to_exclude: str
) -> list[str]:
"""
Get all domains associated with a given document root, excluding one domain.
It's panel-agnostic and uses a cached mapping.
"""
docroot_map = await get_domain_paths()
all_domains = docroot_map.get(docroot, [])
return [domain for domain in all_domains if domain != domain_to_exclude]
async def get_php_binary_path(site: WPSite, username: str) -> Optional[str]:
"""Determine PHP binary path for the given WPSite."""
from clcommon.cpapi import (
get_domains_php_info,
get_installed_php_versions,
)
domains_php_info = get_domains_php_info()
installed_php_versions = get_installed_php_versions()
def find_php_binary_for_domain(domain: str) -> Optional[str]:
domain_info = domains_php_info.get(domain)
if not domain_info or domain_info.get("username") != username:
return None
php_display_version = domain_info.get("display_version")
if not php_display_version:
return None
for php_version in installed_php_versions:
if php_version.get("identifier") == php_display_version:
return php_version.get("bin")
return None
# First, try with the main domain of the site.
php_binary_path = find_php_binary_for_domain(site.domain)
if php_binary_path:
return php_binary_path
# If not found, try with other domains for the site's docroot.
domains = await get_domains_for_docroot(
site.docroot, domain_to_exclude=site.domain
)
for domain in domains:
php_binary_path = find_php_binary_for_domain(domain)
if php_binary_path:
return php_binary_path
raise PHPError(
f"PHP binary was not identified for docroot: {site.docroot}, username:"
f" {username}"
)
def get_malware_history(username: str) -> list:
"""
Get malware history for the specified user.
This is an equivalent of calling `imunify360-agent malware history list --user {username}`.
``
"""
(max_count, hits) = MalwareHit.malicious_list(user=username)
return hits
async def get_last_scan(sink, username: str) -> dict:
"""
Get the last scan for the specified user.
This is an equivalent of calling `imunify360-agent malware user list --user {username}`.
"""
queue = ScanQueue(sink)
_, users = await user_list.fetch_user_list(
queue.get_scans_from_paths, match={username}
)
if not users:
return {}
users = user_list.sort(users, "scan_date", desc=True)
return users[0]
def calculate_next_scan_timestamp():
today = datetime.utcnow()
if MalwareScanSchedule.INTERVAL == Interval.DAY:
next_scan = today.replace(
hour=MalwareScanSchedule.HOUR,
minute=0,
second=0,
microsecond=0,
)
if today >= next_scan:
next_scan += timedelta(days=1)
return next_scan.timestamp()
if MalwareScanSchedule.INTERVAL == Interval.WEEK:
# today.weekday() returns 0 for Monday, 6 for Sunday, but MalwareScanSchedule.DAY_OF_WEEK uses 0 for Sunday,
# 1 for Monday, ..., 6 for Saturday. So we need to adjust the calculation.
days_ahead = (
MalwareScanSchedule.DAY_OF_WEEK - (today.weekday() + 1) % 7 + 7
) % 7
if days_ahead == 0 and today.hour >= MalwareScanSchedule.HOUR:
days_ahead = 7
next_scan_date = today + timedelta(days=days_ahead)
return next_scan_date.replace(
hour=MalwareScanSchedule.HOUR, minute=0, second=0, microsecond=0
).timestamp()
if MalwareScanSchedule.INTERVAL == Interval.MONTH:
next_scan_date = today.replace(
day=MalwareScanSchedule.DAY_OF_MONTH,
hour=MalwareScanSchedule.HOUR,
minute=0,
second=0,
microsecond=0,
)
if today.day > MalwareScanSchedule.DAY_OF_MONTH or (
today.day == MalwareScanSchedule.DAY_OF_MONTH
and today.hour >= MalwareScanSchedule.HOUR
):
next_month = today.month + 1 if today.month < 12 else 1
next_scan_date = next_scan_date.replace(month=next_month)
if next_month == 1: # Handle year change
next_scan_date = next_scan_date.replace(year=today.year + 1)
return next_scan_date.timestamp()
def prepare_scan_data(
last_scan_time: float,
next_scan_time: float,
username: str,
site: WPSite,
malware_by_site: dict,
) -> dict:
"""
Prepare scan data JSON for a WordPress site.
Args:
last_scan_time: Timestamp of the last scan
next_scan_time: Timestamp of the next scheduled scan
username: Username of the site owner
site: WordPress site object
malware_by_site: Dictionary mapping site docroots to their malware hits
Returns:
dict: JSON data ready to be written to scan_data.php. The response includes:
- lastScanTimestamp: Timestamp of the last scan
- nextScanTimestamp: Timestamp of the next scheduled scan
- username: Username of the site owner
- malware: List of malware hits for the site
- config: Configuration items for the site
- license: License information including status and eligibility for Imunify patch
"""
# Define the config sections and options needed
config_sections = [
("MALWARE_SCANNING", "enable_scan_cpanel"),
("MALWARE_SCANNING", "default_action"),
("PROACTIVE_DEFENCE", "blamer"),
]
# Build the config items
config_items = {}
for section, option in config_sections:
if section not in config_items:
config_items[section] = {}
try:
value, _ = choose_value_from_config(
section,
option,
username=username,
)
except KeyError:
value = None
config_items[section][option] = value
return {
"lastScanTimestamp": last_scan_time,
"nextScanTimestamp": next_scan_time,
"username": username,
"malware": malware_by_site.get(site.docroot, []),
"config": config_items,
"license": LicenseCLN.license_info(),
}