Python API Reference

This section documents the Python API for Dexray Intercept, enabling programmatic control and analysis capabilities.

Core Classes

AppProfiler

class dexray_intercept.AppProfiler(process, verbose_mode: bool = False, output_format: str = 'CMD', base_path: str | None = None, deactivate_unlink: bool = False, path_filters: List[str] | None = None, hook_config: Dict[str, bool] | None = None, enable_stacktrace: bool = False, enable_fritap: bool = False, fritap_output_dir: str = './fritap_output', target_name: str | None = None, spawn_mode: bool = False, custom_scripts: List[str] | None = None)[source]

Main application profiler class.

This class orchestrates the profiling process by coordinating between: - InstrumentationService: Manages Frida script loading and communication - ProfileCollector: Handles event collection and processing - HookManager: Manages hook configuration

set_process_session(process_session)[source]

Update the process session after fritap spawns the target

send_interrupt_to_fritap()[source]

Send SIGINT (Ctrl+C) to fritap process

wait_for_fritap(timeout: float | None = None)[source]

Wait for fritap to finish

Parameters:

timeout – Maximum time to wait in seconds (None for indefinite)

Returns:

True if fritap finished, False if timeout occurred

start_profiling(app_name: str | None = None) frida.core.Script[source]

Start the profiling process

stop_profiling()[source]

Stop the profiling process

enable_hook(hook_name: str, enabled: bool = True)[source]

Enable or disable a specific hook at runtime

get_enabled_hooks() List[str][source]

Return list of currently enabled hooks

enable_all_hooks()[source]

Enable all available hooks

enable_hook_group(group_name: str)[source]

Enable a group of related hooks

get_profile_data() ProfileData[source]

Get the collected profile data

get_profiling_log_as_json() str[source]

Get profile data as JSON string

write_profiling_log(filename: str = 'profile.json') str[source]

Write profile data to file

get_event_count(category: str | None = None) int[source]

Get event count for category or total

get_categories() List[str][source]

Get all categories with events

instrument() frida.core.Script[source]

Legacy method - use start_profiling() instead

finish_app_profiling()[source]

Legacy method - use stop_profiling() instead

get_frida_script() str[source]

Get the path to the Frida script

set_job_script(script: frida.core.Script)[source]

Set script reference when using external job manager (e.g., AndroidFridaManager).

This allows Sandroid to load the script via JobManager while still using AppProfiler for message handling, hook config, and profile collection.

Parameters:

script – Frida script loaded by external job manager

update_script(script)[source]

Update script reference (for compatibility)

get_stats() Dict[str, Any][source]

Get profiling statistics

The main orchestrator class that coordinates Frida instrumentation, event collection, and profile generation.

Constructor Parameters:

AppProfiler(
    process,                    # Frida process session
    verbose_mode=False,         # Enable verbose output
    output_format="CMD",        # Output format ("CMD" or "JSON")
    base_path=None,            # Base path for file dumps
    deactivate_unlink=False,   # Disable file unlinking
    path_filters=None,         # Path filters for filesystem events
    hook_config=None,          # Hook configuration dict
    enable_stacktrace=False,   # Enable stack traces
    enable_fritap=False,       # Enable friTap integration
    fritap_output_dir="./fritap_output",  # friTap output directory
    target_name=None,          # Target app name
    spawn_mode=False,          # Whether target was spawned
    custom_scripts=None        # List of custom script paths
)

Basic Usage:

from dexray_intercept import AppProfiler
import frida

# Connect to device and attach to app
device = frida.get_usb_device()
session = device.attach("com.example.app")

# Create profiler with crypto and network monitoring
profiler = AppProfiler(
    session,
    hook_config={
        'aes_hooks': True,
        'web_hooks': True,
        'bypass_hooks': True
    },
    verbose_mode=True
)

# Start profiling
script = profiler.start_profiling("com.example.app")

# Let app run and collect events...
input("Press Enter to stop...")

# Stop and get results
profiler.stop_profiling()
profile_data = profiler.get_profile_data()

Key Methods:

start_profiling(app_name=None)

Start the profiling process and load Frida scripts.

Parameters:

app_name – Name of target application

Returns:

Loaded Frida script instance

