Python API Reference

This section documents the Python API for Dexray Intercept, enabling programmatic control and analysis capabilities.

Core Classes

AppProfiler

class dexray_intercept.AppProfiler(process, verbose_mode: bool = False, output_format: str = 'CMD', base_path: str | None = None, deactivate_unlink: bool = False, path_filters: List[str] | None = None, hook_config: Dict[str, bool] | None = None, enable_stacktrace: bool = False, enable_fritap: bool = False, fritap_output_dir: str = './fritap_output', target_name: str | None = None, spawn_mode: bool = False, custom_scripts: List[str] | None = None)[source]

Main application profiler class.

This class orchestrates the profiling process by coordinating between: - InstrumentationService: Manages Frida script loading and communication - ProfileCollector: Handles event collection and processing - HookManager: Manages hook configuration

set_process_session(process_session)[source]

Update the process session after fritap spawns the target

send_interrupt_to_fritap()[source]

Send SIGINT (Ctrl+C) to fritap process

wait_for_fritap(timeout: float | None = None)[source]

Wait for fritap to finish

Parameters:

timeout – Maximum time to wait in seconds (None for indefinite)

Returns:

True if fritap finished, False if timeout occurred

start_profiling(app_name: str | None = None) frida.core.Script[source]

Start the profiling process

stop_profiling()[source]

Stop the profiling process

enable_hook(hook_name: str, enabled: bool = True)[source]

Enable or disable a specific hook at runtime

get_enabled_hooks() List[str][source]

Return list of currently enabled hooks

enable_all_hooks()[source]

Enable all available hooks

enable_hook_group(group_name: str)[source]

Enable a group of related hooks

get_profile_data() ProfileData[source]

Get the collected profile data

get_profiling_log_as_json() str[source]

Get profile data as JSON string

write_profiling_log(filename: str = 'profile.json') str[source]

Write profile data to file

get_event_count(category: str | None = None) int[source]

Get event count for category or total

get_categories() List[str][source]

Get all categories with events

instrument() frida.core.Script[source]

Legacy method - use start_profiling() instead

finish_app_profiling()[source]

Legacy method - use stop_profiling() instead

get_frida_script() str[source]

Get the path to the Frida script

update_script(script)[source]

Update script reference (for compatibility)

get_stats() Dict[str, Any][source]

Get profiling statistics

The main orchestrator class that coordinates Frida instrumentation, event collection, and profile generation.

Constructor Parameters:

AppProfiler(
    process,                    # Frida process session
    verbose_mode=False,         # Enable verbose output
    output_format="CMD",        # Output format ("CMD" or "JSON")
    base_path=None,            # Base path for file dumps
    deactivate_unlink=False,   # Disable file unlinking
    path_filters=None,         # Path filters for filesystem events
    hook_config=None,          # Hook configuration dict
    enable_stacktrace=False,   # Enable stack traces
    enable_fritap=False,       # Enable friTap integration
    fritap_output_dir="./fritap_output",  # friTap output directory
    target_name=None,          # Target app name
    spawn_mode=False,          # Whether target was spawned
    custom_scripts=None        # List of custom script paths
)

Basic Usage:

from dexray_intercept import AppProfiler
import frida

# Connect to device and attach to app
device = frida.get_usb_device()
session = device.attach("com.example.app")

# Create profiler with crypto and network monitoring
profiler = AppProfiler(
    session,
    hook_config={
        'aes_hooks': True,
        'web_hooks': True,
        'bypass_hooks': True
    },
    verbose_mode=True
)

# Start profiling
script = profiler.start_profiling("com.example.app")

# Let app run and collect events...
input("Press Enter to stop...")

# Stop and get results
profiler.stop_profiling()
profile_data = profiler.get_profile_data()

Key Methods:

start_profiling(app_name=None)

Start the profiling process and load Frida scripts.

Parameters:

app_name – Name of target application

Returns:

Loaded Frida script instance

Return type:

frida.core.Script

script = profiler.start_profiling("com.banking.app")
stop_profiling()

Stop profiling and cleanup resources including friTap processes.

