Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rewrite unittests for JsonConverterHandler and StreamPrinterEventHandler. #1355

Open
wants to merge 2 commits into
base: development
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 84 additions & 32 deletions aecid-testsuite/unit/events/JsonConverterHandlerTest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,48 +2,100 @@
import unittest
from aminer.events.JsonConverterHandler import JsonConverterHandler
from aminer.input.LogAtom import LogAtom
from aminer.parsing.MatchContext import MatchContext
from aminer.parsing.FixedDataModelElement import FixedDataModelElement
from aminer.parsing.ParserMatch import ParserMatch
from unit.TestBase import TestBase
from unit.TestBase import TestBase, DummyFixedDataModelElement, DummyMatchContext


class JsonConverterHandlerTest(TestBase):
"""Unittests for the JsonConverterHandler."""

maxDiff = None
resource_name = b"testresource"
output_logline = True
match_context = MatchContext(b' pid=')
fixed_dme = FixedDataModelElement('s1', b' pid=')
match_element = fixed_dme.get_match_element("match", match_context)
t = time.time()

test_detector = 'Analysis.TestDetector'
event_message = 'An event happened!'
sorted_log_lines = ['Event happend at /path/ 5 times.', '', '', '', '']
persistence_id = 'Default'
description = 'jsonConverterHandlerDescription'
expected_string = '{\n "AnalysisComponent": {\n "AnalysisComponentIdentifier": 0,\n' \
' "AnalysisComponentType": "%s",\n "AnalysisComponentName": "%s",\n "Message": "%s",\n' \
' "PersistenceFileName": "%s",\n "AffectedParserPaths": [\n "test/path/1",\n' \
' "test/path/2"\n ],\n "LogResource": "testresource"\n },\n "LogData": {\n "RawLogData": [\n " pid="\n ],\n ' \
'"Timestamps": [\n %s\n ],\n "DetectionTimestamp": %s,\n "LogLinesCount": 5,\n' \
' "AnnotatedMatchElement": {\n "match/s1": " pid="\n }\n }%s\n}\n'

def test1receive_expected_event(self):
"""In this test case a normal Event happens and the json output should be sent to a StreamPrinterEventHandler."""
json_converter_handler = JsonConverterHandler([self.stream_printer_event_handler], self.analysis_context)
log_atom = LogAtom(self.fixed_dme.fixed_data, ParserMatch(self.match_element), self.t, self)
self.analysis_context.register_component(self, self.description)
event_data = {'AnalysisComponent': {'AffectedParserPaths': ['test/path/1', 'test/path/2']}}
json_converter_handler.receive_event(self.test_detector, self.event_message, self.sorted_log_lines, event_data, log_atom, self)
resource_name = b"testresource"
persistence_id = "Default"

def test1receive_event(self):
"""Test if events are processed correctly and that edge cases are caught in exceptions."""
match_context = DummyMatchContext(b" pid=")
fdme = DummyFixedDataModelElement("s1", b" pid=")
match_element = fdme.get_match_element("match", match_context)
t = time.time()

test = "Analysis.TestDetector"
event_message = "An event happened!"
sorted_log_lines = ["Event happened at /path/ 5 times.", "", "", "", ""]
description = "jsonConverterHandlerDescription"
expected_string = '{\n "AnalysisComponent": {\n "AnalysisComponentIdentifier": 0,\n "AnalysisComponentType": "%s",\n ' \
'"AnalysisComponentName": "%s",\n "Message": "%s",\n "PersistenceFileName": "%s",\n "AffectedParserPaths":' \
' [\n "test/path/1",\n "test/path/2"\n ],\n "LogResource": "testresource"\n },\n "LogData": ' \
'{\n "RawLogData": [\n " pid="\n ],\n "Timestamps": [\n %s\n ],\n "DetectionTimestamp":' \
' %s,\n "LogLinesCount": 5,\n "AnnotatedMatchElement": {\n "match/s1": " pid="\n }\n }%s\n}\n'

jch = JsonConverterHandler([self.stream_printer_event_handler], self.analysis_context)
log_atom = LogAtom(fdme.data, ParserMatch(match_element), t, self)
self.analysis_context.register_component(self, description)
event_data = {"AnalysisComponent": {"AffectedParserPaths": ["test/path/1", "test/path/2"]}}
jch.receive_event(test, event_message, sorted_log_lines, event_data, log_atom, self)
detection_timestamp = None
for line in self.output_stream.getvalue().split('\n'):
for line in self.output_stream.getvalue().split("\n"):
if "DetectionTimestamp" in line:
detection_timestamp = line.split(':')[1].strip(' ,')
self.assertEqual(self.output_stream.getvalue(), self.expected_string % (
self.__class__.__name__, self.description, self.event_message, self.persistence_id, round(self.t, 2), detection_timestamp, ""))
detection_timestamp = line.split(":")[1].strip(" ,")
self.assertEqual(self.output_stream.getvalue(), expected_string % (self.__class__.__name__, description, event_message, self.persistence_id, round(t, 2), detection_timestamp, ""))
self.reset_output_stream()

