From e45800290f206cbd82f9602be8169d06c7bfb846 Mon Sep 17 00:00:00 2001 From: Ernst Leierzopf Date: Thu, 31 Oct 2024 13:55:34 +0100 Subject: [PATCH 1/2] rewrite unittests for JsonConverterHandler. --- .../unit/events/JsonConverterHandlerTest.py | 116 +++++++++++++----- .../aminer/events/JsonConverterHandler.py | 88 +++++++------ 2 files changed, 137 insertions(+), 67 deletions(-) diff --git a/aecid-testsuite/unit/events/JsonConverterHandlerTest.py b/aecid-testsuite/unit/events/JsonConverterHandlerTest.py index e0ac900a8..70f172524 100644 --- a/aecid-testsuite/unit/events/JsonConverterHandlerTest.py +++ b/aecid-testsuite/unit/events/JsonConverterHandlerTest.py @@ -2,48 +2,100 @@ import unittest from aminer.events.JsonConverterHandler import JsonConverterHandler from aminer.input.LogAtom import LogAtom -from aminer.parsing.MatchContext import MatchContext -from aminer.parsing.FixedDataModelElement import FixedDataModelElement from aminer.parsing.ParserMatch import ParserMatch -from unit.TestBase import TestBase +from unit.TestBase import TestBase, DummyFixedDataModelElement, DummyMatchContext class JsonConverterHandlerTest(TestBase): """Unittests for the JsonConverterHandler.""" maxDiff = None - resource_name = b"testresource" output_logline = True - match_context = MatchContext(b' pid=') - fixed_dme = FixedDataModelElement('s1', b' pid=') - match_element = fixed_dme.get_match_element("match", match_context) - t = time.time() - - test_detector = 'Analysis.TestDetector' - event_message = 'An event happened!' - sorted_log_lines = ['Event happend at /path/ 5 times.', '', '', '', ''] - persistence_id = 'Default' - description = 'jsonConverterHandlerDescription' - expected_string = '{\n "AnalysisComponent": {\n "AnalysisComponentIdentifier": 0,\n' \ - ' "AnalysisComponentType": "%s",\n "AnalysisComponentName": "%s",\n "Message": "%s",\n' \ - ' "PersistenceFileName": "%s",\n "AffectedParserPaths": [\n "test/path/1",\n' \ - ' "test/path/2"\n ],\n "LogResource": "testresource"\n },\n "LogData": {\n "RawLogData": [\n " pid="\n ],\n ' \ - '"Timestamps": [\n %s\n ],\n "DetectionTimestamp": %s,\n "LogLinesCount": 5,\n' \ - ' "AnnotatedMatchElement": {\n "match/s1": " pid="\n }\n }%s\n}\n' - - def test1receive_expected_event(self): - """In this test case a normal Event happens and the json output should be sent to a StreamPrinterEventHandler.""" - json_converter_handler = JsonConverterHandler([self.stream_printer_event_handler], self.analysis_context) - log_atom = LogAtom(self.fixed_dme.fixed_data, ParserMatch(self.match_element), self.t, self) - self.analysis_context.register_component(self, self.description) - event_data = {'AnalysisComponent': {'AffectedParserPaths': ['test/path/1', 'test/path/2']}} - json_converter_handler.receive_event(self.test_detector, self.event_message, self.sorted_log_lines, event_data, log_atom, self) + resource_name = b"testresource" + persistence_id = "Default" + + def test1receive_event(self): + """Test if events are processed correctly and that edge cases are caught in exceptions.""" + match_context = DummyMatchContext(b" pid=") + fdme = DummyFixedDataModelElement("s1", b" pid=") + match_element = fdme.get_match_element("match", match_context) + t = time.time() + + test = "Analysis.TestDetector" + event_message = "An event happened!" + sorted_log_lines = ["Event happened at /path/ 5 times.", "", "", "", ""] + description = "jsonConverterHandlerDescription" + expected_string = '{\n "AnalysisComponent": {\n "AnalysisComponentIdentifier": 0,\n "AnalysisComponentType": "%s",\n ' \ + '"AnalysisComponentName": "%s",\n "Message": "%s",\n "PersistenceFileName": "%s",\n "AffectedParserPaths":' \ + ' [\n "test/path/1",\n "test/path/2"\n ],\n "LogResource": "testresource"\n },\n "LogData": ' \ + '{\n "RawLogData": [\n " pid="\n ],\n "Timestamps": [\n %s\n ],\n "DetectionTimestamp":' \ + ' %s,\n "LogLinesCount": 5,\n "AnnotatedMatchElement": {\n "match/s1": " pid="\n }\n }%s\n}\n' + + jch = JsonConverterHandler([self.stream_printer_event_handler], self.analysis_context) + log_atom = LogAtom(fdme.data, ParserMatch(match_element), t, self) + self.analysis_context.register_component(self, description) + event_data = {"AnalysisComponent": {"AffectedParserPaths": ["test/path/1", "test/path/2"]}} + jch.receive_event(test, event_message, sorted_log_lines, event_data, log_atom, self) detection_timestamp = None - for line in self.output_stream.getvalue().split('\n'): + for line in self.output_stream.getvalue().split("\n"): if "DetectionTimestamp" in line: - detection_timestamp = line.split(':')[1].strip(' ,') - self.assertEqual(self.output_stream.getvalue(), self.expected_string % ( - self.__class__.__name__, self.description, self.event_message, self.persistence_id, round(self.t, 2), detection_timestamp, "")) + detection_timestamp = line.split(":")[1].strip(" ,") + self.assertEqual(self.output_stream.getvalue(), expected_string % (self.__class__.__name__, description, event_message, self.persistence_id, round(t, 2), detection_timestamp, "")) + self.reset_output_stream() + + # test output_event_handlers + self.output_event_handlers = [] + self.assertTrue(jch.receive_event(test, event_message, sorted_log_lines, event_data, log_atom, self)) + self.assertEqual(self.output_stream.getvalue(), "") + + self.output_event_handlers = [jch] + self.assertTrue(jch.receive_event(test, event_message, sorted_log_lines, event_data, log_atom, self)) + val = self.output_stream.getvalue() + if val.endswith("\n\n"): + val = val[:-1] + detection_timestamp = None + for line in val.split("\n"): + if "DetectionTimestamp" in line: + detection_timestamp = line.split(":")[1].strip(" ,") + break + self.assertEqual(val, expected_string % (self.__class__.__name__, description, event_message, self.persistence_id, round(t, 2), detection_timestamp, "")) + self.reset_output_stream() + + # test suppress detector list + self.output_event_handlers = None + self.analysis_context.suppress_detector_list = [description] + self.assertTrue(jch.receive_event(test, event_message, sorted_log_lines, event_data, log_atom, self)) + self.assertEqual(self.output_stream.getvalue(), "") + + self.output_event_handlers = [jch] + self.analysis_context.suppress_detector_list = [] + self.assertTrue(jch.receive_event(test, event_message, sorted_log_lines, event_data, log_atom, self)) + self.assertEqual(val, expected_string % (self.__class__.__name__, description, event_message, self.persistence_id, round(t, 2), detection_timestamp, "")) + + def test2validate_parameters(self): + """Test all initialization parameters for the event handler. Input parameters must be validated in the class.""" + JsonConverterHandler([self.stream_printer_event_handler], self.analysis_context) + self.assertRaises(TypeError, JsonConverterHandler, ["default"], self.analysis_context) + self.assertRaises(TypeError, JsonConverterHandler, None, self.analysis_context) + self.assertRaises(TypeError, JsonConverterHandler, "Default", self.analysis_context) + self.assertRaises(TypeError, JsonConverterHandler, b"Default", self.analysis_context) + self.assertRaises(TypeError, JsonConverterHandler, True, self.analysis_context) + self.assertRaises(TypeError, JsonConverterHandler, 123, self.analysis_context) + self.assertRaises(TypeError, JsonConverterHandler, 123.3, self.analysis_context) + self.assertRaises(TypeError, JsonConverterHandler, {"id": "Default"}, self.analysis_context) + self.assertRaises(TypeError, JsonConverterHandler, (), self.analysis_context) + self.assertRaises(TypeError, JsonConverterHandler, set(), self.analysis_context) + self.assertRaises(ValueError, JsonConverterHandler, [], self.analysis_context) + + self.assertRaises(TypeError, JsonConverterHandler, [self.stream_printer_event_handler], self.analysis_context, ["default"]) + self.assertRaises(TypeError, JsonConverterHandler, [self.stream_printer_event_handler], self.analysis_context, None) + self.assertRaises(TypeError, JsonConverterHandler, [self.stream_printer_event_handler], self.analysis_context, "Default") + self.assertRaises(TypeError, JsonConverterHandler, [self.stream_printer_event_handler], self.analysis_context, b"Default") + self.assertRaises(TypeError, JsonConverterHandler, [self.stream_printer_event_handler], self.analysis_context, 123) + self.assertRaises(TypeError, JsonConverterHandler, [self.stream_printer_event_handler], self.analysis_context, 123.3) + self.assertRaises(TypeError, JsonConverterHandler, [self.stream_printer_event_handler], self.analysis_context, {"id": "Default"}) + self.assertRaises(TypeError, JsonConverterHandler, [self.stream_printer_event_handler], self.analysis_context, ()) + self.assertRaises(TypeError, JsonConverterHandler, [self.stream_printer_event_handler], self.analysis_context, set()) if __name__ == '__main__': diff --git a/source/root/usr/lib/logdata-anomaly-miner/aminer/events/JsonConverterHandler.py b/source/root/usr/lib/logdata-anomaly-miner/aminer/events/JsonConverterHandler.py index 7cd5c2630..acc1b68b5 100644 --- a/source/root/usr/lib/logdata-anomaly-miner/aminer/events/JsonConverterHandler.py +++ b/source/root/usr/lib/logdata-anomaly-miner/aminer/events/JsonConverterHandler.py @@ -14,9 +14,10 @@ import json import time import copy +import logging from aminer.events.EventInterfaces import EventHandlerInterface -from aminer import AminerConfig +from aminer.AminerConfig import DEBUG_LOG_NAME, ENCODING, KEY_LOG_LINE_IDENTIFIER, KEY_AMINER_ID class JsonConverterHandler(EventHandlerInterface): @@ -30,8 +31,24 @@ def __init__(self, json_event_handlers, analysis_context, pretty_print=True): @param analysis_context the analysis context used to get the component. @param pretty_print if true, the json is printed pretty; otherwise the json is printed with less space needed. """ + if not isinstance(json_event_handlers, list) or any(not isinstance(x, EventHandlerInterface) for x in json_event_handlers): + msg = "json_event_handlers must be a list of event handlers implementing the EventHandlerInterface." + logging.getLogger(DEBUG_LOG_NAME).error(msg) + raise TypeError(msg) + if not json_event_handlers: + msg = "json_event_handlers must not be empty." + logging.getLogger(DEBUG_LOG_NAME).error(msg) + raise ValueError(msg) + if not isinstance(json_event_handlers, list) or any(not isinstance(x, EventHandlerInterface) for x in json_event_handlers): + msg = "json_event_handlers must be a list of event handlers implementing the EventHandlerInterface." + logging.getLogger(DEBUG_LOG_NAME).error(msg) + raise TypeError(msg) self.json_event_handlers = json_event_handlers self.analysis_context = analysis_context + if not isinstance(pretty_print, bool): + msg = "pretty_print must be a boolean value." + logging.getLogger(DEBUG_LOG_NAME).error(msg) + raise TypeError(msg) self.pretty_print = pretty_print def receive_event(self, event_type, event_message, sorted_loglines, event_data, log_atom, event_source): @@ -47,55 +64,55 @@ def receive_event(self, event_type, event_message, sorted_loglines, event_data, @param log_atom the log atom which produced the event. @param event_source reference to detector generating the event. """ - if hasattr(event_source, 'output_event_handlers') and event_source.output_event_handlers is not None and self not in \ + if hasattr(event_source, "output_event_handlers") and event_source.output_event_handlers is not None and self not in \ event_source.output_event_handlers: - return + return True component_name = self.analysis_context.get_name_by_component(event_source) if component_name in self.analysis_context.suppress_detector_list: - return - if 'StatusInfo' in event_data: + return True + if "StatusInfo" in event_data: # No anomaly; do nothing on purpose pass else: log_data = {} try: - data = log_atom.raw_data.decode(AminerConfig.ENCODING) + data = log_atom.raw_data.decode(ENCODING) except UnicodeError: data = repr(log_atom.raw_data) - log_data['RawLogData'] = [data] + log_data["RawLogData"] = [data] if log_atom.get_timestamp() is None: log_atom.set_timestamp(time.time()) - log_data['Timestamps'] = [round(log_atom.atom_time, 2)] - log_data['DetectionTimestamp'] = round(time.time(), 2) - log_data['LogLinesCount'] = len(sorted_loglines) - if log_atom.parser_match is not None and hasattr(event_source, 'output_logline') and event_source.output_logline: - log_data['AnnotatedMatchElement'] = {} + log_data["Timestamps"] = [round(log_atom.atom_time, 2)] + log_data["DetectionTimestamp"] = round(time.time(), 2) + log_data["LogLinesCount"] = len(sorted_loglines) + if log_atom.parser_match is not None and hasattr(event_source, "output_logline") and event_source.output_logline: + log_data["AnnotatedMatchElement"] = {} for path, match in log_atom.parser_match.get_match_dictionary().items(): if isinstance(match, list): for match_element_id, match_element in enumerate(match): if isinstance(match_element.match_object, bytes): - log_data['AnnotatedMatchElement'][path + '/' + str(match_element_id)] = match_element.match_object.decode( - AminerConfig.ENCODING) + log_data["AnnotatedMatchElement"][path + "/" + str(match_element_id)] = match_element.match_object.decode( + ENCODING) else: - log_data['AnnotatedMatchElement'][path + '/' + str(match_element_id)] = str(match_element.match_object) + log_data["AnnotatedMatchElement"][path + "/" + str(match_element_id)] = str(match_element.match_object) elif isinstance(match.match_object, bytes): - log_data['AnnotatedMatchElement'][path] = match.match_object.decode(AminerConfig.ENCODING) + log_data["AnnotatedMatchElement"][path] = match.match_object.decode(ENCODING) else: - log_data['AnnotatedMatchElement'][path] = str(match.match_object) + log_data["AnnotatedMatchElement"][path] = str(match.match_object) - analysis_component = {'AnalysisComponentIdentifier': self.analysis_context.get_id_by_component(event_source)} - if event_source.__class__.__name__ == 'ExtractedData_class': - analysis_component['AnalysisComponentType'] = 'DistributionDetector' + analysis_component = {"AnalysisComponentIdentifier": self.analysis_context.get_id_by_component(event_source)} + if event_source.__class__.__name__ == "ExtractedData_class": + analysis_component["AnalysisComponentType"] = "DistributionDetector" else: - analysis_component['AnalysisComponentType'] = str(event_source.__class__.__name__) - analysis_component['AnalysisComponentName'] = self.analysis_context.get_name_by_component(event_source) - analysis_component['Message'] = event_message + analysis_component["AnalysisComponentType"] = str(event_source.__class__.__name__) + analysis_component["AnalysisComponentName"] = self.analysis_context.get_name_by_component(event_source) + analysis_component["Message"] = event_message if hasattr(event_source, "persistence_id"): - analysis_component['PersistenceFileName'] = event_source.persistence_id - if hasattr(event_source, 'learn_mode'): - analysis_component['TrainingMode'] = event_source.learn_mode + analysis_component["PersistenceFileName"] = event_source.persistence_id + if hasattr(event_source, "learn_mode"): + analysis_component["TrainingMode"] = event_source.learn_mode - detector_analysis_component = event_data.get('AnalysisComponent') + detector_analysis_component = event_data.get("AnalysisComponent") if detector_analysis_component is not None: for key in detector_analysis_component: if key in analysis_component: @@ -104,23 +121,23 @@ def receive_event(self, event_type, event_message, sorted_loglines, event_data, log_resource = log_atom.source.resource_name if log_resource is not None: - analysis_component['LogResource'] = log_resource.decode() + analysis_component["LogResource"] = log_resource.decode() - if 'LogData' not in event_data: - if self.analysis_context.aminer_config.config_properties.get(AminerConfig.KEY_LOG_LINE_IDENTIFIER): + if "LogData" not in event_data: + if self.analysis_context.aminer_config.config_properties.get(KEY_LOG_LINE_IDENTIFIER): event_data["LogLineIdentifier"] = log_atom.log_line_identifier - event_data['LogData'] = log_data - event_data['AnalysisComponent'] = analysis_component + event_data["LogData"] = log_data + event_data["AnalysisComponent"] = analysis_component - aminer_id = self.analysis_context.aminer_config.config_properties.get(AminerConfig.KEY_AMINER_ID) + aminer_id = self.analysis_context.aminer_config.config_properties.get(KEY_AMINER_ID) if aminer_id is not None: - event_data['AminerId'] = aminer_id + event_data["AminerId"] = aminer_id if self.pretty_print is True: json_data = json.dumps(event_data, indent=2) else: json_data = json.dumps(event_data) - res = [''] * len(sorted_loglines) + res = [""] * len(sorted_loglines) res[0] = str(json_data) for listener in self.json_event_handlers: @@ -129,3 +146,4 @@ def receive_event(self, event_type, event_message, sorted_loglines, event_data, event_source = copy.copy(event_source) event_source.output_event_handlers.append(listener) listener.receive_event(event_type, None, res, json_data, log_atom, event_source) + return True From 289e268e5a4b10ace65b8cdc5c2681cef807a7cc Mon Sep 17 00:00:00 2001 From: Ernst Leierzopf Date: Thu, 31 Oct 2024 15:36:52 +0100 Subject: [PATCH 2/2] rewrite unittests for StreamPrinterEventHandler. --- .../events/StreamPrinterEventHandlerTest.py | 134 ++++++++++-------- .../events/StreamPrinterEventHandler.py | 20 ++- 2 files changed, 84 insertions(+), 70 deletions(-) diff --git a/aecid-testsuite/unit/events/StreamPrinterEventHandlerTest.py b/aecid-testsuite/unit/events/StreamPrinterEventHandlerTest.py index 225abbd4b..4b663c828 100644 --- a/aecid-testsuite/unit/events/StreamPrinterEventHandlerTest.py +++ b/aecid-testsuite/unit/events/StreamPrinterEventHandlerTest.py @@ -1,81 +1,89 @@ import unittest -from aminer.parsing.MatchContext import MatchContext -from aminer.parsing.FixedDataModelElement import FixedDataModelElement +import sys +import io from time import time +from aminer.events.StreamPrinterEventHandler import StreamPrinterEventHandler from aminer.parsing.ParserMatch import ParserMatch from aminer.input.LogAtom import LogAtom -from unit.TestBase import TestBase +from unit.TestBase import TestBase, DummyFixedDataModelElement, DummyMatchContext from datetime import datetime class StreamPrinterEventHandlerTest(TestBase): """Unittests for the StreamPrinterEventHandler.""" - __expectedString = '%s New value for pathes %s: %s\n%s: "%s" (%d lines)\n%s\n' - pid = b' pid=' - test = 'Test.%s' - match_s1 = 'match/s1' - match_s2 = 'match/s2' - new_val = 'New value for pathes %s, %s: %s' - - def test1log_multiple_lines_event(self): - """In this test case the EventHandler receives multiple lines from the test class.""" + def test1receive_event(self): + """Test if events are processed correctly and that edge cases are caught in exceptions.""" + # In this test case the EventHandler receives multiple lines from the test class. description = "Test1StreamPrinterEventHandler" - match_context = MatchContext(self.pid) - fixed_dme = FixedDataModelElement('s1', self.pid) - match_element = fixed_dme.get_match_element("match", match_context) + exp = '%s New value for paths %s: %s\n%s: "%s" (%d lines)\n%s\n' + pid = b" pid=" + test = "Test.%s" + match_s1 = "match/s1" + match_s2 = "match/s2" + match_context = DummyMatchContext(pid) + fdme = DummyFixedDataModelElement("s1", pid) + match_element = fdme.get_match_element("match", match_context) - match_context = MatchContext(self.pid) - fixed_dme2 = FixedDataModelElement('s2', self.pid) - match_element2 = fixed_dme2.get_match_element("match", match_context) - self.analysis_context.register_component(self, description) - t = time() - log_atom = LogAtom(fixed_dme.fixed_data, ParserMatch(match_element), t, self) - - self.stream_printer_event_handler.receive_event( - self.test % self.__class__.__name__, self.new_val % (self.match_s1, self.match_s2, repr(match_element.match_object)), - [log_atom.raw_data, log_atom.raw_data], None, log_atom, self) - - self.assertEqual(self.output_stream.getvalue(), self.__expectedString % ( - datetime.fromtimestamp(t).strftime("%Y-%m-%d %H:%M:%S"), match_element.get_path() + ", " + match_element2.get_path(), - repr(match_element.get_match_object()), self.__class__.__name__, description, 2, - " " + match_element.get_match_string().decode() + "\n " + match_element2.get_match_string().decode() + "\n")) - - def test2log_no_line_event(self): - """In this test case the EventHandler receives no lines from the test class.""" - description = "Test2StreamPrinterEventHandler" - match_context = MatchContext(self.pid) - fixed_dme = FixedDataModelElement('s1', self.pid) - match_element = fixed_dme.get_match_element("match", match_context) - - match_context = MatchContext(self.pid) - fixed_dme2 = FixedDataModelElement('s2', self.pid) - match_element2 = fixed_dme2.get_match_element("match", match_context) - self.analysis_context.register_component(self, description) - t = time() - log_atom = LogAtom(fixed_dme.fixed_data, ParserMatch(match_element), t, self) - - self.stream_printer_event_handler.receive_event( - self.test % self.__class__.__name__, self.new_val % (self.match_s1, self.match_s2, repr(match_element.match_object)), [], - None, log_atom, self) - - self.assertEqual(self.output_stream.getvalue(), self.__expectedString % ( - datetime.fromtimestamp(t).strftime("%Y-%m-%d %H:%M:%S"), match_element.get_path() + ", " + match_element2.get_path(), - repr(match_element.get_match_object()), self.__class__.__name__, description, 0, "")) - - def test3event_data_not_log_atom(self): - """In this test case the EventHandler receives no logAtom from the test class and the method should raise an exception.""" - description = "Test3StreamPrinterEventHandler" - match_context = MatchContext(self.pid) - fixed_dme = FixedDataModelElement('s1', self.pid) - match_element = fixed_dme.get_match_element("match", match_context) self.analysis_context.register_component(self, description) t = time() - log_atom = LogAtom(fixed_dme.fixed_data, ParserMatch(match_element), t, self) + log_atom = LogAtom(fdme.data, ParserMatch(match_element), t, self) + + new_val = "New value for paths %s, %s: %s" % (match_s1, match_s2, repr(match_element.match_object)) + self.stream_printer_event_handler.receive_event(test % self.__class__.__name__, new_val, [log_atom.raw_data, log_atom.raw_data], None, log_atom, self) + + dt = datetime.fromtimestamp(t).strftime("%Y-%m-%d %H:%M:%S") + self.assertEqual(self.output_stream.getvalue(), exp % (dt, match_s1 + ", " + match_s2, "b' pid='", self.__class__.__name__, description, 2, " pid=\n pid=\n")) + self.reset_output_stream() + + #In this test case the EventHandler receives no lines from the test class. + self.stream_printer_event_handler.receive_event(test % self.__class__.__name__, new_val, [], None, log_atom, self) + + self.assertEqual(self.output_stream.getvalue(), exp % (dt, match_s1 + ", " + match_s2, "b' pid='", self.__class__.__name__, description, 0, "")) + self.reset_output_stream() + + #In this test case the EventHandler receives no logAtom from the test class and the method should raise an exception. + self.assertRaises(Exception, self.stream_printer_event_handler.receive_event, test % self.__class__.__name__, + new_val, [log_atom.raw_data, log_atom.raw_data], log_atom.get_parser_match(), self) + + # test output_event_handlers + self.output_event_handlers = [] + self.assertTrue(self.stream_printer_event_handler.receive_event(test % self.__class__.__name__, new_val, [log_atom.raw_data, log_atom.raw_data], None, log_atom, self)) + self.assertEqual(self.output_stream.getvalue(), "") + + self.output_event_handlers = [self.stream_printer_event_handler] + self.assertTrue(self.stream_printer_event_handler.receive_event(test % self.__class__.__name__, new_val, [log_atom.raw_data, log_atom.raw_data], None, log_atom, self)) + self.assertEqual(self.output_stream.getvalue(), exp % (dt, match_s1 + ", " + match_s2, "b' pid='", self.__class__.__name__, description, 2, " pid=\n pid=\n")) + self.reset_output_stream() + + # test suppress detector list + self.output_event_handlers = None + self.analysis_context.suppress_detector_list = [description] + self.assertTrue(self.stream_printer_event_handler.receive_event(test % self.__class__.__name__, new_val, [log_atom.raw_data, log_atom.raw_data], None, log_atom, self)) + self.assertEqual(self.output_stream.getvalue(), "") + + self.output_event_handlers = [self.stream_printer_event_handler] + self.analysis_context.suppress_detector_list = [] + self.assertTrue(self.stream_printer_event_handler.receive_event(test % self.__class__.__name__, new_val, [log_atom.raw_data, log_atom.raw_data], None, log_atom, self)) + self.assertEqual(self.output_stream.getvalue(), exp % (dt, match_s1 + ", " + match_s2, "b' pid='", self.__class__.__name__, description, 2, " pid=\n pid=\n")) + + + def test2validate_parameters(self): + """Test all initialization parameters for the event handler. Input parameters must be validated in the class.""" + self.assertRaises(TypeError, StreamPrinterEventHandler, self.analysis_context, "") + self.assertRaises(TypeError, StreamPrinterEventHandler, self.analysis_context, b"") + self.assertRaises(TypeError, StreamPrinterEventHandler, self.analysis_context, ["default"]) + self.assertRaises(TypeError, StreamPrinterEventHandler, self.analysis_context, None) + self.assertRaises(TypeError, StreamPrinterEventHandler, self.analysis_context, True) + self.assertRaises(TypeError, StreamPrinterEventHandler, self.analysis_context, 123) + self.assertRaises(TypeError, StreamPrinterEventHandler, self.analysis_context, 123.3) + self.assertRaises(TypeError, StreamPrinterEventHandler, self.analysis_context, {"id": "Default"}) + self.assertRaises(TypeError, StreamPrinterEventHandler, self.analysis_context, ()) + self.assertRaises(TypeError, StreamPrinterEventHandler, self.analysis_context, set()) + StreamPrinterEventHandler(self.analysis_context, sys.stdout) + StreamPrinterEventHandler(self.analysis_context, sys.stderr) + StreamPrinterEventHandler(self.analysis_context, self.output_stream) - self.assertRaises(Exception, self.stream_printer_event_handler.receive_event, self.test % self.__class__.__name__, - self.new_val % (self.match_s1, self.match_s2, repr(match_element.match_object)), - [log_atom.raw_data, log_atom.raw_data], log_atom.get_parser_match(), self) if __name__ == "__main__": diff --git a/source/root/usr/lib/logdata-anomaly-miner/aminer/events/StreamPrinterEventHandler.py b/source/root/usr/lib/logdata-anomaly-miner/aminer/events/StreamPrinterEventHandler.py index 5f1b050cb..99a21c5b0 100644 --- a/source/root/usr/lib/logdata-anomaly-miner/aminer/events/StreamPrinterEventHandler.py +++ b/source/root/usr/lib/logdata-anomaly-miner/aminer/events/StreamPrinterEventHandler.py @@ -10,9 +10,10 @@ You should have received a copy of the GNU General Public License along with this program. If not, see . """ - import sys - +import io +import logging +from aminer.AminerConfig import DEBUG_LOG_NAME from aminer.events.EventInterfaces import EventHandlerInterface from aminer.events.EventData import EventData @@ -31,6 +32,10 @@ def __init__(self, analysis_context, stream=sys.stdout): @param stream the output stream of the event handler. """ self.analysis_context = analysis_context + if not isinstance(stream, io.IOBase): + msg = "The stream variable has to be a stream." + logging.getLogger(DEBUG_LOG_NAME).error(msg) + raise TypeError(msg) self.stream = stream def receive_event(self, event_type, event_message, sorted_loglines, event_data, log_atom, event_source): @@ -46,16 +51,17 @@ def receive_event(self, event_type, event_message, sorted_loglines, event_data, @param log_atom the log atom which produced the event. @param event_source reference to detector generating the event. """ - if hasattr(event_source, 'output_event_handlers') and event_source.output_event_handlers is not None and self not in \ + if hasattr(event_source, "output_event_handlers") and event_source.output_event_handlers is not None and self not in \ event_source.output_event_handlers: - return + return True component_name = self.analysis_context.get_name_by_component(event_source) if component_name in self.analysis_context.suppress_detector_list: - return + return True event_data_obj = EventData(event_type, event_message, sorted_loglines, event_data, log_atom, event_source, self.analysis_context) - message = f'{event_data_obj.receive_event_string()}\n' - if hasattr(self.stream, 'buffer'): + message = f"{event_data_obj.receive_event_string()}\n" + if hasattr(self.stream, "buffer"): self.stream.buffer.write(message.encode()) else: self.stream.write(message) self.stream.flush() + return True