Source code for dexray_intercept.appProfiling

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import frida
import subprocess
import os
import signal
import shutil
import threading
from pathlib import Path
from datetime import datetime
from typing import Optional, List, Dict, Any

from .services.instrumentation import InstrumentationService, FridaBasedException, setup_frida_device
from .services.profile_collector import ProfileCollector
from .services.hook_manager import HookManager
from .models.profile import ProfileData


[docs] class AppProfiler: """ Main application profiler class. This class orchestrates the profiling process by coordinating between: - InstrumentationService: Manages Frida script loading and communication - ProfileCollector: Handles event collection and processing - HookManager: Manages hook configuration """ def __init__(self, process, verbose_mode: bool = False, output_format: str = "CMD", base_path: Optional[str] = None, deactivate_unlink: bool = False, path_filters: Optional[List[str]] = None, hook_config: Optional[Dict[str, bool]] = None, enable_stacktrace: bool = False, enable_fritap: bool = False, fritap_output_dir: str = "./fritap_output", target_name: Optional[str] = None, spawn_mode: bool = False, custom_scripts: Optional[List[str]] = None): """ Initialize the AppProfiler. Args: process: Frida process object verbose_mode: Enable verbose output output_format: Output format ("CMD" or "JSON") base_path: Base path for file dumps deactivate_unlink: Disable file unlinking path_filters: Path filters for file system events hook_config: Hook configuration dictionary enable_stacktrace: Enable stack traces enable_fritap: Enable fritap for TLS key extraction fritap_output_dir: Directory for fritap output files target_name: Target application name or package identifier spawn_mode: Whether the target was spawned (True) or attached to (False) custom_scripts: List of paths to custom Frida scripts to load """ self.process = process # Handle fritap spawn mode where process might be None initially if process is None and enable_fritap and spawn_mode: if verbose_mode: print("[*] fritap spawn mode - process session will be set after fritap spawns target") self.verbose_mode = verbose_mode self.output_format = output_format self.deactivate_unlink = deactivate_unlink self.enable_stacktrace = enable_stacktrace # Target and mode information self.target_name = target_name self.spawn_mode = spawn_mode # Custom scripts configuration self.custom_scripts = custom_scripts or [] self.custom_script_instances = [] # Fritap configuration self.enable_fritap = enable_fritap self.fritap_output_dir = fritap_output_dir self.fritap_process = None self.fritap_keylog_file = None self.fritap_pcap_file = None self.fritap_pcap_filename = None self.fritap_finished = threading.Event() self.fritap_monitor_thread = None self._shutdown_requested = False # Check fritap availability early if enabled if self.enable_fritap: if not self._check_fritap_availability(): print("[-] fritap not available, disabling fritap functionality") self.enable_fritap = False # Initialize services self.instrumentation = InstrumentationService(process, custom_scripts=self.custom_scripts) if process else None self.profile_collector = ProfileCollector( output_format=output_format, verbose_mode=verbose_mode, enable_stacktrace=enable_stacktrace, path_filters=path_filters, base_path=base_path ) self.hook_manager = HookManager(hook_config) # Set up message handling (only if instrumentation service exists) if self.instrumentation: self.instrumentation.set_message_handler(self._message_handler) # State tracking self.startup = True self.startup_unlink = True self.path_filters_sent = False def _check_fritap_availability(self) -> bool: """Check if fritap is installed and available""" return shutil.which('fritap') is not None
[docs] def set_process_session(self, process_session): """Update the process session after fritap spawns the target""" self.process = process_session if self.instrumentation is None: self.instrumentation = InstrumentationService(process_session, custom_scripts=self.custom_scripts) self.instrumentation.set_message_handler(self._message_handler) else: self.instrumentation.process = process_session if self.verbose_mode: print("[*] process session updated successfully")
def _generate_fritap_filenames(self, app_name: str) -> tuple: """Generate fritap output filenames with correct signatures""" timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") keylog_file = f"dexray_tlskeys_{app_name}_{timestamp}.log" pcap_file = f"dexray_unfiltered_traffic_{app_name}_{timestamp}.pcap" return keylog_file, pcap_file def _start_fritap(self, app_name: str) -> Optional[int]: """Start fritap subprocess for TLS key extraction Returns: PID of spawned process if fritap spawned the target, None otherwise """ if not self.enable_fritap: return None try: # Create output directory if it doesn't exist os.makedirs(self.fritap_output_dir, exist_ok=True) # Generate filenames keylog_filename, self.fritap_pcap_filename = self._generate_fritap_filenames(app_name) self.fritap_keylog_file = os.path.join(self.fritap_output_dir, keylog_filename) self.fritap_pcap_file = os.path.join(self.fritap_output_dir, self.fritap_pcap_filename) # Build fritap command fritap_cmd = [ 'fritap', '-m', # Mobile app flag for Android '-k', self.fritap_keylog_file, '-p', self.fritap_pcap_file, '-f' # enable full packet capture ] # Add verbose flag if enabled if self.verbose_mode: fritap_cmd.append('-v') # Add spawn flag if in spawn mode if self.spawn_mode: fritap_cmd.append('--spawn') # Add target target = self.target_name or app_name fritap_cmd.append(target) if self.verbose_mode: print(f"[*] starting fritap: {' '.join(fritap_cmd)}") print(f"[*] mode: {'spawn' if self.spawn_mode else 'attach'}") print(f"[*] target: {target}") print(f"[*] keylog file: {self.fritap_keylog_file}") print(f"[*] pcap file: {self.fritap_pcap_file}") self.fritap_process = subprocess.Popen( fritap_cmd, stdout=subprocess.PIPE if self.spawn_mode else (subprocess.DEVNULL if not self.verbose_mode else None), stderr=subprocess.PIPE if self.spawn_mode else (subprocess.DEVNULL if not self.verbose_mode else None), preexec_fn=os.setsid, # Create new process group for clean termination text=True if self.spawn_mode else False ) if self.verbose_mode: print(f"[*] fritap started with PID: {self.fritap_process.pid}") # If fritap is spawning, we need to wait for it to be ready if self.spawn_mode: import time time.sleep(3) # Give fritap time to spawn and attach if self.verbose_mode: print("[*] fritap spawn mode - waiting for target to be ready") # Start monitoring fritap process in a separate thread if self.fritap_process: self.fritap_monitor_thread = threading.Thread( target=self._monitor_fritap_process, daemon=True ) self.fritap_monitor_thread.start() return None except FileNotFoundError: print("[-] fritap not found. Please install fritap first.") print("[-] fritap can be installed from: https://github.com/fkie-cad/friTap") self.enable_fritap = False return None except Exception as e: print(f"[-] failed to start fritap: {e}") self.enable_fritap = False return None def _monitor_fritap_process(self): """Monitor fritap process in a separate thread""" if not self.fritap_process: return try: # Wait for fritap to finish self.fritap_process.wait() if self.verbose_mode: print("[*] fritap process finished") # Signal that fritap has finished self.fritap_finished.set() except Exception as e: if self.verbose_mode: print(f"[-] error monitoring fritap process: {e}") self.fritap_finished.set() def _send_signal_to_fritap(self, sig: int): """Send signal to fritap process""" if self.fritap_process and self.fritap_process.poll() is None: try: # Send signal to the process group os.killpg(os.getpgid(self.fritap_process.pid), sig) return True except Exception as e: if self.verbose_mode: print(f"[-] error sending signal {sig} to fritap: {e}") return False return False
[docs] def send_interrupt_to_fritap(self): """Send SIGINT (Ctrl+C) to fritap process""" if self.verbose_mode: print("[*] sending interrupt signal to fritap") return self._send_signal_to_fritap(signal.SIGINT)
[docs] def wait_for_fritap(self, timeout: Optional[float] = None): """Wait for fritap to finish Args: timeout: Maximum time to wait in seconds (None for indefinite) Returns: True if fritap finished, False if timeout occurred """ if not self.fritap_process: return True return self.fritap_finished.wait(timeout)
def _stop_fritap(self): """Stop fritap subprocess""" self._shutdown_requested = True if self.fritap_process and self.fritap_process.poll() is None: try: if self.verbose_mode: print("[*] stopping fritap gracefully") # First try SIGINT (Ctrl+C equivalent) self._send_signal_to_fritap(signal.SIGINT) # Wait a bit for graceful shutdown try: self.fritap_process.wait(timeout=3) if self.verbose_mode: print("[*] fritap stopped gracefully") self.fritap_finished.set() return except subprocess.TimeoutExpired: pass # If still running, try SIGTERM if self.verbose_mode: print("[*] sending SIGTERM to fritap") self._send_signal_to_fritap(signal.SIGTERM) try: self.fritap_process.wait(timeout=5) if self.verbose_mode: print("[*] fritap stopped") self.fritap_finished.set() except subprocess.TimeoutExpired: # Force kill if graceful termination fails if self.verbose_mode: print("[*] force killing fritap") self._send_signal_to_fritap(signal.SIGKILL) self.fritap_process.wait(timeout=2) if self.verbose_mode: print("[*] fritap force stopped") self.fritap_finished.set() except Exception as e: if self.verbose_mode: print(f"[-] error stopping fritap: {e}") self.fritap_finished.set()
[docs] def start_profiling(self, app_name: str = None) -> frida.core.Script: """Start the profiling process""" try: # Get app name from process if not provided if app_name is None: try: app_name = getattr(self.process, 'name', 'unknown_app') except Exception: app_name = 'unknown_app' # Start fritap FIRST if enabled (fritap-first initialization) # Skip if fritap is already running (e.g., started manually in spawn mode) if self.enable_fritap and (not self.fritap_process or self.fritap_process.poll() is not None): if self.verbose_mode: print("[*] starting fritap before dexray-intercept hooks") self._start_fritap(app_name) # In spawn mode, give fritap extra time to fully initialize the spawned process if self.spawn_mode: import time if self.verbose_mode: print("[*] waiting for fritap to fully initialize spawned process") time.sleep(2) # Additional wait for spawn mode elif self.enable_fritap and self.fritap_process and self.fritap_process.poll() is None: if self.verbose_mode: print("[*] fritap already running, proceeding with dexray-intercept hooks") # Now start dexray-intercept's Frida script if self.verbose_mode and self.enable_fritap: print("[*] starting dexray-intercept hooks") if self.instrumentation is None: raise FridaBasedException("No process session available - cannot load Frida script") script = self.instrumentation.load_script() return script except Exception as e: # Clean up fritap if script loading fails if self.enable_fritap: self._stop_fritap() raise FridaBasedException(f"Failed to start profiling: {str(e)}")
[docs] def stop_profiling(self): """Stop the profiling process""" if self.verbose_mode: print("[*] stopping profiling") # Stop Frida instrumentation first if self.instrumentation: self.instrumentation.unload_script() # Stop fritap and wait for it to finish if self.enable_fritap: self._stop_fritap() # Wait for fritap to finish writing its files if self.verbose_mode: print("[*] waiting for fritap to finish...") # Wait up to 10 seconds for fritap to finish gracefully if not self.wait_for_fritap(timeout=10): if self.verbose_mode: print("[-] fritap did not finish within timeout") else: print("[*] fritap finished successfully") if self.fritap_keylog_file and os.path.exists(self.fritap_keylog_file): print(f"[*] TLS keys saved to: {self.fritap_keylog_file}") if self.fritap_pcap_file and os.path.exists(self.fritap_pcap_file): print(f"[*] Traffic capture saved to: {self.fritap_pcap_file}") else: saved__pcap_name = "_"+self.fritap_pcap_filename dst = Path(self.fritap_pcap_file) src = Path(saved__pcap_name) src.replace(dst) print(f"[*] Traffic capture saved to: {self.fritap_pcap_file}")
def _message_handler(self, message: Dict[str, Any], data: Any = None): """Handle messages from Frida script""" try: # Handle initial startup messages if self._handle_startup_messages(message): return # Process regular profile messages self.profile_collector.process_frida_message(message, data) except Exception as e: if self.verbose_mode: print(f"[-] Error in message handler: {e}") def _handle_startup_messages(self, message: Dict[str, Any]) -> bool: """Handle startup configuration messages""" payload = message.get('payload') # Send verbose mode configuration if self.startup and payload == 'verbose_mode': self.instrumentation.send_message({ 'type': 'verbose_mode', 'payload': self.verbose_mode }) self.startup = False return True # Send unlink configuration if self.startup_unlink and payload == 'deactivate_unlink': self.instrumentation.send_message({ 'type': 'deactivate_unlink', 'payload': self.deactivate_unlink }) self.startup_unlink = False return True # Send hook configuration if payload == 'hook_config': self.instrumentation.send_message({ 'type': 'hook_config', 'payload': self.hook_manager.get_hook_config() }) return True # Send stacktrace configuration if payload == 'enable_stacktrace': self.instrumentation.send_message({ 'type': 'enable_stacktrace', 'payload': self.enable_stacktrace }) return True # Send path filters (once) if not self.path_filters_sent and self.profile_collector.path_filters: filters = self.profile_collector.path_filters if not isinstance(filters, list): filters = [filters] self.instrumentation.send_message({ 'type': 'path_filters', 'payload': filters }) self.path_filters_sent = True return True return False # Hook management methods (delegated to HookManager)
[docs] def enable_hook(self, hook_name: str, enabled: bool = True): """Enable or disable a specific hook at runtime""" self.hook_manager.enable_hook(hook_name, enabled) if self.instrumentation.is_script_loaded(): self.instrumentation.send_message({ 'type': 'hook_config', 'payload': {hook_name: enabled} })
[docs] def get_enabled_hooks(self) -> List[str]: """Return list of currently enabled hooks""" return self.hook_manager.get_enabled_hooks()
[docs] def enable_all_hooks(self): """Enable all available hooks""" self.hook_manager.enable_all_hooks() if self.instrumentation.is_script_loaded(): self.instrumentation.send_message({ 'type': 'hook_config', 'payload': self.hook_manager.get_hook_config() })
[docs] def enable_hook_group(self, group_name: str): """Enable a group of related hooks""" self.hook_manager.enable_hook_group(group_name) if self.instrumentation.is_script_loaded(): self.instrumentation.send_message({ 'type': 'hook_config', 'payload': self.hook_manager.get_hook_config() })
# Profile data methods (delegated to ProfileCollector)
[docs] def get_profile_data(self) -> ProfileData: """Get the collected profile data""" return self.profile_collector.get_profile_data()
[docs] def get_profiling_log_as_json(self) -> str: """Get profile data as JSON string""" return self.profile_collector.get_profile_json()
[docs] def write_profiling_log(self, filename: str = "profile.json") -> str: """Write profile data to file""" return self.profile_collector.write_profile_to_file(filename)
[docs] def get_event_count(self, category: Optional[str] = None) -> int: """Get event count for category or total""" return self.profile_collector.get_event_count(category)
[docs] def get_categories(self) -> List[str]: """Get all categories with events""" return self.profile_collector.get_categories()
# Legacy compatibility methods
[docs] def instrument(self) -> frida.core.Script: """Legacy method - use start_profiling() instead""" return self.start_profiling()
[docs] def finish_app_profiling(self): """Legacy method - use stop_profiling() instead""" self.stop_profiling()
[docs] def get_frida_script(self) -> str: """Get the path to the Frida script""" return self.instrumentation.get_script_path()
[docs] def update_script(self, script): """Update script reference (for compatibility)""" # This is handled internally now pass
# Utility methods
[docs] def get_stats(self) -> Dict[str, Any]: """Get profiling statistics""" hook_stats = self.hook_manager.get_hook_stats() profile_summary = self.profile_collector.get_profile_data().get_summary() return { 'hook_stats': hook_stats, 'profile_summary': profile_summary, 'script_loaded': self.instrumentation.is_script_loaded(), 'output_format': self.output_format, 'verbose_mode': self.verbose_mode }
# Legacy exception class for compatibility class FridaBasedException(FridaBasedException): """Legacy exception class - redirects to new FridaBasedException""" pass # Legacy function for compatibility def setup_frida_handler(host: str = "", enable_spawn_gating: bool = False): """Legacy function - use setup_frida_device() instead""" return setup_frida_device(host, enable_spawn_gating)