# test output_event_handlers
self.output_event_handlers = []
self.assertTrue(jch.receive_event(test, event_message, sorted_log_lines, event_data, log_atom, self))
self.assertEqual(self.output_stream.getvalue(), "")

self.output_event_handlers = [jch]
self.assertTrue(jch.receive_event(test, event_message, sorted_log_lines, event_data, log_atom, self))
val = self.output_stream.getvalue()
if val.endswith("\n\n"):
val = val[:-1]
detection_timestamp = None
for line in val.split("\n"):
if "DetectionTimestamp" in line:
detection_timestamp = line.split(":")[1].strip(" ,")
break
self.assertEqual(val, expected_string % (self.__class__.__name__, description, event_message, self.persistence_id, round(t, 2), detection_timestamp, ""))
self.reset_output_stream()

# test suppress detector list
self.output_event_handlers = None
self.analysis_context.suppress_detector_list = [description]
self.assertTrue(jch.receive_event(test, event_message, sorted_log_lines, event_data, log_atom, self))
self.assertEqual(self.output_stream.getvalue(), "")

self.output_event_handlers = [jch]
self.analysis_context.suppress_detector_list = []
self.assertTrue(jch.receive_event(test, event_message, sorted_log_lines, event_data, log_atom, self))
self.assertEqual(val, expected_string % (self.__class__.__name__, description, event_message, self.persistence_id, round(t, 2), detection_timestamp, ""))

def test2validate_parameters(self):
"""Test all initialization parameters for the event handler. Input parameters must be validated in the class."""
JsonConverterHandler([self.stream_printer_event_handler], self.analysis_context)
self.assertRaises(TypeError, JsonConverterHandler, ["default"], self.analysis_context)
self.assertRaises(TypeError, JsonConverterHandler, None, self.analysis_context)
self.assertRaises(TypeError, JsonConverterHandler, "Default", self.analysis_context)
self.assertRaises(TypeError, JsonConverterHandler, b"Default", self.analysis_context)
self.assertRaises(TypeError, JsonConverterHandler, True, self.analysis_context)
self.assertRaises(TypeError, JsonConverterHandler, 123, self.analysis_context)
self.assertRaises(TypeError, JsonConverterHandler, 123.3, self.analysis_context)
self.assertRaises(TypeError, JsonConverterHandler, {"id": "Default"}, self.analysis_context)
self.assertRaises(TypeError, JsonConverterHandler, (), self.analysis_context)
self.assertRaises(TypeError, JsonConverterHandler, set(), self.analysis_context)
self.assertRaises(ValueError, JsonConverterHandler, [], self.analysis_context)

self.assertRaises(TypeError, JsonConverterHandler, [self.stream_printer_event_handler], self.analysis_context, ["default"])
self.assertRaises(TypeError, JsonConverterHandler, [self.stream_printer_event_handler], self.analysis_context, None)
self.assertRaises(TypeError, JsonConverterHandler, [self.stream_printer_event_handler], self.analysis_context, "Default")
self.assertRaises(TypeError, JsonConverterHandler, [self.stream_printer_event_handler], self.analysis_context, b"Default")
self.assertRaises(TypeError, JsonConverterHandler, [self.stream_printer_event_handler], self.analysis_context, 123)
self.assertRaises(TypeError, JsonConverterHandler, [self.stream_printer_event_handler], self.analysis_context, 123.3)
self.assertRaises(TypeError, JsonConverterHandler, [self.stream_printer_event_handler], self.analysis_context, {"id": "Default"})
self.assertRaises(TypeError, JsonConverterHandler, [self.stream_printer_event_handler], self.analysis_context, ())
self.assertRaises(TypeError, JsonConverterHandler, [self.stream_printer_event_handler], self.analysis_context, set())


