Core Framework API

The core framework provides the foundational classes and components for Dexray Insight’s modular analysis architecture.

Analysis Engine

class dexray_insight.core.analysis_engine.AnalysisEngine(config: Configuration)[source]

Bases: object

Main analysis engine that orchestrates all APK analysis activities.

The AnalysisEngine serves as the central coordinator for the entire analysis workflow. It manages module execution, external tool integration, result aggregation, and security assessment orchestration.

Key Features: - Modular architecture with pluggable analysis modules - Dependency-aware execution planning and parallel processing - External tool integration (APKID, Kavanoz, JADX, etc.) - Comprehensive error handling and resilience - Security assessment engine integration - Result aggregation and structured output generation

Architecture Patterns: - Registry Pattern: For module discovery and management - Strategy Pattern: For different analysis approaches - Factory Pattern: For result object creation - Template Method: For analysis workflow orchestration

SOLID Principles: - Single Responsibility: Orchestrates analysis workflow - Open/Closed: Extensible through module registration - Dependency Inversion: Depends on abstractions (Configuration, modules)

Usage:

config = Configuration() engine = AnalysisEngine(config) results = engine.analyze_apk(‘/path/to/app.apk’)

__init__(config: Configuration)[source]
analyze_apk(apk_path: str, requested_modules: List[str] | None = None, androguard_obj: Any | None = None, timestamp: str | None = None) FullAnalysisResults[source]

Perform comprehensive APK analysis

Parameters:
  • apk_path – Path to the APK file

  • requested_modules – Optional list of specific modules to run

  • androguard_obj – Optional pre-initialized Androguard object

  • timestamp – Optional timestamp for temporal directory naming

Returns:

FullAnalysisResults containing all analysis results

The AnalysisEngine is the central orchestrator that manages the execution of analysis modules, handles dependencies, and coordinates parallel execution.

Key Methods:

  • analyze_apk(apk_path, androguard_obj, timestamp) - Main entry point for APK analysis

  • _execute_modules(context) - Execute registered analysis modules

  • _resolve_dependencies(modules) - Resolve module execution order based on dependencies

Usage Example:

from dexray_insight.core import AnalysisEngine, Configuration
from dexray_insight.Utils.androguardObjClass import Androguard_Obj

# Create configuration and engine
config = Configuration()
engine = AnalysisEngine(config)

# Create androguard object
androguard_obj = Androguard_Obj("app.apk")

# Run analysis
results = engine.analyze_apk("app.apk", androguard_obj=androguard_obj)
print(results.to_json())

Configuration

class dexray_insight.core.configuration.Configuration(config_path: str | None = None, config_dict: Dict[str, Any] | None = None)[source]

Bases: object

Centralized configuration management for Dexray Insight

