Skip to content

Commit

Permalink
Generic handling of backend-specific options
Browse files Browse the repository at this point in the history
* removed backend-specific --min-time and --max-time options.
* added --backend-option (-O) parameter for specification of backend options.
* Updated to latest pySigma
  • Loading branch information
thomaspatzke committed Feb 20, 2023
1 parent 4046d80 commit 8039870
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 19 deletions.
6 changes: 3 additions & 3 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "sigma-cli"
version = "0.6.1"
version = "0.7.0"
description = "Sigma Command Line Interface (conversion, check etc.) based on pySigma"
authors = ["Thomas Patzke <thomas@patzke.org>"]
license = "LGPL-2.1-or-later"
Expand Down
61 changes: 48 additions & 13 deletions sigma/cli/convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import json
import pathlib
import textwrap
from typing import Any, Optional
import click

from sigma.conversion.base import Backend
Expand All @@ -15,6 +16,24 @@
backends = plugins.backends
pipelines = plugins.pipelines

class KeyValueParamType(click.ParamType):
"""
key=value type for backend-specific options.
"""
def convert(self, value, param, ctx):
if not isinstance(value, str):
self.fail(f"Value must be a string with format key=value", param, ctx)
try:
k, v = value.split("=", 1)
except ValueError:
self.fail(f"Value '{value}' has not format key=value", param, ctx)

try:
return { k: int(v) }
except ValueError:
return { k: v }


@click.command()
@click.option(
"--target", "-t",
Expand Down Expand Up @@ -76,20 +95,19 @@
help="Pretty-print and indent JSON output with given indentation width per level."
)
@click.option(
"--min-time",
help="Minimal search time in backend-specific format. Must be supported by backend and output format."
)
@click.option(
"--max-time",
help="Maximal search time in backend-specific format. Must be supported by backend and output format."
"--backend-option",
"-O",
type=KeyValueParamType(),
multiple=True,
help="Backend-specific options provided as key=value pair.",
)
@click.argument(
"input",
nargs=-1,
required=True,
type=click.Path(exists=True, path_type=pathlib.Path),
)
def convert(target, pipeline, without_pipeline, pipeline_check, format, skip_unsupported, min_time, max_time, output, encoding, json_indent, input, file_pattern):
def convert(target, pipeline, without_pipeline, pipeline_check, format, skip_unsupported, output, encoding, json_indent, backend_option, input, file_pattern):
"""
Convert Sigma rules into queries. INPUT can be multiple files or directories. This command automatically recurses
into directories and converts all files matching the pattern in --file-pattern.
Expand Down Expand Up @@ -129,15 +147,32 @@ def convert(target, pipeline, without_pipeline, pipeline_check, format, skip_uns
check with --disable-pipeline-check.
"""))

# Merge backend options: multiple occurences of a key result in array of values
backend_options = dict()
for option in backend_option:
for k, v in option.items():
backend_options.setdefault(k, list()).append(v)
backend_options = {
k: (
v[0] # if there's only one item, return it.
if len(v) == 1
else v
)
for k, v in backend_options.items()
}

# Initialize processing pipeline and backend
backend_class = backends[target]
processing_pipeline = pipeline_resolver.resolve(pipeline)
backend : Backend = backend_class(
processing_pipeline=processing_pipeline,
collect_errors=skip_unsupported,
min_time=min_time,
max_time=max_time,
)
try:
backend : Backend = backend_class(
processing_pipeline=processing_pipeline,
collect_errors=skip_unsupported,
**backend_options
)
except TypeError as e:
param = str(e).split("'")[1]
raise click.BadParameter(f"Parameter '{param}' is not supported by backend '{target}'.", param_hint="backend_option")

if format not in backends[target].formats.keys():
raise click.BadParameter(f"Output format '{format}' is not supported by backend '{target}'.", param_hint="format")
Expand Down
31 changes: 29 additions & 2 deletions tests/test_convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,5 +68,32 @@ def test_convert_wrong_pipeline():
def test_yml_pipeline_doesnt_trigger_wrong_pipeline():
cli = CliRunner()
result = cli.invoke(convert, ["-t", "test", "-p", "test", "-p", "tests/files/custom_pipeline.yml", "tests/files/valid"])
print(result.stdout)
assert "some_other_string" in result.stdout
assert "some_other_string" in result.stdout

def test_backend_option_invalid_format():
cli = CliRunner()
result = cli.invoke(convert, ["-t", "test", "-O", "invalid", "tests/files/valid"])
assert result.exit_code != 0
assert "not format key=value" in result.stdout

def test_backend_option_invalid_type():
cli = CliRunner()
result = cli.invoke(convert, ["-t", "test", "-O", 123, "tests/files/valid"])
assert result.exit_code != 0
assert "must be a string" in result.stdout

def test_backend_option_unknown_by_backend():
cli = CliRunner()
result = cli.invoke(convert, ["-t", "test", "-O", "unknown=parameter", "tests/files/valid"])
assert result.exit_code != 0
assert "Parameter 'unknown' is not supported" in result.stdout

def test_convert_output_backend_option():
cli = CliRunner()
result = cli.invoke(convert, ["-t", "test", "-f", "list_of_dict", "-O", "testparam=testvalue", "tests/files/valid"])
assert 'testvalue' in result.stdout

def test_convert_output_backend_option_list():
cli = CliRunner()
result = cli.invoke(convert, ["-t", "test", "-f", "list_of_dict", "-O", "testparam=123", "-O", "testparam=test", "tests/files/valid"])
assert '[123, "test"]' in result.stdout

0 comments on commit 8039870

Please sign in to comment.