From f8730dd7a40d9bc9e367c7f1d7e65e57a31d9543 Mon Sep 17 00:00:00 2001 From: pyth0n1c Date: Fri, 7 Feb 2025 13:53:20 -0800 Subject: [PATCH] draft for more explicit info on security content object mapping failure --- contentctl/input/director.py | 44 +++++++-------- .../security_content_object_abstract.py | 53 +++++++++++++------ 2 files changed, 59 insertions(+), 38 deletions(-) diff --git a/contentctl/input/director.py b/contentctl/input/director.py index 1b637101..e382b1f5 100644 --- a/contentctl/input/director.py +++ b/contentctl/input/director.py @@ -1,30 +1,29 @@ import os import sys -from pathlib import Path from dataclasses import dataclass, field -from pydantic import ValidationError +from pathlib import Path from uuid import UUID -from contentctl.input.yml_reader import YmlReader -from contentctl.objects.detection import Detection -from contentctl.objects.story import Story +from pydantic import ValidationError -from contentctl.objects.baseline import Baseline -from contentctl.objects.investigation import Investigation -from contentctl.objects.playbook import Playbook -from contentctl.objects.deployment import Deployment -from contentctl.objects.macro import Macro -from contentctl.objects.lookup import LookupAdapter, Lookup -from contentctl.objects.atomic import AtomicEnrichment -from contentctl.objects.security_content_object import SecurityContentObject -from contentctl.objects.data_source import DataSource -from contentctl.objects.dashboard import Dashboard from contentctl.enrichments.attack_enrichment import AttackEnrichment from contentctl.enrichments.cve_enrichment import CveEnrichment - +from contentctl.helper.utils import Utils +from contentctl.input.yml_reader import YmlReader +from contentctl.objects.atomic import AtomicEnrichment +from contentctl.objects.baseline import Baseline from contentctl.objects.config import validate +from contentctl.objects.dashboard import Dashboard +from contentctl.objects.data_source import DataSource +from contentctl.objects.deployment import Deployment +from contentctl.objects.detection import Detection from contentctl.objects.enums import SecurityContentType -from contentctl.helper.utils import Utils +from contentctl.objects.investigation import Investigation +from contentctl.objects.lookup import Lookup, LookupAdapter +from contentctl.objects.macro import Macro +from contentctl.objects.playbook import Playbook +from contentctl.objects.security_content_object import SecurityContentObject +from contentctl.objects.story import Story @dataclass @@ -117,15 +116,16 @@ def execute(self, input_dto: validate) -> None: MISSING_SOURCES, ) - if len(MISSING_SOURCES) > 0: - missing_sources_string = "\n 🟡 ".join(sorted(list(MISSING_SOURCES))) + try: + DataSource.mapNamesToSecurityContentObjects( + list(MISSING_SOURCES), self.output_dto + ) + except Exception as e: print( "WARNING: The following data_sources have been used in detections, but are not yet defined.\n" "This is not yet an error since not all data_sources have been defined, but will be convered to an error soon:\n 🟡 " - f"{missing_sources_string}" + f"{e}" ) - else: - print("No missing data_sources!") def createSecurityContent(self, contentType: SecurityContentType) -> None: if contentType in [ diff --git a/contentctl/objects/abstract_security_content_objects/security_content_object_abstract.py b/contentctl/objects/abstract_security_content_objects/security_content_object_abstract.py index f43c380c..143837a0 100644 --- a/contentctl/objects/abstract_security_content_objects/security_content_object_abstract.py +++ b/contentctl/objects/abstract_security_content_objects/security_content_object_abstract.py @@ -12,6 +12,7 @@ import pathlib import pprint import uuid +from difflib import get_close_matches from functools import cached_property from typing import List, Optional, Tuple, Union @@ -160,16 +161,18 @@ def getReferencesListForJson(self) -> List[str]: def mapNamesToSecurityContentObjects( cls, v: list[str], director: Union[DirectorOutputDto, None] ) -> list[Self]: - if director is not None: - name_map = director.name_to_content_map - else: - name_map = {} + if director is None: + raise Exception( + "Direction was 'None' when passed to " + "'mapNamesToSecurityContentObjects'. This is " + "an error in the contentctl codebase which must be resolved." + ) mappedObjects: list[Self] = [] mistyped_objects: list[SecurityContentObject_Abstract] = [] missing_objects: list[str] = [] for object_name in v: - found_object = name_map.get(object_name, None) + found_object = director.name_to_content_map.get(object_name, None) if not found_object: missing_objects.append(object_name) elif not isinstance(found_object, cls): @@ -178,22 +181,40 @@ def mapNamesToSecurityContentObjects( mappedObjects.append(found_object) errors: list[str] = [] - if len(missing_objects) > 0: + for missing_object in missing_objects: + if missing_object.endswith("_filter"): + # Most filter macros are defined as empty at runtime, so we do not + # want to make any suggestions. It is time consuming and not helpful + # to make these suggestions, so we just skip them in this check. + continue + matches = get_close_matches( + missing_object, + director.name_to_content_map.keys(), + n=3, + ) + if matches == []: + matches = ["NO SUGGESTIONS"] + + matches_string = ", ".join(matches) errors.append( - f"Failed to find the following '{cls.__name__}': {missing_objects}" + f"Unable to find: {missing_object}\n Suggestions: {matches_string}" + ) + + for mistyped_object in mistyped_objects: + matches = get_close_matches( + mistyped_object.name, director.name_to_content_map.keys(), n=3 + ) + + errors.append( + f"'{mistyped_object.name}' expected to have type '{cls.__name__}', but actually " + f"had type '{type(mistyped_object).__name__}'" ) - if len(mistyped_objects) > 0: - for mistyped_object in mistyped_objects: - errors.append( - f"'{mistyped_object.name}' expected to have type '{cls}', but actually " - f"had type '{type(mistyped_object)}'" - ) if len(errors) > 0: - error_string = "\n - ".join(errors) + error_string = "\n\n - ".join(errors) raise ValueError( - f"Found {len(errors)} issues when resolving references Security Content Object " - f"names:\n - {error_string}" + f"Found {len(errors)} issues when resolving references to '{cls.__name__}' objects:\n" + f" - {error_string}" ) # Sort all objects sorted by name