DEFAULT_CONFIG = {'analysis': {'parallel_execution': {'enabled': True, 'max_workers': 4}, 'timeout': {'module_timeout': 300, 'tool_timeout': 600}}, 'logging': {'file': None, 'format': '%(asctime)s - %(name)s - %(levelname)s - %(message)s', 'level': 'INFO'}, 'modules': {'api_invocation': {'enabled': False, 'priority': 40, 'reflection_analysis': True}, 'apk_diffing': {'enabled': False, 'priority': 100}, 'behaviour_analysis': {'deep_mode': False, 'enabled': True, 'priority': 1000}, 'library_detection': {'class_similarity_threshold': 0.7, 'confidence_threshold': 0.7, 'enable_heuristic': True, 'enable_similarity': True, 'enabled': True, 'priority': 25, 'similarity_threshold': 0.85, 'version_analysis': {'api_timeout': 5, 'cache_duration_hours': 24, 'console_output': {'enabled': True, 'group_by_risk': True, 'show_recommendations': True, 'show_summary': True}, 'enabled': True, 'security_analysis_only': True, 'sources': {'custom_database': False, 'maven_central': True, 'npm_registry': True, 'pypi': True}}}, 'manifest_analysis': {'analyze_exported_components': True, 'enabled': True, 'extract_intent_filters': True, 'priority': 15}, 'native_analysis': {'architectures': ['arm64-v8a'], 'enabled': True, 'file_patterns': ['*.so'], 'modules': {'string_extraction': {'enabled': True, 'encoding': 'utf-8', 'fallback_encodings': ['latin1', 'ascii'], 'max_string_length': 1024, 'min_string_length': 4}}, 'priority': 50, 'requires_temporal_analysis': True}, 'permission_analysis': {'critical_permissions_file': None, 'enabled': True, 'priority': 20, 'use_default_critical_list': True}, 'signature_detection': {'enabled': True, 'priority': 10, 'providers': {'koodous': {'api_key': None, 'enabled': False}, 'triage': {'api_key': None, 'enabled': True}, 'virustotal': {'api_key': None, 'enabled': False, 'rate_limit': 4}}}, 'string_analysis': {'enabled': True, 'filters': {'exclude_patterns': [], 'min_string_length': 2}, 'patterns': {'base64_strings': True, 'domains': True, 'email_addresses': True, 'ip_addresses': True, 'urls': True}, 'priority': 30}, 'tracker_analysis': {'api_timeout': 10, 'enabled': True, 'exodus_api_url': 'https://reports.exodus-privacy.eu.org/api/trackers', 'fetch_exodus_trackers': True, 'priority': 35}}, 'output': {'filename_template': 'asam_{apk_name}_{timestamp}.json', 'format': 'json', 'include_timestamps': True, 'output_directory': './results', 'pretty_print': True}, 'security': {'assessments': {'broken_access_control': {'check_exported_components': True, 'check_permissions': True, 'enabled': True}, 'broken_authentication': {'check_hardcoded_secrets': True, 'check_weak_crypto': True, 'enabled': True}, 'injection': {'command_patterns': ['exec', 'system', 'runtime'], 'enabled': True, 'sql_patterns': ['SELECT', 'INSERT', 'UPDATE', 'DELETE', 'DROP']}, 'insufficient_logging': {'check_logging_practices': True, 'enabled': True}, 'security_misconfiguration': {'check_debug_flags': True, 'check_network_security': True, 'enabled': True}, 'sensitive_data': {'crypto_keys_check': True, 'enabled': True, 'pii_patterns': ['email', 'phone', 'ssn', 'credit_card']}, 'vulnerable_components': {'check_known_libraries': True, 'enabled': True}}, 'enable_owasp_assessment': False}, 'temporal_analysis': {'base_directory': './temp_analysis', 'cleanup_after_analysis': False, 'directory_structure': {'apktool_folder': 'apktoolResults', 'jadx_folder': 'jadxResults', 'logs_folder': 'logs', 'unzipped_folder': 'unzipped'}, 'enabled': True, 'preserve_on_error': True}, 'tools': {'androguard': {'enabled': True, 'logging_level': 'WARNING'}, 'apkid': {'enabled': True, 'options': [], 'timeout': 300}, 'apktool': {'enabled': True, 'java_options': ['-Xmx2g'], 'options': ['--no-debug-info'], 'path': None, 'timeout': 600}, 'jadx': {'enabled': True, 'options': ['--no-debug-info', '--no-inline-anonymous', '--show-bad-code'], 'path': None, 'timeout': 900}, 'kavanoz': {'enabled': True, 'output_dir': None, 'timeout': 600}, 'radare2': {'enabled': True, 'options': ['-2'], 'path': None, 'timeout': 120}}}
__init__(config_path: str | None = None, config_dict: Dict[str, Any] | None = None)[source]

Initialize configuration

Parameters:
  • config_path – Path to configuration file (JSON or YAML)

  • config_dict – Configuration dictionary (overrides file)

get_module_config(module_name: str) Dict[str, Any][source]