Return type:

frida.core.Script

script = profiler.start_profiling("com.banking.app")
stop_profiling()

Stop profiling and cleanup resources including friTap processes.

profiler.stop_profiling()
get_profile_data()

Get the collected profile data object.

Returns:

Profile data containing all events

Return type:

ProfileData

data = profiler.get_profile_data()
crypto_events = data.get_events('CRYPTO_AES')
write_profiling_log(filename='profile.json')

Write profile data to JSON file with timestamp.

Parameters:

filename – Base filename for output

Returns:

Generated filename with timestamp

Return type:

str

output_file = profiler.write_profiling_log("banking_analysis")
print(f"Profile saved to: {output_file}")

Hook Management:

enable_hook(hook_name, enabled=True)

Enable or disable a specific hook at runtime.

Parameters:
  • hook_name – Name of hook to control

  • enabled – Whether to enable (True) or disable (False)

# Enable AES hooks during runtime
profiler.enable_hook('aes_hooks', True)

# Disable web hooks
profiler.enable_hook('web_hooks', False)
get_enabled_hooks()

Get list of currently enabled hooks.

Returns:

List of enabled hook names

Return type:

List[str]

enabled = profiler.get_enabled_hooks()
print(f"Active hooks: {enabled}")
enable_all_hooks()

Enable all available hook categories.

profiler.enable_all_hooks()

ProfileData

class dexray_intercept.models.profile.ProfileData[source]

Container for profile data with events organized by category

add_event(category: str, event: Event)[source]

Add an event to the specified category

get_events(category: str) List[Event][source]

Get all events for a category

get_categories() List[str][source]

Get all available categories

get_event_count(category: str | None = None) int[source]

Get event count for a category or total

remove_empty_categories()[source]

Remove categories with no events

to_dict() Dict[str, Any][source]

Convert profile data to dictionary for serialization

to_json(indent: int = 4) str[source]

Convert profile data to JSON string

write_to_file(filename: str)[source]

Write profile data to JSON file

Parameters:

filename – Either a full path (e.g. ‘/path/to/profile_app_2025-10-03_12-00-00.json’) or just an app name (e.g. ‘com.example.app’) for standalone mode

Returns:

Path to the written file

Return type:

str

add_metadata(key: str, value: Any)[source]

Add metadata to the profile

get_summary() Dict[str, Any][source]

Get a summary of the profile data

Container class for organizing collected events by category with metadata.

Basic Usage:

# Get profile data from profiler
profile_data = profiler.get_profile_data()

# Access events by category
crypto_events = profile_data.get_events('CRYPTO_AES')
network_events = profile_data.get_events('WEB')

# Get metadata
total_events = profile_data.get_event_count()
categories = profile_data.get_categories()

# Convert to JSON
json_string = profile_data.to_json()

# Save to file
filename = profile_data.write_to_file("analysis_results.json")

Key Methods:

get_events(category)

Get all events for a specific category.

Parameters:

category – Event category name (e.g., ‘CRYPTO_AES’, ‘WEB’)

Returns:

List of events in category

Return type:

List[Event]

get_categories()

Get all available event categories.

Returns:

List of category names

Return type:

List[str]

get_event_count(category=None)

Get event count for category or total.

Parameters:

category – Specific category or None for total

Returns:

Number of events

Return type:

int

to_json(indent=4)

Convert profile data to JSON string.

Parameters:

indent – JSON indentation level

Returns:

JSON representation

Return type:

str

write_to_file(filename)

Write profile data to timestamped JSON file.

Parameters:

filename – Base filename

Returns:

Generated filename with timestamp

Return type:

str

Event Classes

Base Event

class dexray_intercept.models.events.Event(event_type: str, timestamp: str | None = None)[source]

Base class for all security events

add_metadata(key: str, value: Any)[source]

Add metadata to the event

to_dict() Dict[str, Any][source]

Convert event to dictionary for serialization

abstract get_event_data() Dict[str, Any][source]

Get event-specific data

Base class for all event types providing common functionality.

Common Properties:
  • event_type - Specific event type identifier

  • timestamp - ISO 8601 timestamp

  • metadata - Additional metadata dictionary

Usage:

