""" This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program.  If not, see . Copyright © 2019 Cloud Linux Software Inc. This software is also available under ImunifyAV commercial license, see """ import asyncio import json import logging import os import pwd import shutil import time from collections import defaultdict from pathlib import Path import sentry_sdk from peewee import SqliteDatabase from defence360agent.api import inactivity from defence360agent.contracts.config import ( MalwareScanSchedule, MalwareScanScheduleInterval as Interval, ) from defence360agent.utils import atomic_rewrite, check_run from imav.model.wordpress import WordpressSite, WPSite from imav.wordpress import cli, PLUGIN_SLUG, telemetry from imav.wordpress.utils import ( build_command_for_user, calculate_next_scan_timestamp, clear_get_cagefs_enabled_users_cache, get_last_scan, get_malware_history, ) logger = logging.getLogger(__name__) COMPONENTS_DB_PATH = Path( "/var/lib/cloudlinux-app-version-detector/components_versions.sqlite3" ) def get_data_dir(site: WPSite): return Path(site.docroot) / "wp-content" / "imunify-security" async def _get_scan_data_for_user(sink, username: str, uid: int): # Get the last scan data last_scan = await get_last_scan(sink, username) # Extract the last scan date last_scan_time = last_scan.get("scan_date", None) next_scan_time = None if MalwareScanSchedule.INTERVAL != Interval.NONE: next_scan_time = calculate_next_scan_timestamp() # Get all WordPress sites for the user (the main site is always last) all_users_sites = get_sites_for_user(uid) # Get the malware history for the user malware_history = get_malware_history(username) # Split malware history by site. This part relies on the main site being the last one in the list. # Without this all malware could be attributed to the main site. malware_by_site = defaultdict(list) for item in malware_history: if item["resource_type"] == "file": for site_path in all_users_sites: if item["file"].startswith(site_path): malware_by_site[site_path].append(item) break return last_scan_time, next_scan_time, malware_by_site async def _send_telemetry_task(coro, semaphore: asyncio.Semaphore): async with semaphore: try: await coro except Exception as e: logger.error(f"Telemetry task failed: {e}") async def process_telemetry_tasks(coroutines: list, concurrency=10): semaphore = asyncio.Semaphore(concurrency) tasks = [ asyncio.create_task(_send_telemetry_task(coro, semaphore)) for coro in coroutines ] try: await asyncio.gather(*tasks) except Exception as e: logger.error(f"Some telemetry tasks failed: {e}") async def install_for_users(users: set[str], sink): """Install the imunify-security plugin for all sites where it is not installed.""" logger.info("Installing imunify-security wp plugin") # Keep track of the installed sites installed = set() telemetry_coros = [] with inactivity.track.task("wp-plugin-installation"): try: clear_get_cagefs_enabled_users_cache() to_install = _get_sites_without_plugin() - set( WPSite(r.docroot, r.domain, r.uid) for r in WordpressSite.select() ) if not to_install: return # Group sites by user id sites_by_user = defaultdict(list) for site in to_install: sites_by_user[site.uid].append(site) # Now iterate over the grouped sites for uid, sites in sites_by_user.items(): try: username = pwd.getpwuid(uid).pw_name except Exception as error: sentry_sdk.capture_message( "Skipping installation of WordPress plugin on" " {count} site(s) because they belong to user" " {user} and it is not possible to retrieve" " username for this user. Reason: {reason}".format( count=len(sites), user=uid, reason=error, ), level="warning", ) continue if username not in users: # Skip the user if it's not in the list of users to install the plugin for continue ( last_scan_time, next_scan_time, malware_by_site, ) = await _get_scan_data_for_user(sink, username, uid) for site in sites: try: # Check if site is correctly installed and accessible using WP CLI is_wordpress_installed = ( await cli.is_wordpress_installed(site) ) if not is_wordpress_installed: sentry_sdk.capture_message( "WordPress site is not accessible using WP" " CLI. site={site}".format(site=site), level="warning", ) continue # Prepare the JSON data json_data = { "lastScanTimestamp": last_scan_time, "nextScanTimestamp": next_scan_time, "malware": malware_by_site.get(site.docroot, []), } # Create the scan data file await update_scan_data_file(site, json_data) # Install the plugin await cli.plugin_install(site) installed.add(site) # Prepare telemetry telemetry_coros.append( telemetry.send_event( sink=sink, event="installed_by_imunify", site=site, ) ) except Exception as error: logger.error( "Failed to install plugin to site=%s error=%s", site, error, ) logger.info( "Installed imunify-security wp plugin on %d sites", len(installed), ) except asyncio.CancelledError: logger.info( "Installation imunify-security wp plugin was cancelled. Plugin" " was installed for %d sites", len(installed), ) except Exception as error: logger.error( "Error occurred during plugin installation. error=%s", error ) raise finally: WordpressSite.insert_many( [ { "domain": site.domain, "docroot": site.docroot, "uid": site.uid, "manually_deleted_at": None, } for site in installed ] ).execute() # Send telemetry await process_telemetry_tasks(telemetry_coros) async def delete_plugin_files(site: WPSite): data_dir = get_data_dir(site) if data_dir.exists(): await asyncio.to_thread(shutil.rmtree, data_dir) async def remove_all_installed(sink): """Remove the imunify-security plugin from all sites where it is installed.""" logger.info("Deleting imunify-security wp plugin") telemetry_coros = [] affected = 0 with inactivity.track.task("wp-plugin-removal"): try: to_remove = WordpressSite.select().where( WordpressSite.manually_deleted_at.is_null(True) ) for site in to_remove: try: # Uninstall the plugin from WordPress site. await cli.plugin_deactivate(site) # Delete the data files from the site. await delete_plugin_files(site) # Delete the site from database. affected += ( WordpressSite.delete() .where(WordpressSite.docroot == site.docroot) .execute() ) # Send telemetry telemetry_coros.append( telemetry.send_event( sink=sink, event="uninstalled_by_imunify", site=site, ) ) except Exception as error: logger.error( "Failed to remove plugin from %s %s", site, error ) except asyncio.CancelledError: logger.info( "Deleting imunify-security wp plugin was cancelled. Plugin was" " deleted from %d sites", len(to_remove), ) except Exception as error: logger.error("Error occurred during plugin deleting. %s", error) raise finally: logger.info( "Removed imunify-security wp plugin from %s sites", affected, ) if affected > 0: # send telemetry await process_telemetry_tasks(telemetry_coros) async def mark_site_as_manually_deleted(site, now): logger.info( "Mark site %s as manually deleted at %s (WP-Plugin removed)", site, now ) ( WordpressSite.update(manually_deleted_at=now) .where(WordpressSite.docroot == site.docroot) .execute() ) async def tidy_up_manually_deleted(sink): telemetry_coros = [] try: to_mark_as_manually_removed = _get_sites_without_plugin() & set( WPSite(r.docroot, r.domain, r.uid) for r in WordpressSite.select().where( WordpressSite.manually_deleted_at.is_null() ) ) if to_mark_as_manually_removed: now = time.time() for site in to_mark_as_manually_removed: await mark_site_as_manually_deleted(site, now) # Prepare telemetry telemetry_coros.append( telemetry.send_event( sink=sink, event="removed_by_user", site=site, ) ) except Exception as error: logger.error("Error occurred during site tidy up. %s", error) finally: if telemetry_coros: await process_telemetry_tasks(telemetry_coros) async def update_data_on_sites(sink, sites: list[WPSite]): if not sites: return # Group sites by user id sites_by_user = defaultdict(list) for site in sites: sites_by_user[site.uid].append(site) # Now iterate over the grouped sites for uid, sites in sites_by_user.items(): try: username = pwd.getpwuid(uid).pw_name except Exception as error: logger.error( "Failed to get username for uid=%d. error=%s", uid, error, ) continue ( last_scan_time, next_scan_time, malware_by_site, ) = await _get_scan_data_for_user(sink, username, uid) for site in sites: try: # Prepare the JSON data json_data = { "lastScanTimestamp": last_scan_time, "nextScanTimestamp": next_scan_time, "malware": malware_by_site.get(site.docroot, []), } # Update the scan data file await update_scan_data_file(site, json_data) except Exception as error: logger.error( "Failed to update scan data on site=%s error=%s", site, error, ) async def update_scan_data_file(site: WPSite, json_data: dict): # Get the gid for the given user user_info = pwd.getpwuid(site.uid) gid = user_info.pw_gid # Create data directory data_dir = get_data_dir(site) if os.path.islink(data_dir): # If the data directory is a symlink, interrupt the process. raise Exception( "Data directory %s is a symlink, skipping.", str(data_dir) ) if not data_dir.exists(): command = build_command_for_user( user_info.pw_name, [ "mkdir", "-p", str(data_dir), ], ) await check_run(command) if not data_dir.exists(): # Directory creation failed. Interrupt the process. raise Exception( "Failed to create directory %s for user %s", str(data_dir), user_info.pw_name, ) # we can safely change the permissions of the directory because we just created it data_dir.chmod(0o750) scan_data_path = data_dir / "scan_data.php" # Format the PHP file content php_content = ( " set[WPSite]: """ Get a set of wp sites where imunify-security plugin is not installed. The data is pulled from the app-version-detector database. """ if not COMPONENTS_DB_PATH.exists(): logger.error( "App detector database '%s' couldn't be found.", str(COMPONENTS_DB_PATH), ) return set() cursor = SqliteDatabase(COMPONENTS_DB_PATH).execute_sql( f""" WITH latest_reports AS ( SELECT id, uid, domain FROM report WHERE id IN ( SELECT MAX(id) FROM report WHERE domain IS NOT NULL AND domain != '' GROUP BY dir ) ) SELECT wp.real_path, lr.domain, lr.uid FROM apps AS wp INNER JOIN latest_reports AS lr ON wp.report_id = lr.id WHERE wp.title = 'wp_core' AND wp.parent_id IS NULL AND NOT EXISTS ( SELECT 1 FROM apps AS plugin WHERE plugin.parent_id = wp.id AND plugin.title = 'wp_plugin_{PLUGIN_SLUG.replace("-", "_")}' ) """ ) return { WPSite(docroot=row[0], domain=row[1], uid=int(row[2])) for row in cursor.fetchall() } def get_sites_for_user(uid: int) -> list[str]: """ Get a set of paths to WordPress sites belonging to a particular user. Paths are sorted by their length to make sure that the main site is the last one in the list. The data is pulled from the app-version-detector database. """ if not COMPONENTS_DB_PATH.exists(): logger.error( "App detector database '%s' couldn't be found.", str(COMPONENTS_DB_PATH), ) return list() cursor = SqliteDatabase(COMPONENTS_DB_PATH).execute_sql( f""" WITH latest_reports AS ( SELECT MAX(id) as id FROM report WHERE uid = {uid} GROUP BY dir ) SELECT wp.real_path FROM apps AS wp INNER JOIN latest_reports AS lr ON wp.report_id = lr.id WHERE wp.title = 'wp_core' AND wp.parent_id IS NULL GROUP BY wp.real_path ORDER BY length(wp.real_path) DESC """ ) return [row[0] for row in cursor.fetchall()] def get_sites_by_path(path: str) -> list[WPSite]: """ Get a set of wp sites by given path. The data is pulled from the app-version-detector database. """ if not COMPONENTS_DB_PATH.exists(): logger.error( "App detector database '%s' couldn't be found.", str(COMPONENTS_DB_PATH), ) return list() # Append * to the path to get all sites that start with the given path. Only if the path doesn't already end with *. if not path.endswith("*"): path += "/*" cursor = SqliteDatabase(COMPONENTS_DB_PATH).execute_sql( f""" WITH latest_reports AS ( SELECT id, uid, domain FROM report WHERE id IN ( SELECT MAX(id) FROM report WHERE domain IS NOT NULL AND domain != '' GROUP BY dir ) ) SELECT wp.real_path, lr.domain, lr.uid FROM apps AS wp INNER JOIN latest_reports AS lr ON wp.report_id = lr.id WHERE wp.title = 'wp_core' AND wp.parent_id IS NULL AND wp.real_path GLOB '{path}' """ ) return [ WPSite(docroot=row[0], domain=row[1], uid=int(row[2])) for row in cursor.fetchall() ]