Get configuration for a specific module

get_tool_config(tool_name: str) Dict[str, Any][source]

Get configuration for a specific external tool

get_temporal_analysis_config() Dict[str, Any][source]

Get temporal analysis configuration

get_security_config() Dict[str, Any][source]

Get security assessment configuration

get_output_config() Dict[str, Any][source]

Get output configuration

property enable_security_assessment: bool

Check if OWASP security assessment is enabled

property parallel_execution_enabled: bool

Check if parallel execution is enabled

property max_workers: int

Get maximum number of parallel workers

to_dict() Dict[str, Any][source]

Get complete configuration as dictionary

save_to_file(file_path: str, format: str = 'json')[source]

Save configuration to file

Parameters:
  • file_path – Path to save configuration

  • format – File format (‘json’ or ‘yaml’)

update_from_kwargs(**kwargs)[source]

Update configuration from keyword arguments (for backward compatibility)

validate() bool[source]

Validate configuration

The Configuration class manages YAML configuration loading, validation, and provides access to module and tool settings.

Key Methods:

  • __init__(config_path=None, config_dict=None) - Initialize configuration from file or dictionary

  • validate() - Validate configuration structure and values

  • get_module_config(module_name) - Get configuration for specific module

  • get_tool_config(tool_name) - Get configuration for external tool

Configuration Loading Priority:

  1. Explicit config_dict parameter

  2. Configuration file specified by config_path

  3. Default dexray.yaml in current directory

  4. Built-in default configuration

Usage Example:

from dexray_insight.core.configuration import Configuration

# Load from file
config = Configuration(config_path="my_config.yaml")

# Load from dictionary
config_dict = {
    'modules': {'signature_detection': {'enabled': True}},
    'logging': {'level': 'DEBUG'}
}
config = Configuration(config_dict=config_dict)

# Access configuration
sig_config = config.get_module_config('signature_detection')
tool_config = config.get_tool_config('radare2')

Base Classes

class dexray_insight.core.base_classes.BaseAnalysisModule(config: Dict[str, Any])[source]

Bases: ABC

Abstract base class for all analysis modules in the Dexray Insight framework.

This class defines the standard interface that all analysis modules must implement. It provides common functionality like configuration handling, logging setup, and standardized method signatures for the analysis workflow.

Responsibilities: - Define the contract for analysis modules (analyze, get_dependencies) - Provide common initialization and configuration handling - Set up standardized logging for all modules - Enforce consistent return types (BaseResult)

Design Pattern: Template Method (defines algorithm structure) SOLID Principles: - Interface Segregation (focused interface for analysis modules) - Liskov Substitution (all modules can be used interchangeably)

Implementation Requirements: - Must implement analyze() method for core analysis logic - Must implement get_dependencies() to declare module dependencies - Should return results wrapped in BaseResult or its subclasses

config

Configuration dictionary passed from AnalysisEngine

name

Module class name for identification

enabled

Flag indicating if module is enabled for execution

logger

Configured logger instance for this module

__init__(config: Dict[str, Any])[source]
abstract analyze(apk_path: str, context: AnalysisContext) BaseResult[source]

Perform the core analysis logic for this module.

This is the main entry point for module execution. Implementations should perform their specific analysis tasks and return structured results wrapped in a BaseResult object.

Parameters:
  • apk_path – Absolute path to the APK file being analyzed

  • context – AnalysisContext containing shared data, configuration, and results from previously executed modules

Returns:

Analysis results with status, data, and error information.

Should include all relevant findings from this module’s analysis.

Return type:

BaseResult

Raises:
  • Should handle internal exceptions and return results with FAILURE status

  • rather than propagating exceptions to the engine.

Implementation Guidelines: - Use self.logger for consistent logging - Access configuration via self.config - Use context to access shared data and previous results - Return meaningful error messages in BaseResult on failure - Follow single responsibility principle in analysis logic

