diff --git a/contentctl/actions/inspect.py b/contentctl/actions/inspect.py index e5ebcb06..4a59b135 100644 --- a/contentctl/actions/inspect.py +++ b/contentctl/actions/inspect.py @@ -1,23 +1,45 @@ -import sys -from dataclasses import dataclass -import pathlib -import json import datetime -import timeit +import json +import pathlib +import sys import time +import timeit +from dataclasses import dataclass +from io import BufferedReader -from requests import Session, post, get +from requests import Session, get, post from requests.auth import HTTPBasicAuth from contentctl.objects.config import inspect -from contentctl.objects.savedsearches_conf import SavedsearchesConf from contentctl.objects.errors import ( - MetadataValidationError, DetectionIDError, DetectionMissingError, - VersionDecrementedError, + MetadataValidationError, VersionBumpingError, + VersionDecrementedError, ) +from contentctl.objects.savedsearches_conf import SavedsearchesConf + +""" +The following list includes all appinspect tags available from: +https://dev.splunk.com/enterprise/reference/appinspect/appinspecttagreference/ + +This allows contentctl to be as forward-leaning as possible in catching +any potential issues on the widest variety of stacks. +""" +INCLUDED_TAGS_LIST = [ + "aarch64_compatibility", + "ast", + "cloud", + "future", + "manual", + "packaging_standards", + "private_app", + "private_classic", + "private_victoria", + "splunk_appinspect", +] +INCLUDED_TAGS_STRING = ",".join(INCLUDED_TAGS_LIST) @dataclass(frozen=True) @@ -28,7 +50,6 @@ class InspectInputDto: class Inspect: def execute(self, config: inspect) -> str: if config.build_app or config.build_api: - self.inspectAppCLI(config) appinspect_token = self.inspectAppAPI(config) if config.enable_metadata_validation: @@ -49,10 +70,6 @@ def inspectAppAPI(self, config: inspect) -> str: session.auth = HTTPBasicAuth( config.splunk_api_username, config.splunk_api_password ) - if config.stack_type not in ["victoria", "classic"]: - raise Exception( - f"stack_type MUST be either 'classic' or 'victoria', NOT '{config.stack_type}'" - ) APPINSPECT_API_LOGIN = "https://api.splunk.com/2.0/rest/login/splunk" @@ -64,10 +81,6 @@ def inspectAppAPI(self, config: inspect) -> str: APPINSPECT_API_VALIDATION_REQUEST = ( "https://appinspect.splunk.com/v1/app/validate" ) - headers = { - "Authorization": f"bearer {authorization_bearer}", - "Cache-Control": "no-cache", - } package_path = config.getPackageFilePath(include_version=False) if not package_path.is_file(): @@ -77,18 +90,43 @@ def inspectAppAPI(self, config: inspect) -> str: "trying to 'contentctl deploy_acs' the package BEFORE running 'contentctl build'?" ) - files = { + """ + Some documentation on "files" argument for requests.post exists here: + https://docs.python-requests.org/en/latest/api/ + The type (None, INCLUDED_TAGS_STRING) is intentional, and the None is important. + In curl syntax, the request we make below is equivalent to + curl -X POST \ + -H "Authorization: bearer " \ + -H "Cache-Control: no-cache" \ + -F "app_package=@" \ + -F "included_tags=cloud" \ + --url "https://appinspect.splunk.com/v1/app/validate" + + This is confirmed by the great resource: + https://curlconverter.com/ + """ + data: dict[str, tuple[None, str] | BufferedReader] = { "app_package": open(package_path, "rb"), - "included_tags": (None, "cloud"), + "included_tags": ( + None, + INCLUDED_TAGS_STRING, + ), # tuple with None is intentional here } - res = post(APPINSPECT_API_VALIDATION_REQUEST, headers=headers, files=files) + headers = { + "Authorization": f"bearer {authorization_bearer}", + "Cache-Control": "no-cache", + } + + res = post(APPINSPECT_API_VALIDATION_REQUEST, files=data, headers=headers) res.raise_for_status() request_id = res.json().get("request_id", None) - APPINSPECT_API_VALIDATION_STATUS = f"https://appinspect.splunk.com/v1/app/validate/status/{request_id}?included_tags=private_{config.stack_type}" - headers = headers = {"Authorization": f"bearer {authorization_bearer}"} + APPINSPECT_API_VALIDATION_STATUS = ( + f"https://appinspect.splunk.com/v1/app/validate/status/{request_id}" + ) + startTime = timeit.default_timer() # the first time, wait for 40 seconds. subsequent times, wait for less. # this is because appinspect takes some time to return, so there is no sense @@ -114,7 +152,9 @@ def inspectAppAPI(self, config: inspect) -> str: raise Exception(f"Error - Unknown Appinspect API status '{status}'") # We have finished running appinspect, so get the report - APPINSPECT_API_REPORT = f"https://appinspect.splunk.com/v1/app/report/{request_id}?included_tags=private_{config.stack_type}" + APPINSPECT_API_REPORT = ( + f"https://appinspect.splunk.com/v1/app/report/{request_id}" + ) # Get human-readable HTML report headers = headers = { "Authorization": f"bearer {authorization_bearer}", @@ -159,14 +199,14 @@ def inspectAppCLI(self, config: inspect) -> None: "\t - https://dev.splunk.com/enterprise/docs/developapps/testvalidate/appinspect/useappinspectclitool/" ) from splunk_appinspect.main import ( - validate, - MODE_OPTION, APP_PACKAGE_ARGUMENT, - OUTPUT_FILE_OPTION, - LOG_FILE_OPTION, - INCLUDED_TAGS_OPTION, EXCLUDED_TAGS_OPTION, + INCLUDED_TAGS_OPTION, + LOG_FILE_OPTION, + MODE_OPTION, + OUTPUT_FILE_OPTION, TEST_MODE, + validate, ) except Exception as e: print(e) diff --git a/contentctl/contentctl.py b/contentctl/contentctl.py index d5864eb7..2541dd5d 100644 --- a/contentctl/contentctl.py +++ b/contentctl/contentctl.py @@ -1,7 +1,9 @@ import pathlib +import random import sys import traceback import warnings +from dataclasses import dataclass import tyro @@ -155,6 +157,35 @@ def test_common_func(config: test_common): """ +def get_random_compliment(): + compliments = [ + "Your detection rules are like a zero-day shield! 🛡️", + "You catch threats like it's child's play! 🎯", + "Your correlation rules are pure genius! 🧠", + "Threat actors fear your detection engineering! ⚔️", + "You're the SOC's secret weapon! 🦾", + "Your false positive rate is impressively low! 📊", + "Malware trembles at your detection logic! 🦠", + "You're the threat hunter extraordinaire! 🔍", + "Your MITRE mappings are a work of art! 🎨", + "APTs have nightmares about your detections! 👻", + "Your content testing is bulletproof! 🎯", + "You're the detection engineering MVP! 🏆", + ] + return random.choice(compliments) + + +def recognize_func(): + print(get_random_compliment()) + + +@dataclass +class RecognizeCommand: + """Dummy subcommand for 'recognize' with no parameters.""" + + pass + + def main(): print(CONTENTCTL_5_WARNING) try: @@ -210,6 +241,7 @@ def main(): "test_servers": test_servers.model_construct(**t.__dict__), "release_notes": release_notes.model_construct(**config_obj), "deploy_acs": deploy_acs.model_construct(**t.__dict__), + "recognize": RecognizeCommand(), } ) @@ -240,6 +272,8 @@ def main(): deploy_acs_func(updated_config) elif type(config) is test or type(config) is test_servers: test_common_func(config) + elif type(config) is RecognizeCommand: + recognize_func() else: raise Exception(f"Unknown command line type '{type(config).__name__}'") except FileNotFoundError as e: diff --git a/contentctl/objects/abstract_security_content_objects/detection_abstract.py b/contentctl/objects/abstract_security_content_objects/detection_abstract.py index 1231548c..fc0505ce 100644 --- a/contentctl/objects/abstract_security_content_objects/detection_abstract.py +++ b/contentctl/objects/abstract_security_content_objects/detection_abstract.py @@ -383,21 +383,17 @@ def providing_technologies(self) -> List[ProvidingTechnology]: @computed_field @property def risk(self) -> list[dict[str, Any]]: - risk_objects: list[dict[str, str | int]] = [] - - for entity in self.rba.risk_objects: - risk_object: dict[str, str | int] = dict() - risk_object["risk_object_type"] = entity.type - risk_object["risk_object_field"] = entity.field - risk_object["risk_score"] = entity.score - risk_objects.append(risk_object) - - for entity in self.rba.threat_objects: - threat_object: dict[str, str] = dict() - threat_object["threat_object_field"] = entity.field - threat_object["threat_object_type"] = entity.type - risk_objects.append(threat_object) - return risk_objects + if self.rba is None: + raise Exception( + f"Attempting to serialize rba section of [{self.name}], however RBA section is None" + ) + """ + action.risk.param._risk + of the conf file only contains a list of dicts. We do not eant to + include the message here, so we do not return it. + """ + rba_dict = self.rba.model_dump() + return rba_dict["risk_objects"] + rba_dict["threat_objects"] @computed_field @property diff --git a/contentctl/objects/config.py b/contentctl/objects/config.py index e72a60ec..ac6cef78 100644 --- a/contentctl/objects/config.py +++ b/contentctl/objects/config.py @@ -425,7 +425,6 @@ class inspect(build): "enforcement (defaults to the latest release of the app published on Splunkbase)." ), ) - stack_type: StackType = Field(description="The type of your Splunk Cloud Stack") @field_validator("enrichments", mode="after") @classmethod @@ -496,6 +495,9 @@ class new(Config_Base): class deploy_acs(inspect): model_config = ConfigDict(validate_default=False, arbitrary_types_allowed=True) + + stack_type: StackType = Field(description="The type of your Splunk Cloud Stack") + # ignore linter error splunk_cloud_jwt_token: str = Field( exclude=True, diff --git a/contentctl/objects/rba.py b/contentctl/objects/rba.py index d33da47c..a63c043e 100644 --- a/contentctl/objects/rba.py +++ b/contentctl/objects/rba.py @@ -1,9 +1,12 @@ -from enum import Enum -from pydantic import BaseModel, computed_field, Field +from __future__ import annotations + from abc import ABC -from typing import Set, Annotated -from contentctl.objects.enums import RiskSeverity +from enum import Enum +from typing import Annotated, Set +from pydantic import BaseModel, Field, computed_field, model_serializer + +from contentctl.objects.enums import RiskSeverity RiskScoreValue_Type = Annotated[int, Field(ge=1, le=100)] @@ -51,6 +54,28 @@ class RiskObject(BaseModel): def __hash__(self): return hash((self.field, self.type, self.score)) + def __lt__(self, other: RiskObject) -> bool: + if ( + f"{self.field}{self.type}{self.score}" + < f"{other.field}{other.type}{other.score}" + ): + return True + return False + + @model_serializer + def serialize_risk_object(self) -> dict[str, str | int]: + """ + We define this explicitly for two reasons, even though the automatic + serialization works correctly. First we want to enforce a specific + field order for reasons of readability. Second, some of the fields + actually have different names than they do in the object. + """ + return { + "risk_object_field": self.field, + "risk_object_type": self.type, + "risk_score": self.score, + } + class ThreatObject(BaseModel): field: str @@ -59,6 +84,24 @@ class ThreatObject(BaseModel): def __hash__(self): return hash((self.field, self.type)) + def __lt__(self, other: ThreatObject) -> bool: + if f"{self.field}{self.type}" < f"{other.field}{other.type}": + return True + return False + + @model_serializer + def serialize_threat_object(self) -> dict[str, str]: + """ + We define this explicitly for two reasons, even though the automatic + serialization works correctly. First we want to enforce a specific + field order for reasons of readability. Second, some of the fields + actually have different names than they do in the object. + """ + return { + "threat_object_field": self.field, + "threat_object_type": self.type, + } + class RBAObject(BaseModel, ABC): message: str @@ -94,3 +137,11 @@ def severity(self) -> RiskSeverity: raise Exception( f"Error getting severity - risk_score must be between 0-100, but was actually {self.risk_score}" ) + + @model_serializer + def serialize_rba(self) -> dict[str, str | list[dict[str, str | int]]]: + return { + "message": self.message, + "risk_objects": [obj.model_dump() for obj in sorted(self.risk_objects)], + "threat_objects": [obj.model_dump() for obj in sorted(self.threat_objects)], + } diff --git a/pyproject.toml b/pyproject.toml index 3c478963..6c1cc8d6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "contentctl" -version = "5.0.0" +version = "5.0.1" description = "Splunk Content Control Tool" authors = ["STRT "]