diff --git a/web/reNgine/celery_custom_task.py b/web/reNgine/celery_custom_task.py
index 37bbdbbb..a50f1911 100644
--- a/web/reNgine/celery_custom_task.py
+++ b/web/reNgine/celery_custom_task.py
@@ -109,7 +109,10 @@ def __call__(self, *args, **kwargs):
# Create ScanActivity for this task and send start scan notifs
if self.track:
- logger.warning(f'Task {self.task_name} is RUNNING')
+ if self.domain:
+ logger.warning(f'Task {self.task_name} for {self.subdomain.name if self.subdomain else self.domain.name} is RUNNING')
+ else:
+ logger.warning(f'Task {self.task_name} is RUNNING')
self.create_scan_activity()
if RENGINE_CACHE_ENABLED:
@@ -119,7 +122,10 @@ def __call__(self, *args, **kwargs):
if result and result != b'null':
self.status = SUCCESS_TASK
if RENGINE_RECORD_ENABLED and self.track:
- logger.warning(f'Task {self.task_name} status is SUCCESS (CACHED)')
+ if self.domain:
+ logger.warning(f'Task {self.task_name} for {self.subdomain.name if self.subdomain else self.domain.name} status is SUCCESS (CACHED)')
+ else:
+ logger.warning(f'Task {self.task_name} status is SUCCESS (CACHED)')
self.update_scan_activity()
return json.loads(result)
@@ -150,7 +156,10 @@ def __call__(self, *args, **kwargs):
self.write_results()
if RENGINE_RECORD_ENABLED and self.track:
- msg = f'Task {self.task_name} status is {self.status_str}'
+ if self.domain:
+ msg = f'Task {self.task_name} for {self.subdomain.name if self.subdomain else self.domain.name} status is {self.status_str}'
+ else:
+ msg = f'Task {self.task_name} status is {self.status_str}'
msg += f' | Error: {self.error}' if self.error else ''
logger.warning(msg)
self.update_scan_activity()
diff --git a/web/reNgine/common_func.py b/web/reNgine/common_func.py
index 4cba169c..1aacf507 100644
--- a/web/reNgine/common_func.py
+++ b/web/reNgine/common_func.py
@@ -506,6 +506,45 @@ def extract_path_from_url(url):
return reconstructed_url
+def is_valid_url(url):
+ """Check if a URL is valid, including both full URLs and domain:port format.
+
+ Args:
+ url (str): URL to validate (https://domain.com or domain.com:port)
+
+ Returns:
+ bool: True if valid URL, False otherwise
+ """
+ logger.debug(f'Validating URL: {url}')
+
+ # Handle URLs with scheme (http://, https://)
+ if url.startswith(('http://', 'https://')):
+ return validators.url(url)
+
+ # Handle domain:port format
+ try:
+ if ':' in url:
+ domain, port = url.rsplit(':', 1)
+ # Validate port
+ port = int(port)
+ if not 1 <= port <= 65535:
+ logger.debug(f'Invalid port number: {port}')
+ return False
+ else:
+ domain = url
+
+ # Validate domain
+ if validators.domain(domain) or validators.ipv4(domain) or validators.ipv6(domain):
+ logger.debug(f'Valid domain/IP found: {domain}')
+ return True
+
+ logger.debug(f'Invalid domain/IP: {domain}')
+ return False
+
+ except (ValueError, ValidationError) as e:
+ logger.debug(f'Validation error: {str(e)}')
+ return False
+
#-------#
# Utils #
#-------#
@@ -878,7 +917,7 @@ def get_task_cache_key(func_name, *args, **kwargs):
def get_output_file_name(scan_history_id, subscan_id, filename):
- title = f'#{scan_history_id}'
+ title = f'{scan_history_id}'
if subscan_id:
title += f'-{subscan_id}'
title += f'_{filename}'
@@ -925,21 +964,28 @@ def get_nmap_cmd(
script=None,
script_args=None,
max_rate=None,
- service_detection=True,
flags=[]):
- if not cmd:
- cmd = 'nmap'
+ # Initialize base options
options = {
- "-sV": service_detection,
- "-p": ports,
+ "--max-rate": max_rate,
+ "-oX": output_file,
"--script": script,
"--script-args": script_args,
- "--max-rate": max_rate,
- "-oX": output_file
}
+
+ if not cmd:
+ cmd = 'nmap'
+ # Update options with nmap specific parameters
+ options.update({
+ "-sV": "",
+ "-p": ports,
+ })
+
+ # Build command with options
cmd = _build_cmd(cmd, options, flags)
+ # Add input source
if not input_file:
cmd += f" {host}" if host else ""
else:
@@ -1352,4 +1398,23 @@ def get_ips_from_cidr_range(target):
return [str(ip) for ip in ipaddress.IPv4Network(target)]
except ValueError:
logger.error(f'{target} is not a valid CIDR range. Skipping.')
- return []
\ No newline at end of file
+ return []
+
+def get_http_crawl_value(engine, config):
+ """Get HTTP crawl value from config.
+
+ Args:
+ engine: EngineType object
+ config: Configuration dictionary or None
+
+ Returns:
+ bool: True if HTTP crawl is enabled
+ """
+ # subscan engine value
+ enable_http_crawl = config.get(ENABLE_HTTP_CRAWL) if config else None
+ if enable_http_crawl is None:
+ # scan engine value
+ yaml_config = yaml.safe_load(engine.yaml_configuration)
+ enable_http_crawl = yaml_config.get(ENABLE_HTTP_CRAWL, DEFAULT_ENABLE_HTTP_CRAWL)
+ logger.debug(f'Enable HTTP crawl: {enable_http_crawl}')
+ return enable_http_crawl
diff --git a/web/reNgine/settings.py b/web/reNgine/settings.py
index 1a63d823..d1655f2b 100644
--- a/web/reNgine/settings.py
+++ b/web/reNgine/settings.py
@@ -45,7 +45,7 @@
DOMAIN_NAME = env('DOMAIN_NAME', default='localhost:8000')
TEMPLATE_DEBUG = env.bool('TEMPLATE_DEBUG', default=False)
SECRET_FILE = os.path.join(RENGINE_HOME, 'secret')
-DEFAULT_ENABLE_HTTP_CRAWL = env.bool('DEFAULT_ENABLE_HTTP_CRAWL', default=True)
+DEFAULT_ENABLE_HTTP_CRAWL = env.bool('DEFAULT_ENABLE_HTTP_CRAWL', default=False)
DEFAULT_RATE_LIMIT = env.int('DEFAULT_RATE_LIMIT', default=150) # requests / second
DEFAULT_HTTP_TIMEOUT = env.int('DEFAULT_HTTP_TIMEOUT', default=5) # seconds
DEFAULT_RETRIES = env.int('DEFAULT_RETRIES', default=1)
diff --git a/web/reNgine/tasks.py b/web/reNgine/tasks.py
index 4e016824..d6b65571 100644
--- a/web/reNgine/tasks.py
+++ b/web/reNgine/tasks.py
@@ -15,12 +15,12 @@
import shutil
from pathlib import Path
-from datetime import datetime
from urllib.parse import urlparse
from api.serializers import SubdomainSerializer
from celery import chain, chord, group
from celery.result import allow_join_result
from celery.utils.log import get_task_logger
+from django.db import transaction
from django.db.models import Count
from dotted_dict import DottedDict
from django.utils import timezone, html
@@ -82,7 +82,6 @@ def initiate_scan(
if CELERY_REMOTE_DEBUG:
debug()
- logger.info('Initiating scan on celery')
scan = None
try:
# Get scan engine
@@ -95,7 +94,6 @@ def initiate_scan(
# Get YAML config
config = yaml.safe_load(engine.yaml_configuration)
- enable_http_crawl = config.get(ENABLE_HTTP_CRAWL, DEFAULT_ENABLE_HTTP_CRAWL)
gf_patterns = config.get(GF_PATTERNS, [])
# Get domain and set last_scan_date
@@ -103,6 +101,8 @@ def initiate_scan(
domain.last_scan_date = timezone.now()
domain.save()
+ logger.warning(f'Initiating scan for domain {domain.name} on celery')
+
# Get path filter
url_filter = url_filter.rstrip('/')
@@ -124,16 +124,26 @@ def initiate_scan(
scan.domain = domain
scan.start_scan_date = timezone.now()
scan.tasks = engine.tasks
- uuid_scan = uuid.uuid1()
- scan.results_dir = f'{results_dir}/{domain.name}/scans/{uuid_scan}'
+
+ # Create results directory
+ try:
+ uuid_scan = uuid.uuid1()
+ scan.results_dir = SafePath.create_safe_path(
+ base_dir=RENGINE_RESULTS,
+ components=[domain.name, 'scans', str(uuid_scan)]
+ )
+ except (ValueError, OSError) as e:
+ logger.error(f"Failed to create results directory: {str(e)}")
+ scan.scan_status = FAILED_TASK
+ scan.error_message = "Failed to create results directory, scan failed"
+ scan.save()
+ return {'success': False, 'error': scan.error_message}
+
add_gf_patterns = gf_patterns and 'fetch_url' in engine.tasks
if add_gf_patterns and is_iterable(gf_patterns):
scan.used_gf_patterns = ','.join(gf_patterns)
scan.save()
- # Create scan results dir
- os.makedirs(scan.results_dir, exist_ok=True)
-
# Build task context
ctx = {
'scan_history_id': scan_history_id,
@@ -162,20 +172,30 @@ def initiate_scan(
subdomain_name = domain.name
subdomain, _ = save_subdomain(subdomain_name, ctx=ctx)
-
- # If enable_http_crawl is set, create an initial root HTTP endpoint so that
- # HTTP crawling can start somewhere
+ # Create initial HTTP URL
http_url = f'{domain.name}{url_filter}' if url_filter else domain.name
- endpoint, _ = save_endpoint(
- http_url,
- ctx=ctx,
- crawl=enable_http_crawl,
- is_default=True,
- subdomain=subdomain
- )
+ endpoint = None
+
+ # Use Nmap to find web services ports
+ logger.warning(f'Using Nmap to find web services on {http_url}')
+ hosts_data = get_nmap_http_datas(http_url, ctx)
+ logger.debug(f'Identified hosts: {hosts_data}')
- save_subdomain_metadata(subdomain, endpoint)
+ if not hosts_data:
+ logger.warning(f'Nmap found no web services on host {http_url}. Scan failed.')
+ scan.scan_status = FAILED_TASK
+ scan.error_message = "Sorry, host does not seems to have any web service"
+ scan.save()
+ return {'success': False, 'error': scan.error_message}
+ # Create first HTTP endpoint
+ endpoint = create_first_endpoint_from_nmap_data(hosts_data, domain, subdomain, ctx)
+ if not endpoint:
+ logger.warning(f'Could not create any valid endpoints for {http_url}. Scan failed.')
+ scan.scan_status = FAILED_TASK
+ scan.error_message = "Failed to create valid endpoints"
+ scan.save()
+ return {'success': False, 'error': scan.error_message}
# Build Celery tasks, crafted according to the dependency graph below:
# subdomain_discovery --> port_scan --> fetch_url --> dir_file_fuzz
@@ -237,7 +257,7 @@ def initiate_subscan(
scan_history_id (int): ScanHistory id.
subdomain_id (int): Subdomain id.
engine_id (int): Engine ID.
- scan_type (int): Scan type (periodic, live).
+ scan_type (int): Scan type (port_scan, subdomain_discovery, vulnerability_scan...).
results_dir (str): Results directory.
url_filter (str): URL path. Default: ''
"""
@@ -245,78 +265,119 @@ def initiate_subscan(
if CELERY_REMOTE_DEBUG:
debug()
- # Get Subdomain, Domain and ScanHistory
- subdomain = Subdomain.objects.get(pk=subdomain_id)
- scan = ScanHistory.objects.get(pk=subdomain.scan_history.id)
- domain = Domain.objects.get(pk=subdomain.target_domain.id)
+ subscan = None
+ try:
+ # Get Subdomain, Domain and ScanHistory
+ subdomain = Subdomain.objects.get(pk=subdomain_id)
+ scan = ScanHistory.objects.get(pk=subdomain.scan_history.id)
+ domain = Domain.objects.get(pk=subdomain.target_domain.id)
- # Get EngineType
- engine_id = engine_id or scan.scan_type.id
- engine = EngineType.objects.get(pk=engine_id)
+ logger.info(f'Initiating subscan for subdomain {subdomain.name} on celery')
- # Get YAML config
- config = yaml.safe_load(engine.yaml_configuration)
- enable_http_crawl = config.get(ENABLE_HTTP_CRAWL, DEFAULT_ENABLE_HTTP_CRAWL)
+ # Get EngineType
+ engine_id = engine_id or scan.scan_type.id
+ engine = EngineType.objects.get(pk=engine_id)
- # Create scan activity of SubScan Model
- subscan = SubScan(
- start_scan_date=timezone.now(),
- celery_ids=[initiate_subscan.request.id],
- scan_history=scan,
- subdomain=subdomain,
- type=scan_type,
- status=RUNNING_TASK,
- engine=engine)
- subscan.save()
-
- # Get YAML configuration
- config = yaml.safe_load(engine.yaml_configuration)
-
- # Create results directory
- uuid_scan = uuid.uuid1()
- results_dir = f'{results_dir}/{domain.name}/subscans/{uuid_scan}'
- os.makedirs(results_dir, exist_ok=True)
-
- # Run task
- method = globals().get(scan_type)
- if not method:
- logger.warning(f'Task {scan_type} is not supported by reNgine. Skipping')
- return
- scan.tasks.append(scan_type)
- scan.save()
+ # Get YAML config
+ config = yaml.safe_load(engine.yaml_configuration)
+ config_subscan = config.get(scan_type)
+ enable_http_crawl = get_http_crawl_value(engine, config_subscan)
- # Send start notif
- send_scan_notif.delay(
- scan.id,
- subscan_id=subscan.id,
- engine_id=engine_id,
- status='RUNNING')
-
- # Build context
- ctx = {
- 'scan_history_id': scan.id,
- 'subscan_id': subscan.id,
- 'engine_id': engine_id,
- 'domain_id': domain.id,
- 'subdomain_id': subdomain.id,
- 'yaml_configuration': config,
- 'results_dir': results_dir,
- 'url_filter': url_filter
- }
+ # Create scan activity of SubScan Model
+ subscan = SubScan(
+ start_scan_date=timezone.now(),
+ celery_ids=[initiate_subscan.request.id],
+ scan_history=scan,
+ subdomain=subdomain,
+ type=scan_type,
+ status=RUNNING_TASK,
+ engine=engine)
+ subscan.save()
- # Build header + callback
- workflow = method.si(ctx=ctx)
- callback = report.si(ctx=ctx).set(link_error=[report.si(ctx=ctx)])
+ # Create results directory
+ try:
+ uuid_scan = uuid.uuid1()
+ results_dir = SafePath.create_safe_path(
+ base_dir=RENGINE_RESULTS,
+ components=[domain.name, 'subscans', str(uuid_scan)]
+ )
+ except (ValueError, OSError) as e:
+ logger.error(f"Failed to create results directory: {str(e)}")
+ subscan.scan_status = FAILED_TASK
+ subscan.error_message = "Failed to create results directory, scan failed"
+ subscan.save()
+ return {
+ 'success': False,
+ 'error': subscan.error_message
+ }
- # Run Celery tasks
- task = chain(workflow, callback).on_error(callback).delay()
- subscan.celery_ids.append(task.id)
- subscan.save()
+ # Run task
+ method = globals().get(scan_type)
+ if not method:
+ logger.warning(f'Task {scan_type} is not supported by reNgine. Skipping')
+ return
+ scan.tasks.append(scan_type)
+ scan.save()
- return {
- 'success': True,
- 'task_id': task.id
- }
+ # Send start notif
+ send_scan_notif.delay(
+ scan.id,
+ subscan_id=subscan.id,
+ engine_id=engine_id,
+ status='RUNNING')
+
+ # Build context
+ ctx = {
+ 'scan_history_id': scan.id,
+ 'subscan_id': subscan.id,
+ 'engine_id': engine_id,
+ 'domain_id': domain.id,
+ 'subdomain_id': subdomain.id,
+ 'yaml_configuration': config,
+ 'yaml_configuration_subscan': config_subscan,
+ 'results_dir': results_dir,
+ 'url_filter': url_filter
+ }
+
+ ctx_str = json.dumps(ctx, indent=2)
+ logger.warning(f'Starting subscan {subscan.id} with context:\n{ctx_str}')
+
+ if enable_http_crawl:
+ results = http_crawl(
+ urls=[subdomain.http_url],
+ ctx=ctx)
+ if not results:
+ subscan.scan_status = FAILED_TASK
+ subscan.error_message = "Sorry, host does not seems to have any web service"
+ subscan.save()
+ return {
+ 'success': False,
+ 'error': subscan.error_message
+ }
+
+ # Build header + callback
+ workflow = method.si(ctx=ctx)
+ callback = report.si(ctx=ctx).set(link_error=[report.si(ctx=ctx)])
+
+ # Run Celery tasks
+ task = chain(workflow, callback).on_error(callback).delay()
+ subscan.celery_ids.append(task.id)
+ subscan.save()
+
+ return {
+ 'success': True,
+ 'task_id': task.id
+ }
+ except Exception as e:
+ logger.exception(e)
+ if subscan:
+ subscan.scan_status = FAILED_TASK
+ subscan.error_message = str(e)
+ subscan.save()
+ return {
+ 'success': False,
+ 'error': str(e)
+ }
@app.task(name='report', bind=False, queue='report_queue')
@@ -392,7 +453,7 @@ def subdomain_discovery(
# Config
config = self.yaml_configuration.get(SUBDOMAIN_DISCOVERY) or {}
- enable_http_crawl = config.get(ENABLE_HTTP_CRAWL) or self.yaml_configuration.get(ENABLE_HTTP_CRAWL, DEFAULT_ENABLE_HTTP_CRAWL)
+ enable_http_crawl = get_http_crawl_value(self, config)
threads = config.get(THREADS) or self.yaml_configuration.get(THREADS, DEFAULT_THREADS)
timeout = config.get(TIMEOUT) or self.yaml_configuration.get(TIMEOUT, DEFAULT_HTTP_TIMEOUT)
tools = config.get(USES_TOOLS, SUBDOMAIN_SCAN_DEFAULT_TOOLS)
@@ -564,7 +625,6 @@ def subdomain_discovery(
http_crawl(urls, ctx=ctx, update_subdomain_metadatas=True)
else:
url_filter = ctx.get('url_filter')
- enable_http_crawl = config.get(ENABLE_HTTP_CRAWL, DEFAULT_ENABLE_HTTP_CRAWL)
# Find root subdomain endpoints
for subdomain in subdomains:
subdomain_name = subdomain.strip()
@@ -604,7 +664,6 @@ def subdomain_discovery(
return SubdomainSerializer(subdomains, many=True).data
-
@app.task(name='osint', queue='main_scan_queue', base=RengineTask, bind=True)
def osint(self, host=None, ctx={}, description=None):
"""Run Open-Source Intelligence tools on selected domain.
@@ -1183,7 +1242,6 @@ def screenshot(self, ctx={}, description=None):
output_path = str(Path(self.results_dir) / 'screenshots' / self.filename)
alive_endpoints_file = str(Path(self.results_dir) / 'endpoints_alive.txt')
config = self.yaml_configuration.get(SCREENSHOT) or {}
- enable_http_crawl = config.get(ENABLE_HTTP_CRAWL, DEFAULT_ENABLE_HTTP_CRAWL)
intensity = config.get(INTENSITY) or self.yaml_configuration.get(INTENSITY, DEFAULT_SCAN_INTENSITY)
timeout = config.get(TIMEOUT) or self.yaml_configuration.get(TIMEOUT, DEFAULT_HTTP_TIMEOUT + 5)
threads = config.get(THREADS) or self.yaml_configuration.get(THREADS, DEFAULT_THREADS)
@@ -1192,13 +1250,16 @@ def screenshot(self, ctx={}, description=None):
strict = True if intensity == 'normal' else False
# Get URLs to take screenshot of
- get_http_urls(
- is_alive=enable_http_crawl,
+ urls = get_http_urls(
+ is_alive=True,
strict=strict,
write_filepath=alive_endpoints_file,
get_only_default_urls=True,
ctx=ctx
)
+ if not urls:
+ logger.error(f'No URLs to take screenshot of. Skipping.')
+ return
# Send start notif
notification = Notification.objects.first()
@@ -1226,7 +1287,6 @@ def screenshot(self, ctx={}, description=None):
indices = [header.index(col) for col in ["Protocol", "Port", "Domain", "Request Status", "Screenshot Path", " Source Path"]]
for row in reader:
protocol, port, subdomain_name, status, screenshot_path, source_path = extract_columns(row, indices)
- logger.info(f'{protocol}:{port}:{subdomain_name}:{status}')
subdomain_query = Subdomain.objects.filter(name=subdomain_name)
if self.scan:
subdomain_query = subdomain_query.filter(scan_history=self.scan)
@@ -1235,7 +1295,7 @@ def screenshot(self, ctx={}, description=None):
screenshot_paths.append(screenshot_path)
subdomain.screenshot_path = screenshot_path.replace(RENGINE_RESULTS, '')
subdomain.save()
- logger.warning(f'Added screenshot for {subdomain.name} to DB')
+ logger.warning(f'Added screenshot for {protocol}://{subdomain.name}:{port} to DB')
# Remove all db, html extra files in screenshot results
run_command(
@@ -1279,7 +1339,7 @@ def port_scan(self, hosts=[], ctx={}, description=None):
# Config
config = self.yaml_configuration.get(PORT_SCAN) or {}
- enable_http_crawl = config.get(ENABLE_HTTP_CRAWL, DEFAULT_ENABLE_HTTP_CRAWL)
+ enable_http_crawl = get_http_crawl_value(self, config)
timeout = config.get(TIMEOUT) or self.yaml_configuration.get(TIMEOUT, DEFAULT_HTTP_TIMEOUT)
exclude_ports = config.get(NAABU_EXCLUDE_PORTS, [])
exclude_subdomains = config.get(NAABU_EXCLUDE_SUBDOMAINS, False)
@@ -1418,29 +1478,50 @@ def port_scan(self, hosts=[], ctx={}, description=None):
logger.info('Finished running naabu port scan.')
# Process nmap results: 1 process per host
- sigs = []
if nmap_enabled:
- logger.warning(f'Starting nmap scans ...')
+ logger.warning(f'Starting nmap scans on {len(ports_data)} hosts ...')
logger.warning(ports_data)
- for host, port_list in ports_data.items():
- ports_str = '_'.join([str(p) for p in port_list])
- ctx_nmap = ctx.copy()
- ctx_nmap['description'] = get_task_title(f'nmap_{host}', self.scan_id, self.subscan_id)
- ctx_nmap['track'] = False
- sig = nmap.si(
- cmd=nmap_cmd,
+ nmap_args = {
+ 'rate_limit': rate_limit,
+ 'nmap_cmd': nmap_cmd,
+ 'nmap_script': nmap_script,
+ 'nmap_script_args': nmap_script_args,
+ 'ports_data': ports_data
+ }
+ run_nmap(ctx, **nmap_args)
+
+ return ports_data
+
+@app.task(name='run_nmap', queue='main_scan_queue', base=RengineTask, bind=True)
+def run_nmap(self, ctx, **nmap_args):
+ """Run nmap scans in parallel for each host.
+
+ Args:
+ self: RengineTask instance
+ ctx: Scan context
+ nmap_args: Dictionary containing nmap configuration
+ - nmap_cmd: Custom nmap command
+ - nmap_script: NSE scripts to run
+ - nmap_script_args: NSE script arguments
+ - ports_data: Dictionary mapping hosts to their open ports
+ """
+ sigs = []
+ for host, port_list in nmap_args.get('ports_data', {}).items():
+ ctx_nmap = ctx.copy()
+ ctx_nmap['description'] = get_task_title(f'nmap_{host}', self.scan_id, self.subscan_id)
+ ctx_nmap['track'] = False
+ sig = nmap.si(
+ cmd=nmap_args.get('nmap_cmd'),
ports=port_list,
host=host,
- script=nmap_script,
- script_args=nmap_script_args,
- max_rate=rate_limit,
+ script=nmap_args.get('nmap_script'),
+ script_args=nmap_args.get('nmap_script_args'),
+ max_rate=nmap_args.get('rate_limit'),
ctx=ctx_nmap)
- sigs.append(sig)
- task = group(sigs).apply_async()
- with allow_join_result():
- results = task.get()
-
- return ports_data
+ sigs.append(sig)
+ task = group(sigs).apply_async()
+ with allow_join_result():
+ task.get()
@app.task(name='nmap', queue='main_scan_queue', base=RengineTask, bind=True)
@@ -1469,7 +1550,7 @@ def nmap(
"""
notif = Notification.objects.first()
ports_str = ','.join(str(port) for port in ports)
- self.filename = self.filename.replace('.txt', '.xml')
+ self.filename = 'nmap.xml'
filename_vulns = self.filename.replace('.xml', '_vulns.json')
output_file = self.output_path
output_file_xml = f'{self.results_dir}/{host}_{self.filename}'
@@ -1496,7 +1577,11 @@ def nmap(
activity_id=self.activity_id)
# Get nmap XML results and convert to JSON
- vulns = parse_nmap_results(output_file_xml, output_file)
+ vulns = parse_nmap_results(output_file_xml, output_file, parse_type='vulnerabilities')
+ save_vulns(self, notif, vulns_file, vulns)
+ return vulns
+
+def save_vulns(self, notif, vulns_file, vulns):
with open(vulns_file, 'w') as f:
json.dump(vulns, f, indent=4)
@@ -1525,7 +1610,6 @@ def nmap(
if notif and notif.send_vuln_notif and vulns_str:
logger.warning(vulns_str)
self.notify(fields={'CVEs': vulns_str})
- return vulns
@app.task(name='waf_detection', queue='main_scan_queue', base=RengineTask, bind=True)
@@ -1541,15 +1625,17 @@ def waf_detection(self, ctx={}, description=None):
"""
input_path = str(Path(self.results_dir) / 'input_endpoints_waf_detection.txt')
config = self.yaml_configuration.get(WAF_DETECTION) or {}
- enable_http_crawl = config.get(ENABLE_HTTP_CRAWL, DEFAULT_ENABLE_HTTP_CRAWL)
# Get alive endpoints from DB
- get_http_urls(
- is_alive=enable_http_crawl,
+ urls = get_http_urls(
+ is_alive=True,
write_filepath=input_path,
get_only_default_urls=True,
ctx=ctx
)
+ if not urls:
+ logger.error(f'No URLs to check for WAF. Skipping.')
+ return
cmd = f'wafw00f -i {input_path} -o {self.output_path}'
run_command(
@@ -1615,7 +1701,7 @@ def dir_file_fuzz(self, ctx={}, description=None):
if custom_header:
custom_header = generate_header_param(custom_header,'common')
auto_calibration = config.get(AUTO_CALIBRATION, True)
- enable_http_crawl = config.get(ENABLE_HTTP_CRAWL, DEFAULT_ENABLE_HTTP_CRAWL)
+ enable_http_crawl = get_http_crawl_value(self, config)
rate_limit = config.get(RATE_LIMIT) or self.yaml_configuration.get(RATE_LIMIT, DEFAULT_RATE_LIMIT)
extensions = config.get(EXTENSIONS, DEFAULT_DIR_FILE_FUZZ_EXTENSIONS)
# prepend . on extensions
@@ -1791,7 +1877,7 @@ def fetch_url(self, urls=[], ctx={}, description=None):
config = self.yaml_configuration.get(FETCH_URL) or {}
should_remove_duplicate_endpoints = config.get(REMOVE_DUPLICATE_ENDPOINTS, True)
duplicate_removal_fields = config.get(DUPLICATE_REMOVAL_FIELDS, ENDPOINT_SCAN_DEFAULT_DUPLICATE_FIELDS)
- enable_http_crawl = config.get(ENABLE_HTTP_CRAWL, DEFAULT_ENABLE_HTTP_CRAWL)
+ enable_http_crawl = get_http_crawl_value(self, config)
gf_patterns = config.get(GF_PATTERNS, DEFAULT_GF_PATTERNS)
ignore_file_extension = config.get(IGNORE_FILE_EXTENSION, DEFAULT_IGNORE_FILE_EXTENSIONS)
tools = config.get(USES_TOOLS, ENDPOINT_SCAN_DEFAULT_TOOLS)
@@ -1811,7 +1897,7 @@ def fetch_url(self, urls=[], ctx={}, description=None):
else:
logger.debug(f'URLs gathered from database')
urls = get_http_urls(
- is_alive=enable_http_crawl,
+ is_alive=True,
write_filepath=input_path,
exclude_subdomains=exclude_subdomains,
get_only_default_urls=True,
@@ -1937,8 +2023,25 @@ def fetch_url(self, urls=[], ctx={}, description=None):
base_url, urlpath = tuple(url.split(' - '))
if base_url and urlpath:
- subdomain = urlparse(base_url)
- url = f'{subdomain.scheme}://{subdomain.netloc}{urlpath}'
+ # Handle both cases: path-only and full URLs
+ if urlpath.startswith(('http://', 'https://')):
+ # Full URL case - check if in scope
+ parsed_url = urlparse(urlpath)
+ if self.domain.name in parsed_url.netloc:
+ url = urlpath # Use the full URL directly
+ logger.debug(f'Found in-scope URL: {url}')
+ else:
+ logger.debug(f'URL {urlpath} not in scope for domain {self.domain.name}. Skipping.')
+ continue
+ else:
+ # Path-only case
+ subdomain = urlparse(base_url)
+ # Remove ./ at beginning of urlpath
+ urlpath = urlpath.lstrip('./')
+ # Ensure urlpath starts with /
+ if not urlpath.startswith('/'):
+ urlpath = '/' + urlpath
+ url = f'{subdomain.scheme}://{subdomain.netloc}{urlpath}'
if not validators.url(url):
logger.warning(f'Invalid URL "{url}". Skipping.')
@@ -2377,7 +2480,7 @@ def nuclei_scan(self, urls=[], ctx={}, description=None):
# Config
config = self.yaml_configuration.get(VULNERABILITY_SCAN) or {}
input_path = str(Path(self.results_dir) / 'input_endpoints_vulnerability_scan.txt')
- enable_http_crawl = config.get(ENABLE_HTTP_CRAWL, DEFAULT_ENABLE_HTTP_CRAWL)
+ enable_http_crawl = get_http_crawl_value(self, config)
concurrency = config.get(NUCLEI_CONCURRENCY) or self.yaml_configuration.get(THREADS, DEFAULT_THREADS)
intensity = config.get(INTENSITY) or self.yaml_configuration.get(INTENSITY, DEFAULT_SCAN_INTENSITY)
rate_limit = config.get(RATE_LIMIT) or self.yaml_configuration.get(RATE_LIMIT, DEFAULT_RATE_LIMIT)
@@ -2402,13 +2505,18 @@ def nuclei_scan(self, urls=[], ctx={}, description=None):
with open(input_path, 'w') as f:
f.write('\n'.join(urls))
else:
- get_http_urls(
- is_alive=enable_http_crawl,
+ logger.debug(f'Getting alive endpoints for Nuclei scan')
+ urls = get_http_urls(
+ is_alive=True,
ignore_files=True,
write_filepath=input_path,
ctx=ctx
)
+ if not urls:
+ logger.error(f'No URLs to scan for Nuclei. Skipping.')
+ return
+
if intensity == 'normal': # reduce number of endpoints to scan
unfurl_filter = str(Path(self.results_dir) / 'urls_unfurled.txt')
run_command(
@@ -2518,13 +2626,17 @@ def dalfox_xss_scan(self, urls=[], ctx={}, description=None):
with open(input_path, 'w') as f:
f.write('\n'.join(urls))
else:
- get_http_urls(
- is_alive=False,
+ urls = get_http_urls(
+ is_alive=True,
ignore_files=False,
write_filepath=input_path,
ctx=ctx
)
+ if not urls:
+ logger.error(f'No URLs to scan for XSS. Skipping.')
+ return
+
notif = Notification.objects.first()
send_status = notif.send_scan_status_notif if notif else False
@@ -2646,13 +2758,17 @@ def crlfuzz_scan(self, urls=[], ctx={}, description=None):
with open(input_path, 'w') as f:
f.write('\n'.join(urls))
else:
- get_http_urls(
- is_alive=False,
+ urls = get_http_urls(
+ is_alive=True,
ignore_files=True,
write_filepath=input_path,
ctx=ctx
)
+ if not urls:
+ logger.error(f'No URLs to scan for CRLF. Skipping.')
+ return
+
notif = Notification.objects.first()
send_status = notif.send_scan_status_notif if notif else False
@@ -2758,7 +2874,17 @@ def s3scanner(self, ctx={}, description=None):
ctx (dict): Context
description (str, optional): Task description shown in UI.
"""
- input_path = str(Path(self.results_dir) / f'#{self.scan_id}_subdomain_discovery.txt')
+ input_path = str(Path(self.results_dir) / f'{self.scan_id}_s3_bucket_discovery.txt')
+
+ subdomains = Subdomain.objects.filter(scan_history=self.scan)
+ if not subdomains:
+ logger.error(f'No subdomains found for S3Scanner. Skipping.')
+ return
+
+ with open(input_path, 'w') as f:
+ for subdomain in subdomains:
+ f.write(subdomain.name + '\n')
+
vuln_config = self.yaml_configuration.get(VULNERABILITY_SCAN) or {}
s3_config = vuln_config.get(S3SCANNER) or {}
threads = s3_config.get(THREADS) or self.yaml_configuration.get(THREADS, DEFAULT_THREADS)
@@ -2841,6 +2967,9 @@ def http_crawl(
write_filepath=input_path,
ctx=ctx
)
+ if not http_urls:
+ logger.error(f'No URLs to crawl. Skipping.')
+ return
# Append endpoints
if http_urls:
@@ -2887,7 +3016,8 @@ def http_crawl(
logger.error(line)
continue
- logger.debug(line)
+ line_str = json.dumps(line, indent=2)
+ logger.debug(line_str)
# No response from endpoint
if line.get('failed', False):
@@ -3263,102 +3393,126 @@ def send_hackerone_report(vulnerability_id):
@app.task(name='parse_nmap_results', bind=False, queue='parse_nmap_results_queue')
-def parse_nmap_results(xml_file, output_file=None):
+def parse_nmap_results(xml_file, output_file=None, parse_type='vulnerabilities'):
"""Parse results from nmap output file.
Args:
xml_file (str): nmap XML report file path.
+ output_file (str, optional): JSON output file path.
+ parse_type (str): Type of parsing to perform:
+ - 'vulnerabilities': Parse vulnerabilities from nmap scripts
+ - 'services': Parse service banners from -sV
+ - 'ports': Parse only open ports
Returns:
- list: List of vulnerabilities found from nmap results.
+ list: List of parsed results depending on parse_type:
+ - vulnerabilities: List of vulnerability dictionaries
+ - services: List of service dictionaries
+ - ports: List of port dictionaries
"""
with open(xml_file, encoding='utf8') as f:
content = f.read()
try:
- nmap_results = xmltodict.parse(content) # parse XML to dict
+ nmap_results = xmltodict.parse(content)
except Exception as e:
- logger.exception(e)
+ logger.warning(e)
logger.error(f'Cannot parse {xml_file} to valid JSON. Skipping.')
return []
- # Write JSON to output file
if output_file:
with open(output_file, 'w') as f:
json.dump(nmap_results, f, indent=4)
- logger.warning(json.dumps(nmap_results, indent=4))
- hosts = (
- nmap_results
- .get('nmaprun', {})
- .get('host', {})
- )
- all_vulns = []
+
+ hosts = nmap_results.get('nmaprun', {}).get('host', {})
if isinstance(hosts, dict):
hosts = [hosts]
+ results = []
+
for host in hosts:
- # Grab hostname / IP from output
+ # Get hostname/IP
hostnames_dict = host.get('hostnames', {})
if hostnames_dict:
- # Ensure that hostnames['hostname'] is a list for consistency
hostnames_list = hostnames_dict['hostname'] if isinstance(hostnames_dict['hostname'], list) else [hostnames_dict['hostname']]
-
- # Extract all the @name values from the list of dictionaries
hostnames = [entry.get('@name') for entry in hostnames_list]
else:
hostnames = [host.get('address')['@addr']]
- # Iterate over each hostname for each port
+ # Process each hostname
for hostname in hostnames:
-
- # Grab ports from output
ports = host.get('ports', {}).get('port', [])
if isinstance(ports, dict):
ports = [ports]
for port in ports:
- url_vulns = []
port_number = port['@portid']
- url = sanitize_url(f'{hostname}:{port_number}')
- logger.info(f'Parsing nmap results for {hostname}:{port_number} ...')
if not port_number or not port_number.isdigit():
continue
+
port_protocol = port['@protocol']
- scripts = port.get('script', [])
- if isinstance(scripts, dict):
- scripts = [scripts]
-
- for script in scripts:
- script_id = script['@id']
- script_output = script['@output']
- script_output_table = script.get('table', [])
- logger.debug(f'Ran nmap script "{script_id}" on {port_number}/{port_protocol}:\n{script_output}\n')
- if script_id == 'vulscan':
- vulns = parse_nmap_vulscan_output(script_output)
- url_vulns.extend(vulns)
- elif script_id == 'vulners':
- vulns = parse_nmap_vulners_output(script_output)
- url_vulns.extend(vulns)
- # elif script_id == 'http-server-header':
- # TODO: nmap can help find technologies as well using the http-server-header script
- # regex = r'(\w+)/([\d.]+)\s?(?:\((\w+)\))?'
- # tech_name, tech_version, tech_os = re.match(regex, test_string).groups()
- # Technology.objects.get_or_create(...)
- # elif script_id == 'http_csrf':
- # vulns = parse_nmap_http_csrf_output(script_output)
- # url_vulns.extend(vulns)
- else:
- logger.warning(f'Script output parsing for script "{script_id}" is not supported yet.')
-
- # Add URL & source to vuln
- for vuln in url_vulns:
- vuln['source'] = NMAP
- # TODO: This should extend to any URL, not just HTTP
- vuln['http_url'] = url
- if 'http_path' in vuln:
- vuln['http_url'] += vuln['http_path']
- all_vulns.append(vuln)
-
- return all_vulns
+ port_state = port.get('state', {}).get('@state')
+
+ # Skip closed ports
+ if port_state != 'open':
+ continue
+
+ url = sanitize_url(f'{hostname}:{port_number}')
+
+ if parse_type == 'ports':
+ # Return only open ports info
+ results.append({
+ 'host': hostname,
+ 'port': port_number,
+ 'protocol': port_protocol,
+ 'state': port_state
+ })
+ continue
+
+ if parse_type == 'services':
+ # Parse service information from -sV
+ service = port.get('service', {})
+ results.append({
+ 'host': hostname,
+ 'port': port_number,
+ 'protocol': port_protocol,
+ 'service_name': service.get('@name'),
+ 'service_product': service.get('@product'),
+ 'service_version': service.get('@version'),
+ 'service_extrainfo': service.get('@extrainfo'),
+ 'service_ostype': service.get('@ostype'),
+ 'service_method': service.get('@method'),
+ 'service_conf': service.get('@conf')
+ })
+ continue
+
+ if parse_type == 'vulnerabilities':
+ # Original vulnerability parsing logic
+ url_vulns = []
+ scripts = port.get('script', [])
+ if isinstance(scripts, dict):
+ scripts = [scripts]
+
+ for script in scripts:
+ script_id = script['@id']
+ script_output = script['@output']
+
+ if script_id == 'vulscan':
+ vulns = parse_nmap_vulscan_output(script_output)
+ url_vulns.extend(vulns)
+ elif script_id == 'vulners':
+ vulns = parse_nmap_vulners_output(script_output)
+ url_vulns.extend(vulns)
+ else:
+ logger.warning(f'Script output parsing for script "{script_id}" is not supported yet.')
+
+ for vuln in url_vulns:
+ vuln['source'] = NMAP
+ vuln['http_url'] = url
+ if 'http_path' in vuln:
+ vuln['http_url'] += vuln['http_path']
+ results.append(vuln)
+
+ return results
def parse_nmap_http_csrf_output(script_output):
@@ -3547,7 +3701,6 @@ def parse_s3scanner_result(line):
'size': bucket['bucket_size']
}
-
def parse_nuclei_result(line):
"""Parse results from nuclei JSON output.
@@ -3579,7 +3732,6 @@ def parse_nuclei_result(line):
'source': NUCLEI,
}
-
def parse_dalfox_result(line):
"""Parse results from nuclei JSON output.
@@ -4265,7 +4417,7 @@ def stream_command(cmd, cwd=None, shell=False, history_file=None, encoding='utf-
return_code = process.returncode
command_obj.return_code = return_code
command_obj.save()
- logger.info(f'Command returned exit code: {return_code}')
+ logger.debug(f'Command returned exit code: {return_code}')
if history_file:
write_history(history_file, cmd, return_code, output)
@@ -4558,78 +4710,93 @@ def save_endpoint(
Args:
http_url (str): Input HTTP URL.
is_default (bool): If the url is a default url for SubDomains.
- scan_history (startScan.models.ScanHistory): ScanHistory object.
- domain (startScan.models.Domain): Domain object.
- subdomain (starScan.models.Subdomain): Subdomain object.
- results_dir (str, optional): Results directory.
- crawl (bool, optional): Run httpx on endpoint if True. Default: False.
- force (bool, optional): Force crawl even if ENABLE_HTTP_CRAWL mode is on.
- subscan (startScan.models.SubScan, optional): SubScan object.
-
+ ctx (dict): Context containing scan and domain information.
+ crawl (bool): Run httpx on endpoint if True.
+ endpoint_data: Additional endpoint data (including subdomain).
+
Returns:
- tuple: (startScan.models.EndPoint, created) where `created` is a boolean
- indicating if the object is new or already existed.
+ tuple: (EndPoint, created) or (None, False) if invalid
"""
- # remove nulls
+ # Remove nulls and validate basic inputs
+ # Remove nulls and validate basic inputs
endpoint_data = replace_nulls(endpoint_data)
-
scheme = urlparse(http_url).scheme
- endpoint = None
- created = False
- if ctx.get('domain_id'):
- domain = Domain.objects.get(id=ctx.get('domain_id'))
- if domain.name not in http_url:
- logger.error(f"{http_url} is not a URL of domain {domain.name}. Skipping.")
- return None, False
+
+ if not scheme:
+ logger.error(f'{http_url} is missing scheme (http or https). Skipping.')
+ return None, False
+
+ if not is_valid_url(http_url):
+ logger.error(f'{http_url} is not a valid URL. Skipping.')
+ return None, False
+
+ # Get required objects
+ scan = ScanHistory.objects.filter(pk=ctx.get('scan_history_id')).first()
+ domain = Domain.objects.filter(pk=ctx.get('domain_id')).first()
+ subdomain = endpoint_data.get('subdomain')
+
+ if not all([scan, domain]):
+ logger.error('Missing scan or domain information')
+ return None, False
+
+ if domain.name not in http_url:
+ logger.error(f"{http_url} is not a URL of domain {domain.name}. Skipping.")
+ return None, False
+
+ http_url = sanitize_url(http_url)
+
+ # If this is a default endpoint, check if one already exists for this subdomain
+ if is_default and subdomain:
+ existing_default = EndPoint.objects.filter(
+ scan_history=scan,
+ target_domain=domain,
+ subdomain=subdomain,
+ is_default=True
+ ).first()
+
+ if existing_default:
+ logger.info(f'Default endpoint already exists for subdomain {subdomain}')
+ return existing_default, False
+
+ # Check for existing endpoint with same URL
+ existing_endpoint = EndPoint.objects.filter(
+ scan_history=scan,
+ target_domain=domain,
+ http_url=http_url
+ ).first()
+
+ if existing_endpoint:
+ return existing_endpoint, False
+
+ # Create new endpoint
if crawl:
ctx['track'] = False
- results = http_crawl(
- urls=[http_url],
- ctx=ctx)
- if results:
- endpoint_data = results[0]
- endpoint_id = endpoint_data['endpoint_id']
- created = endpoint_data['endpoint_created']
- endpoint = EndPoint.objects.get(pk=endpoint_id)
- elif not scheme:
- return None, False
- else: # add dumb endpoint without probing it
- scan = ScanHistory.objects.filter(pk=ctx.get('scan_history_id')).first()
- domain = Domain.objects.filter(pk=ctx.get('domain_id')).first()
- if not validators.url(http_url):
+ results = http_crawl(urls=[http_url], ctx=ctx)
+ if not results or results[0]['failed']:
+ logger.error(f'Endpoint for {http_url} does not seem to be up. Skipping.')
return None, False
- http_url = sanitize_url(http_url)
-
- # Try to get the first matching record (prevent duplicate error)
- endpoints = EndPoint.objects.filter(
+
+ endpoint_data = results[0]
+ endpoint = EndPoint.objects.get(pk=endpoint_data['endpoint_id'])
+ endpoint.is_default = is_default
+ endpoint.save()
+ created = endpoint_data['endpoint_created']
+ else:
+ endpoint = EndPoint.objects.create(
scan_history=scan,
target_domain=domain,
http_url=http_url,
+ is_default=is_default,
+ discovered_date=timezone.now(),
**endpoint_data
)
-
- if endpoints.exists():
- endpoint = endpoints.first()
- created = False
- else:
- # No existing record, create a new one
- endpoint = EndPoint.objects.create(
- scan_history=scan,
- target_domain=domain,
- http_url=http_url,
- **endpoint_data
- )
- created = True
-
- if created:
- endpoint.is_default = is_default
- endpoint.discovered_date = timezone.now()
+ created = True
+
+ # Add subscan relation if needed
+ if created and ctx.get('subscan_id'):
+ endpoint.endpoint_subscan_ids.add(ctx.get('subscan_id'))
endpoint.save()
- subscan_id = ctx.get('subscan_id')
- if subscan_id:
- endpoint.endpoint_subscan_ids.add(subscan_id)
- endpoint.save()
-
+
return endpoint, created
@@ -4700,7 +4867,14 @@ def save_subdomain_metadata(subdomain, endpoint, extra_datas={}):
subdomain.cdn_name = extra_datas.get('cdn_name')
for tech in endpoint.techs.all():
subdomain.technologies.add(tech)
- subdomain.save()
+ subdomain.save()
+ else:
+ http_url = extra_datas.get('http_url')
+ if http_url:
+ subdomain.http_url = http_url
+ subdomain.save()
+ else:
+ logger.error(f'No HTTP URL found for {subdomain.name}. Skipping.')
def save_email(email_address, scan_history=None):
if not validators.email(email_address):
@@ -5004,6 +5178,144 @@ def run_gf_list():
'message': str(e)
}
+def get_nmap_http_datas(host, ctx):
+ """Check if port 80 or 443 are opened and get HTTP status code for given hosts.
+
+ Args:
+ host (str): Initial hostname to scan
+ ctx (dict): Context dictionary
+
+ Returns:
+ dict: Dictionary of results per host:
+ {
+ 'host1': {'scheme': 'https', 'ports': [80, 443]},
+ 'host2': {'scheme': 'http', 'ports': [80]}
+ }
+
+ Raises:
+ NmapScanError: If scan fails after max retries
+ """
+ results_dir = ctx.get('results_dir', '/tmp')
+ filename = ctx.get('filename', 'nmap.xml')
+ try:
+ xml_file = SafePath.create_safe_path(
+ base_dir=results_dir,
+ components=[f"{host}_{filename}"],
+ create_dir=False
+ )
+ except (ValueError, OSError) as e:
+ logger.error(f"Failed to create safe path for XML file: {str(e)}")
+ return None
+
+ # Basic nmap scan for HTTP ports only
+ nmap_args = {
+ 'rate_limit': 150,
+ 'nmap_cmd': 'nmap -Pn -p 80,443',
+ 'nmap_script': None,
+ 'nmap_script_args': None,
+ 'ports_data': {host: [80, 443]},
+ }
+
+ # Add retry logic for nmap scan
+ max_retries = 3
+ retry_delay = 2
+
+ for attempt in range(max_retries):
+ try:
+ run_nmap(ctx, **nmap_args)
+ if os.path.exists(xml_file):
+ break
+ logger.warning(f"Attempt {attempt + 1}/{max_retries}: Nmap output file not found, retrying in {retry_delay}s...")
+ time.sleep(retry_delay)
+ except Exception as e:
+ logger.error(f"Attempt {attempt + 1}/{max_retries}: Nmap scan failed: {str(e)}")
+ if attempt == max_retries - 1:
+ logger.error(f"Nmap scan failed after {max_retries} attempts: {str(e)}")
+ return None
+ time.sleep(retry_delay)
+ else:
+ logger.error(f"Failed to generate output file after {max_retries} retries")
+ return None
+
+ # Parse results to get open ports
+ results = parse_nmap_results(xml_file, parse_type='ports')
+ results_str = json.dumps(results, indent=2)
+ logger.debug(f'Raw nmap results: {results_str}')
+
+ # Group results by host using atomic transaction
+ hosts_data = {}
+ with transaction.atomic():
+ for result in results:
+ hostname = result['host']
+ if hostname not in hosts_data:
+ hosts_data[hostname] = {'ports': []}
+
+ if result['state'] == 'open':
+ port = int(result['port'])
+ logger.info(f'Found open port {port} for host {hostname}')
+ if port not in hosts_data[hostname]['ports']:
+ hosts_data[hostname]['ports'].append(port)
+
+ # Determine scheme for each host
+ for hostname, data in hosts_data.items():
+ # Prefer HTTPS over HTTP if both are available
+ if 443 in data['ports']:
+ data['scheme'] = 'https'
+ elif 80 in data['ports']:
+ data['scheme'] = 'http'
+ else:
+ data['scheme'] = None
+ logger.debug(f'Host {hostname} - scheme: {data["scheme"]}, ports: {data["ports"]}')
+
+ return hosts_data
+
+def create_first_endpoint_from_nmap_data(hosts_data, domain, subdomain, ctx):
+ """Create endpoints from Nmap service detection results.
+ Returns the first created endpoint or None if failed."""
+ endpoint = None
+
+ for hostname, data in hosts_data.items():
+ if not data['scheme']:
+ continue
+
+ host_url = f"{data['scheme']}://{hostname}"
+ current_subdomain = subdomain
+ logger.debug(f'Processing HTTP URL: {host_url}')
+
+ # Create subdomain for rDNS hostnames
+ if hostname != domain.name:
+ logger.info(f'Creating subdomain for rDNS hostname: {hostname}')
+ current_subdomain, _ = save_subdomain(hostname, ctx=ctx)
+ if not current_subdomain:
+ logger.warning(f'Could not create subdomain for rDNS hostname: {hostname}. Skipping this host.')
+ continue
+
+ # Create endpoint
+ current_endpoint, _ = save_endpoint(
+ host_url,
+ ctx=ctx,
+ crawl=True,
+ is_default=True,
+ subdomain=current_subdomain
+ )
+
+ # Save metadata for all endpoints (including rDNS)
+ if current_endpoint:
+ save_subdomain_metadata(
+ current_subdomain,
+ current_endpoint,
+ extra_datas={
+ 'http_url': host_url,
+ 'open_ports': data['ports']
+ }
+ )
+
+ # Keep track of first endpoint (prioritizing main domain)
+ if not endpoint or hostname == domain.name:
+ endpoint = current_endpoint
+
+ return endpoint
+
#----------------------#
# Remote debug #
#----------------------#
diff --git a/web/reNgine/utilities.py b/web/reNgine/utilities.py
index d985d2e0..e3608325 100644
--- a/web/reNgine/utilities.py
+++ b/web/reNgine/utilities.py
@@ -1,8 +1,11 @@
import os
-
+import re
+from pathlib import Path
+from typing import List, Union
+from celery.utils.log import get_task_logger, ColorFormatter
from celery._state import get_current_task
-from celery.utils.log import ColorFormatter
+logger = get_task_logger(__name__)
def is_safe_path(basedir, path, follow_symlinks=True):
# Source: https://security.openstack.org/guidelines/dg_using-file-paths.html
@@ -86,3 +89,90 @@ def replace_nulls(obj):
return {key: replace_nulls(value) for key, value in obj.items()}
else:
return obj
+
+
+class SafePath:
+ """Utility class for safe path handling and directory creation."""
+
+ @staticmethod
+ def sanitize_component(component: str) -> str:
+ """Sanitize a path component to prevent directory traversal.
+
+ Args:
+ component (str): Path component to sanitize
+
+ Returns:
+ str: Sanitized path component
+ """
+ # Remove any non-alphanumeric chars except safe ones
+ return re.sub(r'[^a-zA-Z0-9\-\_\.]', '_', str(component))
+
+ @classmethod
+ def create_safe_path(
+ cls,
+ base_dir: Union[str, Path],
+ components: List[str],
+ create_dir: bool = True,
+ mode: int = 0o755
+ ) -> str:
+ """Create a safe path within the base directory.
+
+ Args:
+ base_dir (str|Path): Base directory
+ components (list): List of path components
+ create_dir (bool): Whether to create the directory
+ mode (int): Directory permissions if created
+
+ Returns:
+ str: Safe path object
+
+ Raises:
+ ValueError: If path would be outside base directory
+ OSError: If directory creation fails
+ """
+ try:
+ # Convert to Path objects
+ base_path = Path(base_dir).resolve()
+
+ # Sanitize all components
+ safe_components = [cls.sanitize_component(c) for c in components]
+
+ # Build full path
+ full_path = base_path.joinpath(*safe_components)
+
+ # Resolve to absolute path
+ abs_path = full_path.resolve()
+
+ # Check if path is within base directory
+ if not str(abs_path).startswith(str(base_path)):
+ raise ValueError(
+ f"Invalid path: {abs_path} is outside base directory {base_path}"
+ )
+
+ # Create directory if requested
+ if create_dir:
+ abs_path.mkdir(parents=True, mode=mode, exist_ok=True)
+ logger.debug(f"Created directory: {abs_path}")
+
+ return str(abs_path)
+
+ except Exception as e:
+ logger.error(f"Error creating safe path: {str(e)}")
+ raise
+
+ @classmethod
+ def is_safe_path(cls, base_dir: Union[str, Path], path: Union[str, Path], follow_symlinks: bool = True) -> bool:
+ """Enhanced version of is_safe_path that uses pathlib.
+ Maintains compatibility with existing code while adding more security."""
+ try:
+ base_path = Path(base_dir).resolve()
+ check_path = Path(path)
+
+ if follow_symlinks:
+ check_path = check_path.resolve()
+ else:
+ check_path = check_path.absolute()
+
+ return str(check_path).startswith(str(base_path))
+ except Exception:
+ return False
diff --git a/web/startScan/models.py b/web/startScan/models.py
index f49c310e..a042aedb 100644
--- a/web/startScan/models.py
+++ b/web/startScan/models.py
@@ -575,7 +575,7 @@ class Port(models.Model):
is_uncommon = models.BooleanField(default=False)
def __str__(self):
- return str(self.service_name)
+ return str(self.number)
class DirectoryFile(models.Model):
diff --git a/web/tests/test_data/nmap/basic_scan.xml b/web/tests/test_data/nmap/basic_scan.xml
new file mode 100644
index 00000000..a36e4672
--- /dev/null
+++ b/web/tests/test_data/nmap/basic_scan.xml
@@ -0,0 +1,28 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/web/tests/test_data/nmap/service_scan.xml b/web/tests/test_data/nmap/service_scan.xml
new file mode 100644
index 00000000..f92b7b86
--- /dev/null
+++ b/web/tests/test_data/nmap/service_scan.xml
@@ -0,0 +1,33 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/web/tests/test_data/nmap/vuln_scan.xml b/web/tests/test_data/nmap/vuln_scan.xml
new file mode 100644
index 00000000..fc4abea7
--- /dev/null
+++ b/web/tests/test_data/nmap/vuln_scan.xml
@@ -0,0 +1,38 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/web/tests/test_nmap.py b/web/tests/test_nmap.py
index 4091efc7..67092b85 100644
--- a/web/tests/test_nmap.py
+++ b/web/tests/test_nmap.py
@@ -2,6 +2,8 @@
import os
import unittest
import pathlib
+import pytest
+from pathlib import Path
os.environ['RENGINE_SECRET_KEY'] = 'secret'
os.environ['CELERY_ALWAYS_EAGER'] = 'True'
@@ -31,10 +33,39 @@ def setUp(self):
self.nmap_vulscan_multiple_xml
]
- def test_nmap_parse(self):
- for xml_file in self.all_xml:
- vulns = parse_nmap_results(self.nmap_vuln_single_xml)
- self.assertGreater(len(vulns), 0) # Fixed to use len(vulns)
+ @pytest.mark.parametrize("xml_file", [
+ "web/tests/test_data/nmap/basic_scan.xml",
+ "web/tests/test_data/nmap/service_scan.xml",
+ "web/tests/test_data/nmap/vuln_scan.xml"
+ ])
+ def test_nmap_parse_vulnerabilities(self, xml_file):
+ """Test parsing vulnerabilities from Nmap XML output files."""
+ # Arrange
+ xml_path = Path(xml_file)
+ assert xml_path.exists(), f"Test file {xml_file} not found"
+
+ # Act
+ vulns = parse_nmap_results(xml_path, parse_type='vulnerabilities')
+
+ # Assert
+ assert len(vulns) > 0, f"No vulnerabilities found in {xml_file}"
+
+ @pytest.mark.parametrize("xml_file", [
+ "web/tests/test_data/nmap/basic_scan.xml",
+ "web/tests/test_data/nmap/service_scan.xml",
+ "web/tests/test_data/nmap/vuln_scan.xml"
+ ])
+ def test_nmap_parse_ports(self, xml_file):
+ """Test parsing ports from Nmap XML output files."""
+ # Arrange
+ xml_path = Path(xml_file)
+ assert xml_path.exists(), f"Test file {xml_file} not found"
+
+ # Act
+ ports = parse_nmap_results(xml_path, parse_type='ports')
+
+ # Assert
+ assert len(ports) > 0, f"No ports found in {xml_file}"
def test_nmap_vuln_single(self):
pass