diff options
Diffstat (limited to 'tools/testing/kunit/kunit_parser.py')
-rw-r--r-- | tools/testing/kunit/kunit_parser.py | 1015 |
1 files changed, 702 insertions, 313 deletions
diff --git a/tools/testing/kunit/kunit_parser.py b/tools/testing/kunit/kunit_parser.py index 6310a641b151..3355196d0515 100644 --- a/tools/testing/kunit/kunit_parser.py +++ b/tools/testing/kunit/kunit_parser.py @@ -1,11 +1,15 @@ # SPDX-License-Identifier: GPL-2.0 # -# Parses test results from a kernel dmesg log. +# Parses KTAP test results from a kernel dmesg log and incrementally prints +# results with reader-friendly format. Stores and returns test results in a +# Test object. # # Copyright (C) 2019, Google LLC. # Author: Felix Guo <felixguoxiuping@gmail.com> # Author: Brendan Higgins <brendanhiggins@google.com> +# Author: Rae Moar <rmoar@google.com> +from __future__ import annotations import re from collections import namedtuple @@ -14,33 +18,52 @@ from enum import Enum, auto from functools import reduce from typing import Iterable, Iterator, List, Optional, Tuple -TestResult = namedtuple('TestResult', ['status','suites','log']) - -class TestSuite(object): +TestResult = namedtuple('TestResult', ['status','test','log']) + +class Test(object): + """ + A class to represent a test parsed from KTAP results. All KTAP + results within a test log are stored in a main Test object as + subtests. + + Attributes: + status : TestStatus - status of the test + name : str - name of the test + expected_count : int - expected number of subtests (0 if single + test case and None if unknown expected number of subtests) + subtests : List[Test] - list of subtests + log : List[str] - log of KTAP lines that correspond to the test + counts : TestCounts - counts of the test statuses and errors of + subtests or of the test itself if the test is a single + test case. + """ def __init__(self) -> None: - self.status = TestStatus.SUCCESS - self.name = '' - self.cases = [] # type: List[TestCase] - - def __str__(self) -> str: - return 'TestSuite(' + str(self.status) + ',' + self.name + ',' + str(self.cases) + ')' - - def __repr__(self) -> str: - return str(self) - -class TestCase(object): - def __init__(self) -> None: - self.status = TestStatus.SUCCESS + """Creates Test object with default attributes.""" + self.status = TestStatus.TEST_CRASHED self.name = '' + self.expected_count = 0 # type: Optional[int] + self.subtests = [] # type: List[Test] self.log = [] # type: List[str] + self.counts = TestCounts() def __str__(self) -> str: - return 'TestCase(' + str(self.status) + ',' + self.name + ',' + str(self.log) + ')' + """Returns string representation of a Test class object.""" + return ('Test(' + str(self.status) + ', ' + self.name + + ', ' + str(self.expected_count) + ', ' + + str(self.subtests) + ', ' + str(self.log) + ', ' + + str(self.counts) + ')') def __repr__(self) -> str: + """Returns string representation of a Test class object.""" return str(self) + def add_error(self, error_message: str) -> None: + """Records an error that occurred while parsing this test.""" + self.counts.errors += 1 + print_error('Test ' + self.name + ': ' + error_message) + class TestStatus(Enum): + """An enumeration class to represent the status of a test.""" SUCCESS = auto() FAILURE = auto() SKIPPED = auto() @@ -48,381 +71,747 @@ class TestStatus(Enum): NO_TESTS = auto() FAILURE_TO_PARSE_TESTS = auto() +class TestCounts: + """ + Tracks the counts of statuses of all test cases and any errors within + a Test. + + Attributes: + passed : int - the number of tests that have passed + failed : int - the number of tests that have failed + crashed : int - the number of tests that have crashed + skipped : int - the number of tests that have skipped + errors : int - the number of errors in the test and subtests + """ + def __init__(self): + """Creates TestCounts object with counts of all test + statuses and test errors set to 0. + """ + self.passed = 0 + self.failed = 0 + self.crashed = 0 + self.skipped = 0 + self.errors = 0 + + def __str__(self) -> str: + """Returns the string representation of a TestCounts object. + """ + return ('Passed: ' + str(self.passed) + + ', Failed: ' + str(self.failed) + + ', Crashed: ' + str(self.crashed) + + ', Skipped: ' + str(self.skipped) + + ', Errors: ' + str(self.errors)) + + def total(self) -> int: + """Returns the total number of test cases within a test + object, where a test case is a test with no subtests. + """ + return (self.passed + self.failed + self.crashed + + self.skipped) + + def add_subtest_counts(self, counts: TestCounts) -> None: + """ + Adds the counts of another TestCounts object to the current + TestCounts object. Used to add the counts of a subtest to the + parent test. + + Parameters: + counts - a different TestCounts object whose counts + will be added to the counts of the TestCounts object + """ + self.passed += counts.passed + self.failed += counts.failed + self.crashed += counts.crashed + self.skipped += counts.skipped + self.errors += counts.errors + + def get_status(self) -> TestStatus: + """Returns the aggregated status of a Test using test + counts. + """ + if self.total() == 0: + return TestStatus.NO_TESTS + elif self.crashed: + # If one of the subtests crash, the expected status + # of the Test is crashed. + return TestStatus.TEST_CRASHED + elif self.failed: + # Otherwise if one of the subtests fail, the + # expected status of the Test is failed. + return TestStatus.FAILURE + elif self.passed: + # Otherwise if one of the subtests pass, the + # expected status of the Test is passed. + return TestStatus.SUCCESS + else: + # Finally, if none of the subtests have failed, + # crashed, or passed, the expected status of the + # Test is skipped. + return TestStatus.SKIPPED + + def add_status(self, status: TestStatus) -> None: + """ + Increments count of inputted status. + + Parameters: + status - status to be added to the TestCounts object + """ + if status == TestStatus.SUCCESS: + self.passed += 1 + elif status == TestStatus.FAILURE: + self.failed += 1 + elif status == TestStatus.SKIPPED: + self.skipped += 1 + elif status != TestStatus.NO_TESTS: + self.crashed += 1 + class LineStream: - """Provides a peek()/pop() interface over an iterator of (line#, text).""" + """ + A class to represent the lines of kernel output. + Provides a peek()/pop() interface over an iterator of + (line#, text). + """ _lines: Iterator[Tuple[int, str]] _next: Tuple[int, str] _done: bool def __init__(self, lines: Iterator[Tuple[int, str]]): + """Creates a new LineStream that wraps the given iterator.""" self._lines = lines self._done = False self._next = (0, '') self._get_next() def _get_next(self) -> None: + """Advances the LineSteam to the next line.""" try: self._next = next(self._lines) except StopIteration: self._done = True def peek(self) -> str: + """Returns the current line, without advancing the LineStream. + """ return self._next[1] def pop(self) -> str: + """Returns the current line and advances the LineStream to + the next line. + """ n = self._next self._get_next() return n[1] def __bool__(self) -> bool: + """Returns True if stream has more lines.""" return not self._done # Only used by kunit_tool_test.py. def __iter__(self) -> Iterator[str]: + """Empties all lines stored in LineStream object into + Iterator object and returns the Iterator object. + """ while bool(self): yield self.pop() def line_number(self) -> int: + """Returns the line number of the current line.""" return self._next[0] -kunit_start_re = re.compile(r'TAP version [0-9]+$') -kunit_end_re = re.compile('(List of all partitions:|' - 'Kernel panic - not syncing: VFS:|reboot: System halted)') +# Parsing helper methods: + +KTAP_START = re.compile(r'KTAP version ([0-9]+)$') +TAP_START = re.compile(r'TAP version ([0-9]+)$') +KTAP_END = re.compile('(List of all partitions:|' + 'Kernel panic - not syncing: VFS:|reboot: System halted)') def extract_tap_lines(kernel_output: Iterable[str]) -> LineStream: - def isolate_kunit_output(kernel_output: Iterable[str]) -> Iterator[Tuple[int, str]]: + """Extracts KTAP lines from the kernel output.""" + def isolate_ktap_output(kernel_output: Iterable[str]) \ + -> Iterator[Tuple[int, str]]: line_num = 0 started = False for line in kernel_output: line_num += 1 - line = line.rstrip() # line always has a trailing \n - if kunit_start_re.search(line): + line = line.rstrip() # remove trailing \n + if not started and KTAP_START.search(line): + # start extracting KTAP lines and set prefix + # to number of characters before version line + prefix_len = len( + line.split('KTAP version')[0]) + started = True + yield line_num, line[prefix_len:] + elif not started and TAP_START.search(line): + # start extracting KTAP lines and set prefix + # to number of characters before version line prefix_len = len(line.split('TAP version')[0]) started = True yield line_num, line[prefix_len:] - elif kunit_end_re.search(line): + elif started and KTAP_END.search(line): + # stop extracting KTAP lines break elif started: - yield line_num, line[prefix_len:] - return LineStream(lines=isolate_kunit_output(kernel_output)) - -DIVIDER = '=' * 60 - -RESET = '\033[0;0m' - -def red(text) -> str: - return '\033[1;31m' + text + RESET - -def yellow(text) -> str: - return '\033[1;33m' + text + RESET - -def green(text) -> str: - return '\033[1;32m' + text + RESET - -def print_with_timestamp(message) -> None: - print('[%s] %s' % (datetime.now().strftime('%H:%M:%S'), message)) - -def format_suite_divider(message) -> str: - return '======== ' + message + ' ========' + # remove prefix and any indention and yield + # line with line number + line = line[prefix_len:].lstrip() + yield line_num, line + return LineStream(lines=isolate_ktap_output(kernel_output)) + +KTAP_VERSIONS = [1] +TAP_VERSIONS = [13, 14] + +def check_version(version_num: int, accepted_versions: List[int], + version_type: str, test: Test) -> None: + """ + Adds error to test object if version number is too high or too + low. + + Parameters: + version_num - The inputted version number from the parsed KTAP or TAP + header line + accepted_version - List of accepted KTAP or TAP versions + version_type - 'KTAP' or 'TAP' depending on the type of + version line. + test - Test object for current test being parsed + """ + if version_num < min(accepted_versions): + test.add_error(version_type + + ' version lower than expected!') + elif version_num > max(accepted_versions): + test.add_error( + version_type + ' version higher than expected!') + +def parse_ktap_header(lines: LineStream, test: Test) -> bool: + """ + Parses KTAP/TAP header line and checks version number. + Returns False if fails to parse KTAP/TAP header line. + + Accepted formats: + - 'KTAP version [version number]' + - 'TAP version [version number]' + + Parameters: + lines - LineStream of KTAP output to parse + test - Test object for current test being parsed + + Return: + True if successfully parsed KTAP/TAP header line + """ + ktap_match = KTAP_START.match(lines.peek()) + tap_match = TAP_START.match(lines.peek()) + if ktap_match: + version_num = int(ktap_match.group(1)) + check_version(version_num, KTAP_VERSIONS, 'KTAP', test) + elif tap_match: + version_num = int(tap_match.group(1)) + check_version(version_num, TAP_VERSIONS, 'TAP', test) + else: + return False + test.log.append(lines.pop()) + return True -def print_suite_divider(message) -> None: - print_with_timestamp(DIVIDER) - print_with_timestamp(format_suite_divider(message)) +TEST_HEADER = re.compile(r'^# Subtest: (.*)$') -def print_log(log) -> None: - for m in log: - print_with_timestamp(m) +def parse_test_header(lines: LineStream, test: Test) -> bool: + """ + Parses test header and stores test name in test object. + Returns False if fails to parse test header line. -TAP_ENTRIES = re.compile(r'^(TAP|[\s]*ok|[\s]*not ok|[\s]*[0-9]+\.\.[0-9]+|[\s]*# (Subtest:|.*: kunit test case crashed!)).*$') + Accepted format: + - '# Subtest: [test name]' -def consume_non_diagnostic(lines: LineStream) -> None: - while lines and not TAP_ENTRIES.match(lines.peek()): - lines.pop() + Parameters: + lines - LineStream of KTAP output to parse + test - Test object for current test being parsed -def save_non_diagnostic(lines: LineStream, test_case: TestCase) -> None: - while lines and not TAP_ENTRIES.match(lines.peek()): - test_case.log.append(lines.peek()) - lines.pop() + Return: + True if successfully parsed test header line + """ + match = TEST_HEADER.match(lines.peek()) + if not match: + return False + test.log.append(lines.pop()) + test.name = match.group(1) + return True -OkNotOkResult = namedtuple('OkNotOkResult', ['is_ok','description', 'text']) +TEST_PLAN = re.compile(r'1\.\.([0-9]+)') -OK_NOT_OK_SKIP = re.compile(r'^[\s]*(ok|not ok) [0-9]+ - (.*) # SKIP(.*)$') +def parse_test_plan(lines: LineStream, test: Test) -> bool: + """ + Parses test plan line and stores the expected number of subtests in + test object. Reports an error if expected count is 0. + Returns False and reports missing test plan error if fails to parse + test plan. -OK_NOT_OK_SUBTEST = re.compile(r'^[\s]+(ok|not ok) [0-9]+ - (.*)$') + Accepted format: + - '1..[number of subtests]' -OK_NOT_OK_MODULE = re.compile(r'^(ok|not ok) ([0-9]+) - (.*)$') + Parameters: + lines - LineStream of KTAP output to parse + test - Test object for current test being parsed -def parse_ok_not_ok_test_case(lines: LineStream, test_case: TestCase) -> bool: - save_non_diagnostic(lines, test_case) - if not lines: - test_case.status = TestStatus.TEST_CRASHED - return True - line = lines.peek() - match = OK_NOT_OK_SUBTEST.match(line) - while not match and lines: - line = lines.pop() - match = OK_NOT_OK_SUBTEST.match(line) - if match: - test_case.log.append(lines.pop()) - test_case.name = match.group(2) - skip_match = OK_NOT_OK_SKIP.match(line) - if skip_match: - test_case.status = TestStatus.SKIPPED - return True - if test_case.status == TestStatus.TEST_CRASHED: - return True - if match.group(1) == 'ok': - test_case.status = TestStatus.SUCCESS - else: - test_case.status = TestStatus.FAILURE - return True - else: + Return: + True if successfully parsed test plan line + """ + match = TEST_PLAN.match(lines.peek()) + if not match: + test.expected_count = None + test.add_error('missing plan line!') return False - -SUBTEST_DIAGNOSTIC = re.compile(r'^[\s]+# (.*)$') -DIAGNOSTIC_CRASH_MESSAGE = re.compile(r'^[\s]+# .*?: kunit test case crashed!$') - -def parse_diagnostic(lines: LineStream, test_case: TestCase) -> bool: - save_non_diagnostic(lines, test_case) - if not lines: + test.log.append(lines.pop()) + expected_count = int(match.group(1)) + test.expected_count = expected_count + if expected_count == 0: + test.status = TestStatus.NO_TESTS + test.add_error('0 tests run!') + return True + +TEST_RESULT = re.compile(r'^(ok|not ok) ([0-9]+) (- )?([^#]*)( # .*)?$') + +TEST_RESULT_SKIP = re.compile(r'^(ok|not ok) ([0-9]+) (- )?(.*) # SKIP(.*)$') + +def peek_test_name_match(lines: LineStream, test: Test) -> bool: + """ + Matches current line with the format of a test result line and checks + if the name matches the name of the current test. + Returns False if fails to match format or name. + + Accepted format: + - '[ok|not ok] [test number] [-] [test name] [optional skip + directive]' + + Parameters: + lines - LineStream of KTAP output to parse + test - Test object for current test being parsed + + Return: + True if matched a test result line and the name matching the + expected test name + """ + line = lines.peek() + match = TEST_RESULT.match(line) + if not match: return False + name = match.group(4) + return (name == test.name) + +def parse_test_result(lines: LineStream, test: Test, + expected_num: int) -> bool: + """ + Parses test result line and stores the status and name in the test + object. Reports an error if the test number does not match expected + test number. + Returns False if fails to parse test result line. + + Note that the SKIP directive is the only direction that causes a + change in status. + + Accepted format: + - '[ok|not ok] [test number] [-] [test name] [optional skip + directive]' + + Parameters: + lines - LineStream of KTAP output to parse + test - Test object for current test being parsed + expected_num - expected test number for current test + + Return: + True if successfully parsed a test result line. + """ line = lines.peek() - match = SUBTEST_DIAGNOSTIC.match(line) - if match: - test_case.log.append(lines.pop()) - crash_match = DIAGNOSTIC_CRASH_MESSAGE.match(line) - if crash_match: - test_case.status = TestStatus.TEST_CRASHED - return True - else: + match = TEST_RESULT.match(line) + skip_match = TEST_RESULT_SKIP.match(line) + + # Check if line matches test result line format + if not match: return False + test.log.append(lines.pop()) -def parse_test_case(lines: LineStream) -> Optional[TestCase]: - test_case = TestCase() - save_non_diagnostic(lines, test_case) - while parse_diagnostic(lines, test_case): - pass - if parse_ok_not_ok_test_case(lines, test_case): - return test_case + # Set name of test object + if skip_match: + test.name = skip_match.group(4) else: - return None - -SUBTEST_HEADER = re.compile(r'^[\s]+# Subtest: (.*)$') - -def parse_subtest_header(lines: LineStream) -> Optional[str]: - consume_non_diagnostic(lines) - if not lines: - return None - match = SUBTEST_HEADER.match(lines.peek()) - if match: - lines.pop() - return match.group(1) + test.name = match.group(4) + + # Check test num + num = int(match.group(2)) + if num != expected_num: + test.add_error('Expected test number ' + + str(expected_num) + ' but found ' + str(num)) + + # Set status of test object + status = match.group(1) + if skip_match: + test.status = TestStatus.SKIPPED + elif status == 'ok': + test.status = TestStatus.SUCCESS else: - return None + test.status = TestStatus.FAILURE + return True + +def parse_diagnostic(lines: LineStream) -> List[str]: + """ + Parse lines that do not match the format of a test result line or + test header line and returns them in list. + + Line formats that are not parsed: + - '# Subtest: [test name]' + - '[ok|not ok] [test number] [-] [test name] [optional skip + directive]' + + Parameters: + lines - LineStream of KTAP output to parse + + Return: + Log of diagnostic lines + """ + log = [] # type: List[str] + while lines and not TEST_RESULT.match(lines.peek()) and not \ + TEST_HEADER.match(lines.peek()): + log.append(lines.pop()) + return log + +DIAGNOSTIC_CRASH_MESSAGE = re.compile(r'^# .*?: kunit test case crashed!$') + +def parse_crash_in_log(test: Test) -> bool: + """ + Iterate through the lines of the log to parse for crash message. + If crash message found, set status to crashed and return True. + Otherwise return False. + + Parameters: + test - Test object for current test being parsed + + Return: + True if crash message found in log + """ + for line in test.log: + if DIAGNOSTIC_CRASH_MESSAGE.match(line): + test.status = TestStatus.TEST_CRASHED + return True + return False -SUBTEST_PLAN = re.compile(r'[\s]+[0-9]+\.\.([0-9]+)') -def parse_subtest_plan(lines: LineStream) -> Optional[int]: - consume_non_diagnostic(lines) - match = SUBTEST_PLAN.match(lines.peek()) - if match: - lines.pop() - return int(match.group(1)) - else: - return None - -def max_status(left: TestStatus, right: TestStatus) -> TestStatus: - if left == right: - return left - elif left == TestStatus.TEST_CRASHED or right == TestStatus.TEST_CRASHED: - return TestStatus.TEST_CRASHED - elif left == TestStatus.FAILURE or right == TestStatus.FAILURE: - return TestStatus.FAILURE - elif left == TestStatus.SKIPPED: - return right - else: - return left +# Printing helper methods: -def parse_ok_not_ok_test_suite(lines: LineStream, - test_suite: TestSuite, - expected_suite_index: int) -> bool: - consume_non_diagnostic(lines) - if not lines: - test_suite.status = TestStatus.TEST_CRASHED - return False - line = lines.peek() - match = OK_NOT_OK_MODULE.match(line) - if match: - lines.pop() - if match.group(1) == 'ok': - test_suite.status = TestStatus.SUCCESS - else: - test_suite.status = TestStatus.FAILURE - skip_match = OK_NOT_OK_SKIP.match(line) - if skip_match: - test_suite.status = TestStatus.SKIPPED - suite_index = int(match.group(2)) - if suite_index != expected_suite_index: - print_with_timestamp( - red('[ERROR] ') + 'expected_suite_index ' + - str(expected_suite_index) + ', but got ' + - str(suite_index)) - return True - else: - return False +DIVIDER = '=' * 60 -def bubble_up_errors(status_list: Iterable[TestStatus]) -> TestStatus: - return reduce(max_status, status_list, TestStatus.SKIPPED) +RESET = '\033[0;0m' -def bubble_up_test_case_errors(test_suite: TestSuite) -> TestStatus: - max_test_case_status = bubble_up_errors(x.status for x in test_suite.cases) - return max_status(max_test_case_status, test_suite.status) +def red(text: str) -> str: + """Returns inputted string with red color code.""" + return '\033[1;31m' + text + RESET -def parse_test_suite(lines: LineStream, expected_suite_index: int) -> Optional[TestSuite]: - if not lines: - return None - consume_non_diagnostic(lines) - test_suite = TestSuite() - test_suite.status = TestStatus.SUCCESS - name = parse_subtest_header(lines) - if not name: - return None - test_suite.name = name - expected_test_case_num = parse_subtest_plan(lines) - if expected_test_case_num is None: - return None - while expected_test_case_num > 0: - test_case = parse_test_case(lines) - if not test_case: - break - test_suite.cases.append(test_case) - expected_test_case_num -= 1 - if parse_ok_not_ok_test_suite(lines, test_suite, expected_suite_index): - test_suite.status = bubble_up_test_case_errors(test_suite) - return test_suite - elif not lines: - print_with_timestamp(red('[ERROR] ') + 'ran out of lines before end token') - return test_suite - else: - print(f'failed to parse end of suite "{name}", at line {lines.line_number()}: {lines.peek()}') - return None +def yellow(text: str) -> str: + """Returns inputted string with yellow color code.""" + return '\033[1;33m' + text + RESET -TAP_HEADER = re.compile(r'^TAP version 14$') +def green(text: str) -> str: + """Returns inputted string with green color code.""" + return '\033[1;32m' + text + RESET -def parse_tap_header(lines: LineStream) -> bool: - consume_non_diagnostic(lines) - if TAP_HEADER.match(lines.peek()): - lines.pop() - return True - else: - return False +ANSI_LEN = len(red('')) -TEST_PLAN = re.compile(r'[0-9]+\.\.([0-9]+)') +def print_with_timestamp(message: str) -> None: + """Prints message with timestamp at beginning.""" + print('[%s] %s' % (datetime.now().strftime('%H:%M:%S'), message)) -def parse_test_plan(lines: LineStream) -> Optional[int]: - consume_non_diagnostic(lines) - match = TEST_PLAN.match(lines.peek()) - if match: - lines.pop() - return int(match.group(1)) - else: - return None - -def bubble_up_suite_errors(test_suites: Iterable[TestSuite]) -> TestStatus: - return bubble_up_errors(x.status for x in test_suites) - -def parse_test_result(lines: LineStream) -> TestResult: - consume_non_diagnostic(lines) - if not lines or not parse_tap_header(lines): - return TestResult(TestStatus.FAILURE_TO_PARSE_TESTS, [], lines) - expected_test_suite_num = parse_test_plan(lines) - if expected_test_suite_num == 0: - return TestResult(TestStatus.NO_TESTS, [], lines) - elif expected_test_suite_num is None: - return TestResult(TestStatus.FAILURE_TO_PARSE_TESTS, [], lines) - test_suites = [] - for i in range(1, expected_test_suite_num + 1): - test_suite = parse_test_suite(lines, i) - if test_suite: - test_suites.append(test_suite) +def format_test_divider(message: str, len_message: int) -> str: + """ + Returns string with message centered in fixed width divider. + + Example: + '===================== message example =====================' + + Parameters: + message - message to be centered in divider line + len_message - length of the message to be printed such that + any characters of the color codes are not counted + + Return: + String containing message centered in fixed width divider + """ + default_count = 3 # default number of dashes + len_1 = default_count + len_2 = default_count + difference = len(DIVIDER) - len_message - 2 # 2 spaces added + if difference > 0: + # calculate number of dashes for each side of the divider + len_1 = int(difference / 2) + len_2 = difference - len_1 + return ('=' * len_1) + ' ' + message + ' ' + ('=' * len_2) + +def print_test_header(test: Test) -> None: + """ + Prints test header with test name and optionally the expected number + of subtests. + + Example: + '=================== example (2 subtests) ===================' + + Parameters: + test - Test object representing current test being printed + """ + message = test.name + if test.expected_count: + if test.expected_count == 1: + message += (' (' + str(test.expected_count) + + ' subtest)') else: - print_with_timestamp( - red('[ERROR] ') + ' expected ' + - str(expected_test_suite_num) + - ' test suites, but got ' + str(i - 2)) - break - test_suite = parse_test_suite(lines, -1) - if test_suite: - print_with_timestamp(red('[ERROR] ') + - 'got unexpected test suite: ' + test_suite.name) - if test_suites: - return TestResult(bubble_up_suite_errors(test_suites), test_suites, lines) - else: - return TestResult(TestStatus.NO_TESTS, [], lines) + message += (' (' + str(test.expected_count) + + ' subtests)') + print_with_timestamp(format_test_divider(message, len(message))) -class TestCounts: - passed: int - failed: int - crashed: int - skipped: int +def print_log(log: Iterable[str]) -> None: + """ + Prints all strings in saved log for test in yellow. - def __init__(self): - self.passed = 0 - self.failed = 0 - self.crashed = 0 - self.skipped = 0 - - def total(self) -> int: - return self.passed + self.failed + self.crashed + self.skipped - -def print_and_count_results(test_result: TestResult) -> TestCounts: - counts = TestCounts() - for test_suite in test_result.suites: - if test_suite.status == TestStatus.SUCCESS: - print_suite_divider(green('[PASSED] ') + test_suite.name) - elif test_suite.status == TestStatus.SKIPPED: - print_suite_divider(yellow('[SKIPPED] ') + test_suite.name) - elif test_suite.status == TestStatus.TEST_CRASHED: - print_suite_divider(red('[CRASHED] ' + test_suite.name)) - else: - print_suite_divider(red('[FAILED] ') + test_suite.name) - for test_case in test_suite.cases: - if test_case.status == TestStatus.SUCCESS: - counts.passed += 1 - print_with_timestamp(green('[PASSED] ') + test_case.name) - elif test_case.status == TestStatus.SKIPPED: - counts.skipped += 1 - print_with_timestamp(yellow('[SKIPPED] ') + test_case.name) - elif test_case.status == TestStatus.TEST_CRASHED: - counts.crashed += 1 - print_with_timestamp(red('[CRASHED] ' + test_case.name)) - print_log(map(yellow, test_case.log)) - print_with_timestamp('') + Parameters: + log - Iterable object with all strings saved in log for test + """ + for m in log: + print_with_timestamp(yellow(m)) + +def format_test_result(test: Test) -> str: + """ + Returns string with formatted test result with colored status and test + name. + + Example: + '[PASSED] example' + + Parameters: + test - Test object representing current test being printed + + Return: + String containing formatted test result + """ + if test.status == TestStatus.SUCCESS: + return (green('[PASSED] ') + test.name) + elif test.status == TestStatus.SKIPPED: + return (yellow('[SKIPPED] ') + test.name) + elif test.status == TestStatus.TEST_CRASHED: + print_log(test.log) + return (red('[CRASHED] ') + test.name) + else: + print_log(test.log) + return (red('[FAILED] ') + test.name) + +def print_test_result(test: Test) -> None: + """ + Prints result line with status of test. + + Example: + '[PASSED] example' + + Parameters: + test - Test object representing current test being printed + """ + print_with_timestamp(format_test_result(test)) + +def print_test_footer(test: Test) -> None: + """ + Prints test footer with status of test. + + Example: + '===================== [PASSED] example =====================' + + Parameters: + test - Test object representing current test being printed + """ + message = format_test_result(test) + print_with_timestamp(format_test_divider(message, + len(message) - ANSI_LEN)) + +def print_summary_line(test: Test) -> None: + """ + Prints summary line of test object. Color of line is dependent on + status of test. Color is green if test passes, yellow if test is + skipped, and red if the test fails or crashes. Summary line contains + counts of the statuses of the tests subtests or the test itself if it + has no subtests. + + Example: + "Testing complete. Passed: 2, Failed: 0, Crashed: 0, Skipped: 0, + Errors: 0" + + test - Test object representing current test being printed + """ + if test.status == TestStatus.SUCCESS: + color = green + elif test.status == TestStatus.SKIPPED or test.status == TestStatus.NO_TESTS: + color = yellow + else: + color = red + counts = test.counts + print_with_timestamp(color('Testing complete. ' + str(counts))) + +def print_error(error_message: str) -> None: + """ + Prints error message with error format. + + Example: + "[ERROR] Test example: missing test plan!" + + Parameters: + error_message - message describing error + """ + print_with_timestamp(red('[ERROR] ') + error_message) + +# Other methods: + +def bubble_up_test_results(test: Test) -> None: + """ + If the test has subtests, add the test counts of the subtests to the + test and check if any of the tests crashed and if so set the test + status to crashed. Otherwise if the test has no subtests add the + status of the test to the test counts. + + Parameters: + test - Test object for current test being parsed + """ + parse_crash_in_log(test) + subtests = test.subtests + counts = test.counts + status = test.status + for t in subtests: + counts.add_subtest_counts(t.counts) + if counts.total() == 0: + counts.add_status(status) + elif test.counts.get_status() == TestStatus.TEST_CRASHED: + test.status = TestStatus.TEST_CRASHED + +def parse_test(lines: LineStream, expected_num: int, log: List[str]) -> Test: + """ + Finds next test to parse in LineStream, creates new Test object, + parses any subtests of the test, populates Test object with all + information (status, name) about the test and the Test objects for + any subtests, and then returns the Test object. The method accepts + three formats of tests: + + Accepted test formats: + + - Main KTAP/TAP header + + Example: + + KTAP version 1 + 1..4 + [subtests] + + - Subtest header line + + Example: + + # Subtest: name + 1..3 + [subtests] + ok 1 name + + - Test result line + + Example: + + ok 1 - test + + Parameters: + lines - LineStream of KTAP output to parse + expected_num - expected test number for test to be parsed + log - list of strings containing any preceding diagnostic lines + corresponding to the current test + + Return: + Test object populated with characteristics and any subtests + """ + test = Test() + test.log.extend(log) + parent_test = False + main = parse_ktap_header(lines, test) + if main: + # If KTAP/TAP header is found, attempt to parse + # test plan + test.name = "main" + parse_test_plan(lines, test) + else: + # If KTAP/TAP header is not found, test must be subtest + # header or test result line so parse attempt to parser + # subtest header + parent_test = parse_test_header(lines, test) + if parent_test: + # If subtest header is found, attempt to parse + # test plan and print header + parse_test_plan(lines, test) + print_test_header(test) + expected_count = test.expected_count + subtests = [] + test_num = 1 + while expected_count is None or test_num <= expected_count: + # Loop to parse any subtests. + # Break after parsing expected number of tests or + # if expected number of tests is unknown break when test + # result line with matching name to subtest header is found + # or no more lines in stream. + sub_log = parse_diagnostic(lines) + sub_test = Test() + if not lines or (peek_test_name_match(lines, test) and + not main): + if expected_count and test_num <= expected_count: + # If parser reaches end of test before + # parsing expected number of subtests, print + # crashed subtest and record error + test.add_error('missing expected subtest!') + sub_test.log.extend(sub_log) + test.counts.add_status( + TestStatus.TEST_CRASHED) + print_test_result(sub_test) else: - counts.failed += 1 - print_with_timestamp(red('[FAILED] ') + test_case.name) - print_log(map(yellow, test_case.log)) - print_with_timestamp('') - return counts + test.log.extend(sub_log) + break + else: + sub_test = parse_test(lines, test_num, sub_log) + subtests.append(sub_test) + test_num += 1 + test.subtests = subtests + if not main: + # If not main test, look for test result line + test.log.extend(parse_diagnostic(lines)) + if (parent_test and peek_test_name_match(lines, test)) or \ + not parent_test: + parse_test_result(lines, test, expected_num) + else: + test.add_error('missing subtest result line!') + # Add statuses to TestCounts attribute in Test object + bubble_up_test_results(test) + if parent_test: + # If test has subtests and is not the main test object, print + # footer. + print_test_footer(test) + elif not main: + print_test_result(test) + return test def parse_run_tests(kernel_output: Iterable[str]) -> TestResult: - counts = TestCounts() + """ + Using kernel output, extract KTAP lines, parse the lines for test + results and print condensed test results and summary line . + + Parameters: + kernel_output - Iterable object contains lines of kernel output + + Return: + TestResult - Tuple containg status of main test object, main test + object with all subtests, and log of all KTAP lines. + """ + print_with_timestamp(DIVIDER) lines = extract_tap_lines(kernel_output) - test_result = parse_test_result(lines) - if test_result.status == TestStatus.NO_TESTS: - print(red('[ERROR] ') + yellow('no tests run!')) - elif test_result.status == TestStatus.FAILURE_TO_PARSE_TESTS: - print(red('[ERROR] ') + yellow('could not parse test results!')) + test = Test() + if not lines: + test.add_error('invalid KTAP input!') + test.status = TestStatus.FAILURE_TO_PARSE_TESTS else: - counts = print_and_count_results(test_result) + test = parse_test(lines, 0, []) + if test.status != TestStatus.NO_TESTS: + test.status = test.counts.get_status() print_with_timestamp(DIVIDER) - if test_result.status == TestStatus.SUCCESS: - fmt = green - elif test_result.status == TestStatus.SKIPPED: - fmt = yellow - else: - fmt =red - print_with_timestamp( - fmt('Testing complete. %d tests run. %d failed. %d crashed. %d skipped.' % - (counts.total(), counts.failed, counts.crashed, counts.skipped))) - return test_result + print_summary_line(test) + return TestResult(test.status, test, lines) |