Python API Reference
This section documents the Python API for Dexray Intercept, enabling programmatic control and analysis capabilities.
Core Classes
AppProfiler
- class dexray_intercept.AppProfiler(process, verbose_mode: bool = False, output_format: str = 'CMD', base_path: str | None = None, deactivate_unlink: bool = False, path_filters: List[str] | None = None, hook_config: Dict[str, bool] | None = None, enable_stacktrace: bool = False, enable_fritap: bool = False, fritap_output_dir: str = './fritap_output', target_name: str | None = None, spawn_mode: bool = False, custom_scripts: List[str] | None = None)[source]
Main application profiler class.
This class orchestrates the profiling process by coordinating between: - InstrumentationService: Manages Frida script loading and communication - ProfileCollector: Handles event collection and processing - HookManager: Manages hook configuration
- set_process_session(process_session)[source]
Update the process session after fritap spawns the target
- wait_for_fritap(timeout: float | None = None)[source]
Wait for fritap to finish
- Parameters:
timeout – Maximum time to wait in seconds (None for indefinite)
- Returns:
True if fritap finished, False if timeout occurred
- start_profiling(app_name: str | None = None) frida.core.Script [source]
Start the profiling process
- enable_hook(hook_name: str, enabled: bool = True)[source]
Enable or disable a specific hook at runtime
- get_profile_data() ProfileData [source]
Get the collected profile data
The main orchestrator class that coordinates Frida instrumentation, event collection, and profile generation.
Constructor Parameters:
AppProfiler(
process, # Frida process session
verbose_mode=False, # Enable verbose output
output_format="CMD", # Output format ("CMD" or "JSON")
base_path=None, # Base path for file dumps
deactivate_unlink=False, # Disable file unlinking
path_filters=None, # Path filters for filesystem events
hook_config=None, # Hook configuration dict
enable_stacktrace=False, # Enable stack traces
enable_fritap=False, # Enable friTap integration
fritap_output_dir="./fritap_output", # friTap output directory
target_name=None, # Target app name
spawn_mode=False, # Whether target was spawned
custom_scripts=None # List of custom script paths
)
Basic Usage:
from dexray_intercept import AppProfiler
import frida
# Connect to device and attach to app
device = frida.get_usb_device()
session = device.attach("com.example.app")
# Create profiler with crypto and network monitoring
profiler = AppProfiler(
session,
hook_config={
'aes_hooks': True,
'web_hooks': True,
'bypass_hooks': True
},
verbose_mode=True
)
# Start profiling
script = profiler.start_profiling("com.example.app")
# Let app run and collect events...
input("Press Enter to stop...")
# Stop and get results
profiler.stop_profiling()
profile_data = profiler.get_profile_data()
Key Methods:
- start_profiling(app_name=None)
Start the profiling process and load Frida scripts.
- Parameters:
app_name – Name of target application
- Returns:
Loaded Frida script instance
- Return type:
frida.core.Script
script = profiler.start_profiling("com.banking.app")
- stop_profiling()
Stop profiling and cleanup resources including friTap processes.
profiler.stop_profiling()
- get_profile_data()
Get the collected profile data object.
- Returns:
Profile data containing all events
- Return type:
data = profiler.get_profile_data() crypto_events = data.get_events('CRYPTO_AES')
- write_profiling_log(filename='profile.json')
Write profile data to JSON file with timestamp.
- Parameters:
filename – Base filename for output
- Returns:
Generated filename with timestamp
- Return type:
output_file = profiler.write_profiling_log("banking_analysis") print(f"Profile saved to: {output_file}")
Hook Management:
- enable_hook(hook_name, enabled=True)
Enable or disable a specific hook at runtime.
- Parameters:
hook_name – Name of hook to control
enabled – Whether to enable (True) or disable (False)
# Enable AES hooks during runtime profiler.enable_hook('aes_hooks', True) # Disable web hooks profiler.enable_hook('web_hooks', False)
- get_enabled_hooks()
Get list of currently enabled hooks.
- Returns:
List of enabled hook names
- Return type:
List[str]
enabled = profiler.get_enabled_hooks() print(f"Active hooks: {enabled}")
- enable_all_hooks()
Enable all available hook categories.
profiler.enable_all_hooks()
ProfileData
- class dexray_intercept.models.profile.ProfileData[source]
Container for profile data with events organized by category
Container class for organizing collected events by category with metadata.
Basic Usage:
# Get profile data from profiler
profile_data = profiler.get_profile_data()
# Access events by category
crypto_events = profile_data.get_events('CRYPTO_AES')
network_events = profile_data.get_events('WEB')
# Get metadata
total_events = profile_data.get_event_count()
categories = profile_data.get_categories()
# Convert to JSON
json_string = profile_data.to_json()
# Save to file
filename = profile_data.write_to_file("analysis_results.json")
Key Methods:
- get_events(category)
Get all events for a specific category.
- Parameters:
category – Event category name (e.g., ‘CRYPTO_AES’, ‘WEB’)
- Returns:
List of events in category
- Return type:
List[Event]
- get_categories()
Get all available event categories.
- Returns:
List of category names
- Return type:
List[str]
- get_event_count(category=None)
Get event count for category or total.
- Parameters:
category – Specific category or None for total
- Returns:
Number of events
- Return type:
- to_json(indent=4)
Convert profile data to JSON string.
- Parameters:
indent – JSON indentation level
- Returns:
JSON representation
- Return type:
Event Classes
Base Event
- class dexray_intercept.models.events.Event(event_type: str, timestamp: str | None = None)[source]
Base class for all security events
Base class for all event types providing common functionality.
- Common Properties:
event_type
- Specific event type identifiertimestamp
- ISO 8601 timestampmetadata
- Additional metadata dictionary
Usage:
for event in crypto_events:
print(f"Event: {event.event_type}")
print(f"Time: {event.timestamp}")
print(f"Data: {event.get_event_data()}")
CryptoEvent
- class dexray_intercept.models.events.CryptoEvent(event_type: str, algorithm: str | None = None, timestamp: str | None = None)[source]
Cryptographic operation event
Specialized event for cryptographic operations.
- Properties:
algorithm
- Cryptographic algorithm usedoperation_mode
- Encryption/decryption modekey_hex
- Hexadecimal key representationiv_hex
- Initialization vectorplaintext
- Extracted plaintext (when available)
Usage:
crypto_events = profile_data.get_events('CRYPTO_AES')
for event in crypto_events:
if event.algorithm == 'AES':
print(f"AES operation: {event.operation_mode_desc}")
print(f"Key length: {event.key_length} bytes")
if event.plaintext:
print(f"Plaintext: {event.plaintext}")
NetworkEvent
- class dexray_intercept.models.events.NetworkEvent(event_type: str, timestamp: str | None = None)[source]
Network operation event
Event for network communications.
- Properties:
url
- Request URLmethod
- HTTP methodheaders
- Request/response headersbody_preview
- Preview of request/response bodylibrary
- Network library used (OkHttp, Retrofit, etc.)
Usage:
network_events = profile_data.get_events('WEB')
for event in network_events:
if event.url:
print(f"{event.method} {event.url}")
if event.headers:
print(f"Headers: {event.headers}")
Parsing and Processing
Event Parsers
Parsers convert raw Frida messages into structured Event objects.
Parser Factory:
from dexray_intercept.parsers.factory import parser_factory
# Get parser for category
crypto_parser = parser_factory.get_parser('CRYPTO_AES')
# Parse raw event data
event = crypto_parser.parse(raw_json_string, timestamp)
Custom Parsers:
from dexray_intercept.parsers.base import BaseParser
from dexray_intercept.models.events import Event
class CustomParser(BaseParser):
def parse_json_data(self, data, timestamp):
# Create custom event from data
event = CustomEvent(data['event_type'], timestamp)
event.custom_field = data.get('custom_field')
return event
# Register custom parser
parser_factory.register_parser('CUSTOM_CATEGORY', CustomParser())
Profile Collection
- class dexray_intercept.services.profile_collector.ProfileCollector(output_format: str = 'CMD', verbose_mode: bool = False, enable_stacktrace: bool = False, path_filters: List[str] | None = None, base_path: str | None = None)[source]
Service for collecting and processing profile events
- process_frida_message(message: Dict[str, Any], data: Any | None = None) bool [source]
Process a message from Frida script
- get_profile_data() ProfileData [source]
Get the collected profile data
Handles event collection and processing from Frida messages.
Usage:
from dexray_intercept.services.profile_collector import ProfileCollector
collector = ProfileCollector(
output_format="JSON",
verbose_mode=True,
enable_stacktrace=True
)
# Process Frida message
success = collector.process_frida_message(message, data)
# Get collected data
profile_data = collector.get_profile_data()
Hook Management
HookManager
- class dexray_intercept.services.hook_manager.HookManager(initial_config: Dict[str, bool] | None = None)[source]
Manages hook configuration and runtime control
Manages hook configuration and state.
Usage:
from dexray_intercept.services.hook_manager import HookManager
# Create with initial config
hook_config = {
'aes_hooks': True,
'web_hooks': True,
'bypass_hooks': False
}
manager = HookManager(hook_config)
# Runtime management
manager.enable_hook('bypass_hooks', True)
enabled_hooks = manager.get_enabled_hooks()
# Get full configuration
config = manager.get_hook_config()
Frida Integration
InstrumentationService
- class dexray_intercept.services.instrumentation.InstrumentationService(process, frida_agent_script: str = 'profiling.js', custom_scripts: List[str] | None = None)[source]
Service for managing Frida instrumentation
Manages Frida script loading and communication.
Usage:
from dexray_intercept.services.instrumentation import InstrumentationService
service = InstrumentationService(
process_session,
custom_scripts=['./my_hooks.js']
)
# Set message handler
service.set_message_handler(message_callback)
# Load and start script
script = service.load_script()
# Send message to script
service.send_message({'type': 'config', 'data': config})
Utility Functions
Device Management
from dexray_intercept.services.instrumentation import setup_frida_device
# Connect to USB device
device = setup_frida_device()
# Connect to remote device
device = setup_frida_device("192.168.1.100:27042")
# Enable spawn gating
device = setup_frida_device(enable_spawn_gating=True)
Command Line Integration
from dexray_intercept.ammm import parse_hook_config
from argparse import Namespace
# Parse CLI arguments to hook config
args = Namespace()
args.hooks_crypto = True
args.enable_bypass = True
hook_config = parse_hook_config(args)
# Result: {'aes_hooks': True, 'encodings_hooks': True, 'keystore_hooks': True, 'bypass_hooks': True}
Advanced Usage Patterns
Real-time Event Processing
class RealTimeAnalyzer:
def __init__(self):
self.alerts = []
def message_handler(self, message, data):
payload = message.get('payload', {})
# Real-time analysis
if payload.get('profileType') == 'CRYPTO_AES':
self.analyze_crypto_event(payload)
elif payload.get('profileType') == 'BYPASS_DETECTION':
self.analyze_bypass_event(payload)
def analyze_crypto_event(self, payload):
# Check for weak encryption
content = payload.get('profileContent', {})
if content.get('key_length', 0) < 16:
self.alerts.append("Weak encryption detected")
def analyze_bypass_event(self, payload):
# Alert on evasion attempts
content = payload.get('profileContent', {})
if content.get('bypass_category') == 'root_detection':
self.alerts.append("Root detection evasion detected")
# Usage
analyzer = RealTimeAnalyzer()
profiler = AppProfiler(session)
profiler.instrumentation.set_message_handler(analyzer.message_handler)
Custom Event Types
from dexray_intercept.models.events import Event
class CustomEvent(Event):
def __init__(self, event_type, timestamp):
super().__init__(event_type, timestamp)
self.custom_data = {}
def get_event_data(self):
data = super().get_event_data()
data.update(self.custom_data)
return data
# Custom processor
class CustomProfileCollector(ProfileCollector):
def _handle_custom_events(self, content, timestamp):
event = CustomEvent("custom.event", timestamp)
event.custom_data = content
self.profile_data.add_event("CUSTOM", event)
return True
Batch Processing
import glob
import json
def analyze_profile_batch(profile_pattern):
results = {}
for profile_path in glob.glob(profile_pattern):
with open(profile_path, 'r') as f:
profile = json.load(f)
# Analyze profile
results[profile_path] = {
'total_events': profile.get('_metadata', {}).get('total_events', 0),
'crypto_events': len(profile.get('CRYPTO_AES', [])),
'network_events': len(profile.get('WEB', [])),
'bypass_events': len(profile.get('BYPASS_DETECTION', []))
}
return results
# Usage
batch_results = analyze_profile_batch("./analysis_*/profile_*.json")
Error Handling
Exception Classes
- class dexray_intercept.services.instrumentation.FridaBasedException[source]
Custom exception for Frida-related errors
Custom exception for Frida-related errors.
Common Error Scenarios:
from dexray_intercept import AppProfiler, FridaBasedException
import frida
try:
device = frida.get_usb_device()
session = device.attach("com.example.app")
profiler = AppProfiler(session)
profiler.start_profiling()
except frida.ProcessNotFoundError:
print("Target app not found or not running")
except frida.TransportError:
print("Connection to device lost")
except FridaBasedException as e:
print(f"Frida instrumentation error: {e}")
except Exception as e:
print(f"Unexpected error: {e}")
Best Practices
Resource Management:
# Always cleanup resources
try:
profiler = AppProfiler(session)
profiler.start_profiling()
# ... analysis code ...
finally:
profiler.stop_profiling()
Performance Optimization:
# Use selective hooks for better performance
hook_config = {
'aes_hooks': True, # Only what you need
'web_hooks': True,
# 'hooks_all': False # Avoid unless necessary
}
# Process events efficiently
def efficient_message_handler(message, data):
payload = message.get('payload', {})
# Quick filtering
if payload.get('profileType') not in ['CRYPTO_AES', 'WEB']:
return
# Process only relevant events
process_relevant_event(payload)
Threading Considerations:
import threading
from queue import Queue
class ThreadSafeAnalyzer:
def __init__(self):
self.event_queue = Queue()
self.processing_thread = threading.Thread(target=self.process_events)
self.processing_thread.daemon = True
self.processing_thread.start()
def message_handler(self, message, data):
# Add to queue for background processing
self.event_queue.put((message, data))
def process_events(self):
while True:
message, data = self.event_queue.get()
# Process in background thread
self.analyze_event(message, data)
self.event_queue.task_done()
Integration Examples
Threat Intelligence Integration:
class ThreatIntelIntegrator:
def __init__(self, ti_api_key):
self.ti_api = ThreatIntelAPI(ti_api_key)
def analyze_profile(self, profile_data):
# Extract IOCs
iocs = self.extract_iocs(profile_data)
# Check against threat intelligence
for ioc in iocs['domains']:
threat_info = self.ti_api.check_domain(ioc)
if threat_info.is_malicious:
print(f"Malicious domain detected: {ioc}")
def extract_iocs(self, profile_data):
iocs = {'domains': [], 'ips': [], 'urls': []}
for event in profile_data.get_events('WEB'):
if hasattr(event, 'url') and event.url:
iocs['urls'].append(event.url)
# Extract domain from URL
from urllib.parse import urlparse
domain = urlparse(event.url).netloc
iocs['domains'].append(domain)
return iocs
SIEM Integration:
import syslog
class SIEMIntegrator:
def __init__(self):
syslog.openlog("dexray-intercept")
def send_alert(self, event):
severity = self.get_severity(event)
message = self.format_siem_message(event)
syslog.syslog(severity, message)
def get_severity(self, event):
if hasattr(event, 'bypass_category'):
return syslog.LOG_CRIT # Critical for bypass attempts
elif event.event_type.startswith('crypto'):
return syslog.LOG_WARNING
else:
return syslog.LOG_INFO
def format_siem_message(self, event):
return f"dexray_event={event.event_type} timestamp={event.timestamp} data={event.get_event_data()}"
Next Steps
Explore TypeScript API for custom hooks: TypeScript API Reference
Learn about development workflows: Development Guide
See practical examples in the user guide: User Guide