Python API Reference
This section documents the Python API for Dexray Intercept, enabling programmatic control and analysis capabilities.
Core Classes
AppProfiler
- class dexray_intercept.AppProfiler(process, verbose_mode: bool = False, output_format: str = 'CMD', base_path: str | None = None, deactivate_unlink: bool = False, path_filters: List[str] | None = None, hook_config: Dict[str, bool] | None = None, enable_stacktrace: bool = False, enable_fritap: bool = False, fritap_output_dir: str = './fritap_output', target_name: str | None = None, spawn_mode: bool = False, custom_scripts: List[str] | None = None)[source]
Main application profiler class.
This class orchestrates the profiling process by coordinating between: - InstrumentationService: Manages Frida script loading and communication - ProfileCollector: Handles event collection and processing - HookManager: Manages hook configuration
- set_process_session(process_session)[source]
Update the process session after fritap spawns the target
- wait_for_fritap(timeout: float | None = None)[source]
Wait for fritap to finish
- Parameters:
timeout – Maximum time to wait in seconds (None for indefinite)
- Returns:
True if fritap finished, False if timeout occurred
- enable_hook(hook_name: str, enabled: bool = True)[source]
Enable or disable a specific hook at runtime
- get_profile_data() ProfileData[source]
Get the collected profile data
- set_job_script(script: frida.core.Script)[source]
Set script reference when using external job manager (e.g., AndroidFridaManager).
This allows Sandroid to load the script via JobManager while still using AppProfiler for message handling, hook config, and profile collection.
- Parameters:
script – Frida script loaded by external job manager
The main orchestrator class that coordinates Frida instrumentation, event collection, and profile generation.
Constructor Parameters:
AppProfiler(
process, # Frida process session
verbose_mode=False, # Enable verbose output
output_format="CMD", # Output format ("CMD" or "JSON")
base_path=None, # Base path for file dumps
deactivate_unlink=False, # Disable file unlinking
path_filters=None, # Path filters for filesystem events
hook_config=None, # Hook configuration dict
enable_stacktrace=False, # Enable stack traces
enable_fritap=False, # Enable friTap integration
fritap_output_dir="./fritap_output", # friTap output directory
target_name=None, # Target app name
spawn_mode=False, # Whether target was spawned
custom_scripts=None # List of custom script paths
)
Basic Usage:
from dexray_intercept import AppProfiler
import frida
# Connect to device and attach to app
device = frida.get_usb_device()
session = device.attach("com.example.app")
# Create profiler with crypto and network monitoring
profiler = AppProfiler(
session,
hook_config={
'aes_hooks': True,
'web_hooks': True,
'bypass_hooks': True
},
verbose_mode=True
)
# Start profiling
script = profiler.start_profiling("com.example.app")
# Let app run and collect events...
input("Press Enter to stop...")
# Stop and get results
profiler.stop_profiling()
profile_data = profiler.get_profile_data()
Key Methods:
- start_profiling(app_name=None)
Start the profiling process and load Frida scripts.
- Parameters:
app_name – Name of target application
- Returns:
Loaded Frida script instance
- Return type:
frida.core.Script
script = profiler.start_profiling("com.banking.app")
- stop_profiling()
Stop profiling and cleanup resources including friTap processes.
profiler.stop_profiling()
- get_profile_data()
Get the collected profile data object.
- Returns:
Profile data containing all events
- Return type:
data = profiler.get_profile_data() crypto_events = data.get_events('CRYPTO_AES')
- write_profiling_log(filename='profile.json')
Write profile data to JSON file with timestamp.
- Parameters:
filename – Base filename for output
- Returns:
Generated filename with timestamp
- Return type:
output_file = profiler.write_profiling_log("banking_analysis") print(f"Profile saved to: {output_file}")
Hook Management:
- enable_hook(hook_name, enabled=True)
Enable or disable a specific hook at runtime.
- Parameters:
hook_name – Name of hook to control
enabled – Whether to enable (True) or disable (False)
# Enable AES hooks during runtime profiler.enable_hook('aes_hooks', True) # Disable web hooks profiler.enable_hook('web_hooks', False)
- get_enabled_hooks()
Get list of currently enabled hooks.
- Returns:
List of enabled hook names
- Return type:
List[str]
enabled = profiler.get_enabled_hooks() print(f"Active hooks: {enabled}")
- enable_all_hooks()
Enable all available hook categories.
profiler.enable_all_hooks()
ProfileData
- class dexray_intercept.models.profile.ProfileData[source]
Container for profile data with events organized by category
Container class for organizing collected events by category with metadata.
Basic Usage:
# Get profile data from profiler
profile_data = profiler.get_profile_data()
# Access events by category
crypto_events = profile_data.get_events('CRYPTO_AES')
network_events = profile_data.get_events('WEB')
# Get metadata
total_events = profile_data.get_event_count()
categories = profile_data.get_categories()
# Convert to JSON
json_string = profile_data.to_json()
# Save to file
filename = profile_data.write_to_file("analysis_results.json")
Key Methods:
- get_events(category)
Get all events for a specific category.
- Parameters:
category – Event category name (e.g., ‘CRYPTO_AES’, ‘WEB’)
- Returns:
List of events in category
- Return type:
List[Event]
- get_categories()
Get all available event categories.
- Returns:
List of category names
- Return type:
List[str]
- get_event_count(category=None)
Get event count for category or total.
- Parameters:
category – Specific category or None for total
- Returns:
Number of events
- Return type:
- to_json(indent=4)
Convert profile data to JSON string.
- Parameters:
indent – JSON indentation level
- Returns:
JSON representation
- Return type:
Event Classes
Base Event
- class dexray_intercept.models.events.Event(event_type: str, timestamp: str | None = None)[source]
Base class for all security events
Base class for all event types providing common functionality.
- Common Properties:
event_type- Specific event type identifiertimestamp- ISO 8601 timestampmetadata- Additional metadata dictionary
Usage:
for event in crypto_events:
print(f"Event: {event.event_type}")
print(f"Time: {event.timestamp}")
print(f"Data: {event.get_event_data()}")
CryptoEvent
- class dexray_intercept.models.events.CryptoEvent(event_type: str, algorithm: str | None = None, timestamp: str | None = None)[source]
Cryptographic operation event
Specialized event for cryptographic operations.
- Properties:
algorithm- Cryptographic algorithm usedoperation_mode- Encryption/decryption modekey_hex- Hexadecimal key representationiv_hex- Initialization vectorplaintext- Extracted plaintext (when available)
Usage:
crypto_events = profile_data.get_events('CRYPTO_AES')
for event in crypto_events:
if event.algorithm == 'AES':
print(f"AES operation: {event.operation_mode_desc}")
print(f"Key length: {event.key_length} bytes")
if event.plaintext:
print(f"Plaintext: {event.plaintext}")
NetworkEvent
- class dexray_intercept.models.events.NetworkEvent(event_type: str, timestamp: str | None = None)[source]
Network operation event
Event for network communications.
- Properties:
url- Request URLmethod- HTTP methodheaders- Request/response headersbody_preview- Preview of request/response bodylibrary- Network library used (OkHttp, Retrofit, etc.)
Usage:
network_events = profile_data.get_events('WEB')
for event in network_events:
if event.url:
print(f"{event.method} {event.url}")
if event.headers:
print(f"Headers: {event.headers}")
Parsing and Processing
Event Parsers
Parsers convert raw Frida messages into structured Event objects.
Parser Factory:
from dexray_intercept.parsers.factory import parser_factory
# Get parser for category
crypto_parser = parser_factory.get_parser('CRYPTO_AES')
# Parse raw event data
event = crypto_parser.parse(raw_json_string, timestamp)
Custom Parsers:
from dexray_intercept.parsers.base import BaseParser
from dexray_intercept.models.events import Event
class CustomParser(BaseParser):
def parse_json_data(self, data, timestamp):
# Create custom event from data
event = CustomEvent(data['event_type'], timestamp)
event.custom_field = data.get('custom_field')
return event
# Register custom parser
parser_factory.register_parser('CUSTOM_CATEGORY', CustomParser())
Profile Collection
- class dexray_intercept.services.profile_collector.ProfileCollector(output_format: str = 'CMD', verbose_mode: bool = False, enable_stacktrace: bool = False, path_filters: List[str] | None = None, base_path: str | None = None)[source]
Service for collecting and processing profile events
- process_frida_message(message: Dict[str, Any], data: Any | None = None) bool[source]
Process a message from Frida script
- get_profile_data() ProfileData[source]
Get the collected profile data
Handles event collection and processing from Frida messages.
Usage:
from dexray_intercept.services.profile_collector import ProfileCollector
collector = ProfileCollector(
output_format="JSON",
verbose_mode=True,
enable_stacktrace=True
)
# Process Frida message
success = collector.process_frida_message(message, data)
# Get collected data
profile_data = collector.get_profile_data()
Hook Management
HookManager
- class dexray_intercept.services.hook_manager.HookManager(initial_config: Dict[str, bool] | None = None)[source]
Manages hook configuration and runtime control
Manages hook configuration and state.
Usage:
from dexray_intercept.services.hook_manager import HookManager
# Create with initial config
hook_config = {
'aes_hooks': True,
'web_hooks': True,
'bypass_hooks': False
}
manager = HookManager(hook_config)
# Runtime management
manager.enable_hook('bypass_hooks', True)
enabled_hooks = manager.get_enabled_hooks()
# Get full configuration
config = manager.get_hook_config()
Frida Integration
InstrumentationService
- class dexray_intercept.services.instrumentation.InstrumentationService(process, frida_agent_script: str = 'profiling.js', custom_scripts: List[str] | None = None)[source]
Service for managing Frida instrumentation
Manages Frida script loading and communication.
Usage:
from dexray_intercept.services.instrumentation import InstrumentationService
service = InstrumentationService(
process_session,
custom_scripts=['./my_hooks.js']
)
# Set message handler
service.set_message_handler(message_callback)
# Load and start script
script = service.load_script()
# Send message to script
service.send_message({'type': 'config', 'data': config})
Utility Functions
Device Management
from dexray_intercept.services.instrumentation import setup_frida_device
# Connect to USB device
device = setup_frida_device()
# Connect to remote device
device = setup_frida_device("192.168.1.100:27042")
# Enable spawn gating
device = setup_frida_device(enable_spawn_gating=True)
Command Line Integration
from dexray_intercept.ammm import parse_hook_config
from argparse import Namespace
# Parse CLI arguments to hook config
args = Namespace()
args.hooks_crypto = True
args.enable_bypass = True
hook_config = parse_hook_config(args)
# Result: {'aes_hooks': True, 'encodings_hooks': True, 'keystore_hooks': True, 'bypass_hooks': True}
Advanced Usage Patterns
Real-time Event Processing
class RealTimeAnalyzer:
def __init__(self):
self.alerts = []
def message_handler(self, message, data):
payload = message.get('payload', {})
# Real-time analysis
if payload.get('profileType') == 'CRYPTO_AES':
self.analyze_crypto_event(payload)
elif payload.get('profileType') == 'BYPASS_DETECTION':
self.analyze_bypass_event(payload)
def analyze_crypto_event(self, payload):
# Check for weak encryption
content = payload.get('profileContent', {})
if content.get('key_length', 0) < 16:
self.alerts.append("Weak encryption detected")
def analyze_bypass_event(self, payload):
# Alert on evasion attempts
content = payload.get('profileContent', {})
if content.get('bypass_category') == 'root_detection':
self.alerts.append("Root detection evasion detected")
# Usage
analyzer = RealTimeAnalyzer()
profiler = AppProfiler(session)
profiler.instrumentation.set_message_handler(analyzer.message_handler)
Custom Event Types
from dexray_intercept.models.events import Event
class CustomEvent(Event):
def __init__(self, event_type, timestamp):
super().__init__(event_type, timestamp)
self.custom_data = {}
def get_event_data(self):
data = super().get_event_data()
data.update(self.custom_data)
return data
# Custom processor
class CustomProfileCollector(ProfileCollector):
def _handle_custom_events(self, content, timestamp):
event = CustomEvent("custom.event", timestamp)
event.custom_data = content
self.profile_data.add_event("CUSTOM", event)
return True
Batch Processing
import glob
import json
def analyze_profile_batch(profile_pattern):
results = {}
for profile_path in glob.glob(profile_pattern):
with open(profile_path, 'r') as f:
profile = json.load(f)
# Analyze profile
results[profile_path] = {
'total_events': profile.get('_metadata', {}).get('total_events', 0),
'crypto_events': len(profile.get('CRYPTO_AES', [])),
'network_events': len(profile.get('WEB', [])),
'bypass_events': len(profile.get('BYPASS_DETECTION', []))
}
return results
# Usage
batch_results = analyze_profile_batch("./analysis_*/profile_*.json")
Error Handling
Exception Classes
- class dexray_intercept.services.instrumentation.FridaBasedException[source]
Custom exception for Frida-related errors
Custom exception for Frida-related errors.
Common Error Scenarios:
from dexray_intercept import AppProfiler, FridaBasedException
import frida
try:
device = frida.get_usb_device()
session = device.attach("com.example.app")
profiler = AppProfiler(session)
profiler.start_profiling()
except frida.ProcessNotFoundError:
print("Target app not found or not running")
except frida.TransportError:
print("Connection to device lost")
except FridaBasedException as e:
print(f"Frida instrumentation error: {e}")
except Exception as e:
print(f"Unexpected error: {e}")
Best Practices
Resource Management:
# Always cleanup resources
try:
profiler = AppProfiler(session)
profiler.start_profiling()
# ... analysis code ...
finally:
profiler.stop_profiling()
Performance Optimization:
# Use selective hooks for better performance
hook_config = {
'aes_hooks': True, # Only what you need
'web_hooks': True,
# 'hooks_all': False # Avoid unless necessary
}
# Process events efficiently
def efficient_message_handler(message, data):
payload = message.get('payload', {})
# Quick filtering
if payload.get('profileType') not in ['CRYPTO_AES', 'WEB']:
return
# Process only relevant events
process_relevant_event(payload)
Threading Considerations:
import threading
from queue import Queue
class ThreadSafeAnalyzer:
def __init__(self):
self.event_queue = Queue()
self.processing_thread = threading.Thread(target=self.process_events)
self.processing_thread.daemon = True
self.processing_thread.start()
def message_handler(self, message, data):
# Add to queue for background processing
self.event_queue.put((message, data))
def process_events(self):
while True:
message, data = self.event_queue.get()
# Process in background thread
self.analyze_event(message, data)
self.event_queue.task_done()
Integration Examples
Threat Intelligence Integration:
class ThreatIntelIntegrator:
def __init__(self, ti_api_key):
self.ti_api = ThreatIntelAPI(ti_api_key)
def analyze_profile(self, profile_data):
# Extract IOCs
iocs = self.extract_iocs(profile_data)
# Check against threat intelligence
for ioc in iocs['domains']:
threat_info = self.ti_api.check_domain(ioc)
if threat_info.is_malicious:
print(f"Malicious domain detected: {ioc}")
def extract_iocs(self, profile_data):
iocs = {'domains': [], 'ips': [], 'urls': []}
for event in profile_data.get_events('WEB'):
if hasattr(event, 'url') and event.url:
iocs['urls'].append(event.url)
# Extract domain from URL
from urllib.parse import urlparse
domain = urlparse(event.url).netloc
iocs['domains'].append(domain)
return iocs
SIEM Integration:
import syslog
class SIEMIntegrator:
def __init__(self):
syslog.openlog("dexray-intercept")
def send_alert(self, event):
severity = self.get_severity(event)
message = self.format_siem_message(event)
syslog.syslog(severity, message)
def get_severity(self, event):
if hasattr(event, 'bypass_category'):
return syslog.LOG_CRIT # Critical for bypass attempts
elif event.event_type.startswith('crypto'):
return syslog.LOG_WARNING
else:
return syslog.LOG_INFO
def format_siem_message(self, event):
return f"dexray_event={event.event_type} timestamp={event.timestamp} data={event.get_event_data()}"
Next Steps
Explore TypeScript API for custom hooks: TypeScript API Reference
Learn about development workflows: Development Guide
See practical examples in the user guide: User Guide