Source code for dexray_insight.core.configuration

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Configuration management system for Dexray Insight analysis framework."""

# #!/usr/bin/env python3
# # -*- coding: utf-8 -*-
#
# # Copyright (C) {{ year }} Dexray Insight Contributors
# #
# # This file is part of Dexray Insight - Android APK Security Analysis Tool
# #
# # Licensed under the Apache License, Version 2.0 (the "License");
# # you may not use this file except in compliance with the License.
# # You may obtain a copy of the License at
# #
# #     http://www.apache.org/licenses/LICENSE-2.0
# #
# # Unless required by applicable law or agreed to in writing, software
# # distributed under the License is distributed on an "AS IS" BASIS,
# # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# # See the License for the specific language governing permissions and
# # limitations under the License.

import json
import os
from pathlib import Path
from typing import Any
from typing import Optional


[docs] class Configuration: """Centralized configuration management for Dexray Insight.""" DEFAULT_CONFIG = { "analysis": { "parallel_execution": {"enabled": True, "max_workers": 4}, "timeout": {"module_timeout": 300, "tool_timeout": 600}, # 5 minutes per module # 10 minutes per tool }, "modules": { "signature_detection": { "enabled": True, "priority": 10, "providers": { "virustotal": {"enabled": False, "api_key": None, "rate_limit": 4}, # requests per minute "koodous": {"enabled": False, "api_key": None}, "triage": {"enabled": True, "api_key": None}, }, }, "permission_analysis": { "enabled": True, "priority": 20, "critical_permissions_file": None, # Path to custom permissions file "use_default_critical_list": True, }, "string_analysis": { "enabled": True, "priority": 30, "patterns": { "ip_addresses": True, "urls": True, "email_addresses": True, "domains": True, "base64_strings": True, }, "filters": {"min_string_length": 2, "exclude_patterns": []}, }, "api_invocation": {"enabled": False, "priority": 40, "reflection_analysis": True}, "manifest_analysis": { "enabled": True, "priority": 15, "extract_intent_filters": True, "analyze_exported_components": True, }, "apk_diffing": {"enabled": False, "priority": 100}, "tracker_analysis": { "enabled": True, "priority": 35, "fetch_exodus_trackers": True, "exodus_api_url": "https://reports.exodus-privacy.eu.org/api/trackers", "api_timeout": 10, }, "behaviour_analysis": { "enabled": True, "priority": 1000, # Lowest priority to run last "deep_mode": False, # Fast mode by default, deep mode via --deep flag }, "library_detection": { "enabled": True, "priority": 25, "enable_heuristic": True, "enable_similarity": True, "confidence_threshold": 0.7, "similarity_threshold": 0.85, "class_similarity_threshold": 0.7, "version_analysis": { "enabled": True, "security_analysis_only": True, # Only run during security analysis "api_timeout": 5, "cache_duration_hours": 24, "sources": {"maven_central": True, "npm_registry": True, "pypi": True, "custom_database": False}, "console_output": { "enabled": True, "show_recommendations": True, "group_by_risk": True, "show_summary": True, }, }, }, "native_analysis": { "enabled": True, "priority": 50, # Run after library_detection and string_analysis "requires_temporal_analysis": True, # Only run when APK is unzipped "architectures": ["arm64-v8a"], # Primary 64-bit ARM "file_patterns": ["*.so"], # Native shared libraries "modules": { "string_extraction": { "enabled": True, "min_string_length": 4, "max_string_length": 1024, "encoding": "utf-8", "fallback_encodings": ["latin1", "ascii"], } }, }, }, "tools": { "apkid": {"enabled": True, "timeout": 300, "options": []}, "kavanoz": {"enabled": True, "timeout": 600, "output_dir": None}, # If None, uses temp directory "androguard": {"enabled": True, "logging_level": "WARNING"}, # Decompilation and Analysis Tools "jadx": { "enabled": True, "path": None, # Set to JADX executable path "timeout": 900, # 15 minutes "options": ["--no-debug-info", "--no-inline-anonymous", "--show-bad-code"], }, "apktool": { "enabled": True, "path": None, # Set to apktool JAR path "timeout": 600, # 10 minutes "java_options": ["-Xmx2g"], # Java heap options "options": ["--no-debug-info"], }, # Native Binary Analysis Tools "radare2": { "enabled": True, "path": None, # Set to radare2 binary path (uses system PATH if None) "timeout": 120, # 2 minutes per binary analysis "options": ["-2"], # Radare2 options: -2 for no stderr output }, }, "temporal_analysis": { "enabled": True, "base_directory": "./temp_analysis", # Base directory for temporal analysis "cleanup_after_analysis": False, # Set to True to cleanup directories after analysis "directory_structure": { "unzipped_folder": "unzipped", # Folder name for unzipped APK contents "jadx_folder": "jadxResults", # Folder name for JADX decompiled results "apktool_folder": "apktoolResults", # Folder name for apktool results "logs_folder": "logs", # Folder name for tool execution logs }, "preserve_on_error": True, # Keep directories if analysis fails }, "security": { "enable_owasp_assessment": False, "assessments": { "injection": { "enabled": True, "sql_patterns": ["SELECT", "INSERT", "UPDATE", "DELETE", "DROP"], "command_patterns": ["exec", "system", "runtime"], }, "broken_authentication": {"enabled": True, "check_weak_crypto": True, "check_hardcoded_secrets": True}, "sensitive_data": { "enabled": True, "pii_patterns": ["email", "phone", "ssn", "credit_card"], "crypto_keys_check": True, }, "broken_access_control": { "enabled": True, "check_exported_components": True, "check_permissions": True, }, "security_misconfiguration": { "enabled": True, "check_debug_flags": True, "check_network_security": True, }, "vulnerable_components": {"enabled": True, "check_known_libraries": True}, "insufficient_logging": {"enabled": True, "check_logging_practices": True}, }, }, "output": { "format": "json", "pretty_print": True, "include_timestamps": True, "output_directory": "./results", "filename_template": "asam_{apk_name}_{timestamp}.json", }, "logging": { "level": "INFO", "format": "%(asctime)s - %(name)s - %(levelname)s - %(message)s", "file": None, # If None, logs to console only }, }
[docs] def __init__(self, config_path: Optional[str] = None, config_dict: Optional[dict[str, Any]] = None): """ Initialize configuration. Args: config_path: Path to configuration file (JSON or YAML) config_dict: Configuration dictionary (overrides file) """ self.config = self.DEFAULT_CONFIG.copy() # Try to load default config file first self._load_default_config() if config_path: self._load_from_file(config_path) if config_dict: self._merge_config(config_dict) # Load environment variables (highest priority) self._load_from_environment()
def _load_default_config(self): """Try to load default config file from project root.""" # Look for dexray.yaml in project root (relative to this file) current_dir = Path(__file__).parent project_root = current_dir.parent.parent.parent # Go up to project root default_config_path = project_root / "dexray.yaml" if default_config_path.exists(): try: self._load_from_file(str(default_config_path)) except Exception as e: # Don't fail if default config can't be loaded, just warn print(f"Warning: Could not load default config from {default_config_path}: {e}") else: # Also check current working directory cwd_config = Path.cwd() / "dexray.yaml" if cwd_config.exists(): try: self._load_from_file(str(cwd_config)) except Exception as e: print(f"Warning: Could not load config from {cwd_config}: {e}") def _load_from_file(self, config_path: str): """Load configuration from file.""" path = Path(config_path) if not path.exists(): raise FileNotFoundError(f"Configuration file not found: {config_path}") try: with open(path) as f: if path.suffix.lower() in [".yml", ".yaml"]: # Try to import yaml, fallback to JSON if not available try: import yaml file_config = yaml.safe_load(f) except ImportError: raise ValueError("YAML files require PyYAML to be installed") else: file_config = json.load(f) self._merge_config(file_config) except Exception as e: raise ValueError(f"Failed to load configuration from {config_path}: {str(e)}") def _load_from_environment(self): """Load configuration from environment variables.""" # API Keys vt_key = os.getenv("VIRUSTOTAL_API_KEY") if vt_key: self.config["modules"]["signature_detection"]["providers"]["virustotal"]["api_key"] = vt_key self.config["modules"]["signature_detection"]["providers"]["virustotal"]["enabled"] = True koodous_key = os.getenv("KOODOUS_API_KEY") if koodous_key: self.config["modules"]["signature_detection"]["providers"]["koodous"]["api_key"] = koodous_key self.config["modules"]["signature_detection"]["providers"]["koodous"]["enabled"] = True triage_key = os.getenv("TRIAGE_API_KEY") if triage_key: self.config["modules"]["signature_detection"]["providers"]["triage"]["api_key"] = triage_key # Other environment overrides log_level = os.getenv("DEXRAY_LOG_LEVEL") if log_level: self.config["logging"]["level"] = log_level.upper() output_dir = os.getenv("DEXRAY_OUTPUT_DIR") if output_dir: self.config["output"]["output_directory"] = output_dir def _merge_config(self, new_config: dict[str, Any]): """Recursively merge new configuration with existing.""" def merge_dict(base: dict[str, Any], update: dict[str, Any]): for key, value in update.items(): if key in base and isinstance(base[key], dict) and isinstance(value, dict): merge_dict(base[key], value) else: base[key] = value merge_dict(self.config, new_config)
[docs] def get_module_config(self, module_name: str) -> dict[str, Any]: """Get configuration for a specific module.""" return self.config.get("modules", {}).get(module_name, {})
[docs] def get_tool_config(self, tool_name: str) -> dict[str, Any]: """Get configuration for a specific external tool.""" return self.config.get("tools", {}).get(tool_name, {})
[docs] def get_temporal_analysis_config(self) -> dict[str, Any]: """Get temporal analysis configuration.""" return self.config.get("temporal_analysis", {})
[docs] def get_security_config(self) -> dict[str, Any]: """Get security assessment configuration.""" return self.config.get("security", {})
[docs] def get_output_config(self) -> dict[str, Any]: """Get output configuration.""" return self.config.get("output", {})
@property def enable_security_assessment(self) -> bool: """Check if OWASP security assessment is enabled.""" return self.config.get("security", {}).get("enable_owasp_assessment", False) @property def parallel_execution_enabled(self) -> bool: """Check if parallel execution is enabled.""" return self.config.get("analysis", {}).get("parallel_execution", {}).get("enabled", True) @property def max_workers(self) -> int: """Get maximum number of parallel workers.""" return self.config.get("analysis", {}).get("parallel_execution", {}).get("max_workers", 4)
[docs] def to_dict(self) -> dict[str, Any]: """Get complete configuration as dictionary.""" return self.config.copy()
[docs] def save_to_file(self, file_path: str, format: str = "json"): """ Save configuration to file. Args: file_path: Path to save configuration format: File format ('json' or 'yaml') """ path = Path(file_path) path.parent.mkdir(parents=True, exist_ok=True) with open(path, "w") as f: if format.lower() == "yaml": try: import yaml yaml.dump(self.config, f, default_flow_style=False, indent=2) except ImportError: raise ValueError("YAML output requires PyYAML to be installed") else: json.dump(self.config, f, indent=2)
[docs] def update_from_kwargs(self, **kwargs): """Update configuration from keyword arguments (for backward compatibility).""" # Map common CLI arguments to configuration mapping = { "do_signature_check": ["modules", "signature_detection", "enabled"], "is_verbose": ["logging", "level"], # Would map 'DEBUG' if True "do_sec_analysis": ["security", "enable_owasp_assessment"], } for arg_name, arg_value in kwargs.items(): if arg_name in mapping: config_path = mapping[arg_name] self._set_nested_value(config_path, arg_value)
def _set_nested_value(self, path: list, value: Any): """Set a nested configuration value.""" current = self.config for key in path[:-1]: if key not in current: current[key] = {} current = current[key] # Special handling for verbose flag if path[-1] == "level" and isinstance(value, bool): current[path[-1]] = "DEBUG" if value else "INFO" else: current[path[-1]] = value
[docs] def validate(self) -> bool: """Validate configuration.""" # Check required API keys if services are enabled sig_config = self.get_module_config("signature_detection") for provider, config in sig_config.get("providers", {}).items(): if config.get("enabled", False) and not config.get("api_key"): print(f"Warning: {provider} is enabled but no API key provided") # Check output directory output_dir = self.get_output_config().get("output_directory") if output_dir: path = Path(output_dir) if not path.exists(): try: path.mkdir(parents=True, exist_ok=True) except Exception as e: print(f"Warning: Cannot create output directory {output_dir}: {e}") return False return True