profiler.stop_profiling()
get_profile_data()

Get the collected profile data object.

Returns:

Profile data containing all events

Return type:

ProfileData

data = profiler.get_profile_data()
crypto_events = data.get_events('CRYPTO_AES')
write_profiling_log(filename='profile.json')

Write profile data to JSON file with timestamp.

Parameters:

filename – Base filename for output

Returns:

Generated filename with timestamp

Return type:

str

output_file = profiler.write_profiling_log("banking_analysis")
print(f"Profile saved to: {output_file}")

Hook Management:

enable_hook(hook_name, enabled=True)

Enable or disable a specific hook at runtime.

Parameters:
  • hook_name – Name of hook to control

  • enabled – Whether to enable (True) or disable (False)

# Enable AES hooks during runtime
profiler.enable_hook('aes_hooks', True)

# Disable web hooks
profiler.enable_hook('web_hooks', False)
get_enabled_hooks()

Get list of currently enabled hooks.

Returns:

List of enabled hook names

Return type:

List[str]

enabled = profiler.get_enabled_hooks()
print(f"Active hooks: {enabled}")
enable_all_hooks()

Enable all available hook categories.

profiler.enable_all_hooks()

ProfileData

class dexray_intercept.models.profile.ProfileData[source]

Container for profile data with events organized by category

add_event(category: str, event: Event)[source]

Add an event to the specified category

get_events(category: str) List[Event][source]

Get all events for a category

get_categories() List[str][source]

Get all available categories

get_event_count(category: str | None = None) int[source]

Get event count for a category or total

remove_empty_categories()[source]

Remove categories with no events

to_dict() Dict[str, Any][source]

Convert profile data to dictionary for serialization

to_json(indent: int = 4) str[source]

Convert profile data to JSON string

write_to_file(filename: str)[source]

Write profile data to JSON file

add_metadata(key: str, value: Any)[source]

Add metadata to the profile

get_summary() Dict[str, Any][source]

Get a summary of the profile data

Container class for organizing collected events by category with metadata.

Basic Usage:

# Get profile data from profiler
profile_data = profiler.get_profile_data()

# Access events by category
crypto_events = profile_data.get_events('CRYPTO_AES')
network_events = profile_data.get_events('WEB')

# Get metadata
total_events = profile_data.get_event_count()
categories = profile_data.get_categories()

# Convert to JSON
json_string = profile_data.to_json()

# Save to file
filename = profile_data.write_to_file("analysis_results.json")

Key Methods:

get_events(category)

Get all events for a specific category.

Parameters:

category – Event category name (e.g., ‘CRYPTO_AES’, ‘WEB’)

Returns:

List of events in category

Return type:

List[Event]

get_categories()

Get all available event categories.

Returns:

List of category names

Return type:

List[str]

get_event_count(category=None)

Get event count for category or total.

Parameters:

category – Specific category or None for total

Returns:

Number of events

Return type:

int

to_json(indent=4)

Convert profile data to JSON string.

Parameters:

indent – JSON indentation level

Returns:

JSON representation

Return type:

str

write_to_file(filename)

Write profile data to timestamped JSON file.

Parameters:

filename – Base filename

Returns:

Generated filename with timestamp

Return type:

str

Event Classes

Base Event

class dexray_intercept.models.events.Event(event_type: str, timestamp: str | None = None)[source]

Base class for all security events

add_metadata(key: str, value: Any)[source]

Add metadata to the event

to_dict() Dict[str, Any][source]

Convert event to dictionary for serialization

abstract get_event_data() Dict[str, Any][source]

Get event-specific data

Base class for all event types providing common functionality.

Common Properties:
  • event_type - Specific event type identifier

  • timestamp - ISO 8601 timestamp

  • metadata - Additional metadata dictionary

Usage:

for event in crypto_events:
    print(f"Event: {event.event_type}")
    print(f"Time: {event.timestamp}")
    print(f"Data: {event.get_event_data()}")

CryptoEvent

class dexray_intercept.models.events.CryptoEvent(event_type: str, algorithm: str | None = None, timestamp: str | None = None)[source]