if __name__ == '__main__':
Expand Down
134 changes: 71 additions & 63 deletions aecid-testsuite/unit/events/StreamPrinterEventHandlerTest.py
Original file line number Diff line number Diff line change
@@ -1,81 +1,89 @@
import unittest
from aminer.parsing.MatchContext import MatchContext
from aminer.parsing.FixedDataModelElement import FixedDataModelElement
import sys
import io
from time import time
from aminer.events.StreamPrinterEventHandler import StreamPrinterEventHandler
from aminer.parsing.ParserMatch import ParserMatch
from aminer.input.LogAtom import LogAtom
from unit.TestBase import TestBase
from unit.TestBase import TestBase, DummyFixedDataModelElement, DummyMatchContext
from datetime import datetime


class StreamPrinterEventHandlerTest(TestBase):
"""Unittests for the StreamPrinterEventHandler."""

__expectedString = '%s New value for pathes %s: %s\n%s: "%s" (%d lines)\n%s\n'
pid = b' pid='
test = 'Test.%s'
match_s1 = 'match/s1'
match_s2 = 'match/s2'
new_val = 'New value for pathes %s, %s: %s'

def test1log_multiple_lines_event(self):
"""In this test case the EventHandler receives multiple lines from the test class."""
def test1receive_event(self):
"""Test if events are processed correctly and that edge cases are caught in exceptions."""
# In this test case the EventHandler receives multiple lines from the test class.
description = "Test1StreamPrinterEventHandler"
match_context = MatchContext(self.pid)
fixed_dme = FixedDataModelElement('s1', self.pid)
match_element = fixed_dme.get_match_element("match", match_context)
exp = '%s New value for paths %s: %s\n%s: "%s" (%d lines)\n%s\n'
pid = b" pid="
test = "Test.%s"
match_s1 = "match/s1"
match_s2 = "match/s2"
match_context = DummyMatchContext(pid)
fdme = DummyFixedDataModelElement("s1", pid)
match_element = fdme.get_match_element("match", match_context)

match_context = MatchContext(self.pid)
fixed_dme2 = FixedDataModelElement('s2', self.pid)
match_element2 = fixed_dme2.get_match_element("match", match_context)
self.analysis_context.register_component(self, description)
t = time()
log_atom = LogAtom(fixed_dme.fixed_data, ParserMatch(match_element), t, self)

self.stream_printer_event_handler.receive_event(
self.test % self.__class__.__name__, self.new_val % (self.match_s1, self.match_s2, repr(match_element.match_object)),
[log_atom.raw_data, log_atom.raw_data], None, log_atom, self)

self.assertEqual(self.output_stream.getvalue(), self.__expectedString % (
datetime.fromtimestamp(t).strftime("%Y-%m-%d %H:%M:%S"), match_element.get_path() + ", " + match_element2.get_path(),
repr(match_element.get_match_object()), self.__class__.__name__, description, 2,
" " + match_element.get_match_string().decode() + "\n " + match_element2.get_match_string().decode() + "\n"))

def test2log_no_line_event(self):
"""In this test case the EventHandler receives no lines from the test class."""
description = "Test2StreamPrinterEventHandler"
match_context = MatchContext(self.pid)
fixed_dme = FixedDataModelElement('s1', self.pid)
match_element = fixed_dme.get_match_element("match", match_context)

match_context = MatchContext(self.pid)
fixed_dme2 = FixedDataModelElement('s2', self.pid)
match_element2 = fixed_dme2.get_match_element("match", match_context)
self.analysis_context.register_component(self, description)
t = time()
log_atom = LogAtom(fixed_dme.fixed_data, ParserMatch(match_element), t, self)

self.stream_printer_event_handler.receive_event(
self.test % self.__class__.__name__, self.new_val % (self.match_s1, self.match_s2, repr(match_element.match_object)), [],
None, log_atom, self)

self.assertEqual(self.output_stream.getvalue(), self.__expectedString % (
datetime.fromtimestamp(t).strftime("%Y-%m-%d %H:%M:%S"), match_element.get_path() + ", " + match_element2.get_path(),
repr(match_element.get_match_object()), self.__class__.__name__, description, 0, ""))

def test3event_data_not_log_atom(self):
"""In this test case the EventHandler receives no logAtom from the test class and the method should raise an exception."""
description = "Test3StreamPrinterEventHandler"
match_context = MatchContext(self.pid)
fixed_dme = FixedDataModelElement('s1', self.pid)
match_element = fixed_dme.get_match_element("match", match_context)
self.analysis_context.register_component(self, description)
t = time()
log_atom = LogAtom(fixed_dme.fixed_data, ParserMatch(match_element), t, self)
log_atom = LogAtom(fdme.data, ParserMatch(match_element), t, self)

