#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Configuration management system for Dexray Insight analysis framework."""
# #!/usr/bin/env python3
# # -*- coding: utf-8 -*-
#
# # Copyright (C) {{ year }} Dexray Insight Contributors
# #
# # This file is part of Dexray Insight - Android APK Security Analysis Tool
# #
# # Licensed under the Apache License, Version 2.0 (the "License");
# # you may not use this file except in compliance with the License.
# # You may obtain a copy of the License at
# #
# # http://www.apache.org/licenses/LICENSE-2.0
# #
# # Unless required by applicable law or agreed to in writing, software
# # distributed under the License is distributed on an "AS IS" BASIS,
# # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# # See the License for the specific language governing permissions and
# # limitations under the License.
import json
import os
from pathlib import Path
from typing import Any
from typing import Optional
[docs]
class Configuration:
"""Centralized configuration management for Dexray Insight."""
DEFAULT_CONFIG = {
"analysis": {
"parallel_execution": {"enabled": True, "max_workers": 4},
"timeout": {"module_timeout": 300, "tool_timeout": 600}, # 5 minutes per module # 10 minutes per tool
},
"modules": {
"signature_detection": {
"enabled": True,
"priority": 10,
"providers": {
"virustotal": {"enabled": False, "api_key": None, "rate_limit": 4}, # requests per minute
"koodous": {"enabled": False, "api_key": None},
"triage": {"enabled": True, "api_key": None},
},
},
"permission_analysis": {
"enabled": True,
"priority": 20,
"critical_permissions_file": None, # Path to custom permissions file
"use_default_critical_list": True,
},
"string_analysis": {
"enabled": True,
"priority": 30,
"patterns": {
"ip_addresses": True,
"urls": True,
"email_addresses": True,
"domains": True,
"base64_strings": True,
},
"filters": {"min_string_length": 2, "exclude_patterns": []},
},
"api_invocation": {"enabled": False, "priority": 40, "reflection_analysis": True},
"manifest_analysis": {
"enabled": True,
"priority": 15,
"extract_intent_filters": True,
"analyze_exported_components": True,
},
"apk_diffing": {"enabled": False, "priority": 100},
"tracker_analysis": {
"enabled": True,
"priority": 35,
"fetch_exodus_trackers": True,
"exodus_api_url": "https://reports.exodus-privacy.eu.org/api/trackers",
"api_timeout": 10,
},
"behaviour_analysis": {
"enabled": True,
"priority": 1000, # Lowest priority to run last
"deep_mode": False, # Fast mode by default, deep mode via --deep flag
},
"library_detection": {
"enabled": True,
"priority": 25,
"enable_heuristic": True,
"enable_similarity": True,
"confidence_threshold": 0.7,
"similarity_threshold": 0.85,
"class_similarity_threshold": 0.7,
"version_analysis": {
"enabled": True,
"security_analysis_only": True, # Only run during security analysis
"api_timeout": 5,
"cache_duration_hours": 24,
"sources": {"maven_central": True, "npm_registry": True, "pypi": True, "custom_database": False},
"console_output": {
"enabled": True,
"show_recommendations": True,
"group_by_risk": True,
"show_summary": True,
},
},
},
"native_analysis": {
"enabled": True,
"priority": 50, # Run after library_detection and string_analysis
"requires_temporal_analysis": True, # Only run when APK is unzipped
"architectures": ["arm64-v8a"], # Primary 64-bit ARM
"file_patterns": ["*.so"], # Native shared libraries
"modules": {
"string_extraction": {
"enabled": True,
"min_string_length": 4,
"max_string_length": 1024,
"encoding": "utf-8",
"fallback_encodings": ["latin1", "ascii"],
}
},
},
},
"tools": {
"apkid": {"enabled": True, "timeout": 300, "options": []},
"kavanoz": {"enabled": True, "timeout": 600, "output_dir": None}, # If None, uses temp directory
"androguard": {"enabled": True, "logging_level": "WARNING"},
# Decompilation and Analysis Tools
"jadx": {
"enabled": True,
"path": None, # Set to JADX executable path
"timeout": 900, # 15 minutes
"options": ["--no-debug-info", "--no-inline-anonymous", "--show-bad-code"],
},
"apktool": {
"enabled": True,
"path": None, # Set to apktool JAR path
"timeout": 600, # 10 minutes
"java_options": ["-Xmx2g"], # Java heap options
"options": ["--no-debug-info"],
},
# Native Binary Analysis Tools
"radare2": {
"enabled": True,
"path": None, # Set to radare2 binary path (uses system PATH if None)
"timeout": 120, # 2 minutes per binary analysis
"options": ["-2"], # Radare2 options: -2 for no stderr output
},
},
"temporal_analysis": {
"enabled": True,
"base_directory": "./temp_analysis", # Base directory for temporal analysis
"cleanup_after_analysis": False, # Set to True to cleanup directories after analysis
"directory_structure": {
"unzipped_folder": "unzipped", # Folder name for unzipped APK contents
"jadx_folder": "jadxResults", # Folder name for JADX decompiled results
"apktool_folder": "apktoolResults", # Folder name for apktool results
"logs_folder": "logs", # Folder name for tool execution logs
},
"preserve_on_error": True, # Keep directories if analysis fails
},
"security": {
"enable_owasp_assessment": False,
"assessments": {
"injection": {
"enabled": True,
"sql_patterns": ["SELECT", "INSERT", "UPDATE", "DELETE", "DROP"],
"command_patterns": ["exec", "system", "runtime"],
},
"broken_authentication": {"enabled": True, "check_weak_crypto": True, "check_hardcoded_secrets": True},
"sensitive_data": {
"enabled": True,
"pii_patterns": ["email", "phone", "ssn", "credit_card"],
"crypto_keys_check": True,
},
"broken_access_control": {
"enabled": True,
"check_exported_components": True,
"check_permissions": True,
},
"security_misconfiguration": {
"enabled": True,
"check_debug_flags": True,
"check_network_security": True,
},
"vulnerable_components": {"enabled": True, "check_known_libraries": True},
"insufficient_logging": {"enabled": True, "check_logging_practices": True},
},
},
"output": {
"format": "json",
"pretty_print": True,
"include_timestamps": True,
"output_directory": "./results",
"filename_template": "asam_{apk_name}_{timestamp}.json",
},
"logging": {
"level": "INFO",
"format": "%(asctime)s - %(name)s - %(levelname)s - %(message)s",
"file": None, # If None, logs to console only
},
}
[docs]
def __init__(self, config_path: Optional[str] = None, config_dict: Optional[dict[str, Any]] = None):
"""
Initialize configuration.
Args:
config_path: Path to configuration file (JSON or YAML)
config_dict: Configuration dictionary (overrides file)
"""
self.config = self.DEFAULT_CONFIG.copy()
# Try to load default config file first
self._load_default_config()
if config_path:
self._load_from_file(config_path)
if config_dict:
self._merge_config(config_dict)
# Load environment variables (highest priority)
self._load_from_environment()
def _load_default_config(self):
"""Try to load default config file from project root."""
# Look for dexray.yaml in project root (relative to this file)
current_dir = Path(__file__).parent
project_root = current_dir.parent.parent.parent # Go up to project root
default_config_path = project_root / "dexray.yaml"
if default_config_path.exists():
try:
self._load_from_file(str(default_config_path))
except Exception as e:
# Don't fail if default config can't be loaded, just warn
print(f"Warning: Could not load default config from {default_config_path}: {e}")
else:
# Also check current working directory
cwd_config = Path.cwd() / "dexray.yaml"
if cwd_config.exists():
try:
self._load_from_file(str(cwd_config))
except Exception as e:
print(f"Warning: Could not load config from {cwd_config}: {e}")
def _load_from_file(self, config_path: str):
"""Load configuration from file."""
path = Path(config_path)
if not path.exists():
raise FileNotFoundError(f"Configuration file not found: {config_path}")
try:
with open(path) as f:
if path.suffix.lower() in [".yml", ".yaml"]:
# Try to import yaml, fallback to JSON if not available
try:
import yaml
file_config = yaml.safe_load(f)
except ImportError:
raise ValueError("YAML files require PyYAML to be installed")
else:
file_config = json.load(f)
self._merge_config(file_config)
except Exception as e:
raise ValueError(f"Failed to load configuration from {config_path}: {str(e)}")
def _load_from_environment(self):
"""Load configuration from environment variables."""
# API Keys
vt_key = os.getenv("VIRUSTOTAL_API_KEY")
if vt_key:
self.config["modules"]["signature_detection"]["providers"]["virustotal"]["api_key"] = vt_key
self.config["modules"]["signature_detection"]["providers"]["virustotal"]["enabled"] = True
koodous_key = os.getenv("KOODOUS_API_KEY")
if koodous_key:
self.config["modules"]["signature_detection"]["providers"]["koodous"]["api_key"] = koodous_key
self.config["modules"]["signature_detection"]["providers"]["koodous"]["enabled"] = True
triage_key = os.getenv("TRIAGE_API_KEY")
if triage_key:
self.config["modules"]["signature_detection"]["providers"]["triage"]["api_key"] = triage_key
# Other environment overrides
log_level = os.getenv("DEXRAY_LOG_LEVEL")
if log_level:
self.config["logging"]["level"] = log_level.upper()
output_dir = os.getenv("DEXRAY_OUTPUT_DIR")
if output_dir:
self.config["output"]["output_directory"] = output_dir
def _merge_config(self, new_config: dict[str, Any]):
"""Recursively merge new configuration with existing."""
def merge_dict(base: dict[str, Any], update: dict[str, Any]):
for key, value in update.items():
if key in base and isinstance(base[key], dict) and isinstance(value, dict):
merge_dict(base[key], value)
else:
base[key] = value
merge_dict(self.config, new_config)
[docs]
def get_module_config(self, module_name: str) -> dict[str, Any]:
"""Get configuration for a specific module."""
return self.config.get("modules", {}).get(module_name, {})
[docs]
def get_temporal_analysis_config(self) -> dict[str, Any]:
"""Get temporal analysis configuration."""
return self.config.get("temporal_analysis", {})
[docs]
def get_security_config(self) -> dict[str, Any]:
"""Get security assessment configuration."""
return self.config.get("security", {})
[docs]
def get_output_config(self) -> dict[str, Any]:
"""Get output configuration."""
return self.config.get("output", {})
@property
def enable_security_assessment(self) -> bool:
"""Check if OWASP security assessment is enabled."""
return self.config.get("security", {}).get("enable_owasp_assessment", False)
@property
def parallel_execution_enabled(self) -> bool:
"""Check if parallel execution is enabled."""
return self.config.get("analysis", {}).get("parallel_execution", {}).get("enabled", True)
@property
def max_workers(self) -> int:
"""Get maximum number of parallel workers."""
return self.config.get("analysis", {}).get("parallel_execution", {}).get("max_workers", 4)
[docs]
def to_dict(self) -> dict[str, Any]:
"""Get complete configuration as dictionary."""
return self.config.copy()
[docs]
def save_to_file(self, file_path: str, format: str = "json"):
"""
Save configuration to file.
Args:
file_path: Path to save configuration
format: File format ('json' or 'yaml')
"""
path = Path(file_path)
path.parent.mkdir(parents=True, exist_ok=True)
with open(path, "w") as f:
if format.lower() == "yaml":
try:
import yaml
yaml.dump(self.config, f, default_flow_style=False, indent=2)
except ImportError:
raise ValueError("YAML output requires PyYAML to be installed")
else:
json.dump(self.config, f, indent=2)
[docs]
def update_from_kwargs(self, **kwargs):
"""Update configuration from keyword arguments (for backward compatibility)."""
# Map common CLI arguments to configuration
mapping = {
"do_signature_check": ["modules", "signature_detection", "enabled"],
"is_verbose": ["logging", "level"], # Would map 'DEBUG' if True
"do_sec_analysis": ["security", "enable_owasp_assessment"],
}
for arg_name, arg_value in kwargs.items():
if arg_name in mapping:
config_path = mapping[arg_name]
self._set_nested_value(config_path, arg_value)
def _set_nested_value(self, path: list, value: Any):
"""Set a nested configuration value."""
current = self.config
for key in path[:-1]:
if key not in current:
current[key] = {}
current = current[key]
# Special handling for verbose flag
if path[-1] == "level" and isinstance(value, bool):
current[path[-1]] = "DEBUG" if value else "INFO"
else:
current[path[-1]] = value
[docs]
def validate(self) -> bool:
"""Validate configuration."""
# Check required API keys if services are enabled
sig_config = self.get_module_config("signature_detection")
for provider, config in sig_config.get("providers", {}).items():
if config.get("enabled", False) and not config.get("api_key"):
print(f"Warning: {provider} is enabled but no API key provided")
# Check output directory
output_dir = self.get_output_config().get("output_directory")
if output_dir:
path = Path(output_dir)
if not path.exists():
try:
path.mkdir(parents=True, exist_ok=True)
except Exception as e:
print(f"Warning: Cannot create output directory {output_dir}: {e}")
return False
return True