Core Framework API
The core framework provides the foundational classes and components for Dexray Insight’s modular analysis architecture.
Analysis Engine
- class dexray_insight.core.analysis_engine.AnalysisEngine(config: Configuration)[source]
Bases:
object
Main analysis engine that orchestrates all APK analysis activities.
The AnalysisEngine serves as the central coordinator for the entire analysis workflow. It manages module execution, external tool integration, result aggregation, and security assessment orchestration.
Key Features: - Modular architecture with pluggable analysis modules - Dependency-aware execution planning and parallel processing - External tool integration (APKID, Kavanoz, JADX, etc.) - Comprehensive error handling and resilience - Security assessment engine integration - Result aggregation and structured output generation
Architecture Patterns: - Registry Pattern: For module discovery and management - Strategy Pattern: For different analysis approaches - Factory Pattern: For result object creation - Template Method: For analysis workflow orchestration
SOLID Principles: - Single Responsibility: Orchestrates analysis workflow - Open/Closed: Extensible through module registration - Dependency Inversion: Depends on abstractions (Configuration, modules)
- Usage:
config = Configuration() engine = AnalysisEngine(config) results = engine.analyze_apk(‘/path/to/app.apk’)
- __init__(config: Configuration)[source]
- analyze_apk(apk_path: str, requested_modules: List[str] | None = None, androguard_obj: Any | None = None, timestamp: str | None = None) FullAnalysisResults [source]
Perform comprehensive APK analysis
- Parameters:
apk_path – Path to the APK file
requested_modules – Optional list of specific modules to run
androguard_obj – Optional pre-initialized Androguard object
timestamp – Optional timestamp for temporal directory naming
- Returns:
FullAnalysisResults containing all analysis results
The AnalysisEngine
is the central orchestrator that manages the execution of analysis modules, handles dependencies, and coordinates parallel execution.
Key Methods:
analyze_apk(apk_path, androguard_obj, timestamp)
- Main entry point for APK analysis_execute_modules(context)
- Execute registered analysis modules_resolve_dependencies(modules)
- Resolve module execution order based on dependencies
Usage Example:
from dexray_insight.core import AnalysisEngine, Configuration
from dexray_insight.Utils.androguardObjClass import Androguard_Obj
# Create configuration and engine
config = Configuration()
engine = AnalysisEngine(config)
# Create androguard object
androguard_obj = Androguard_Obj("app.apk")
# Run analysis
results = engine.analyze_apk("app.apk", androguard_obj=androguard_obj)
print(results.to_json())
Configuration
- class dexray_insight.core.configuration.Configuration(config_path: str | None = None, config_dict: Dict[str, Any] | None = None)[source]
Bases:
object
Centralized configuration management for Dexray Insight
- DEFAULT_CONFIG = {'analysis': {'parallel_execution': {'enabled': True, 'max_workers': 4}, 'timeout': {'module_timeout': 300, 'tool_timeout': 600}}, 'logging': {'file': None, 'format': '%(asctime)s - %(name)s - %(levelname)s - %(message)s', 'level': 'INFO'}, 'modules': {'api_invocation': {'enabled': False, 'priority': 40, 'reflection_analysis': True}, 'apk_diffing': {'enabled': False, 'priority': 100}, 'behaviour_analysis': {'deep_mode': False, 'enabled': True, 'priority': 1000}, 'library_detection': {'class_similarity_threshold': 0.7, 'confidence_threshold': 0.7, 'enable_heuristic': True, 'enable_similarity': True, 'enabled': True, 'priority': 25, 'similarity_threshold': 0.85, 'version_analysis': {'api_timeout': 5, 'cache_duration_hours': 24, 'console_output': {'enabled': True, 'group_by_risk': True, 'show_recommendations': True, 'show_summary': True}, 'enabled': True, 'security_analysis_only': True, 'sources': {'custom_database': False, 'maven_central': True, 'npm_registry': True, 'pypi': True}}}, 'manifest_analysis': {'analyze_exported_components': True, 'enabled': True, 'extract_intent_filters': True, 'priority': 15}, 'native_analysis': {'architectures': ['arm64-v8a'], 'enabled': True, 'file_patterns': ['*.so'], 'modules': {'string_extraction': {'enabled': True, 'encoding': 'utf-8', 'fallback_encodings': ['latin1', 'ascii'], 'max_string_length': 1024, 'min_string_length': 4}}, 'priority': 50, 'requires_temporal_analysis': True}, 'permission_analysis': {'critical_permissions_file': None, 'enabled': True, 'priority': 20, 'use_default_critical_list': True}, 'signature_detection': {'enabled': True, 'priority': 10, 'providers': {'koodous': {'api_key': None, 'enabled': False}, 'triage': {'api_key': None, 'enabled': True}, 'virustotal': {'api_key': None, 'enabled': False, 'rate_limit': 4}}}, 'string_analysis': {'enabled': True, 'filters': {'exclude_patterns': [], 'min_string_length': 2}, 'patterns': {'base64_strings': True, 'domains': True, 'email_addresses': True, 'ip_addresses': True, 'urls': True}, 'priority': 30}, 'tracker_analysis': {'api_timeout': 10, 'enabled': True, 'exodus_api_url': 'https://reports.exodus-privacy.eu.org/api/trackers', 'fetch_exodus_trackers': True, 'priority': 35}}, 'output': {'filename_template': 'asam_{apk_name}_{timestamp}.json', 'format': 'json', 'include_timestamps': True, 'output_directory': './results', 'pretty_print': True}, 'security': {'assessments': {'broken_access_control': {'check_exported_components': True, 'check_permissions': True, 'enabled': True}, 'broken_authentication': {'check_hardcoded_secrets': True, 'check_weak_crypto': True, 'enabled': True}, 'injection': {'command_patterns': ['exec', 'system', 'runtime'], 'enabled': True, 'sql_patterns': ['SELECT', 'INSERT', 'UPDATE', 'DELETE', 'DROP']}, 'insufficient_logging': {'check_logging_practices': True, 'enabled': True}, 'security_misconfiguration': {'check_debug_flags': True, 'check_network_security': True, 'enabled': True}, 'sensitive_data': {'crypto_keys_check': True, 'enabled': True, 'pii_patterns': ['email', 'phone', 'ssn', 'credit_card']}, 'vulnerable_components': {'check_known_libraries': True, 'enabled': True}}, 'enable_owasp_assessment': False}, 'temporal_analysis': {'base_directory': './temp_analysis', 'cleanup_after_analysis': False, 'directory_structure': {'apktool_folder': 'apktoolResults', 'jadx_folder': 'jadxResults', 'logs_folder': 'logs', 'unzipped_folder': 'unzipped'}, 'enabled': True, 'preserve_on_error': True}, 'tools': {'androguard': {'enabled': True, 'logging_level': 'WARNING'}, 'apkid': {'enabled': True, 'options': [], 'timeout': 300}, 'apktool': {'enabled': True, 'java_options': ['-Xmx2g'], 'options': ['--no-debug-info'], 'path': None, 'timeout': 600}, 'jadx': {'enabled': True, 'options': ['--no-debug-info', '--no-inline-anonymous', '--show-bad-code'], 'path': None, 'timeout': 900}, 'kavanoz': {'enabled': True, 'output_dir': None, 'timeout': 600}, 'radare2': {'enabled': True, 'options': ['-2'], 'path': None, 'timeout': 120}}}
- __init__(config_path: str | None = None, config_dict: Dict[str, Any] | None = None)[source]
Initialize configuration
- Parameters:
config_path – Path to configuration file (JSON or YAML)
config_dict – Configuration dictionary (overrides file)
- get_module_config(module_name: str) Dict[str, Any] [source]
Get configuration for a specific module
- get_tool_config(tool_name: str) Dict[str, Any] [source]
Get configuration for a specific external tool
- save_to_file(file_path: str, format: str = 'json')[source]
Save configuration to file
- Parameters:
file_path – Path to save configuration
format – File format (‘json’ or ‘yaml’)
The Configuration
class manages YAML configuration loading, validation, and provides access to module and tool settings.
Key Methods:
__init__(config_path=None, config_dict=None)
- Initialize configuration from file or dictionaryvalidate()
- Validate configuration structure and valuesget_module_config(module_name)
- Get configuration for specific moduleget_tool_config(tool_name)
- Get configuration for external tool
Configuration Loading Priority:
Explicit config_dict parameter
Configuration file specified by config_path
Default
dexray.yaml
in current directoryBuilt-in default configuration
Usage Example:
from dexray_insight.core.configuration import Configuration
# Load from file
config = Configuration(config_path="my_config.yaml")
# Load from dictionary
config_dict = {
'modules': {'signature_detection': {'enabled': True}},
'logging': {'level': 'DEBUG'}
}
config = Configuration(config_dict=config_dict)
# Access configuration
sig_config = config.get_module_config('signature_detection')
tool_config = config.get_tool_config('radare2')
Base Classes
- class dexray_insight.core.base_classes.BaseAnalysisModule(config: Dict[str, Any])[source]
Bases:
ABC
Abstract base class for all analysis modules in the Dexray Insight framework.
This class defines the standard interface that all analysis modules must implement. It provides common functionality like configuration handling, logging setup, and standardized method signatures for the analysis workflow.
Responsibilities: - Define the contract for analysis modules (analyze, get_dependencies) - Provide common initialization and configuration handling - Set up standardized logging for all modules - Enforce consistent return types (BaseResult)
Design Pattern: Template Method (defines algorithm structure) SOLID Principles: - Interface Segregation (focused interface for analysis modules) - Liskov Substitution (all modules can be used interchangeably)
Implementation Requirements: - Must implement analyze() method for core analysis logic - Must implement get_dependencies() to declare module dependencies - Should return results wrapped in BaseResult or its subclasses
- config
Configuration dictionary passed from AnalysisEngine
- name
Module class name for identification
- enabled
Flag indicating if module is enabled for execution
- logger
Configured logger instance for this module
- abstract analyze(apk_path: str, context: AnalysisContext) BaseResult [source]
Perform the core analysis logic for this module.
This is the main entry point for module execution. Implementations should perform their specific analysis tasks and return structured results wrapped in a BaseResult object.
- Parameters:
apk_path – Absolute path to the APK file being analyzed
context – AnalysisContext containing shared data, configuration, and results from previously executed modules
- Returns:
- Analysis results with status, data, and error information.
Should include all relevant findings from this module’s analysis.
- Return type:
- Raises:
Should handle internal exceptions and return results with FAILURE status –
rather than propagating exceptions to the engine. –
Implementation Guidelines: - Use self.logger for consistent logging - Access configuration via self.config - Use context to access shared data and previous results - Return meaningful error messages in BaseResult on failure - Follow single responsibility principle in analysis logic
- abstract get_dependencies() List[str] [source]
Return list of module names this module depends on
- Returns:
List of module names that must be executed before this module
Abstract base class for all analysis modules. Provides the standard interface that all modules must implement.
Required Methods:
analyze(apk_path, context)
- Perform module-specific analysisget_dependencies()
- Return list of module dependencies
Standard Methods:
is_enabled()
- Check if module is enabled in configurationget_timeout()
- Get module execution timeoutget_priority()
- Get module execution priority
- class dexray_insight.core.base_classes.BaseResult(module_name: str, status: AnalysisStatus, execution_time: float = 0.0, error_message: str | None = None)[source]
Bases:
object
Base class for all analysis results
- status: AnalysisStatus
Base class for all analysis results. Provides standardized result structure and serialization methods.
Standard Fields:
module_name
- Name of the analysis modulestatus
- Analysis execution status (SUCCESS, FAILURE, SKIPPED, TIMEOUT)execution_time
- Time taken for analysis (seconds)error_message
- Error details if analysis failed
Key Methods:
to_dict()
- Convert result to dictionary for JSON serializationis_successful()
- Check if analysis completed successfully
- class dexray_insight.core.base_classes.AnalysisContext(apk_path: str, config: Dict[str, Any], androguard_obj: Any | None = None, unzip_path: str | None = None, module_results: Dict[str, Any] | None = None, temporal_paths: Any | None = None, jadx_available: bool = False, apktool_available: bool = False)[source]
Bases:
object
Context object passed between modules containing shared data and results.
The AnalysisContext serves as a shared data container that is passed between analysis modules during the analysis workflow. It contains APK information, configuration, and accumulated results from previous modules.
This design enables: - Data sharing between dependent modules - Centralized configuration access - Progressive result accumulation - Temporal directory management
- androguard_obj
Optional pre-loaded Androguard analysis object
- Type:
Any | None
- temporal_paths
Modern temporal directory management object
- Type:
Any | None
Design Pattern: Context Object (shares state between modules) SOLID Principles: Single Responsibility (data container and accessor)
- add_result(module_name: str, result: Any)[source]
Add a module result to the context for use by dependent modules.
This method allows completed modules to store their results in the shared context where they can be accessed by dependent modules.
- Parameters:
module_name – Name of the module storing the result
result – Analysis result object or data structure
- Side Effects:
Modifies self.module_results dictionary
- get_unzipped_dir() str | None [source]
Get path to unzipped APK directory (temporal or legacy).
This method provides backwards compatibility by checking both modern temporal paths and legacy unzip paths.
- Returns:
Path to unzipped APK directory, or None if not available
- Return type:
Design Pattern: Facade (hides complexity of path resolution)
Shared context object passed between analysis modules containing APK information and intermediate results.
Key Attributes:
apk_path
- Path to the APK file being analyzedandroguard_obj
- Androguard analysis objecttemporal_paths
- Paths to temporary analysis directoriesmodule_results
- Dictionary storing results from completed modulesshared_data
- Dictionary for sharing data between modules
Usage Example:
# Access context in a module
def analyze(self, apk_path: str, context: AnalysisContext):
# Access previous module results
string_results = context.module_results.get('string_analysis', [])
# Access temporal directories
if context.temporal_paths:
unzipped_dir = context.temporal_paths.unzipped_dir
# Share data with other modules
context.shared_data['my_module_data'] = analysis_results
- class dexray_insight.core.base_classes.AnalysisStatus(value)[source]
Bases:
Enum
Enumeration of analysis module execution statuses.
Used to track the execution state of individual analysis modules and provide consistent status reporting across the framework.
- Values:
SUCCESS: Module completed successfully with results FAILURE: Module failed to execute due to errors PARTIAL: Module completed with some issues or warnings SKIPPED: Module was not executed (disabled, missing dependencies, etc.)
- SUCCESS = 'success'
- FAILURE = 'failure'
- PARTIAL = 'partial'
- SKIPPED = 'skipped'
Enumeration defining possible analysis execution states.
Values:
SUCCESS
- Analysis completed successfullyFAILURE
- Analysis failed due to errorSKIPPED
- Analysis was skipped (dependencies not met, disabled, etc.)TIMEOUT
- Analysis exceeded timeout limit
Module Registry
- dexray_insight.core.base_classes.register_module(name: str)[source]
Decorator for registering analysis modules
Decorator function for registering analysis modules with the framework.
Usage Example:
from dexray_insight.core.base_classes import register_module, BaseAnalysisModule
@register_module('my_custom_module')
class MyCustomModule(BaseAnalysisModule):
def analyze(self, apk_path: str, context: AnalysisContext):
# Implementation here
pass
def get_dependencies(self):
return ['string_analysis'] # Depends on string analysis
Temporal Analysis
Container for temporary directory paths used during analysis.
Key Attributes:
base_dir
- Root temporary directoryunzipped_dir
- Directory containing unzipped APK contentsjadx_dir
- Directory for JADX decompilation resultsapktool_dir
- Directory for APKTool analysis resultslogs_dir
- Directory for tool execution logs
Usage Example:
# Check if temporal analysis is available
if context.temporal_paths:
# Access native libraries
lib_dir = context.temporal_paths.unzipped_dir / 'lib'
if lib_dir.exists():
so_files = list(lib_dir.rglob('*.so'))
Error Handling
The core framework provides standardized error handling patterns:
Module Timeouts:
Modules that exceed their configured timeout are automatically terminated and marked with AnalysisStatus.TIMEOUT
.
Exception Handling:
Unhandled exceptions in modules are caught and converted to AnalysisStatus.FAILURE
results with error details.
Dependency Resolution:
Missing dependencies result in modules being skipped with AnalysisStatus.SKIPPED
status.
Example Error Handling in Modules:
def analyze(self, apk_path: str, context: AnalysisContext):
try:
# Analysis implementation
result_data = self._perform_analysis(apk_path, context)
return MyModuleResult(
module_name=self.get_module_name(),
status=AnalysisStatus.SUCCESS,
execution_time=time.time() - start_time,
data=result_data
)
except TimeoutError:
return MyModuleResult(
module_name=self.get_module_name(),
status=AnalysisStatus.TIMEOUT,
execution_time=time.time() - start_time,
error_message="Analysis timed out"
)
except Exception as e:
self.logger.error(f"Analysis failed: {e}")
return MyModuleResult(
module_name=self.get_module_name(),
status=AnalysisStatus.FAILURE,
execution_time=time.time() - start_time,
error_message=str(e)
)
Parallel Execution
The framework supports parallel module execution for improved performance:
Configuration:
analysis:
parallel_execution:
enabled: true
max_workers: 4
Dependency-Aware Scheduling:
Modules with dependencies are automatically scheduled after their prerequisites complete, even in parallel execution mode.
Thread Safety:
Modules should be designed to be thread-safe when parallel execution is enabled. The AnalysisContext
object is shared between modules and should be accessed carefully.
Monitoring Parallel Execution:
# Check if running in parallel mode
if config.parallel_execution_enabled:
max_workers = config.get_max_workers()
self.logger.info(f"Running with {max_workers} parallel workers")
Extension Points
The core framework provides several extension points for customization:
Custom Result Types:
from dexray_insight.core.base_classes import BaseResult
@dataclass
class MyCustomResult(BaseResult):
custom_data: Dict[str, Any] = None
def to_dict(self) -> Dict[str, Any]:
base_dict = super().to_dict()
base_dict['custom_data'] = self.custom_data
return base_dict
Custom Analysis Context Extensions:
# Add custom data to shared context
context.shared_data['custom_analyzer'] = {
'processed_files': [],
'detected_patterns': []
}
Custom Configuration Validation:
def validate_custom_config(config_dict):
required_fields = ['custom_module.api_key', 'custom_module.timeout']
for field in required_fields:
if not config_dict.get(field.split('.')[0], {}).get(field.split('.')[1]):
raise ValueError(f"Missing required configuration: {field}")
Integration Examples
Creating a Custom Analysis Module:
import time
from typing import Dict, Any, List
from dataclasses import dataclass
from dexray_insight.core.base_classes import (
BaseAnalysisModule, BaseResult, AnalysisContext,
AnalysisStatus, register_module
)
@dataclass
class CustomAnalysisResult(BaseResult):
findings: List[str] = None
confidence_score: float = 0.0
def __post_init__(self):
if self.findings is None:
self.findings = []
@register_module('custom_analysis')
class CustomAnalysisModule(BaseAnalysisModule):
def __init__(self, config: Dict[str, Any]):
super().__init__(config)
self.custom_patterns = config.get('custom_patterns', [])
def analyze(self, apk_path: str, context: AnalysisContext) -> CustomAnalysisResult:
start_time = time.time()
try:
findings = []
# Access string analysis results
if 'string_analysis' in context.module_results:
strings = context.module_results['string_analysis']
findings = self._analyze_strings(strings)
return CustomAnalysisResult(
module_name='custom_analysis',
status=AnalysisStatus.SUCCESS,
execution_time=time.time() - start_time,
findings=findings,
confidence_score=len(findings) / 10.0
)
except Exception as e:
return CustomAnalysisResult(
module_name='custom_analysis',
status=AnalysisStatus.FAILURE,
execution_time=time.time() - start_time,
error_message=str(e)
)
def get_dependencies(self) -> List[str]:
return ['string_analysis']
def _analyze_strings(self, strings: List[str]) -> List[str]:
findings = []
for string in strings:
for pattern in self.custom_patterns:
if pattern in string:
findings.append(f"Found pattern '{pattern}' in: {string}")
return findings