new_val = "New value for paths %s, %s: %s" % (match_s1, match_s2, repr(match_element.match_object))
self.stream_printer_event_handler.receive_event(test % self.__class__.__name__, new_val, [log_atom.raw_data, log_atom.raw_data], None, log_atom, self)

dt = datetime.fromtimestamp(t).strftime("%Y-%m-%d %H:%M:%S")
self.assertEqual(self.output_stream.getvalue(), exp % (dt, match_s1 + ", " + match_s2, "b' pid='", self.__class__.__name__, description, 2, " pid=\n pid=\n"))
self.reset_output_stream()

#In this test case the EventHandler receives no lines from the test class.
self.stream_printer_event_handler.receive_event(test % self.__class__.__name__, new_val, [], None, log_atom, self)

self.assertEqual(self.output_stream.getvalue(), exp % (dt, match_s1 + ", " + match_s2, "b' pid='", self.__class__.__name__, description, 0, ""))
self.reset_output_stream()

#In this test case the EventHandler receives no logAtom from the test class and the method should raise an exception.
self.assertRaises(Exception, self.stream_printer_event_handler.receive_event, test % self.__class__.__name__,
new_val, [log_atom.raw_data, log_atom.raw_data], log_atom.get_parser_match(), self)

# test output_event_handlers
self.output_event_handlers = []
self.assertTrue(self.stream_printer_event_handler.receive_event(test % self.__class__.__name__, new_val, [log_atom.raw_data, log_atom.raw_data], None, log_atom, self))
self.assertEqual(self.output_stream.getvalue(), "")

self.output_event_handlers = [self.stream_printer_event_handler]
self.assertTrue(self.stream_printer_event_handler.receive_event(test % self.__class__.__name__, new_val, [log_atom.raw_data, log_atom.raw_data], None, log_atom, self))
self.assertEqual(self.output_stream.getvalue(), exp % (dt, match_s1 + ", " + match_s2, "b' pid='", self.__class__.__name__, description, 2, " pid=\n pid=\n"))
self.reset_output_stream()

# test suppress detector list
self.output_event_handlers = None
self.analysis_context.suppress_detector_list = [description]
self.assertTrue(self.stream_printer_event_handler.receive_event(test % self.__class__.__name__, new_val, [log_atom.raw_data, log_atom.raw_data], None, log_atom, self))
self.assertEqual(self.output_stream.getvalue(), "")

self.output_event_handlers = [self.stream_printer_event_handler]
self.analysis_context.suppress_detector_list = []
self.assertTrue(self.stream_printer_event_handler.receive_event(test % self.__class__.__name__, new_val, [log_atom.raw_data, log_atom.raw_data], None, log_atom, self))
self.assertEqual(self.output_stream.getvalue(), exp % (dt, match_s1 + ", " + match_s2, "b' pid='", self.__class__.__name__, description, 2, " pid=\n pid=\n"))


def test2validate_parameters(self):
"""Test all initialization parameters for the event handler. Input parameters must be validated in the class."""
self.assertRaises(TypeError, StreamPrinterEventHandler, self.analysis_context, "")
self.assertRaises(TypeError, StreamPrinterEventHandler, self.analysis_context, b"")
self.assertRaises(TypeError, StreamPrinterEventHandler, self.analysis_context, ["default"])
self.assertRaises(TypeError, StreamPrinterEventHandler, self.analysis_context, None)
self.assertRaises(TypeError, StreamPrinterEventHandler, self.analysis_context, True)
self.assertRaises(TypeError, StreamPrinterEventHandler, self.analysis_context, 123)
self.assertRaises(TypeError, StreamPrinterEventHandler, self.analysis_context, 123.3)
self.assertRaises(TypeError, StreamPrinterEventHandler, self.analysis_context, {"id": "Default"})
self.assertRaises(TypeError, StreamPrinterEventHandler, self.analysis_context, ())
self.assertRaises(TypeError, StreamPrinterEventHandler, self.analysis_context, set())
StreamPrinterEventHandler(self.analysis_context, sys.stdout)
StreamPrinterEventHandler(self.analysis_context, sys.stderr)
StreamPrinterEventHandler(self.analysis_context, self.output_stream)

self.assertRaises(Exception, self.stream_printer_event_handler.receive_event, self.test % self.__class__.__name__,
self.new_val % (self.match_s1, self.match_s2, repr(match_element.match_object)),
[log_atom.raw_data, log_atom.raw_data], log_atom.get_parser_match(), self)


if __name__ == "__main__":
Expand Down
Loading