Cryptographic operation event

get_event_data() Dict[str, Any][source]

Get event-specific data

Specialized event for cryptographic operations.

Properties:
  • algorithm - Cryptographic algorithm used

  • operation_mode - Encryption/decryption mode

  • key_hex - Hexadecimal key representation

  • iv_hex - Initialization vector

  • plaintext - Extracted plaintext (when available)

Usage:

crypto_events = profile_data.get_events('CRYPTO_AES')
for event in crypto_events:
    if event.algorithm == 'AES':
        print(f"AES operation: {event.operation_mode_desc}")
        print(f"Key length: {event.key_length} bytes")
        if event.plaintext:
            print(f"Plaintext: {event.plaintext}")

NetworkEvent

class dexray_intercept.models.events.NetworkEvent(event_type: str, timestamp: str | None = None)[source]

Network operation event

get_event_data() Dict[str, Any][source]

Get event-specific data

Event for network communications.

Properties:
  • url - Request URL

  • method - HTTP method

  • headers - Request/response headers

  • body_preview - Preview of request/response body

  • library - Network library used (OkHttp, Retrofit, etc.)

Usage:

network_events = profile_data.get_events('WEB')
for event in network_events:
    if event.url:
        print(f"{event.method} {event.url}")
        if event.headers:
            print(f"Headers: {event.headers}")

Parsing and Processing

Event Parsers

Parsers convert raw Frida messages into structured Event objects.

Parser Factory:

from dexray_intercept.parsers.factory import parser_factory

# Get parser for category
crypto_parser = parser_factory.get_parser('CRYPTO_AES')

# Parse raw event data
event = crypto_parser.parse(raw_json_string, timestamp)

Custom Parsers:

from dexray_intercept.parsers.base import BaseParser
from dexray_intercept.models.events import Event

class CustomParser(BaseParser):
    def parse_json_data(self, data, timestamp):
        # Create custom event from data
        event = CustomEvent(data['event_type'], timestamp)
        event.custom_field = data.get('custom_field')
        return event

# Register custom parser
parser_factory.register_parser('CUSTOM_CATEGORY', CustomParser())

Profile Collection

class dexray_intercept.services.profile_collector.ProfileCollector(output_format: str = 'CMD', verbose_mode: bool = False, enable_stacktrace: bool = False, path_filters: List[str] | None = None, base_path: str | None = None)[source]

Service for collecting and processing profile events

process_frida_message(message: Dict[str, Any], data: Any | None = None) bool[source]

Process a message from Frida script

get_profile_data() ProfileData[source]

Get the collected profile data

get_profile_json() str[source]

Get profile data as JSON string

write_profile_to_file(filename: str = 'profile.json') str[source]

Write profile data to file

get_event_count(category: str | None = None) int[source]

Get event count for category or total

get_categories() List[str][source]

Get all categories with events

clear_profile_data()[source]

Clear collected profile data

Handles event collection and processing from Frida messages.

Usage:

from dexray_intercept.services.profile_collector import ProfileCollector

collector = ProfileCollector(
    output_format="JSON",
    verbose_mode=True,
    enable_stacktrace=True
)

# Process Frida message
success = collector.process_frida_message(message, data)

# Get collected data
profile_data = collector.get_profile_data()

Hook Management

HookManager

class dexray_intercept.services.hook_manager.HookManager(initial_config: Dict[str, bool] | None = None)[source]

Manages hook configuration and runtime control

enable_hook(hook_name: str, enabled: bool = True) bool[source]

Enable or disable a specific hook

is_hook_enabled(hook_name: str) bool[source]

Check if a hook is enabled

get_enabled_hooks() List[str][source]

Return list of currently enabled hooks

get_disabled_hooks() List[str][source]

Return list of currently disabled hooks

enable_all_hooks()[source]

Enable all available hooks

disable_all_hooks()[source]

Disable all hooks

enable_hook_group(group_name: str)[source]

Enable a group of related hooks

get_hook_config() Dict[str, bool][source]

Get current hook configuration

update_config(new_config: Dict[str, bool])[source]

