Utilities API
Dexray Insight provides a comprehensive set of utility functions and classes that support the core analysis framework. These utilities handle common tasks such as file operations, logging, JSON serialization, and Android-specific operations.
File Utilities
- dexray_insight.Utils.file_utils.backup_and_replace_with_template(original_file_path: str, template_cs_file: str) tuple[str, str] [source]
Backs up the original file and replaces it with a template from the root directory.
- Parameters:
original_file_path – Path to the original file (e.g., “/project/…/targetapk.csproj”)
template_cs_file – Name of template file in root directory (e.g., “template.csproj”)
- Returns:
(backup_path, new_file_path)
- Return type:
- Raises:
FileNotFoundError – If original or template files are missing
- dexray_insight.Utils.file_utils.get_parent_directory(path: str) str [source]
Returns the parent directory of the given path.
Example: Input: “/project/targetapk_2025-03-08_20-28-38_asam_results/targetapk_unzipped” Output: “/project/targetapk_2025-03-08_20-28-38_asam_results”
- dexray_insight.Utils.file_utils.create_new_directory(dir_name: str) str [source]
Creates a asam analysis directory (errors if exists)
- dexray_insight.Utils.file_utils.unzip_apk_with_skip(app_name: str, apk_path: str) Tuple[str, List[str]] [source]
Unzips an APK while ignoring CRC errors, returns (destination_path, skipped_files)
- dexray_insight.Utils.file_utils.unzip_apk(app_name: str, apk_path: str) str [source]
Unzips an APK file into a folder named after the app.
- Parameters:
- Returns:
Path to the created directory with unzipped contents
- Return type:
- Raises:
FileNotFoundError – If the APK file doesn’t exist
ValueError – If the APK file is invalid
- dexray_insight.Utils.file_utils.split_path_file_extension(file_path)[source]
Splits a file path into directory path, filename without extension, and the extension.
- dexray_insight.Utils.file_utils.calculate_file_hash(file_path, hash_func)[source]
Calculate the hash of a file using the specified hash function.
- class dexray_insight.Utils.file_utils.CustomJSONEncoder(*, skipkeys=False, ensure_ascii=True, check_circular=True, allow_nan=True, sort_keys=False, indent=None, separators=None, default=None)[source]
- default(obj)[source]
Implement this method in a subclass such that it returns a serializable object for
o
, or calls the base implementation (to raise aTypeError
).For example, to support arbitrary iterators, you could implement default like this:
def default(self, o): try: iterable = iter(o) except TypeError: pass else: return list(iterable) # Let the base class default method raise the TypeError return JSONEncoder.default(self, o)
The file utilities module provides functions for handling file paths, JSON operations, and file system interactions.
Core Functions
- dexray_insight.Utils.file_utils.split_path_file_extension(file_path)[source]
Splits a file path into directory path, filename without extension, and the extension.
Splits a file path into directory, filename, and extension components.
- Parameters:
file_path
(str): Full path to the file
- Returns:
tuple[str, str, str]
: (directory, filename_without_extension, extension)
Usage Example:
from dexray_insight.Utils.file_utils import split_path_file_extension
# Basic usage
base_dir, name, ext = split_path_file_extension("/path/to/app.apk")
print(f"Directory: {base_dir}") # /path/to
print(f"Name: {name}") # app
print(f"Extension: {ext}") # apk
# Handle complex filenames
base_dir, name, ext = split_path_file_extension("/path/to/com.example.app-v1.2.3.apk")
print(f"Name: {name}") # com.example.app-v1.2.3
print(f"Extension: {ext}") # apk
Cross-Platform Considerations:
The function handles different path separators and behaves consistently across Windows and Unix-like systems:
# Unix path
split_path_file_extension("/home/user/app.apk")
# Returns: ('/home/user', 'app', 'apk')
# Windows path (on Windows)
split_path_file_extension("C:\\Users\\user\\app.apk")
# Returns: ('C:\\Users\\user', 'app', 'apk')
# Relative paths
split_path_file_extension("./app.apk")
# Returns: ('.', 'app', 'apk')
Serializes Python objects to JSON format and saves to file.
- Parameters:
filename
(str): Output filenamedata
(Any): Python object to serializeindent
(int, optional): JSON indentation level (default: 2)
Usage Example:
from dexray_insight.Utils.file_utils import dump_json
# Save analysis results
analysis_data = {
'package_name': 'com.example.app',
'permissions': ['android.permission.INTERNET'],
'urls': ['https://api.example.com']
}
dump_json('analysis_results.json', analysis_data)
# Custom indentation
dump_json('compact_results.json', analysis_data, indent=0)
Error Handling:
The function handles common serialization errors and provides meaningful error messages:
# Handle non-serializable objects
try:
dump_json('results.json', {'timestamp': datetime.now()})
except TypeError as e:
print(f"Serialization error: {e}")
# Convert to serializable format
data['timestamp'] = str(datetime.now())
dump_json('results.json', data)
Androguard Integration
Provides wrapper classes and utilities for integrating with the Androguard Android analysis library.
Androguard Object Wrapper
Main wrapper class for Androguard APK analysis objects, providing a simplified interface to Androguard’s functionality.
Key Methods:
__init__(apk_path)
- Initialize Androguard analysis for APK fileget_package()
- Get application package nameget_android_version()
- Get target Android versionget_permissions()
- Get declared permissions listget_activities()
- Get application activitiesget_services()
- Get application servicesget_receivers()
- Get broadcast receiversget_providers()
- Get content providersget_libraries()
- Get native librariesis_valid_apk()
- Check if APK is valid and parseable
Usage Example:
from dexray_insight.Utils.androguardObjClass import Androguard_Obj
# Initialize analysis
androguard_obj = Androguard_Obj("path/to/app.apk")
if androguard_obj.is_valid_apk():
# Extract basic information
package_name = androguard_obj.get_package()
permissions = androguard_obj.get_permissions()
activities = androguard_obj.get_activities()
print(f"Package: {package_name}")
print(f"Permissions: {len(permissions)}")
print(f"Activities: {len(activities)}")
# Get native libraries
libraries = androguard_obj.get_libraries()
native_libs = [lib for lib in libraries if lib.endswith('.so')]
print(f"Native libraries: {len(native_libs)}")
else:
print("Invalid or corrupted APK file")
Error Handling:
try:
androguard_obj = Androguard_Obj("corrupted.apk")
if not androguard_obj.is_valid_apk():
raise ValueError("Invalid APK file")
except Exception as e:
print(f"Failed to analyze APK: {e}")
Performance Considerations:
The Androguard_Obj class performs full APK parsing during initialization. For large APKs, consider:
import time
start_time = time.time()
androguard_obj = Androguard_Obj("large_app.apk")
load_time = time.time() - start_time
print(f"APK loading took {load_time:.2f} seconds")
# Cache the object for multiple analyses
cached_objects = {}
apk_path = "app.apk"
if apk_path not in cached_objects:
cached_objects[apk_path] = Androguard_Obj(apk_path)
Logging Utilities
- class dexray_insight.Utils.log.CustomFormatter(fmt=None, datefmt=None, style='%', validate=True)[source]
Custom logging formatter to add colors and custom prefixes, with dynamic format based on log level.
- FORMAT = {10: '\x1b[96m[DEBUG] %(filename)s : %(message)s\x1b[0m', 20: '\x1b[92m[+] %(message)s\x1b[0m', 30: '\x1b[93m[W] %(message)s\x1b[0m', 40: '\x1b[91m[-] %(message)s\x1b[0m'}
- format(record)[source]
Format the specified record as text.
The record’s attribute dictionary is used as the operand to a string formatting operation which yields the returned string. Before formatting the dictionary, a couple of preparatory steps are carried out. The message attribute of the record is computed using LogRecord.getMessage(). If the formatting string uses the time (as determined by a call to usesTime(), formatTime() is called to format the event time. If there is exception information, it is formatted using formatException() and appended to the message.
- class dexray_insight.Utils.log.LogFilter(files_to_filter)[source]
Provides logging configuration and utilities for the Dexray Insight framework.
Logger Configuration
Configures the logging system based on command-line arguments and configuration settings.
- Parameters:
args
: Parsed command-line arguments containing logging configuration
Usage Example:
from dexray_insight.Utils.log import set_logger
import argparse
# Configure logging from command-line args
parser = argparse.ArgumentParser()
parser.add_argument('-d', '--debug', default='ERROR')
parser.add_argument('-f', '--filter', nargs='+')
args = parser.parse_args()
# Set up logging
set_logger(args)
# Use logging in modules
import logging
logger = logging.getLogger(__name__)
logger.info("Analysis started")
logger.debug("Detailed debug information")
Logging Levels:
The logging system supports multiple levels:
DEBUG
- Detailed diagnostic informationINFO
- General information about analysis progressWARNING
- Warning messages for non-critical issuesERROR
- Error messages for failures
Log Filtering:
Filter logs by specific source files:
# Filter logs from specific modules
dexray-insight app.apk -d DEBUG -f string_analysis.py permission_analysis.py
Custom Logger Setup:
import logging
# Create module-specific logger
logger = logging.getLogger('my_custom_module')
logger.setLevel(logging.INFO)
# Add custom formatter
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# Log analysis progress
logger.info("Starting custom analysis")
logger.debug("Processing APK components")
logger.warning("Non-critical issue detected")
logger.error("Analysis failed")
String Processing Utilities
Pattern Matching and Extraction
Dexray Insight includes several utility functions for string pattern matching and extraction:
URL Extraction:
import re
def extract_urls(text: str) -> List[str]:
"""Extract URLs from text using regex patterns"""
url_pattern = r'https?://(?:[-\w.])+(?:\.[a-zA-Z]{2,})+(?:/[^?\s]*)?(?:\?[^#\s]*)?(?:#[^\s]*)?'
return re.findall(url_pattern, text)
# Usage
text = "Visit https://example.com and http://api.test.com/v1"
urls = extract_urls(text)
print(urls) # ['https://example.com', 'http://api.test.com/v1']
IP Address Extraction:
def extract_ip_addresses(text: str) -> List[str]:
"""Extract IPv4 and IPv6 addresses from text"""
ipv4_pattern = r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b'
ipv6_pattern = r'\b(?:[0-9a-fA-F]{1,4}:){7}[0-9a-fA-F]{1,4}\b'
ipv4_matches = re.findall(ipv4_pattern, text)
ipv6_matches = re.findall(ipv6_pattern, text)
return ipv4_matches + ipv6_matches
# Usage
text = "Connect to 192.168.1.1 or 2001:db8::1"
ips = extract_ip_addresses(text)
print(ips) # ['192.168.1.1', '2001:db8::1']
Email Extraction:
def extract_emails(text: str) -> List[str]:
"""Extract email addresses from text"""
email_pattern = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'
return re.findall(email_pattern, text)
# Usage
text = "Contact support@example.com or admin@test.org"
emails = extract_emails(text)
print(emails) # ['support@example.com', 'admin@test.org']
Base64 Detection:
import base64
def is_base64(s: str) -> bool:
"""Check if string is valid Base64"""
try:
if isinstance(s, str):
s = s.encode('ascii')
return base64.b64encode(base64.b64decode(s)) == s
except Exception:
return False
def extract_base64_strings(text: str, min_length: int = 16) -> List[str]:
"""Extract potential Base64 encoded strings"""
base64_pattern = r'[A-Za-z0-9+/]{16,}={0,2}'
potential_b64 = re.findall(base64_pattern, text)
valid_b64 = []
for candidate in potential_b64:
if len(candidate) >= min_length and is_base64(candidate):
valid_b64.append(candidate)
return valid_b64
Entropy Analysis
Utility functions for analyzing string entropy to detect potential encoded content:
import math
from collections import Counter
def calculate_entropy(data: str) -> float:
"""Calculate Shannon entropy of string"""
if not data:
return 0
# Count character frequencies
counter = Counter(data)
length = len(data)
# Calculate entropy
entropy = 0
for count in counter.values():
probability = count / length
if probability > 0:
entropy -= probability * math.log2(probability)
return entropy
def is_high_entropy(data: str, threshold: float = 4.5) -> bool:
"""Check if string has high entropy (potentially encoded)"""
return calculate_entropy(data) > threshold
# Usage
normal_text = "This is normal text"
encoded_text = "dGhpcyBpcyBlbmNvZGVkIHRleHQ="
print(f"Normal text entropy: {calculate_entropy(normal_text):.2f}") # ~4.1
print(f"Encoded text entropy: {calculate_entropy(encoded_text):.2f}") # ~5.9
print(f"High entropy: {is_high_entropy(encoded_text)}") # True
Android Utilities
Android-specific utility functions for handling APK components and data structures.
Permission Utilities
def is_dangerous_permission(permission: str) -> bool:
"""Check if permission is considered dangerous"""
dangerous_perms = [
'android.permission.CAMERA',
'android.permission.READ_CONTACTS',
'android.permission.WRITE_CONTACTS',
'android.permission.ACCESS_FINE_LOCATION',
'android.permission.ACCESS_COARSE_LOCATION',
'android.permission.RECORD_AUDIO',
'android.permission.READ_PHONE_STATE',
'android.permission.CALL_PHONE',
'android.permission.READ_SMS',
'android.permission.SEND_SMS',
'android.permission.WRITE_EXTERNAL_STORAGE'
]
return permission in dangerous_perms
def categorize_permissions(permissions: List[str]) -> Dict[str, List[str]]:
"""Categorize permissions by type"""
categories = {
'dangerous': [],
'normal': [],
'custom': [],
'system': []
}
for perm in permissions:
if is_dangerous_permission(perm):
categories['dangerous'].append(perm)
elif perm.startswith('android.permission.'):
categories['normal'].append(perm)
elif perm.startswith('android.'):
categories['system'].append(perm)
else:
categories['custom'].append(perm)
return categories
Framework Detection
def detect_framework(package_name: str, native_libraries: List[str],
classes: List[str]) -> str:
"""Detect application framework based on indicators"""
# Flutter detection
flutter_indicators = ['libflutter.so', 'libapp.so']
if any(lib in native_libraries for lib in flutter_indicators):
return 'Flutter'
# React Native detection
rn_indicators = ['libreactnativejni.so', 'libhermes.so']
if any(lib in native_libraries for lib in rn_indicators):
return 'React Native'
# Xamarin detection
xamarin_indicators = ['libmonodroid.so', 'libmonosgen-2.0.so']
if any(lib in native_libraries for lib in xamarin_indicators):
return 'Xamarin'
# Cordova/PhoneGap detection
cordova_classes = ['org.apache.cordova', 'org.apache.phonegap']
if any(cls_prefix in str(classes) for cls_prefix in cordova_classes):
return 'Cordova'
# Unity detection
unity_indicators = ['libunity.so', 'libil2cpp.so']
if any(lib in native_libraries for lib in unity_indicators):
return 'Unity'
return 'Native'
Component Analysis
def analyze_component_security(components: List[Dict]) -> List[Dict]:
"""Analyze Android components for security issues"""
issues = []
for component in components:
component_name = component.get('name', 'unknown')
is_exported = component.get('exported', False)
permission = component.get('permission')
intent_filters = component.get('intent_filters', [])
# Check for exported components without permission protection
if is_exported and not permission and intent_filters:
issues.append({
'type': 'exported_without_permission',
'component': component_name,
'description': f'Component {component_name} is exported without permission protection',
'severity': 'MEDIUM'
})
# Check for dangerous intent filters
dangerous_actions = [
'android.intent.action.BOOT_COMPLETED',
'android.intent.action.PACKAGE_REPLACED',
'android.net.conn.CONNECTIVITY_CHANGE'
]
for intent_filter in intent_filters:
if any(action in intent_filter for action in dangerous_actions):
issues.append({
'type': 'dangerous_intent_filter',
'component': component_name,
'description': f'Component {component_name} uses sensitive intent filter',
'severity': 'HIGH'
})
return issues
Data Validation Utilities
Input Validation
def validate_apk_path(apk_path: str) -> bool:
"""Validate APK file path"""
from pathlib import Path
path = Path(apk_path)
# Check if file exists
if not path.exists():
return False
# Check if it's a file (not directory)
if not path.is_file():
return False
# Check file extension
if path.suffix.lower() != '.apk':
return False
# Check file size (not empty)
if path.stat().st_size == 0:
return False
return True
def validate_configuration(config_dict: Dict[str, Any]) -> List[str]:
"""Validate configuration dictionary"""
errors = []
# Check required sections
required_sections = ['analysis', 'modules', 'tools']
for section in required_sections:
if section not in config_dict:
errors.append(f"Missing required section: {section}")
# Validate analysis configuration
if 'analysis' in config_dict:
analysis = config_dict['analysis']
if 'timeout' in analysis:
timeout = analysis['timeout']
if not isinstance(timeout.get('module_timeout'), int) or timeout.get('module_timeout') <= 0:
errors.append("Invalid module_timeout: must be positive integer")
return errors
Data Sanitization
def sanitize_filename(filename: str) -> str:
"""Sanitize filename for safe file system usage"""
import re
# Remove or replace invalid characters
sanitized = re.sub(r'[<>:"/\\|?*]', '_', filename)
# Remove leading/trailing whitespace and dots
sanitized = sanitized.strip(' .')
# Limit length
if len(sanitized) > 255:
sanitized = sanitized[:255]
# Ensure not empty
if not sanitized:
sanitized = 'unnamed'
return sanitized
def sanitize_url(url: str) -> str:
"""Sanitize URL for safe processing"""
import urllib.parse
try:
# Parse and reconstruct URL
parsed = urllib.parse.urlparse(url)
# Validate scheme
if parsed.scheme not in ['http', 'https', 'ftp']:
return None
# Reconstruct clean URL
clean_url = urllib.parse.urlunparse(parsed)
return clean_url
except Exception:
return None
Caching Utilities
Result Caching
import functools
import hashlib
import json
import pickle
from pathlib import Path
def cache_analysis_result(cache_dir: str = ".cache"):
"""Decorator for caching analysis results"""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
# Create cache directory
cache_path = Path(cache_dir)
cache_path.mkdir(exist_ok=True)
# Generate cache key
cache_key = hashlib.md5(
json.dumps([str(arg) for arg in args] +
[f"{k}={v}" for k, v in kwargs.items()]).encode()
).hexdigest()
cache_file = cache_path / f"{func.__name__}_{cache_key}.cache"
# Check if cached result exists
if cache_file.exists():
try:
with open(cache_file, 'rb') as f:
return pickle.load(f)
except Exception:
# Cache corrupted, remove it
cache_file.unlink()
# Execute function and cache result
result = func(*args, **kwargs)
try:
with open(cache_file, 'wb') as f:
pickle.dump(result, f)
except Exception:
# Failed to cache, continue without caching
pass
return result
return wrapper
return decorator
# Usage
@cache_analysis_result()
def expensive_analysis(apk_path: str) -> Dict[str, Any]:
"""Expensive analysis function with caching"""
# Perform expensive computation
time.sleep(5) # Simulate expensive operation
return {'result': 'analysis_complete'}
Performance Monitoring
import time
import functools
def monitor_performance(func):
"""Decorator to monitor function performance"""
@functools.wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
memory_start = get_memory_usage() # Custom function
try:
result = func(*args, **kwargs)
success = True
except Exception as e:
result = None
success = False
raise
finally:
end_time = time.time()
memory_end = get_memory_usage()
# Log performance metrics
execution_time = end_time - start_time
memory_delta = memory_end - memory_start
print(f"Function {func.__name__}:")
print(f" Execution time: {execution_time:.2f}s")
print(f" Memory change: {memory_delta:.2f}MB")
print(f" Success: {success}")
return result
return wrapper
def get_memory_usage() -> float:
"""Get current memory usage in MB"""
try:
import psutil
process = psutil.Process()
return process.memory_info().rss / 1024 / 1024
except ImportError:
return 0.0
Utility Integration Examples
Combining Utilities in Analysis Modules
from dexray_insight.Utils.file_utils import dump_json, split_path_file_extension
from dexray_insight.Utils.androguardObjClass import Androguard_Obj
from dexray_insight.Utils.log import set_logger
class CustomAnalysisModule:
def __init__(self, config):
self.config = config
self.logger = logging.getLogger(__name__)
def analyze(self, apk_path: str, context: AnalysisContext):
# Validate APK path
if not validate_apk_path(apk_path):
raise ValueError(f"Invalid APK path: {apk_path}")
# Extract filename components
base_dir, name, ext = split_path_file_extension(apk_path)
# Initialize Androguard analysis
androguard_obj = Androguard_Obj(apk_path)
if not androguard_obj.is_valid_apk():
raise ValueError("Invalid APK file")
# Perform analysis
results = self._perform_custom_analysis(androguard_obj)
# Sanitize and save results
sanitized_name = sanitize_filename(name)
output_file = f"custom_analysis_{sanitized_name}.json"
dump_json(output_file, results)
return results
Error Handling Utilities
def safe_extract_urls(text: str) -> List[str]:
"""Safely extract URLs with error handling"""
try:
urls = extract_urls(text)
# Sanitize URLs
clean_urls = []
for url in urls:
clean_url = sanitize_url(url)
if clean_url:
clean_urls.append(clean_url)
return clean_urls
except Exception as e:
logging.warning(f"Failed to extract URLs: {e}")
return []
def safe_analyze_permissions(permissions: List[str]) -> Dict[str, Any]:
"""Safely analyze permissions with error handling"""
try:
return categorize_permissions(permissions)
except Exception as e:
logging.error(f"Failed to analyze permissions: {e}")
return {'dangerous': [], 'normal': [], 'custom': [], 'system': []}
These utilities provide the foundation for reliable and consistent APK analysis throughout the Dexray Insight framework. They handle common edge cases, provide error recovery mechanisms, and ensure data integrity across the analysis pipeline.