for event in crypto_events:
    print(f"Event: {event.event_type}")
    print(f"Time: {event.timestamp}")
    print(f"Data: {event.get_event_data()}")

CryptoEvent

class dexray_intercept.models.events.CryptoEvent(event_type: str, algorithm: str | None = None, timestamp: str | None = None)[source]

Cryptographic operation event

get_event_data() Dict[str, Any][source]

Get event-specific data

Specialized event for cryptographic operations.

Properties:
  • algorithm - Cryptographic algorithm used

  • operation_mode - Encryption/decryption mode

  • key_hex - Hexadecimal key representation

  • iv_hex - Initialization vector

  • plaintext - Extracted plaintext (when available)

Usage:

crypto_events = profile_data.get_events('CRYPTO_AES')
for event in crypto_events:
    if event.algorithm == 'AES':
        print(f"AES operation: {event.operation_mode_desc}")
        print(f"Key length: {event.key_length} bytes")
        if event.plaintext:
            print(f"Plaintext: {event.plaintext}")

NetworkEvent

class dexray_intercept.models.events.NetworkEvent(event_type: str, timestamp: str | None = None)[source]

Network operation event

get_event_data() Dict[str, Any][source]

Get event-specific data

Event for network communications.

Properties:
  • url - Request URL

  • method - HTTP method

  • headers - Request/response headers

  • body_preview - Preview of request/response body

  • library - Network library used (OkHttp, Retrofit, etc.)

Usage:

network_events = profile_data.get_events('WEB')
for event in network_events:
    if event.url:
        print(f"{event.method} {event.url}")
        if event.headers:
            print(f"Headers: {event.headers}")

Parsing and Processing

Event Parsers

Parsers convert raw Frida messages into structured Event objects.

Parser Factory:

from dexray_intercept.parsers.factory import parser_factory

# Get parser for category
crypto_parser = parser_factory.get_parser('CRYPTO_AES')

# Parse raw event data
event = crypto_parser.parse(raw_json_string, timestamp)

Custom Parsers:

from dexray_intercept.parsers.base import BaseParser
from dexray_intercept.models.events import Event

class CustomParser(BaseParser):
    def parse_json_data(self, data, timestamp):
        # Create custom event from data
        event = CustomEvent(data['event_type'], timestamp)
        event.custom_field = data.get('custom_field')
        return event

# Register custom parser
parser_factory.register_parser('CUSTOM_CATEGORY', CustomParser())

Profile Collection

class dexray_intercept.services.profile_collector.ProfileCollector(output_format: str = 'CMD', verbose_mode: bool = False, enable_stacktrace: bool = False, path_filters: List[str] | None = None, base_path: str | None = None)[source]

Service for collecting and processing profile events

process_frida_message(message: Dict[str, Any], data: Any | None = None) bool[source]

Process a message from Frida script

get_profile_data() ProfileData[source]

Get the collected profile data

get_profile_json() str[source]

Get profile data as JSON string

write_profile_to_file(filename: str = 'profile.json') str[source]

Write profile data to file

get_event_count(category: str | None = None) int[source]

Get event count for category or total

get_categories() List[str][source]

Get all categories with events

clear_profile_data()[source]

Clear collected profile data

Handles event collection and processing from Frida messages.

Usage:

from dexray_intercept.services.profile_collector import ProfileCollector

collector = ProfileCollector(
    output_format="JSON",
    verbose_mode=True,
    enable_stacktrace=True
)

# Process Frida message
success = collector.process_frida_message(message, data)

# Get collected data
profile_data = collector.get_profile_data()

Hook Management

HookManager

class dexray_intercept.services.hook_manager.HookManager(initial_config: Dict[str, bool] | None = None)[source]

Manages hook configuration and runtime control

enable_hook(hook_name: str, enabled: bool = True) bool[source]

Enable or disable a specific hook

is_hook_enabled(hook_name: str) bool[source]

Check if a hook is enabled

get_enabled_hooks() List[str][source]

Return list of currently enabled hooks

get_disabled_hooks() List[str][source]

Return list of currently disabled hooks

enable_all_hooks()[source]

Enable all available hooks

disable_all_hooks()[source]

Disable all hooks