Update hook configuration with new values

get_available_hooks() List[str][source]

Get list of all available hooks

get_hook_stats() Dict[str, int][source]

Get statistics about hook configuration

Manages hook configuration and state.

Usage:

from dexray_intercept.services.hook_manager import HookManager

# Create with initial config
hook_config = {
    'aes_hooks': True,
    'web_hooks': True,
    'bypass_hooks': False
}

manager = HookManager(hook_config)

# Runtime management
manager.enable_hook('bypass_hooks', True)
enabled_hooks = manager.get_enabled_hooks()

# Get full configuration
config = manager.get_hook_config()

Frida Integration

InstrumentationService

class dexray_intercept.services.instrumentation.InstrumentationService(process, frida_agent_script: str = 'profiling.js', custom_scripts: List[str] | None = None)[source]

Service for managing Frida instrumentation

load_script() frida.core.Script[source]

Load and create the Frida script

set_message_handler(handler: Callable)[source]

Set the message handler for script communication

send_message(message: dict)[source]

Send a message to the Frida script

unload_script()[source]

Unload the Frida script and all custom scripts

is_script_loaded() bool[source]

Check if script is loaded

get_script_path() str[source]

Get the script path (public method for compatibility)

restart_script()[source]

Restart the Frida script

Manages Frida script loading and communication.

Usage:

from dexray_intercept.services.instrumentation import InstrumentationService

service = InstrumentationService(
    process_session,
    custom_scripts=['./my_hooks.js']
)

# Set message handler
service.set_message_handler(message_callback)

# Load and start script
script = service.load_script()

# Send message to script
service.send_message({'type': 'config', 'data': config})

Utility Functions

Device Management

from dexray_intercept.services.instrumentation import setup_frida_device

# Connect to USB device
device = setup_frida_device()

# Connect to remote device
device = setup_frida_device("192.168.1.100:27042")

# Enable spawn gating
device = setup_frida_device(enable_spawn_gating=True)

Command Line Integration

from dexray_intercept.ammm import parse_hook_config
from argparse import Namespace

# Parse CLI arguments to hook config
args = Namespace()
args.hooks_crypto = True
args.enable_bypass = True

hook_config = parse_hook_config(args)
# Result: {'aes_hooks': True, 'encodings_hooks': True, 'keystore_hooks': True, 'bypass_hooks': True}

Advanced Usage Patterns

Real-time Event Processing

class RealTimeAnalyzer:
    def __init__(self):
        self.alerts = []

    def message_handler(self, message, data):
        payload = message.get('payload', {})

        # Real-time analysis
        if payload.get('profileType') == 'CRYPTO_AES':
            self.analyze_crypto_event(payload)
        elif payload.get('profileType') == 'BYPASS_DETECTION':
            self.analyze_bypass_event(payload)

    def analyze_crypto_event(self, payload):
        # Check for weak encryption
        content = payload.get('profileContent', {})
        if content.get('key_length', 0) < 16:
            self.alerts.append("Weak encryption detected")

    def analyze_bypass_event(self, payload):
        # Alert on evasion attempts
        content = payload.get('profileContent', {})
        if content.get('bypass_category') == 'root_detection':
            self.alerts.append("Root detection evasion detected")

# Usage
analyzer = RealTimeAnalyzer()
profiler = AppProfiler(session)
profiler.instrumentation.set_message_handler(analyzer.message_handler)

Custom Event Types

from dexray_intercept.models.events import Event

class CustomEvent(Event):
    def __init__(self, event_type, timestamp):
        super().__init__(event_type, timestamp)
        self.custom_data = {}

    def get_event_data(self):
        data = super().get_event_data()
        data.update(self.custom_data)
        return data

# Custom processor
class CustomProfileCollector(ProfileCollector):
    def _handle_custom_events(self, content, timestamp):
        event = CustomEvent("custom.event", timestamp)
        event.custom_data = content
        self.profile_data.add_event("CUSTOM", event)
        return True

Batch Processing

import glob
import json

