Source code for dexray_intercept.services.instrumentation

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import frida
import os
from typing import Optional, Callable, List
from datetime import datetime


[docs] class FridaBasedException(Exception): """Custom exception for Frida-related errors""" pass
[docs] class InstrumentationService: """Service for managing Frida instrumentation""" def __init__(self, process, frida_agent_script: str = "profiling.js", custom_scripts: Optional[List[str]] = None): self.process = process self.frida_agent_script = frida_agent_script self.script: Optional[frida.core.Script] = None self.message_handler: Optional[Callable] = None self.custom_scripts = custom_scripts or [] self.custom_script_instances: List[frida.core.Script] = []
[docs] def load_script(self) -> frida.core.Script: """Load and create the Frida script""" try: runtime = "qjs" script_path = self._get_script_path() with open(script_path, encoding='utf8', newline='\n') as f: script_string = f.read() self.script = self.process.create_script(script_string, runtime=runtime) if self.message_handler: self.script.on("message", self.message_handler) self.script.load() # Load custom scripts self._load_custom_scripts() return self.script except frida.ProcessNotFoundError: raise FridaBasedException("Unable to find target process") except frida.InvalidOperationError: raise FridaBasedException("Invalid operation! Please run Dexray Intercept in debug mode in order to understand the source of this error and report it.") except frida.TransportError: raise FridaBasedException("Timeout error due to some internal frida error's. Try to restart frida-server again.") except frida.ProtocolError: raise FridaBasedException("Connection is closed. Probably the target app crashed") except FileNotFoundError: raise FridaBasedException(f"Frida script not found: {script_path}") except Exception as e: raise FridaBasedException(f"Failed to load Frida script: {str(e)}")
def _load_custom_scripts(self): """Load custom Frida scripts""" for script_path in self.custom_scripts: try: if not os.path.exists(script_path): print(f"[-] Custom script not found: {script_path}") continue with open(script_path, 'r', encoding='utf-8') as f: script_content = f.read() # Create custom message handler that adds script identification script_name = os.path.basename(script_path) custom_handler = self._create_custom_script_handler(script_name) # Create and load the custom script custom_script = self.process.create_script(script_content, runtime="qjs") custom_script.on("message", custom_handler) custom_script.load() self.custom_script_instances.append(custom_script) print(f"[*] Loaded custom script: {script_name}") except Exception as e: print(f"[-] Failed to load custom script {script_path}: {e}") def _create_custom_script_handler(self, script_name: str): """Create a message handler for custom scripts that adds identification""" def custom_handler(message, data): # Modify the message to identify it as coming from a custom script if message.get('type') == 'send' and 'payload' in message: payload = message['payload'] # Check if it's already structured as a profile message if isinstance(payload, dict) and 'profileType' in payload: # It's already structured - mark as custom payload['profileType'] = 'CUSTOM_SCRIPT' if 'profileContent' not in payload: payload['profileContent'] = {} if isinstance(payload['profileContent'], dict): payload['profileContent']['script_name'] = script_name else: # Wrap unstructured messages message['payload'] = { 'profileType': 'CUSTOM_SCRIPT', 'profileContent': { 'script_name': script_name, 'message': payload }, 'timestamp': datetime.now().isoformat() } # Forward to main message handler if self.message_handler: self.message_handler(message, data) return custom_handler
[docs] def set_message_handler(self, handler: Callable): """Set the message handler for script communication""" self.message_handler = handler if self.script: self.script.on("message", handler)
[docs] def send_message(self, message: dict): """Send a message to the Frida script""" if self.script: self.script.post(message) else: raise FridaBasedException("Script not loaded. Call load_script() first.")
[docs] def unload_script(self): """Unload the Frida script and all custom scripts""" # Unload custom scripts for custom_script in self.custom_script_instances: try: custom_script.unload() except Exception: # Ignore errors during unload pass self.custom_script_instances.clear() # Unload main script if self.script: try: self.script.unload() except Exception: # Ignore errors during unload pass finally: self.script = None
[docs] def is_script_loaded(self) -> bool: """Check if script is loaded""" return self.script is not None
def _get_script_path(self) -> str: """Get the full path to the Frida script""" # Assuming the script is in the same directory as this module current_dir = os.path.dirname(os.path.dirname(__file__)) return os.path.join(current_dir, self.frida_agent_script)
[docs] def get_script_path(self) -> str: """Get the script path (public method for compatibility)""" return self._get_script_path()
[docs] def restart_script(self): """Restart the Frida script""" self.unload_script() return self.load_script()
def setup_frida_device(host: str = "", enable_spawn_gating: bool = False): """Setup and return a Frida device connection""" try: if len(host) > 4: # Use IP address of the target machine instead of USB device = frida.get_device_manager().add_remote_device(host) else: device = frida.get_usb_device() # Handle child processes def on_child_added(child): print(f"[*] Attached to child process with pid {child.pid}") device.resume(child.pid) # Handle spawned processes def on_spawn_added(spawn): print(f"[*] Process spawned with pid {spawn.pid}. Name: {spawn.identifier}") device.resume(spawn.pid) device.on("child_added", on_child_added) if enable_spawn_gating: device.enable_spawn_gating() device.on("spawn_added", on_spawn_added) return device except frida.InvalidArgumentError: raise FridaBasedException("Unable to find device") except frida.ServerNotRunningError: raise FridaBasedException("Frida server not running. Start frida-server and try it again.")