"""
This program is free software: you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License,
or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see .
Copyright © 2019 Cloud Linux Software Inc.
This software is also available under ImunifyAV commercial license,
see
"""
import asyncio
import json
import logging
import os
import pwd
import shutil
import time
from collections import defaultdict
from pathlib import Path
import sentry_sdk
from peewee import SqliteDatabase
from defence360agent.api import inactivity
from defence360agent.contracts.config import (
MalwareScanSchedule,
MalwareScanScheduleInterval as Interval,
)
from defence360agent.utils import atomic_rewrite, check_run
from imav.model.wordpress import WordpressSite, WPSite
from imav.wordpress import cli, PLUGIN_SLUG, telemetry
from imav.wordpress.utils import (
build_command_for_user,
calculate_next_scan_timestamp,
clear_get_cagefs_enabled_users_cache,
get_last_scan,
get_malware_history,
)
logger = logging.getLogger(__name__)
COMPONENTS_DB_PATH = Path(
"/var/lib/cloudlinux-app-version-detector/components_versions.sqlite3"
)
def get_data_dir(site: WPSite):
return Path(site.docroot) / "wp-content" / "imunify-security"
async def _get_scan_data_for_user(sink, username: str, uid: int):
# Get the last scan data
last_scan = await get_last_scan(sink, username)
# Extract the last scan date
last_scan_time = last_scan.get("scan_date", None)
next_scan_time = None
if MalwareScanSchedule.INTERVAL != Interval.NONE:
next_scan_time = calculate_next_scan_timestamp()
# Get all WordPress sites for the user (the main site is always last)
all_users_sites = get_sites_for_user(uid)
# Get the malware history for the user
malware_history = get_malware_history(username)
# Split malware history by site. This part relies on the main site being the last one in the list.
# Without this all malware could be attributed to the main site.
malware_by_site = defaultdict(list)
for item in malware_history:
if item["resource_type"] == "file":
for site_path in all_users_sites:
if item["file"].startswith(site_path):
malware_by_site[site_path].append(item)
break
return last_scan_time, next_scan_time, malware_by_site
async def _send_telemetry_task(coro, semaphore: asyncio.Semaphore):
async with semaphore:
try:
await coro
except Exception as e:
logger.error(f"Telemetry task failed: {e}")
async def process_telemetry_tasks(coroutines: list, concurrency=10):
semaphore = asyncio.Semaphore(concurrency)
tasks = [
asyncio.create_task(_send_telemetry_task(coro, semaphore))
for coro in coroutines
]
try:
await asyncio.gather(*tasks)
except Exception as e:
logger.error(f"Some telemetry tasks failed: {e}")
async def install_for_users(users: set[str], sink):
"""Install the imunify-security plugin for all sites where it is not installed."""
logger.info("Installing imunify-security wp plugin")
# Keep track of the installed sites
installed = set()
telemetry_coros = []
with inactivity.track.task("wp-plugin-installation"):
try:
clear_get_cagefs_enabled_users_cache()
to_install = _get_sites_without_plugin() - set(
WPSite(r.docroot, r.domain, r.uid)
for r in WordpressSite.select()
)
if not to_install:
return
# Group sites by user id
sites_by_user = defaultdict(list)
for site in to_install:
sites_by_user[site.uid].append(site)
# Now iterate over the grouped sites
for uid, sites in sites_by_user.items():
try:
username = pwd.getpwuid(uid).pw_name
except Exception as error:
sentry_sdk.capture_message(
"Skipping installation of WordPress plugin on"
" {count} site(s) because they belong to user"
" {user} and it is not possible to retrieve"
" username for this user. Reason: {reason}".format(
count=len(sites),
user=uid,
reason=error,
),
level="warning",
)
continue
if username not in users:
# Skip the user if it's not in the list of users to install the plugin for
continue
(
last_scan_time,
next_scan_time,
malware_by_site,
) = await _get_scan_data_for_user(sink, username, uid)
for site in sites:
try:
# Check if site is correctly installed and accessible using WP CLI
is_wordpress_installed = (
await cli.is_wordpress_installed(site)
)
if not is_wordpress_installed:
sentry_sdk.capture_message(
"WordPress site is not accessible using WP"
" CLI. site={site}".format(site=site),
level="warning",
)
continue
# Prepare the JSON data
json_data = {
"lastScanTimestamp": last_scan_time,
"nextScanTimestamp": next_scan_time,
"malware": malware_by_site.get(site.docroot, []),
}
# Create the scan data file
await update_scan_data_file(site, json_data)
# Install the plugin
await cli.plugin_install(site)
installed.add(site)
# Prepare telemetry
telemetry_coros.append(
telemetry.send_event(
sink=sink,
event="installed_by_imunify",
site=site,
)
)
except Exception as error:
logger.error(
"Failed to install plugin to site=%s error=%s",
site,
error,
)
logger.info(
"Installed imunify-security wp plugin on %d sites",
len(installed),
)
except asyncio.CancelledError:
logger.info(
"Installation imunify-security wp plugin was cancelled. Plugin"
" was installed for %d sites",
len(installed),
)
except Exception as error:
logger.error(
"Error occurred during plugin installation. error=%s", error
)
raise
finally:
WordpressSite.insert_many(
[
{
"domain": site.domain,
"docroot": site.docroot,
"uid": site.uid,
"manually_deleted_at": None,
}
for site in installed
]
).execute()
# Send telemetry
await process_telemetry_tasks(telemetry_coros)
async def delete_plugin_files(site: WPSite):
data_dir = get_data_dir(site)
if data_dir.exists():
await asyncio.to_thread(shutil.rmtree, data_dir)
async def remove_all_installed(sink):
"""Remove the imunify-security plugin from all sites where it is installed."""
logger.info("Deleting imunify-security wp plugin")
telemetry_coros = []
affected = 0
with inactivity.track.task("wp-plugin-removal"):
try:
to_remove = WordpressSite.select().where(
WordpressSite.manually_deleted_at.is_null(True)
)
for site in to_remove:
try:
# Uninstall the plugin from WordPress site.
await cli.plugin_deactivate(site)
# Delete the data files from the site.
await delete_plugin_files(site)
# Delete the site from database.
affected += (
WordpressSite.delete()
.where(WordpressSite.docroot == site.docroot)
.execute()
)
# Send telemetry
telemetry_coros.append(
telemetry.send_event(
sink=sink,
event="uninstalled_by_imunify",
site=site,
)
)
except Exception as error:
logger.error(
"Failed to remove plugin from %s %s", site, error
)
except asyncio.CancelledError:
logger.info(
"Deleting imunify-security wp plugin was cancelled. Plugin was"
" deleted from %d sites",
len(to_remove),
)
except Exception as error:
logger.error("Error occurred during plugin deleting. %s", error)
raise
finally:
logger.info(
"Removed imunify-security wp plugin from %s sites",
affected,
)
if affected > 0:
# send telemetry
await process_telemetry_tasks(telemetry_coros)
async def mark_site_as_manually_deleted(site, now):
logger.info(
"Mark site %s as manually deleted at %s (WP-Plugin removed)", site, now
)
(
WordpressSite.update(manually_deleted_at=now)
.where(WordpressSite.docroot == site.docroot)
.execute()
)
async def tidy_up_manually_deleted(sink):
telemetry_coros = []
try:
to_mark_as_manually_removed = _get_sites_without_plugin() & set(
WPSite(r.docroot, r.domain, r.uid)
for r in WordpressSite.select().where(
WordpressSite.manually_deleted_at.is_null()
)
)
if to_mark_as_manually_removed:
now = time.time()
for site in to_mark_as_manually_removed:
await mark_site_as_manually_deleted(site, now)
# Prepare telemetry
telemetry_coros.append(
telemetry.send_event(
sink=sink,
event="removed_by_user",
site=site,
)
)
except Exception as error:
logger.error("Error occurred during site tidy up. %s", error)
finally:
if telemetry_coros:
await process_telemetry_tasks(telemetry_coros)
async def update_data_on_sites(sink, sites: list[WPSite]):
if not sites:
return
# Group sites by user id
sites_by_user = defaultdict(list)
for site in sites:
sites_by_user[site.uid].append(site)
# Now iterate over the grouped sites
for uid, sites in sites_by_user.items():
try:
username = pwd.getpwuid(uid).pw_name
except Exception as error:
logger.error(
"Failed to get username for uid=%d. error=%s",
uid,
error,
)
continue
(
last_scan_time,
next_scan_time,
malware_by_site,
) = await _get_scan_data_for_user(sink, username, uid)
for site in sites:
try:
# Prepare the JSON data
json_data = {
"lastScanTimestamp": last_scan_time,
"nextScanTimestamp": next_scan_time,
"malware": malware_by_site.get(site.docroot, []),
}
# Update the scan data file
await update_scan_data_file(site, json_data)
except Exception as error:
logger.error(
"Failed to update scan data on site=%s error=%s",
site,
error,
)
async def update_scan_data_file(site: WPSite, json_data: dict):
# Get the gid for the given user
user_info = pwd.getpwuid(site.uid)
gid = user_info.pw_gid
# Create data directory
data_dir = get_data_dir(site)
if os.path.islink(data_dir):
# If the data directory is a symlink, interrupt the process.
raise Exception(
"Data directory %s is a symlink, skipping.", str(data_dir)
)
if not data_dir.exists():
command = build_command_for_user(
user_info.pw_name,
[
"mkdir",
"-p",
str(data_dir),
],
)
await check_run(command)
if not data_dir.exists():
# Directory creation failed. Interrupt the process.
raise Exception(
"Failed to create directory %s for user %s",
str(data_dir),
user_info.pw_name,
)
# we can safely change the permissions of the directory because we just created it
data_dir.chmod(0o750)
scan_data_path = data_dir / "scan_data.php"
# Format the PHP file content
php_content = (
" set[WPSite]:
"""
Get a set of wp sites where imunify-security plugin is not installed.
The data is pulled from the app-version-detector database.
"""
if not COMPONENTS_DB_PATH.exists():
logger.error(
"App detector database '%s' couldn't be found.",
str(COMPONENTS_DB_PATH),
)
return set()
cursor = SqliteDatabase(COMPONENTS_DB_PATH).execute_sql(
f"""
WITH latest_reports AS (
SELECT id, uid, domain
FROM report
WHERE id IN (
SELECT MAX(id)
FROM report
WHERE domain IS NOT NULL
AND domain != ''
GROUP BY dir
)
)
SELECT wp.real_path, lr.domain, lr.uid
FROM apps AS wp
INNER JOIN latest_reports AS lr
ON wp.report_id = lr.id
WHERE wp.title = 'wp_core'
AND wp.parent_id IS NULL
AND NOT EXISTS (
SELECT 1
FROM apps AS plugin
WHERE plugin.parent_id = wp.id
AND plugin.title = 'wp_plugin_{PLUGIN_SLUG.replace("-", "_")}'
)
"""
)
return {
WPSite(docroot=row[0], domain=row[1], uid=int(row[2]))
for row in cursor.fetchall()
}
def get_sites_for_user(uid: int) -> list[str]:
"""
Get a set of paths to WordPress sites belonging to a particular user. Paths are sorted by their length to make sure
that the main site is the last one in the list.
The data is pulled from the app-version-detector database.
"""
if not COMPONENTS_DB_PATH.exists():
logger.error(
"App detector database '%s' couldn't be found.",
str(COMPONENTS_DB_PATH),
)
return list()
cursor = SqliteDatabase(COMPONENTS_DB_PATH).execute_sql(
f"""
WITH latest_reports AS (
SELECT MAX(id) as id
FROM report
WHERE uid = {uid}
GROUP BY dir
)
SELECT wp.real_path
FROM apps AS wp
INNER JOIN latest_reports AS lr
ON wp.report_id = lr.id
WHERE wp.title = 'wp_core'
AND wp.parent_id IS NULL
GROUP BY wp.real_path
ORDER BY length(wp.real_path) DESC
"""
)
return [row[0] for row in cursor.fetchall()]
def get_sites_by_path(path: str) -> list[WPSite]:
"""
Get a set of wp sites by given path.
The data is pulled from the app-version-detector database.
"""
if not COMPONENTS_DB_PATH.exists():
logger.error(
"App detector database '%s' couldn't be found.",
str(COMPONENTS_DB_PATH),
)
return list()
# Append * to the path to get all sites that start with the given path. Only if the path doesn't already end with *.
if not path.endswith("*"):
path += "/*"
cursor = SqliteDatabase(COMPONENTS_DB_PATH).execute_sql(
f"""
WITH latest_reports AS (
SELECT id, uid, domain
FROM report
WHERE id IN (
SELECT MAX(id)
FROM report
WHERE domain IS NOT NULL
AND domain != ''
GROUP BY dir
)
)
SELECT wp.real_path, lr.domain, lr.uid
FROM apps AS wp
INNER JOIN latest_reports AS lr
ON wp.report_id = lr.id
WHERE wp.title = 'wp_core'
AND wp.parent_id IS NULL
AND wp.real_path GLOB '{path}'
"""
)
return [
WPSite(docroot=row[0], domain=row[1], uid=int(row[2]))
for row in cursor.fetchall()
]