def analyze_profile_batch(profile_pattern):
    results = {}

    for profile_path in glob.glob(profile_pattern):
        with open(profile_path, 'r') as f:
            profile = json.load(f)

        # Analyze profile
        results[profile_path] = {
            'total_events': profile.get('_metadata', {}).get('total_events', 0),
            'crypto_events': len(profile.get('CRYPTO_AES', [])),
            'network_events': len(profile.get('WEB', [])),
            'bypass_events': len(profile.get('BYPASS_DETECTION', []))
        }

    return results

# Usage
batch_results = analyze_profile_batch("./analysis_*/profile_*.json")

Error Handling

Exception Classes

class dexray_intercept.services.instrumentation.FridaBasedException[source]

Custom exception for Frida-related errors

Custom exception for Frida-related errors.

Common Error Scenarios:

from dexray_intercept import AppProfiler, FridaBasedException
import frida

try:
    device = frida.get_usb_device()
    session = device.attach("com.example.app")
    profiler = AppProfiler(session)
    profiler.start_profiling()

except frida.ProcessNotFoundError:
    print("Target app not found or not running")
except frida.TransportError:
    print("Connection to device lost")
except FridaBasedException as e:
    print(f"Frida instrumentation error: {e}")
except Exception as e:
    print(f"Unexpected error: {e}")

Best Practices

Resource Management:

# Always cleanup resources
try:
    profiler = AppProfiler(session)
    profiler.start_profiling()
    # ... analysis code ...
finally:
    profiler.stop_profiling()

Performance Optimization:

# Use selective hooks for better performance
hook_config = {
    'aes_hooks': True,        # Only what you need
    'web_hooks': True,
    # 'hooks_all': False     # Avoid unless necessary
}

# Process events efficiently
def efficient_message_handler(message, data):
    payload = message.get('payload', {})

    # Quick filtering
    if payload.get('profileType') not in ['CRYPTO_AES', 'WEB']:
        return

    # Process only relevant events
    process_relevant_event(payload)

Threading Considerations:

import threading
from queue import Queue

class ThreadSafeAnalyzer:
    def __init__(self):
        self.event_queue = Queue()
        self.processing_thread = threading.Thread(target=self.process_events)
        self.processing_thread.daemon = True
        self.processing_thread.start()

    def message_handler(self, message, data):
        # Add to queue for background processing
        self.event_queue.put((message, data))

    def process_events(self):
        while True:
            message, data = self.event_queue.get()
            # Process in background thread
            self.analyze_event(message, data)
            self.event_queue.task_done()

Integration Examples

Threat Intelligence Integration:

class ThreatIntelIntegrator:
    def __init__(self, ti_api_key):
        self.ti_api = ThreatIntelAPI(ti_api_key)

    def analyze_profile(self, profile_data):
        # Extract IOCs
        iocs = self.extract_iocs(profile_data)

        # Check against threat intelligence
        for ioc in iocs['domains']:
            threat_info = self.ti_api.check_domain(ioc)
            if threat_info.is_malicious:
                print(f"Malicious domain detected: {ioc}")

    def extract_iocs(self, profile_data):
        iocs = {'domains': [], 'ips': [], 'urls': []}

        for event in profile_data.get_events('WEB'):
            if hasattr(event, 'url') and event.url:
                iocs['urls'].append(event.url)
                # Extract domain from URL
                from urllib.parse import urlparse
                domain = urlparse(event.url).netloc
                iocs['domains'].append(domain)

        return iocs

SIEM Integration:

import syslog

class SIEMIntegrator:
    def __init__(self):
        syslog.openlog("dexray-intercept")

    def send_alert(self, event):
        severity = self.get_severity(event)
        message = self.format_siem_message(event)
        syslog.syslog(severity, message)

    def get_severity(self, event):
        if hasattr(event, 'bypass_category'):
            return syslog.LOG_CRIT  # Critical for bypass attempts
        elif event.event_type.startswith('crypto'):
            return syslog.LOG_WARNING
        else:
            return syslog.LOG_INFO

    def format_siem_message(self, event):
        return f"dexray_event={event.event_type} timestamp={event.timestamp} data={event.get_event_data()}"

Next Steps