Skip to content

Commit

Permalink
Added update-packs command to check whether packs have all detections…
Browse files Browse the repository at this point in the history
… it should have
  • Loading branch information
melenevskyi committed Dec 12, 2023
1 parent 2996ba0 commit b8854ca
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 21 deletions.
10 changes: 10 additions & 0 deletions panther_analysis_tool/analysis_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,16 @@ def __init__(self, file_name: str, dir_name: str, analysis_spec: Dict[str, Any])
self.dir_name = dir_name
self.analysis_spec = analysis_spec

def is_deprecated(self) -> bool:
display_name = self.analysis_spec["DisplayName"]
description = self.analysis_spec.get("Description", "")
if "deprecated" in display_name.lower() or "deprecated" in description.lower():
return True

if "deprecated" in self.analysis_spec.get("Tags", []):
return True
return False


@dataclasses.dataclass
class ClassifiedAnalysisContainer:
Expand Down
98 changes: 77 additions & 21 deletions panther_analysis_tool/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -680,30 +680,17 @@ def upload_assets_github(upload_url: str, headers: dict, release_dir: str) -> in
return return_code


# pylint: disable=too-many-locals
def test_analysis(
args: argparse.Namespace, backend: typing.Optional[BackendClient] = None
) -> Tuple[int, list]:
"""Imports each policy or rule and runs their tests.
Args:
args: The populated Argparse namespace with parsed command-line arguments.
Returns:
A tuple of the return code, and a list of tuples containing invalid specs and their error.
"""
logging.info("Testing analysis items in %s\n", args.path)

ignored_files = args.ignore_files
search_directories = [args.path]

def load_analysis(
path: str, ignore_table_names: bool, valid_table_names: List[str]
) -> Tuple[Any, List[Any]]:
search_directories = [path]
for directory in (
HELPERS_LOCATION,
"." + HELPERS_LOCATION, # Try the parent directory as well
DATA_MODEL_LOCATION,
"." + DATA_MODEL_LOCATION, # Try the parent directory as well
):
absolute_dir_path = os.path.abspath(os.path.join(args.path, directory))
absolute_dir_path = os.path.abspath(os.path.join(path, directory))
absolute_helper_path = os.path.abspath(directory)

if os.path.exists(absolute_dir_path):
Expand All @@ -713,11 +700,30 @@ def test_analysis(

# First classify each file, always include globals and data models location
specs, invalid_specs = classify_analysis(
list(load_analysis_specs(search_directories, ignore_files=ignored_files)),
ignore_table_names=args.ignore_table_names,
valid_table_names=args.valid_table_names,
list(load_analysis_specs(search_directories, ignore_files=[])),
ignore_table_names=ignore_table_names,
valid_table_names=valid_table_names,
)

return specs, invalid_specs


# pylint: disable=too-many-locals
def test_analysis(
args: argparse.Namespace, backend: typing.Optional[BackendClient] = None
) -> Tuple[int, list]:
"""Imports each policy or rule and runs their tests.
Args:
args: The populated Argparse namespace with parsed command-line arguments.
Returns:
A tuple of the return code, and a list of tuples containing invalid specs and their error.
"""
logging.info("Testing analysis items in %s\n", args.path)

# First classify each file, always include globals and data models location
specs, invalid_specs = load_analysis(args.path, args.ignore_table_names, args.valid_table_names)
if specs.empty():
if invalid_specs:
return 1, invalid_specs
Expand Down Expand Up @@ -1262,6 +1268,47 @@ def enrich_test_data(backend: BackendClient, args: argparse.Namespace) -> Tuple[
return (0, result_str)


def check_packs(args: argparse.Namespace) -> Tuple[int, str]:
specs, _ = load_analysis(args.path, False, [])

analysis_type_to_key_mapping = {
AnalysisTypes.POLICY: "PolicyID",
AnalysisTypes.RULE: "RuleID",
AnalysisTypes.SCHEDULED_RULE: "RuleID",
}
packs_with_missing_detections = {}
for pack in specs.packs:
pack_name = pack.file_name.replace(".yml", "").split("/")[-1]
included_rules = []
detections = [detection for detection in specs.detections if not detection.is_deprecated()]
for detection in detections:
# remove leading ./
# ./some-dir -> some-dir
dir_name = detection.dir_name.strip("./")

# rules/asana_rules/asana_service_account_created -> [rules, asana_rules, asana_service_account_created]
# if pack name is "asana" we can assume that the detection is part of the pack
path_to_detection = detection.file_name[detection.file_name.find(dir_name) :]
pieces = path_to_detection.split("/")

matching_pieces = [piece.startswith(pack_name) for piece in pieces]
if any(matching_pieces):
key = analysis_type_to_key_mapping[detection.analysis_spec["AnalysisType"]]
included_rules.append(detection.analysis_spec[key])

diff = set(included_rules).difference(set(pack.analysis_spec["PackDefinition"]["IDs"]))
if diff:
packs_with_missing_detections[pack_name] = list(diff)

if packs_with_missing_detections:
error_string = "There are packs that are potentially missing detections:\n"
for pack_name, detections in packs_with_missing_detections.items():
detections_str = ",".join(detections)
error_string += f"{pack_name}.yml: {detections_str}\n\n"
return 1, error_string
return 0, "Looks like packs are up to date"


def lookup_analysis_id(analysis_spec: Any, analysis_type: str) -> str:
analysis_id = "UNKNOWN_ID"
if analysis_type == AnalysisTypes.DATA_MODEL:
Expand Down Expand Up @@ -1962,6 +2009,15 @@ def setup_parser() -> argparse.ArgumentParser:
enrich_test_data_parser.add_argument(valid_table_names_name, **valid_table_names_arg)
enrich_test_data_parser.set_defaults(func=pat_utils.func_with_backend(enrich_test_data))

check_packs_parser = subparsers.add_parser(
"check-packs",
help="Update Packs content, rules and policies",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
check_packs_parser.add_argument(path_name, **path_arg)
check_packs_parser.set_defaults(func=check_packs)
standard_args.for_public_api(check_packs_parser, required=False)

return parser


Expand Down

0 comments on commit b8854ca

Please sign in to comment.