abstract get_dependencies() List[str][source]

Return list of module names this module depends on

Returns:

List of module names that must be executed before this module

validate_config() bool[source]

Validate module configuration

Returns:

True if configuration is valid, False otherwise

is_enabled() bool[source]

Check if module is enabled

get_priority() int[source]

Get execution priority (lower numbers = higher priority)

Abstract base class for all analysis modules. Provides the standard interface that all modules must implement.

Required Methods:

  • analyze(apk_path, context) - Perform module-specific analysis

  • get_dependencies() - Return list of module dependencies

Standard Methods:

  • is_enabled() - Check if module is enabled in configuration

  • get_timeout() - Get module execution timeout

  • get_priority() - Get module execution priority

class dexray_insight.core.base_classes.BaseResult(module_name: str, status: AnalysisStatus, execution_time: float = 0.0, error_message: str | None = None)[source]

Bases: object

Base class for all analysis results

module_name: str
status: AnalysisStatus
execution_time: float = 0.0
error_message: str | None = None
to_dict() Dict[str, Any][source]

Convert result to dictionary for serialization

to_json() str[source]

Convert result to JSON string

__init__(module_name: str, status: AnalysisStatus, execution_time: float = 0.0, error_message: str | None = None) None

Base class for all analysis results. Provides standardized result structure and serialization methods.

Standard Fields:

  • module_name - Name of the analysis module

  • status - Analysis execution status (SUCCESS, FAILURE, SKIPPED, TIMEOUT)

  • execution_time - Time taken for analysis (seconds)

  • error_message - Error details if analysis failed

Key Methods:

  • to_dict() - Convert result to dictionary for JSON serialization

  • is_successful() - Check if analysis completed successfully

class dexray_insight.core.base_classes.AnalysisContext(apk_path: str, config: Dict[str, Any], androguard_obj: Any | None = None, unzip_path: str | None = None, module_results: Dict[str, Any] | None = None, temporal_paths: Any | None = None, jadx_available: bool = False, apktool_available: bool = False)[source]

Bases: object

Context object passed between modules containing shared data and results.

The AnalysisContext serves as a shared data container that is passed between analysis modules during the analysis workflow. It contains APK information, configuration, and accumulated results from previous modules.

This design enables: - Data sharing between dependent modules - Centralized configuration access - Progressive result accumulation - Temporal directory management

apk_path

File path to the APK being analyzed

Type:

str

config

Configuration dictionary from the engine

Type:

Dict[str, Any]

androguard_obj

Optional pre-loaded Androguard analysis object

Type:

Any | None

unzip_path

Legacy field for backwards compatibility (deprecated)

Type:

str | None

module_results

Dictionary storing results from completed modules

Type:

Dict[str, Any]

temporal_paths

Modern temporal directory management object

Type:

Any | None

jadx_available

Flag indicating JADX decompiler availability

Type:

bool

apktool_available

Flag indicating APKTool availability

Type:

bool

Design Pattern: Context Object (shares state between modules) SOLID Principles: Single Responsibility (data container and accessor)

apk_path: str
config: Dict[str, Any]
androguard_obj: Any | None = None
unzip_path: str | None = None
module_results: Dict[str, Any] = None
temporal_paths: Any | None = None
jadx_available: bool = False
apktool_available: bool = False
add_result(module_name: str, result: Any)[source]

Add a module result to the context for use by dependent modules.

This method allows completed modules to store their results in the shared context where they can be accessed by dependent modules.

Parameters:
  • module_name – Name of the module storing the result

  • result – Analysis result object or data structure

Side Effects:

Modifies self.module_results dictionary

get_unzipped_dir() str | None[source]

Get path to unzipped APK directory (temporal or legacy).

This method provides backwards compatibility by checking both modern temporal paths and legacy unzip paths.

Returns:

Path to unzipped APK directory, or None if not available

Return type:

str

Design Pattern: Facade (hides complexity of path resolution)

