diff options
Diffstat (limited to 'poky/bitbake/lib')
-rw-r--r-- | poky/bitbake/lib/bb/__init__.py | 14 | ||||
-rw-r--r-- | poky/bitbake/lib/bb/cooker.py | 9 | ||||
-rw-r--r-- | poky/bitbake/lib/bb/fetch2/gcp.py | 1 | ||||
-rw-r--r-- | poky/bitbake/lib/bb/fetch2/wget.py | 25 | ||||
-rw-r--r-- | poky/bitbake/lib/bb/parse/__init__.py | 12 | ||||
-rw-r--r-- | poky/bitbake/lib/bb/runqueue.py | 99 | ||||
-rw-r--r-- | poky/bitbake/lib/bb/siggen.py | 11 | ||||
-rw-r--r-- | poky/bitbake/lib/bb/tests/fetch.py | 8 | ||||
-rw-r--r-- | poky/bitbake/lib/hashserv/client.py | 106 | ||||
-rw-r--r-- | poky/bitbake/lib/hashserv/tests.py | 75 |
10 files changed, 294 insertions, 66 deletions
diff --git a/poky/bitbake/lib/bb/__init__.py b/poky/bitbake/lib/bb/__init__.py index 15013540c2..cdec9e4d6c 100644 --- a/poky/bitbake/lib/bb/__init__.py +++ b/poky/bitbake/lib/bb/__init__.py @@ -9,7 +9,7 @@ # SPDX-License-Identifier: GPL-2.0-only # -__version__ = "2.9.0" +__version__ = "2.8.0" import sys if sys.version_info < (3, 8, 0): @@ -36,6 +36,7 @@ class BBHandledException(Exception): import os import logging +from collections import namedtuple class NullHandler(logging.Handler): @@ -227,3 +228,14 @@ def deprecate_import(current, modulename, fromlist, renames = None): setattr(sys.modules[current], newname, newobj) +TaskData = namedtuple("TaskData", [ + "pn", + "taskname", + "fn", + "deps", + "provides", + "taskhash", + "unihash", + "hashfn", + "taskhash_deps", +]) diff --git a/poky/bitbake/lib/bb/cooker.py b/poky/bitbake/lib/bb/cooker.py index c5bfef55d6..6318ef4a8f 100644 --- a/poky/bitbake/lib/bb/cooker.py +++ b/poky/bitbake/lib/bb/cooker.py @@ -315,13 +315,13 @@ class BBCooker: dbfile = (self.data.getVar("PERSISTENT_DIR") or self.data.getVar("CACHE")) + "/hashserv.db" upstream = self.data.getVar("BB_HASHSERVE_UPSTREAM") or None if upstream: - import socket try: - sock = socket.create_connection(upstream.split(":"), 5) - sock.close() - except socket.error as e: + with hashserv.create_client(upstream) as client: + client.ping() + except (ConnectionError, ImportError) as e: bb.warn("BB_HASHSERVE_UPSTREAM is not valid, unable to connect hash equivalence server at '%s': %s" % (upstream, repr(e))) + upstream = None self.hashservaddr = "unix://%s/hashserve.sock" % self.data.getVar("TOPDIR") self.hashserv = hashserv.create_server( @@ -1459,7 +1459,6 @@ class BBCooker: if t in task or getAllTaskSignatures: try: - rq.rqdata.prepare_task_hash(tid) sig.append([pn, t, rq.rqdata.get_task_unihash(tid)]) except KeyError: sig.append(self.getTaskSignatures(target, [t])[0]) diff --git a/poky/bitbake/lib/bb/fetch2/gcp.py b/poky/bitbake/lib/bb/fetch2/gcp.py index f40ce2eaa5..eb3e0c6a6b 100644 --- a/poky/bitbake/lib/bb/fetch2/gcp.py +++ b/poky/bitbake/lib/bb/fetch2/gcp.py @@ -23,6 +23,7 @@ import urllib.parse, urllib.error from bb.fetch2 import FetchMethod from bb.fetch2 import FetchError from bb.fetch2 import logger +from bb.fetch2 import runfetchcmd class GCP(FetchMethod): """ diff --git a/poky/bitbake/lib/bb/fetch2/wget.py b/poky/bitbake/lib/bb/fetch2/wget.py index fbfa6938ac..2e92117634 100644 --- a/poky/bitbake/lib/bb/fetch2/wget.py +++ b/poky/bitbake/lib/bb/fetch2/wget.py @@ -108,7 +108,8 @@ class Wget(FetchMethod): fetchcmd = self.basecmd - localpath = os.path.join(d.getVar("DL_DIR"), ud.localfile) + ".tmp" + dldir = os.path.realpath(d.getVar("DL_DIR")) + localpath = os.path.join(dldir, ud.localfile) + ".tmp" bb.utils.mkdirhier(os.path.dirname(localpath)) fetchcmd += " -O %s" % shlex.quote(localpath) @@ -128,12 +129,21 @@ class Wget(FetchMethod): uri = ud.url.split(";")[0] if os.path.exists(ud.localpath): # file exists, but we didnt complete it.. trying again.. - fetchcmd += d.expand(" -c -P ${DL_DIR} '%s'" % uri) + fetchcmd += " -c -P " + dldir + " '" + uri + "'" else: - fetchcmd += d.expand(" -P ${DL_DIR} '%s'" % uri) + fetchcmd += " -P " + dldir + " '" + uri + "'" self._runwget(ud, d, fetchcmd, False) + # Sanity check since wget can pretend it succeed when it didn't + # Also, this used to happen if sourceforge sent us to the mirror page + if not os.path.exists(localpath): + raise FetchError("The fetch command returned success for url %s but %s doesn't exist?!" % (uri, localpath), uri) + + if os.path.getsize(localpath) == 0: + os.remove(localpath) + raise FetchError("The fetch of %s resulted in a zero size file?! Deleting and failing since this isn't right." % (uri), uri) + # Try and verify any checksum now, meaning if it isn't correct, we don't remove the # original file, which might be a race (imagine two recipes referencing the same # source, one with an incorrect checksum) @@ -143,15 +153,6 @@ class Wget(FetchMethod): # Our lock prevents multiple writers but mirroring code may grab incomplete files os.rename(localpath, localpath[:-4]) - # Sanity check since wget can pretend it succeed when it didn't - # Also, this used to happen if sourceforge sent us to the mirror page - if not os.path.exists(ud.localpath): - raise FetchError("The fetch command returned success for url %s but %s doesn't exist?!" % (uri, ud.localpath), uri) - - if os.path.getsize(ud.localpath) == 0: - os.remove(ud.localpath) - raise FetchError("The fetch of %s resulted in a zero size file?! Deleting and failing since this isn't right." % (uri), uri) - return True def checkstatus(self, fetch, ud, d, try_again=True): diff --git a/poky/bitbake/lib/bb/parse/__init__.py b/poky/bitbake/lib/bb/parse/__init__.py index a4358f1374..7ffdaa6fd7 100644 --- a/poky/bitbake/lib/bb/parse/__init__.py +++ b/poky/bitbake/lib/bb/parse/__init__.py @@ -49,20 +49,23 @@ class SkipPackage(SkipRecipe): __mtime_cache = {} def cached_mtime(f): if f not in __mtime_cache: - __mtime_cache[f] = os.stat(f)[stat.ST_MTIME] + res = os.stat(f) + __mtime_cache[f] = (res.st_mtime_ns, res.st_size, res.st_ino) return __mtime_cache[f] def cached_mtime_noerror(f): if f not in __mtime_cache: try: - __mtime_cache[f] = os.stat(f)[stat.ST_MTIME] + res = os.stat(f) + __mtime_cache[f] = (res.st_mtime_ns, res.st_size, res.st_ino) except OSError: return 0 return __mtime_cache[f] def check_mtime(f, mtime): try: - current_mtime = os.stat(f)[stat.ST_MTIME] + res = os.stat(f) + current_mtime = (res.st_mtime_ns, res.st_size, res.st_ino) __mtime_cache[f] = current_mtime except OSError: current_mtime = 0 @@ -70,7 +73,8 @@ def check_mtime(f, mtime): def update_mtime(f): try: - __mtime_cache[f] = os.stat(f)[stat.ST_MTIME] + res = os.stat(f) + __mtime_cache[f] = (res.st_mtime_ns, res.st_size, res.st_ino) except OSError: if f in __mtime_cache: del __mtime_cache[f] diff --git a/poky/bitbake/lib/bb/runqueue.py b/poky/bitbake/lib/bb/runqueue.py index bc7e18175d..93079a9776 100644 --- a/poky/bitbake/lib/bb/runqueue.py +++ b/poky/bitbake/lib/bb/runqueue.py @@ -1273,27 +1273,41 @@ class RunQueueData: bb.parse.siggen.set_setscene_tasks(self.runq_setscene_tids) + starttime = time.time() + lasttime = starttime + # Iterate over the task list and call into the siggen code dealtwith = set() todeal = set(self.runtaskentries) while todeal: + ready = set() for tid in todeal.copy(): if not (self.runtaskentries[tid].depends - dealtwith): - dealtwith.add(tid) - todeal.remove(tid) - self.prepare_task_hash(tid) - bb.event.check_for_interrupts(self.cooker.data) + self.runtaskentries[tid].taskhash_deps = bb.parse.siggen.prep_taskhash(tid, self.runtaskentries[tid].depends, self.dataCaches) + # get_taskhash for a given tid *must* be called before get_unihash* below + self.runtaskentries[tid].hash = bb.parse.siggen.get_taskhash(tid, self.runtaskentries[tid].depends, self.dataCaches) + ready.add(tid) + unihashes = bb.parse.siggen.get_unihashes(ready) + for tid in ready: + dealtwith.add(tid) + todeal.remove(tid) + self.runtaskentries[tid].unihash = unihashes[tid] + + bb.event.check_for_interrupts(self.cooker.data) + + if time.time() > (lasttime + 30): + lasttime = time.time() + hashequiv_logger.verbose("Initial setup loop progress: %s of %s in %s" % (len(todeal), len(self.runtaskentries), lasttime - starttime)) + + endtime = time.time() + if (endtime-starttime > 60): + hashequiv_logger.verbose("Initial setup loop took: %s" % (endtime-starttime)) bb.parse.siggen.writeout_file_checksum_cache() #self.dump_data() return len(self.runtaskentries) - def prepare_task_hash(self, tid): - bb.parse.siggen.prep_taskhash(tid, self.runtaskentries[tid].depends, self.dataCaches) - self.runtaskentries[tid].hash = bb.parse.siggen.get_taskhash(tid, self.runtaskentries[tid].depends, self.dataCaches) - self.runtaskentries[tid].unihash = bb.parse.siggen.get_unihash(tid) - def dump_data(self): """ Dump some debug information on the internal data structures @@ -2438,14 +2452,17 @@ class RunQueueExecute: taskdepdata_cache = {} for task in self.rqdata.runtaskentries: (mc, fn, taskname, taskfn) = split_tid_mcfn(task) - pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn] - deps = self.rqdata.runtaskentries[task].depends - provides = self.rqdata.dataCaches[mc].fn_provides[taskfn] - taskhash = self.rqdata.runtaskentries[task].hash - unihash = self.rqdata.runtaskentries[task].unihash - deps = self.filtermcdeps(task, mc, deps) - hashfn = self.rqdata.dataCaches[mc].hashfn[taskfn] - taskdepdata_cache[task] = [pn, taskname, fn, deps, provides, taskhash, unihash, hashfn] + taskdepdata_cache[task] = bb.TaskData( + pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn], + taskname = taskname, + fn = fn, + deps = self.filtermcdeps(task, mc, self.rqdata.runtaskentries[task].depends), + provides = self.rqdata.dataCaches[mc].fn_provides[taskfn], + taskhash = self.rqdata.runtaskentries[task].hash, + unihash = self.rqdata.runtaskentries[task].unihash, + hashfn = self.rqdata.dataCaches[mc].hashfn[taskfn], + taskhash_deps = self.rqdata.runtaskentries[task].taskhash_deps, + ) self.taskdepdata_cache = taskdepdata_cache @@ -2460,9 +2477,11 @@ class RunQueueExecute: while next: additional = [] for revdep in next: - self.taskdepdata_cache[revdep][6] = self.rqdata.runtaskentries[revdep].unihash + self.taskdepdata_cache[revdep] = self.taskdepdata_cache[revdep]._replace( + unihash=self.rqdata.runtaskentries[revdep].unihash + ) taskdepdata[revdep] = self.taskdepdata_cache[revdep] - for revdep2 in self.taskdepdata_cache[revdep][3]: + for revdep2 in self.taskdepdata_cache[revdep].deps: if revdep2 not in taskdepdata: additional.append(revdep2) next = additional @@ -2556,17 +2575,28 @@ class RunQueueExecute: elif self.rqdata.runtaskentries[p].depends.isdisjoint(total): next.add(p) + starttime = time.time() + lasttime = starttime + # When an item doesn't have dependencies in total, we can process it. Drop items from total when handled while next: current = next.copy() next = set() + ready = {} for tid in current: if self.rqdata.runtaskentries[p].depends and not self.rqdata.runtaskentries[tid].depends.isdisjoint(total): continue + # get_taskhash for a given tid *must* be called before get_unihash* below + ready[tid] = bb.parse.siggen.get_taskhash(tid, self.rqdata.runtaskentries[tid].depends, self.rqdata.dataCaches) + + unihashes = bb.parse.siggen.get_unihashes(ready.keys()) + + for tid in ready: orighash = self.rqdata.runtaskentries[tid].hash - newhash = bb.parse.siggen.get_taskhash(tid, self.rqdata.runtaskentries[tid].depends, self.rqdata.dataCaches) + newhash = ready[tid] origuni = self.rqdata.runtaskentries[tid].unihash - newuni = bb.parse.siggen.get_unihash(tid) + newuni = unihashes[tid] + # FIXME, need to check it can come from sstate at all for determinism? remapped = False if newuni == origuni: @@ -2587,6 +2617,15 @@ class RunQueueExecute: next |= self.rqdata.runtaskentries[tid].revdeps total.remove(tid) next.intersection_update(total) + bb.event.check_for_interrupts(self.cooker.data) + + if time.time() > (lasttime + 30): + lasttime = time.time() + hashequiv_logger.verbose("Rehash loop slow progress: %s in %s" % (len(total), lasttime - starttime)) + + endtime = time.time() + if (endtime-starttime > 60): + hashequiv_logger.verbose("Rehash loop took more than 60s: %s" % (endtime-starttime)) if changed: for mc in self.rq.worker: @@ -2806,13 +2845,19 @@ class RunQueueExecute: additional = [] for revdep in next: (mc, fn, taskname, taskfn) = split_tid_mcfn(revdep) - pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn] deps = getsetscenedeps(revdep) - provides = self.rqdata.dataCaches[mc].fn_provides[taskfn] - taskhash = self.rqdata.runtaskentries[revdep].hash - unihash = self.rqdata.runtaskentries[revdep].unihash - hashfn = self.rqdata.dataCaches[mc].hashfn[taskfn] - taskdepdata[revdep] = [pn, taskname, fn, deps, provides, taskhash, unihash, hashfn] + + taskdepdata[revdep] = bb.TaskData( + pn = self.rqdata.dataCaches[mc].pkg_fn[taskfn], + taskname = taskname, + fn = fn, + deps = deps, + provides = self.rqdata.dataCaches[mc].fn_provides[taskfn], + taskhash = self.rqdata.runtaskentries[revdep].hash, + unihash = self.rqdata.runtaskentries[revdep].unihash, + hashfn = self.rqdata.dataCaches[mc].hashfn[taskfn], + taskhash_deps = self.rqdata.runtaskentries[revdep].taskhash_deps, + ) for revdep2 in deps: if revdep2 not in taskdepdata: additional.append(revdep2) diff --git a/poky/bitbake/lib/bb/siggen.py b/poky/bitbake/lib/bb/siggen.py index 8ab08ec961..65ca0811d5 100644 --- a/poky/bitbake/lib/bb/siggen.py +++ b/poky/bitbake/lib/bb/siggen.py @@ -381,7 +381,7 @@ class SignatureGeneratorBasic(SignatureGenerator): self.taints[tid] = taint logger.warning("%s is tainted from a forced run" % tid) - return + return set(dep for _, dep in self.runtaskdeps[tid]) def get_taskhash(self, tid, deps, dataCaches): @@ -726,10 +726,13 @@ class SignatureGeneratorUniHashMixIn(object): return result if self.max_parallel <= 1 or len(queries) <= 1: - # No parallelism required. Make the query serially with the single client + # No parallelism required. Make the query using a single client with self.client() as client: - for tid, args in queries.items(): - query_result[tid] = client.get_unihash(*args) + keys = list(queries.keys()) + unihashes = client.get_unihash_batch(queries[k] for k in keys) + + for idx, k in enumerate(keys): + query_result[k] = unihashes[idx] else: with self.client_pool() as client_pool: query_result = client_pool.get_unihashes(queries) diff --git a/poky/bitbake/lib/bb/tests/fetch.py b/poky/bitbake/lib/bb/tests/fetch.py index 85c1f79ff3..33cc9bcac6 100644 --- a/poky/bitbake/lib/bb/tests/fetch.py +++ b/poky/bitbake/lib/bb/tests/fetch.py @@ -1421,7 +1421,7 @@ class FetchLatestVersionTest(FetcherTest): # combination version pattern ("sysprof", "git://gitlab.gnome.org/GNOME/sysprof.git;protocol=https;branch=master", "cd44ee6644c3641507fb53b8a2a69137f2971219", "", "") : "1.2.0", - ("u-boot-mkimage", "git://git.denx.de/u-boot.git;branch=master;protocol=git", "62c175fbb8a0f9a926c88294ea9f7e88eb898f6c", "", "") + ("u-boot-mkimage", "git://source.denx.de/u-boot/u-boot.git;branch=master;protocol=https", "62c175fbb8a0f9a926c88294ea9f7e88eb898f6c", "", "") : "2014.01", # version pattern "yyyymmdd" ("mobile-broadband-provider-info", "git://gitlab.gnome.org/GNOME/mobile-broadband-provider-info.git;protocol=https;branch=master", "4ed19e11c2975105b71b956440acdb25d46a347d", "", "") @@ -1511,7 +1511,7 @@ class FetchLatestVersionTest(FetcherTest): def test_wget_latest_versionstring(self): testdata = os.path.dirname(os.path.abspath(__file__)) + "/fetch-testdata" - server = HTTPService(testdata) + server = HTTPService(testdata, host="127.0.0.1") server.start() port = server.port try: @@ -1519,10 +1519,10 @@ class FetchLatestVersionTest(FetcherTest): self.d.setVar("PN", k[0]) checkuri = "" if k[2]: - checkuri = "http://localhost:%s/" % port + k[2] + checkuri = "http://127.0.0.1:%s/" % port + k[2] self.d.setVar("UPSTREAM_CHECK_URI", checkuri) self.d.setVar("UPSTREAM_CHECK_REGEX", k[3]) - url = "http://localhost:%s/" % port + k[1] + url = "http://127.0.0.1:%s/" % port + k[1] ud = bb.fetch2.FetchData(url, self.d) pupver = ud.method.latest_versionstring(ud, self.d) verstring = pupver[0] diff --git a/poky/bitbake/lib/hashserv/client.py b/poky/bitbake/lib/hashserv/client.py index 0b254beddd..775faf935a 100644 --- a/poky/bitbake/lib/hashserv/client.py +++ b/poky/bitbake/lib/hashserv/client.py @@ -5,6 +5,7 @@ import logging import socket +import asyncio import bb.asyncrpc import json from . import create_async_client @@ -13,6 +14,66 @@ from . import create_async_client logger = logging.getLogger("hashserv.client") +class Batch(object): + def __init__(self): + self.done = False + self.cond = asyncio.Condition() + self.pending = [] + self.results = [] + self.sent_count = 0 + + async def recv(self, socket): + while True: + async with self.cond: + await self.cond.wait_for(lambda: self.pending or self.done) + + if not self.pending: + if self.done: + return + continue + + r = await socket.recv() + self.results.append(r) + + async with self.cond: + self.pending.pop(0) + + async def send(self, socket, msgs): + try: + # In the event of a restart due to a reconnect, all in-flight + # messages need to be resent first to keep to result count in sync + for m in self.pending: + await socket.send(m) + + for m in msgs: + # Add the message to the pending list before attempting to send + # it so that if the send fails it will be retried + async with self.cond: + self.pending.append(m) + self.cond.notify() + self.sent_count += 1 + + await socket.send(m) + + finally: + async with self.cond: + self.done = True + self.cond.notify() + + async def process(self, socket, msgs): + await asyncio.gather( + self.recv(socket), + self.send(socket, msgs), + ) + + if len(self.results) != self.sent_count: + raise ValueError( + f"Expected result count {len(self.results)}. Expected {self.sent_count}" + ) + + return self.results + + class AsyncClient(bb.asyncrpc.AsyncClient): MODE_NORMAL = 0 MODE_GET_STREAM = 1 @@ -36,11 +97,27 @@ class AsyncClient(bb.asyncrpc.AsyncClient): if become: await self.become_user(become) - async def send_stream(self, mode, msg): + async def send_stream_batch(self, mode, msgs): + """ + Does a "batch" process of stream messages. This sends the query + messages as fast as possible, and simultaneously attempts to read the + messages back. This helps to mitigate the effects of latency to the + hash equivalence server be allowing multiple queries to be "in-flight" + at once + + The implementation does more complicated tracking using a count of sent + messages so that `msgs` can be a generator function (i.e. its length is + unknown) + + """ + + b = Batch() + async def proc(): + nonlocal b + await self._set_mode(mode) - await self.socket.send(msg) - return await self.socket.recv() + return await b.process(self.socket, msgs) return await self._send_wrapper(proc) @@ -89,10 +166,15 @@ class AsyncClient(bb.asyncrpc.AsyncClient): self.mode = new_mode async def get_unihash(self, method, taskhash): - r = await self.send_stream(self.MODE_GET_STREAM, "%s %s" % (method, taskhash)) - if not r: - return None - return r + r = await self.get_unihash_batch([(method, taskhash)]) + return r[0] + + async def get_unihash_batch(self, args): + result = await self.send_stream_batch( + self.MODE_GET_STREAM, + (f"{method} {taskhash}" for method, taskhash in args), + ) + return [r if r else None for r in result] async def report_unihash(self, taskhash, method, outhash, unihash, extra={}): m = extra.copy() @@ -115,8 +197,12 @@ class AsyncClient(bb.asyncrpc.AsyncClient): ) async def unihash_exists(self, unihash): - r = await self.send_stream(self.MODE_EXIST_STREAM, unihash) - return r == "true" + r = await self.unihash_exists_batch([unihash]) + return r[0] + + async def unihash_exists_batch(self, unihashes): + result = await self.send_stream_batch(self.MODE_EXIST_STREAM, unihashes) + return [r == "true" for r in result] async def get_outhash(self, method, outhash, taskhash, with_unihash=True): return await self.invoke( @@ -237,10 +323,12 @@ class Client(bb.asyncrpc.Client): "connect_tcp", "connect_websocket", "get_unihash", + "get_unihash_batch", "report_unihash", "report_unihash_equiv", "get_taskhash", "unihash_exists", + "unihash_exists_batch", "get_outhash", "get_stats", "reset_stats", diff --git a/poky/bitbake/lib/hashserv/tests.py b/poky/bitbake/lib/hashserv/tests.py index 0809453cf8..5349cd5867 100644 --- a/poky/bitbake/lib/hashserv/tests.py +++ b/poky/bitbake/lib/hashserv/tests.py @@ -594,6 +594,43 @@ class HashEquivalenceCommonTests(object): 7: None, }) + def test_get_unihash_batch(self): + TEST_INPUT = ( + # taskhash outhash unihash + ('8aa96fcffb5831b3c2c0cb75f0431e3f8b20554a', 'afe240a439959ce86f5e322f8c208e1fedefea9e813f2140c81af866cc9edf7e','218e57509998197d570e2c98512d0105985dffc9'), + # Duplicated taskhash with multiple output hashes and unihashes. + ('8aa96fcffb5831b3c2c0cb75f0431e3f8b20554a', '0904a7fe3dc712d9fd8a74a616ddca2a825a8ee97adf0bd3fc86082c7639914d', 'ae9a7d252735f0dafcdb10e2e02561ca3a47314c'), + # Equivalent hash + ("044c2ec8aaf480685a00ff6ff49e6162e6ad34e1", '0904a7fe3dc712d9fd8a74a616ddca2a825a8ee97adf0bd3fc86082c7639914d', "def64766090d28f627e816454ed46894bb3aab36"), + ("e3da00593d6a7fb435c7e2114976c59c5fd6d561", "1cf8713e645f491eb9c959d20b5cae1c47133a292626dda9b10709857cbe688a", "3b5d3d83f07f259e9086fcb422c855286e18a57d"), + ('35788efcb8dfb0a02659d81cf2bfd695fb30faf9', '2765d4a5884be49b28601445c2760c5f21e7e5c0ee2b7e3fce98fd7e5970796f', 'f46d3fbb439bd9b921095da657a4de906510d2cd'), + ('35788efcb8dfb0a02659d81cf2bfd695fb30fafa', '2765d4a5884be49b28601445c2760c5f21e7e5c0ee2b7e3fce98fd7e5970796f', 'f46d3fbb439bd9b921095da657a4de906510d2ce'), + ('9d81d76242cc7cfaf7bf74b94b9cd2e29324ed74', '8470d56547eea6236d7c81a644ce74670ca0bbda998e13c629ef6bb3f0d60b69', '05d2a63c81e32f0a36542ca677e8ad852365c538'), + ) + EXTRA_QUERIES = ( + "6b6be7a84ab179b4240c4302518dc3f6", + ) + + for taskhash, outhash, unihash in TEST_INPUT: + self.client.report_unihash(taskhash, self.METHOD, outhash, unihash) + + + result = self.client.get_unihash_batch( + [(self.METHOD, data[0]) for data in TEST_INPUT] + + [(self.METHOD, e) for e in EXTRA_QUERIES] + ) + + self.assertListEqual(result, [ + "218e57509998197d570e2c98512d0105985dffc9", + "218e57509998197d570e2c98512d0105985dffc9", + "218e57509998197d570e2c98512d0105985dffc9", + "3b5d3d83f07f259e9086fcb422c855286e18a57d", + "f46d3fbb439bd9b921095da657a4de906510d2cd", + "f46d3fbb439bd9b921095da657a4de906510d2cd", + "05d2a63c81e32f0a36542ca677e8ad852365c538", + None, + ]) + def test_client_pool_unihash_exists(self): TEST_INPUT = ( # taskhash outhash unihash @@ -636,6 +673,44 @@ class HashEquivalenceCommonTests(object): result = client_pool.unihashes_exist(query) self.assertDictEqual(result, expected) + def test_unihash_exists_batch(self): + TEST_INPUT = ( + # taskhash outhash unihash + ('8aa96fcffb5831b3c2c0cb75f0431e3f8b20554a', 'afe240a439959ce86f5e322f8c208e1fedefea9e813f2140c81af866cc9edf7e','218e57509998197d570e2c98512d0105985dffc9'), + # Duplicated taskhash with multiple output hashes and unihashes. + ('8aa96fcffb5831b3c2c0cb75f0431e3f8b20554a', '0904a7fe3dc712d9fd8a74a616ddca2a825a8ee97adf0bd3fc86082c7639914d', 'ae9a7d252735f0dafcdb10e2e02561ca3a47314c'), + # Equivalent hash + ("044c2ec8aaf480685a00ff6ff49e6162e6ad34e1", '0904a7fe3dc712d9fd8a74a616ddca2a825a8ee97adf0bd3fc86082c7639914d', "def64766090d28f627e816454ed46894bb3aab36"), + ("e3da00593d6a7fb435c7e2114976c59c5fd6d561", "1cf8713e645f491eb9c959d20b5cae1c47133a292626dda9b10709857cbe688a", "3b5d3d83f07f259e9086fcb422c855286e18a57d"), + ('35788efcb8dfb0a02659d81cf2bfd695fb30faf9', '2765d4a5884be49b28601445c2760c5f21e7e5c0ee2b7e3fce98fd7e5970796f', 'f46d3fbb439bd9b921095da657a4de906510d2cd'), + ('35788efcb8dfb0a02659d81cf2bfd695fb30fafa', '2765d4a5884be49b28601445c2760c5f21e7e5c0ee2b7e3fce98fd7e5970796f', 'f46d3fbb439bd9b921095da657a4de906510d2ce'), + ('9d81d76242cc7cfaf7bf74b94b9cd2e29324ed74', '8470d56547eea6236d7c81a644ce74670ca0bbda998e13c629ef6bb3f0d60b69', '05d2a63c81e32f0a36542ca677e8ad852365c538'), + ) + EXTRA_QUERIES = ( + "6b6be7a84ab179b4240c4302518dc3f6", + ) + + result_unihashes = set() + + + for taskhash, outhash, unihash in TEST_INPUT: + result = self.client.report_unihash(taskhash, self.METHOD, outhash, unihash) + result_unihashes.add(result["unihash"]) + + query = [] + expected = [] + + for _, _, unihash in TEST_INPUT: + query.append(unihash) + expected.append(unihash in result_unihashes) + + + for unihash in EXTRA_QUERIES: + query.append(unihash) + expected.append(False) + + result = self.client.unihash_exists_batch(query) + self.assertListEqual(result, expected) def test_auth_read_perms(self): admin_client = self.start_auth_server() |