enable_hook_group(group_name: str)[source]

Enable a group of related hooks

get_hook_config() Dict[str, bool][source]

Get current hook configuration

update_config(new_config: Dict[str, bool])[source]

Update hook configuration with new values

get_available_hooks() List[str][source]

Get list of all available hooks

get_hook_stats() Dict[str, int][source]

Get statistics about hook configuration

Manages hook configuration and state.

Usage:

from dexray_intercept.services.hook_manager import HookManager

# Create with initial config
hook_config = {
    'aes_hooks': True,
    'web_hooks': True,
    'bypass_hooks': False
}

manager = HookManager(hook_config)

# Runtime management
manager.enable_hook('bypass_hooks', True)
enabled_hooks = manager.get_enabled_hooks()

# Get full configuration
config = manager.get_hook_config()

Frida Integration

InstrumentationService

class dexray_intercept.services.instrumentation.InstrumentationService(process, frida_agent_script: str = 'profiling.js', custom_scripts: List[str] | None = None)[source]

Service for managing Frida instrumentation

load_script() frida.core.Script[source]

Load and create the Frida script

set_message_handler(handler: Callable)[source]

Set the message handler for script communication

send_message(message: dict)[source]

Send a message to the Frida script

unload_script()[source]

Unload the Frida script and all custom scripts

is_script_loaded() bool[source]

Check if script is loaded

get_script_path() str[source]

Get the script path (public method for compatibility)

restart_script()[source]

Restart the Frida script

Manages Frida script loading and communication.

Usage:

from dexray_intercept.services.instrumentation import InstrumentationService

service = InstrumentationService(
    process_session,
    custom_scripts=['./my_hooks.js']
)

# Set message handler
service.set_message_handler(message_callback)

# Load and start script
script = service.load_script()

# Send message to script
service.send_message({'type': 'config', 'data': config})

Utility Functions

Device Management

from dexray_intercept.services.instrumentation import setup_frida_device

# Connect to USB device
device = setup_frida_device()

# Connect to remote device
device = setup_frida_device("192.168.1.100:27042")

# Enable spawn gating
device = setup_frida_device(enable_spawn_gating=True)

Command Line Integration

from dexray_intercept.ammm import parse_hook_config
from argparse import Namespace

# Parse CLI arguments to hook config
args = Namespace()
args.hooks_crypto = True
args.enable_bypass = True

hook_config = parse_hook_config(args)
# Result: {'aes_hooks': True, 'encodings_hooks': True, 'keystore_hooks': True, 'bypass_hooks': True}

Advanced Usage Patterns

Real-time Event Processing

class RealTimeAnalyzer:
    def __init__(self):
        self.alerts = []

    def message_handler(self, message, data):
        payload = message.get('payload', {})

        # Real-time analysis
        if payload.get('profileType') == 'CRYPTO_AES':
            self.analyze_crypto_event(payload)
        elif payload.get('profileType') == 'BYPASS_DETECTION':
            self.analyze_bypass_event(payload)

    def analyze_crypto_event(self, payload):
        # Check for weak encryption
        content = payload.get('profileContent', {})
        if content.get('key_length', 0) < 16:
            self.alerts.append("Weak encryption detected")

    def analyze_bypass_event(self, payload):
        # Alert on evasion attempts
        content = payload.get('profileContent', {})
        if content.get('bypass_category') == 'root_detection':
            self.alerts.append("Root detection evasion detected")

# Usage
analyzer = RealTimeAnalyzer()
profiler = AppProfiler(session)
profiler.instrumentation.set_message_handler(analyzer.message_handler)

Custom Event Types

from dexray_intercept.models.events import Event

class CustomEvent(Event):
    def __init__(self, event_type, timestamp):
        super().__init__(event_type, timestamp)
        self.custom_data = {}

    def get_event_data(self):
        data = super().get_event_data()
        data.update(self.custom_data)
        return data

# Custom processor
class CustomProfileCollector(ProfileCollector):
    def _handle_custom_events(self, content, timestamp):
        event = CustomEvent("custom.event", timestamp)
        event.custom_data = content
        self.profile_data.add_event("CUSTOM", event)
        return True

Batch Processing

import glob
import json