get_jadx_dir() str | None[source]

Get path to JADX decompiled directory

get_apktool_dir() str | None[source]

Get path to apktool results directory

get_result(module_name: str) Any | None[source]

Get a result from a previously executed module

__init__(apk_path: str, config: Dict[str, Any], androguard_obj: Any | None = None, unzip_path: str | None = None, module_results: Dict[str, Any] | None = None, temporal_paths: Any | None = None, jadx_available: bool = False, apktool_available: bool = False) None

Shared context object passed between analysis modules containing APK information and intermediate results.

Key Attributes:

  • apk_path - Path to the APK file being analyzed

  • androguard_obj - Androguard analysis object

  • temporal_paths - Paths to temporary analysis directories

  • module_results - Dictionary storing results from completed modules

  • shared_data - Dictionary for sharing data between modules

Usage Example:

# Access context in a module
def analyze(self, apk_path: str, context: AnalysisContext):
    # Access previous module results
    string_results = context.module_results.get('string_analysis', [])

    # Access temporal directories
    if context.temporal_paths:
        unzipped_dir = context.temporal_paths.unzipped_dir

    # Share data with other modules
    context.shared_data['my_module_data'] = analysis_results
class dexray_insight.core.base_classes.AnalysisStatus(value)[source]

Bases: Enum

Enumeration of analysis module execution statuses.

Used to track the execution state of individual analysis modules and provide consistent status reporting across the framework.

Values:

SUCCESS: Module completed successfully with results FAILURE: Module failed to execute due to errors PARTIAL: Module completed with some issues or warnings SKIPPED: Module was not executed (disabled, missing dependencies, etc.)

SUCCESS = 'success'
FAILURE = 'failure'
PARTIAL = 'partial'
SKIPPED = 'skipped'

Enumeration defining possible analysis execution states.

Values:

  • SUCCESS - Analysis completed successfully

  • FAILURE - Analysis failed due to error

  • SKIPPED - Analysis was skipped (dependencies not met, disabled, etc.)

  • TIMEOUT - Analysis exceeded timeout limit

Module Registry

dexray_insight.core.base_classes.register_module(name: str)[source]

Decorator for registering analysis modules

Decorator function for registering analysis modules with the framework.

Usage Example:

from dexray_insight.core.base_classes import register_module, BaseAnalysisModule

@register_module('my_custom_module')
class MyCustomModule(BaseAnalysisModule):
    def analyze(self, apk_path: str, context: AnalysisContext):
        # Implementation here
        pass

    def get_dependencies(self):
        return ['string_analysis']  # Depends on string analysis

Temporal Analysis

Container for temporary directory paths used during analysis.

Key Attributes:

  • base_dir - Root temporary directory

  • unzipped_dir - Directory containing unzipped APK contents

  • jadx_dir - Directory for JADX decompilation results

  • apktool_dir - Directory for APKTool analysis results

  • logs_dir - Directory for tool execution logs

Usage Example:

# Check if temporal analysis is available
if context.temporal_paths:
    # Access native libraries
    lib_dir = context.temporal_paths.unzipped_dir / 'lib'
    if lib_dir.exists():
        so_files = list(lib_dir.rglob('*.so'))

Error Handling

The core framework provides standardized error handling patterns:

Module Timeouts:

Modules that exceed their configured timeout are automatically terminated and marked with AnalysisStatus.TIMEOUT.

Exception Handling:

Unhandled exceptions in modules are caught and converted to AnalysisStatus.FAILURE results with error details.

Dependency Resolution:

Missing dependencies result in modules being skipped with AnalysisStatus.SKIPPED status.

Example Error Handling in Modules:

