diff options
Diffstat (limited to 'poky/meta/lib/oeqa/oetest.py')
-rw-r--r-- | poky/meta/lib/oeqa/oetest.py | 616 |
1 files changed, 616 insertions, 0 deletions
diff --git a/poky/meta/lib/oeqa/oetest.py b/poky/meta/lib/oeqa/oetest.py new file mode 100644 index 0000000000..f7171260e7 --- /dev/null +++ b/poky/meta/lib/oeqa/oetest.py @@ -0,0 +1,616 @@ +# Copyright (C) 2013 Intel Corporation +# +# Released under the MIT license (see COPYING.MIT) + +# Main unittest module used by testimage.bbclass +# This provides the oeRuntimeTest base class which is inherited by all tests in meta/lib/oeqa/runtime. + +# It also has some helper functions and it's responsible for actually starting the tests + +import os, re, mmap, sys +import unittest +import inspect +import subprocess +import signal +import shutil +import functools +try: + import bb +except ImportError: + pass +import logging + +import oeqa.runtime +# Exported test doesn't require sdkext +try: + import oeqa.sdkext +except ImportError: + pass +from oeqa.utils.decorators import LogResults, gettag, getResults + +logger = logging.getLogger("BitBake") + +def getVar(obj): + #extend form dict, if a variable didn't exists, need find it in testcase + class VarDict(dict): + def __getitem__(self, key): + return gettag(obj, key) + return VarDict() + +def checkTags(tc, tagexp): + return eval(tagexp, None, getVar(tc)) + +def filterByTagExp(testsuite, tagexp): + if not tagexp: + return testsuite + caseList = [] + for each in testsuite: + if not isinstance(each, unittest.BaseTestSuite): + if checkTags(each, tagexp): + caseList.append(each) + else: + caseList.append(filterByTagExp(each, tagexp)) + return testsuite.__class__(caseList) + +@LogResults +class oeTest(unittest.TestCase): + + pscmd = "ps" + longMessage = True + + @classmethod + def hasPackage(self, pkg): + """ + True if the full package name exists in the manifest, False otherwise. + """ + return pkg in oeTest.tc.pkgmanifest + + @classmethod + def hasPackageMatch(self, match): + """ + True if match exists in the manifest as a regular expression substring, + False otherwise. + """ + for s in oeTest.tc.pkgmanifest: + if re.match(match, s): + return True + return False + + @classmethod + def hasFeature(self,feature): + if feature in oeTest.tc.imagefeatures or \ + feature in oeTest.tc.distrofeatures: + return True + else: + return False + +class oeRuntimeTest(oeTest): + def __init__(self, methodName='runTest'): + self.target = oeRuntimeTest.tc.target + super(oeRuntimeTest, self).__init__(methodName) + + def setUp(self): + # Install packages in the DUT + self.tc.install_uninstall_packages(self.id()) + + # Check if test needs to run + if self.tc.sigterm: + self.fail("Got SIGTERM") + elif (type(self.target).__name__ == "QemuTarget"): + self.assertTrue(self.target.check(), msg = "Qemu not running?") + + self.setUpLocal() + + # a setup method before tests but after the class instantiation + def setUpLocal(self): + pass + + def tearDown(self): + # Uninstall packages in the DUT + self.tc.install_uninstall_packages(self.id(), False) + + res = getResults() + # If a test fails or there is an exception dump + # for QemuTarget only + if (type(self.target).__name__ == "QemuTarget" and + (self.id() in res.getErrorList() or + self.id() in res.getFailList())): + self.tc.host_dumper.create_dir(self._testMethodName) + self.tc.host_dumper.dump_host() + self.target.target_dumper.dump_target( + self.tc.host_dumper.dump_dir) + print ("%s dump data stored in %s" % (self._testMethodName, + self.tc.host_dumper.dump_dir)) + + self.tearDownLocal() + + # Method to be run after tearDown and implemented by child classes + def tearDownLocal(self): + pass + +def getmodule(pos=2): + # stack returns a list of tuples containg frame information + # First element of the list the is current frame, caller is 1 + frameinfo = inspect.stack()[pos] + modname = inspect.getmodulename(frameinfo[1]) + #modname = inspect.getmodule(frameinfo[0]).__name__ + return modname + +def skipModule(reason, pos=2): + modname = getmodule(pos) + if modname not in oeTest.tc.testsrequired: + raise unittest.SkipTest("%s: %s" % (modname, reason)) + else: + raise Exception("\nTest %s wants to be skipped.\nReason is: %s" \ + "\nTest was required in TEST_SUITES, so either the condition for skipping is wrong" \ + "\nor the image really doesn't have the required feature/package when it should." % (modname, reason)) + +def skipModuleIf(cond, reason): + + if cond: + skipModule(reason, 3) + +def skipModuleUnless(cond, reason): + + if not cond: + skipModule(reason, 3) + +_buffer_logger = "" +def custom_verbose(msg, *args, **kwargs): + global _buffer_logger + if msg[-1] != "\n": + _buffer_logger += msg + else: + _buffer_logger += msg + try: + bb.plain(_buffer_logger.rstrip("\n"), *args, **kwargs) + except NameError: + logger.info(_buffer_logger.rstrip("\n"), *args, **kwargs) + _buffer_logger = "" + +class TestContext(object): + def __init__(self, d, exported=False): + self.d = d + + self.testsuites = self._get_test_suites() + + if exported: + path = [os.path.dirname(os.path.abspath(__file__))] + extrapath = "" + else: + path = d.getVar("BBPATH").split(':') + extrapath = "lib/oeqa" + + self.testslist = self._get_tests_list(path, extrapath) + self.testsrequired = self._get_test_suites_required() + + self.filesdir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "runtime/files") + self.corefilesdir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "files") + self.imagefeatures = d.getVar("IMAGE_FEATURES").split() + self.distrofeatures = d.getVar("DISTRO_FEATURES").split() + + # get testcase list from specified file + # if path is a relative path, then relative to build/conf/ + def _read_testlist(self, fpath, builddir): + if not os.path.isabs(fpath): + fpath = os.path.join(builddir, "conf", fpath) + if not os.path.exists(fpath): + bb.fatal("No such manifest file: ", fpath) + tcs = [] + for line in open(fpath).readlines(): + line = line.strip() + if line and not line.startswith("#"): + tcs.append(line) + return " ".join(tcs) + + # return test list by type also filter if TEST_SUITES is specified + def _get_tests_list(self, bbpath, extrapath): + testslist = [] + + type = self._get_test_namespace() + + # This relies on lib/ under each directory in BBPATH being added to sys.path + # (as done by default in base.bbclass) + for testname in self.testsuites: + if testname != "auto": + if testname.startswith("oeqa."): + testslist.append(testname) + continue + found = False + for p in bbpath: + if os.path.exists(os.path.join(p, extrapath, type, testname + ".py")): + testslist.append("oeqa." + type + "." + testname) + found = True + break + elif os.path.exists(os.path.join(p, extrapath, type, testname.split(".")[0] + ".py")): + testslist.append("oeqa." + type + "." + testname) + found = True + break + if not found: + bb.fatal('Test %s specified in TEST_SUITES could not be found in lib/oeqa/runtime under BBPATH' % testname) + + if "auto" in self.testsuites: + def add_auto_list(path): + files = sorted([f for f in os.listdir(path) if f.endswith('.py') and not f.startswith('_')]) + for f in files: + module = 'oeqa.' + type + '.' + f[:-3] + if module not in testslist: + testslist.append(module) + + for p in bbpath: + testpath = os.path.join(p, 'lib', 'oeqa', type) + bb.debug(2, 'Searching for tests in %s' % testpath) + if os.path.exists(testpath): + add_auto_list(testpath) + + return testslist + + def getTestModules(self): + """ + Returns all the test modules in the testlist. + """ + + import pkgutil + + modules = [] + for test in self.testslist: + if re.search("\w+\.\w+\.test_\S+", test): + test = '.'.join(t.split('.')[:3]) + module = pkgutil.get_loader(test) + modules.append(module) + + return modules + + def getModulefromID(self, test_id): + """ + Returns the test module based on a test id. + """ + + module_name = ".".join(test_id.split(".")[:3]) + modules = self.getTestModules() + for module in modules: + if module.name == module_name: + return module + + return None + + def getTests(self, test): + '''Return all individual tests executed when running the suite.''' + # Unfortunately unittest does not have an API for this, so we have + # to rely on implementation details. This only needs to work + # for TestSuite containing TestCase. + method = getattr(test, '_testMethodName', None) + if method: + # leaf case: a TestCase + yield test + else: + # Look into TestSuite. + tests = getattr(test, '_tests', []) + for t1 in tests: + for t2 in self.getTests(t1): + yield t2 + + def loadTests(self): + setattr(oeTest, "tc", self) + + testloader = unittest.TestLoader() + testloader.sortTestMethodsUsing = None + suites = [testloader.loadTestsFromName(name) for name in self.testslist] + suites = filterByTagExp(suites, getattr(self, "tagexp", None)) + + # Determine dependencies between suites by looking for @skipUnlessPassed + # method annotations. Suite A depends on suite B if any method in A + # depends on a method on B. + for suite in suites: + suite.dependencies = [] + suite.depth = 0 + for test in self.getTests(suite): + methodname = getattr(test, '_testMethodName', None) + if methodname: + method = getattr(test, methodname) + depends_on = getattr(method, '_depends_on', None) + if depends_on: + for dep_suite in suites: + if depends_on in [getattr(t, '_testMethodName', None) for t in self.getTests(dep_suite)]: + if dep_suite not in suite.dependencies and \ + dep_suite is not suite: + suite.dependencies.append(dep_suite) + break + else: + logger.warning("Test %s was declared as @skipUnlessPassed('%s') but that test is either not defined or not active. Will run the test anyway." % + (test, depends_on)) + + # Use brute-force topological sort to determine ordering. Sort by + # depth (higher depth = must run later), with original ordering to + # break ties. + def set_suite_depth(suite): + for dep in suite.dependencies: + new_depth = set_suite_depth(dep) + 1 + if new_depth > suite.depth: + suite.depth = new_depth + return suite.depth + + for index, suite in enumerate(suites): + set_suite_depth(suite) + suite.index = index + + def cmp(a, b): + return (a > b) - (a < b) + + def cmpfunc(a, b): + return cmp((a.depth, a.index), (b.depth, b.index)) + + suites.sort(key=functools.cmp_to_key(cmpfunc)) + + self.suite = testloader.suiteClass(suites) + + return self.suite + + def runTests(self): + logger.info("Test modules %s" % self.testslist) + if hasattr(self, "tagexp") and self.tagexp: + logger.info("Filter test cases by tags: %s" % self.tagexp) + logger.info("Found %s tests" % self.suite.countTestCases()) + runner = unittest.TextTestRunner(verbosity=2) + if 'bb' in sys.modules: + runner.stream.write = custom_verbose + + return runner.run(self.suite) + +class RuntimeTestContext(TestContext): + def __init__(self, d, target, exported=False): + super(RuntimeTestContext, self).__init__(d, exported) + + self.target = target + + self.pkgmanifest = {} + manifest = os.path.join(d.getVar("DEPLOY_DIR_IMAGE"), + d.getVar("IMAGE_LINK_NAME") + ".manifest") + nomanifest = d.getVar("IMAGE_NO_MANIFEST") + if nomanifest is None or nomanifest != "1": + try: + with open(manifest) as f: + for line in f: + (pkg, arch, version) = line.strip().split() + self.pkgmanifest[pkg] = (version, arch) + except IOError as e: + bb.fatal("No package manifest file found. Did you build the image?\n%s" % e) + + def _get_test_namespace(self): + return "runtime" + + def _get_test_suites(self): + testsuites = [] + + manifests = (self.d.getVar("TEST_SUITES_MANIFEST") or '').split() + if manifests: + for manifest in manifests: + testsuites.extend(self._read_testlist(manifest, + self.d.getVar("TOPDIR")).split()) + + else: + testsuites = self.d.getVar("TEST_SUITES").split() + + return testsuites + + def _get_test_suites_required(self): + return [t for t in self.d.getVar("TEST_SUITES").split() if t != "auto"] + + def loadTests(self): + super(RuntimeTestContext, self).loadTests() + if oeTest.hasPackage("procps"): + oeRuntimeTest.pscmd = "ps -ef" + + def extract_packages(self): + """ + Find packages that will be needed during runtime. + """ + + modules = self.getTestModules() + bbpaths = self.d.getVar("BBPATH").split(":") + + shutil.rmtree(self.d.getVar("TEST_EXTRACTED_DIR")) + shutil.rmtree(self.d.getVar("TEST_PACKAGED_DIR")) + for module in modules: + json_file = self._getJsonFile(module) + if json_file: + needed_packages = self._getNeededPackages(json_file) + self._perform_package_extraction(needed_packages) + + def _perform_package_extraction(self, needed_packages): + """ + Extract packages that will be needed during runtime. + """ + + import oe.path + + extracted_path = self.d.getVar("TEST_EXTRACTED_DIR") + packaged_path = self.d.getVar("TEST_PACKAGED_DIR") + + for key,value in needed_packages.items(): + packages = () + if isinstance(value, dict): + packages = (value, ) + elif isinstance(value, list): + packages = value + else: + bb.fatal("Failed to process needed packages for %s; " + "Value must be a dict or list" % key) + + for package in packages: + pkg = package["pkg"] + rm = package.get("rm", False) + extract = package.get("extract", True) + if extract: + dst_dir = os.path.join(extracted_path, pkg) + else: + dst_dir = os.path.join(packaged_path) + + # Extract package and copy it to TEST_EXTRACTED_DIR + pkg_dir = self._extract_in_tmpdir(pkg) + if extract: + + # Same package used for more than one test, + # don't need to extract again. + if os.path.exists(dst_dir): + continue + oe.path.copytree(pkg_dir, dst_dir) + shutil.rmtree(pkg_dir) + + # Copy package to TEST_PACKAGED_DIR + else: + self._copy_package(pkg) + + def _getJsonFile(self, module): + """ + Returns the path of the JSON file for a module, empty if doesn't exitst. + """ + + module_file = module.path + json_file = "%s.json" % module_file.rsplit(".", 1)[0] + if os.path.isfile(module_file) and os.path.isfile(json_file): + return json_file + else: + return "" + + def _getNeededPackages(self, json_file, test=None): + """ + Returns a dict with needed packages based on a JSON file. + + + If a test is specified it will return the dict just for that test. + """ + + import json + + needed_packages = {} + + with open(json_file) as f: + test_packages = json.load(f) + for key,value in test_packages.items(): + needed_packages[key] = value + + if test: + if test in needed_packages: + needed_packages = needed_packages[test] + else: + needed_packages = {} + + return needed_packages + + def _extract_in_tmpdir(self, pkg): + """" + Returns path to a temp directory where the package was + extracted without dependencies. + """ + + from oeqa.utils.package_manager import get_package_manager + + pkg_path = os.path.join(self.d.getVar("TEST_INSTALL_TMP_DIR"), pkg) + pm = get_package_manager(self.d, pkg_path) + extract_dir = pm.extract(pkg) + shutil.rmtree(pkg_path) + + return extract_dir + + def _copy_package(self, pkg): + """ + Copy the RPM, DEB or IPK package to dst_dir + """ + + from oeqa.utils.package_manager import get_package_manager + + pkg_path = os.path.join(self.d.getVar("TEST_INSTALL_TMP_DIR"), pkg) + dst_dir = self.d.getVar("TEST_PACKAGED_DIR") + pm = get_package_manager(self.d, pkg_path) + pkg_info = pm.package_info(pkg) + file_path = pkg_info[pkg]["filepath"] + shutil.copy2(file_path, dst_dir) + shutil.rmtree(pkg_path) + + def install_uninstall_packages(self, test_id, pkg_dir, install): + """ + Check if the test requires a package and Install/Uninstall it in the DUT + """ + + test = test_id.split(".")[4] + module = self.getModulefromID(test_id) + json = self._getJsonFile(module) + if json: + needed_packages = self._getNeededPackages(json, test) + if needed_packages: + self._install_uninstall_packages(needed_packages, pkg_dir, install) + + def _install_uninstall_packages(self, needed_packages, pkg_dir, install=True): + """ + Install/Uninstall packages in the DUT without using a package manager + """ + + if isinstance(needed_packages, dict): + packages = [needed_packages] + elif isinstance(needed_packages, list): + packages = needed_packages + + for package in packages: + pkg = package["pkg"] + rm = package.get("rm", False) + extract = package.get("extract", True) + src_dir = os.path.join(pkg_dir, pkg) + + # Install package + if install and extract: + self.target.connection.copy_dir_to(src_dir, "/") + + # Uninstall package + elif not install and rm: + self.target.connection.delete_dir_structure(src_dir, "/") + +class ImageTestContext(RuntimeTestContext): + def __init__(self, d, target, host_dumper): + super(ImageTestContext, self).__init__(d, target) + + self.tagexp = d.getVar("TEST_SUITES_TAGS") + + self.host_dumper = host_dumper + + self.sigterm = False + self.origsigtermhandler = signal.getsignal(signal.SIGTERM) + signal.signal(signal.SIGTERM, self._sigterm_exception) + + def _sigterm_exception(self, signum, stackframe): + bb.warn("TestImage received SIGTERM, shutting down...") + self.sigterm = True + self.target.stop() + + def install_uninstall_packages(self, test_id, install=True): + """ + Check if the test requires a package and Install/Uninstall it in the DUT + """ + + pkg_dir = self.d.getVar("TEST_EXTRACTED_DIR") + super(ImageTestContext, self).install_uninstall_packages(test_id, pkg_dir, install) + +class ExportTestContext(RuntimeTestContext): + def __init__(self, d, target, exported=False, parsedArgs={}): + """ + This class is used when exporting tests and when are executed outside OE environment. + + parsedArgs can contain the following: + - tag: Filter test by tag. + """ + super(ExportTestContext, self).__init__(d, target, exported) + + tag = parsedArgs.get("tag", None) + self.tagexp = tag if tag != None else d.getVar("TEST_SUITES_TAGS") + + self.sigterm = None + + def install_uninstall_packages(self, test_id, install=True): + """ + Check if the test requires a package and Install/Uninstall it in the DUT + """ + + export_dir = os.path.dirname(os.path.dirname(os.path.realpath(__file__))) + extracted_dir = self.d.getVar("TEST_EXPORT_EXTRACTED_DIR") + pkg_dir = os.path.join(export_dir, extracted_dir) + super(ExportTestContext, self).install_uninstall_packages(test_id, pkg_dir, install) |