def analyze_profile_batch(profile_pattern):
    results = {}

    for profile_path in glob.glob(profile_pattern):
        with open(profile_path, 'r') as f:
            profile = json.load(f)

        # Analyze profile
        results[profile_path] = {
            'total_events': profile.get('_metadata', {}).get('total_events', 0),
            'crypto_events': len(profile.get('CRYPTO_AES', [])),
            'network_events': len(profile.get('WEB', [])),
            'bypass_events': len(profile.get('BYPASS_DETECTION', []))
        }

    return results

# Usage
batch_results = analyze_profile_batch("./analysis_*/profile_*.json")

Error Handling

Exception Classes

class dexray_intercept.services.instrumentation.FridaBasedException[source]

Custom exception for Frida-related errors

Custom exception for Frida-related errors.

Common Error Scenarios:

from dexray_intercept import AppProfiler, FridaBasedException
import frida

try:
    device = frida.get_usb_device()
    session = device.attach("com.example.app")
    profiler = AppProfiler(session)
    profiler.start_profiling()

except frida.ProcessNotFoundError:
    print("Target app not found or not running")
except frida.TransportError:
    print("Connection to device lost")
except FridaBasedException as e:
    print(f"Frida instrumentation error: {e}")
except Exception as e:
    print(f"Unexpected error: {e}")

Best Practices

Resource Management:

# Always cleanup resources
try:
    profiler = AppProfiler(session)
    profiler.start_profiling()
    # ... analysis code ...
finally:
    profiler.stop_profiling()

Performance Optimization:

# Use selective hooks for better performance
hook_config = {
    'aes_hooks': True,        # Only what you need
    'web_hooks': True,
    # 'hooks_all': False     # Avoid unless necessary
}

# Process events efficiently
def efficient_message_handler(message, data):
    payload = message.get('payload', {})

    # Quick filtering
    if payload.get('profileType') not in ['CRYPTO_AES', 'WEB']:
        return

    # Process only relevant events
    process_relevant_event(payload)

Threading Considerations:

import threading
from queue import Queue

class ThreadSafeAnalyzer:
    def __init__(self):
        self.event_queue = Queue()
        self.processing_thread = threading.Thread(target=self.process_events)
        self.processing_thread.daemon = True
        self.processing_thread.start()

    def message_handler(self, message, data):
        # Add to queue for background processing
        self.event_queue.put((message, data))

    def process_events(self):
        while True:
            message, data = self.event_queue.get()
            # Process in background thread
            self.analyze_event(message, data)
            self.event_queue.task_done()

Integration Examples

Threat Intelligence Integration:

class ThreatIntelIntegrator:
    def __init__(self, ti_api_key):
        self.ti_api = ThreatIntelAPI(ti_api_key)

    def analyze_profile(self, profile_data):
        # Extract IOCs
        iocs = self.extract_iocs(profile_data)

        # Check against threat intelligence
        for ioc in iocs['domains']:
            threat_info = self.ti_api.check_domain(ioc)
            if threat_info.is_malicious:
                print(f"Malicious domain detected: {ioc}")

    def extract_iocs(self, profile_data):
        iocs = {'domains': [], 'ips': [], 'urls': []}

        for event in profile_data.get_events('WEB'):
            if hasattr(event, 'url') and event.url:
                iocs['urls'].append(event.url)
                # Extract domain from URL
                from urllib.parse import urlparse
                domain = urlparse(event.url).netloc
                iocs['domains'].append(domain)

        return iocs

SIEM Integration:

import syslog

class SIEMIntegrator:
    def __init__(self):
        syslog.openlog("dexray-intercept")

    def send_alert(self, event):
        severity = self.get_severity(event)
        message = self.format_siem_message(event)
        syslog.syslog(severity, message)

    def get_severity(self, event):
        if hasattr(event, 'bypass_category'):
            return syslog.LOG_CRIT  # Critical for bypass attempts
        elif event.event_type.startswith('crypto'):
            return syslog.LOG_WARNING
        else:
            return syslog.LOG_INFO

    def format_siem_message(self, event):
        return f"dexray_event={event.event_type} timestamp={event.timestamp} data={event.get_event_data()}"

Next Steps