def analyze(self, apk_path: str, context: AnalysisContext):
    try:
        # Analysis implementation
        result_data = self._perform_analysis(apk_path, context)

        return MyModuleResult(
            module_name=self.get_module_name(),
            status=AnalysisStatus.SUCCESS,
            execution_time=time.time() - start_time,
            data=result_data
        )

    except TimeoutError:
        return MyModuleResult(
            module_name=self.get_module_name(),
            status=AnalysisStatus.TIMEOUT,
            execution_time=time.time() - start_time,
            error_message="Analysis timed out"
        )

    except Exception as e:
        self.logger.error(f"Analysis failed: {e}")
        return MyModuleResult(
            module_name=self.get_module_name(),
            status=AnalysisStatus.FAILURE,
            execution_time=time.time() - start_time,
            error_message=str(e)
        )

Parallel Execution

The framework supports parallel module execution for improved performance:

Configuration:

analysis:
  parallel_execution:
    enabled: true
    max_workers: 4

Dependency-Aware Scheduling:

Modules with dependencies are automatically scheduled after their prerequisites complete, even in parallel execution mode.

Thread Safety:

Modules should be designed to be thread-safe when parallel execution is enabled. The AnalysisContext object is shared between modules and should be accessed carefully.

Monitoring Parallel Execution:

# Check if running in parallel mode
if config.parallel_execution_enabled:
    max_workers = config.get_max_workers()
    self.logger.info(f"Running with {max_workers} parallel workers")

Extension Points

The core framework provides several extension points for customization:

Custom Result Types:

from dexray_insight.core.base_classes import BaseResult

@dataclass
class MyCustomResult(BaseResult):
    custom_data: Dict[str, Any] = None

    def to_dict(self) -> Dict[str, Any]:
        base_dict = super().to_dict()
        base_dict['custom_data'] = self.custom_data
        return base_dict

Custom Analysis Context Extensions:

# Add custom data to shared context
context.shared_data['custom_analyzer'] = {
    'processed_files': [],
    'detected_patterns': []
}

Custom Configuration Validation:

def validate_custom_config(config_dict):
    required_fields = ['custom_module.api_key', 'custom_module.timeout']
    for field in required_fields:
        if not config_dict.get(field.split('.')[0], {}).get(field.split('.')[1]):
            raise ValueError(f"Missing required configuration: {field}")

Integration Examples

Creating a Custom Analysis Module:

import time
from typing import Dict, Any, List
from dataclasses import dataclass

from dexray_insight.core.base_classes import (
    BaseAnalysisModule, BaseResult, AnalysisContext,
    AnalysisStatus, register_module
)

@dataclass
class CustomAnalysisResult(BaseResult):
    findings: List[str] = None
    confidence_score: float = 0.0

    def __post_init__(self):
        if self.findings is None:
            self.findings = []

@register_module('custom_analysis')
class CustomAnalysisModule(BaseAnalysisModule):
    def __init__(self, config: Dict[str, Any]):
        super().__init__(config)
        self.custom_patterns = config.get('custom_patterns', [])

    def analyze(self, apk_path: str, context: AnalysisContext) -> CustomAnalysisResult:
        start_time = time.time()

        try:
            findings = []

            # Access string analysis results
            if 'string_analysis' in context.module_results:
                strings = context.module_results['string_analysis']
                findings = self._analyze_strings(strings)

            return CustomAnalysisResult(
                module_name='custom_analysis',
                status=AnalysisStatus.SUCCESS,
                execution_time=time.time() - start_time,
                findings=findings,
                confidence_score=len(findings) / 10.0
            )

        except Exception as e:
            return CustomAnalysisResult(
                module_name='custom_analysis',
                status=AnalysisStatus.FAILURE,
                execution_time=time.time() - start_time,
                error_message=str(e)
            )

    def get_dependencies(self) -> List[str]:
        return ['string_analysis']

    def _analyze_strings(self, strings: List[str]) -> List[str]:
        findings = []
        for string in strings:
            for pattern in self.custom_patterns:
                if pattern in string:
                    findings.append(f"Found pattern '{pattern}' in: {string}")
        return findings