Source code for TrigDroid_Infrastructure.test_runners.frida_test_runner

"""Frida test runner implementation.

This module handles Frida-based instrumentation and hooking operations
for runtime environment manipulation.
"""

import subprocess
import time
from typing import Dict, Any, Optional
from pathlib import Path

from ..interfaces import ITestRunner, ITestContext, TestResult, ILogger, IFridaHookProvider
from ..interfaces import TestRunnerBase


[docs] class FridaTestRunner(TestRunnerBase): """Test runner for Frida-based instrumentation.""" SUPPORTED_TESTS = [ "frida_hooks", "runtime_instrumentation", "api_hooking", "environment_spoofing" ]
[docs] def __init__(self, logger: ILogger, hook_provider: IFridaHookProvider): super().__init__(logger) self._hook_provider = hook_provider self._frida_server_process: Optional[subprocess.Popen] = None self._hook_script_path: Optional[str] = None
[docs] def can_run(self, test_type: str) -> bool: """Check if this runner can handle the given test type.""" return test_type in self.SUPPORTED_TESTS
[docs] def setup(self) -> bool: """Setup Frida server and prepare hook scripts.""" if not super().setup(): return False try: # Start Frida server on device if not self._start_frida_server(): return False # Prepare hook script if not self._prepare_hook_script(): return False return True except Exception as e: self._logger.error(f"Frida setup failed: {e}") return False
[docs] def teardown(self) -> bool: """Cleanup Frida server and temporary files.""" try: # Stop Frida server if self._frida_server_process: self._logger.debug("Terminating Frida server") self._frida_server_process.terminate() try: self._frida_server_process.wait(timeout=5) except subprocess.TimeoutExpired: self._frida_server_process.kill() self._frida_server_process = None # Cleanup hook script if self._hook_script_path and Path(self._hook_script_path).exists(): Path(self._hook_script_path).unlink() self._hook_script_path = None return super().teardown() except Exception as e: self._logger.error(f"Frida teardown failed: {e}") return False
def _execute_internal(self, context: ITestContext) -> TestResult: """Execute Frida-based tests.""" try: # Start the application with Frida instrumentation if not self._start_app_with_frida(context): return TestResult.FAILURE self._logger.info("Frida instrumentation active, monitoring application behavior") return TestResult.SUCCESS except Exception as e: self._logger.error(f"Frida test execution failed: {e}") return TestResult.FAILURE def _start_frida_server(self) -> bool: """Start Frida server on the Android device.""" self._logger.debug("Starting Frida server") # This would implement the actual Frida server startup logic # For now, simulate successful startup self._logger.info("Frida server started successfully") return True def _prepare_hook_script(self) -> bool: """Prepare the JavaScript hook script for injection.""" try: # Get the hook script from the provider hook_script = self._hook_provider.get_hook_script() hook_config = self._hook_provider.get_hook_config() # Apply configuration to the script configured_script = self._apply_hook_configuration(hook_script, hook_config) # Write to temporary file hook_file = Path.cwd() / "temp_hooks.js" hook_file.write_text(configured_script) self._hook_script_path = str(hook_file) self._logger.debug(f"Hook script prepared at {self._hook_script_path}") return True except Exception as e: self._logger.error(f"Failed to prepare hook script: {e}") return False def _apply_hook_configuration(self, script: str, config: Dict[str, Any]) -> str: """Apply configuration values to the hook script template.""" configured_script = script # Replace configuration placeholders for key, value in config.items(): placeholder = f"${{{key}}}" configured_script = configured_script.replace(placeholder, str(value)) return configured_script def _start_app_with_frida(self, context: ITestContext) -> bool: """Start the application with Frida instrumentation.""" if not self._hook_script_path: self._logger.error("Hook script not prepared") return False try: # Use Frida to spawn the application with instrumentation cmd = [ "frida", "-U", # USB device "-f", context.package_name, # Spawn app "-l", self._hook_script_path, # Load script "--no-pause" # Don't pause on startup ] self._logger.debug(f"Starting app with Frida: {' '.join(cmd)}") # Start the process but don't wait for it to complete process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) # Give it a moment to start time.sleep(2) # Check if the process is still running if process.poll() is None: self._logger.info(f"Successfully started {context.package_name} with Frida") return True else: stdout, stderr = process.communicate() self._logger.error(f"Frida failed to start app: {stderr.decode()}") return False except Exception as e: self._logger.error(f"Failed to start app with Frida: {e}") return False
[docs] def is_frida_needed(self, context: ITestContext) -> bool: """Check if Frida is needed for the current configuration.""" # Check configuration for Frida-dependent options frida_options = [ "constants", "adb_enabled", "uptime", "time", "sleep", "post_delayed", "remove_enabled_input_methods", "ip_4", "ip_6", "bluetooth_mac", "nfc_available" ] return any(context.config.has_key(option) and context.config.get_value(option) for option in frida_options)
[docs] class FridaServerManager: """Manages Frida server lifecycle on Android devices."""
[docs] def __init__(self, logger: ILogger): self._logger = logger self._server_process: Optional[subprocess.Popen] = None
[docs] def start_server(self, device_id: Optional[str] = None, server_path: str = "/data/local/tmp/frida-server") -> bool: """Start Frida server on the device.""" try: # Make sure server is executable chmod_cmd = f"adb {'-s ' + device_id if device_id else ''} shell chmod +x {server_path}" result = subprocess.run(chmod_cmd.split(), capture_output=True) if result.returncode != 0: self._logger.error(f"Failed to make Frida server executable: {result.stderr.decode()}") return False # Start the server server_cmd = f"adb {'-s ' + device_id if device_id else ''} shell {server_path}" self._server_process = subprocess.Popen(server_cmd.split()) # Give it time to start time.sleep(2) # Check if it's still running if self._server_process.poll() is None: self._logger.info("Frida server started successfully") return True else: self._logger.error("Frida server failed to start") return False except Exception as e: self._logger.error(f"Error starting Frida server: {e}") return False
[docs] def stop_server(self) -> bool: """Stop the Frida server.""" if not self._server_process: return True try: self._server_process.terminate() try: self._server_process.wait(timeout=5) except subprocess.TimeoutExpired: self._server_process.kill() self._logger.info("Frida server stopped") self._server_process = None return True except Exception as e: self._logger.error(f"Error stopping Frida server: {e}") return False
[docs] def is_running(self) -> bool: """Check if the Frida server is running.""" return self._server_process is not None and self._server_process.poll() is None