diff options
Diffstat (limited to 'poky/bitbake/lib/bb/asyncrpc/serv.py')
-rw-r--r-- | poky/bitbake/lib/bb/asyncrpc/serv.py | 149 |
1 files changed, 106 insertions, 43 deletions
diff --git a/poky/bitbake/lib/bb/asyncrpc/serv.py b/poky/bitbake/lib/bb/asyncrpc/serv.py index ef20cb71d..b4cffff21 100644 --- a/poky/bitbake/lib/bb/asyncrpc/serv.py +++ b/poky/bitbake/lib/bb/asyncrpc/serv.py @@ -9,6 +9,7 @@ import os import signal import socket import sys +import multiprocessing from . import chunkify, DEFAULT_MAX_CHUNK @@ -130,53 +131,55 @@ class AsyncServerConnection(object): class AsyncServer(object): - def __init__(self, logger, loop=None): - if loop is None: - self.loop = asyncio.new_event_loop() - self.close_loop = True - else: - self.loop = loop - self.close_loop = False - + def __init__(self, logger): self._cleanup_socket = None self.logger = logger + self.start = None + self.address = None + self.loop = None def start_tcp_server(self, host, port): - self.server = self.loop.run_until_complete( - asyncio.start_server(self.handle_client, host, port, loop=self.loop) - ) - - for s in self.server.sockets: - self.logger.debug('Listening on %r' % (s.getsockname(),)) - # Newer python does this automatically. Do it manually here for - # maximum compatibility - s.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1) - s.setsockopt(socket.SOL_TCP, socket.TCP_QUICKACK, 1) - - name = self.server.sockets[0].getsockname() - if self.server.sockets[0].family == socket.AF_INET6: - self.address = "[%s]:%d" % (name[0], name[1]) - else: - self.address = "%s:%d" % (name[0], name[1]) + def start_tcp(): + self.server = self.loop.run_until_complete( + asyncio.start_server(self.handle_client, host, port) + ) + + for s in self.server.sockets: + self.logger.debug('Listening on %r' % (s.getsockname(),)) + # Newer python does this automatically. Do it manually here for + # maximum compatibility + s.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1) + s.setsockopt(socket.SOL_TCP, socket.TCP_QUICKACK, 1) + + name = self.server.sockets[0].getsockname() + if self.server.sockets[0].family == socket.AF_INET6: + self.address = "[%s]:%d" % (name[0], name[1]) + else: + self.address = "%s:%d" % (name[0], name[1]) + + self.start = start_tcp def start_unix_server(self, path): def cleanup(): os.unlink(path) - cwd = os.getcwd() - try: - # Work around path length limits in AF_UNIX - os.chdir(os.path.dirname(path)) - self.server = self.loop.run_until_complete( - asyncio.start_unix_server(self.handle_client, os.path.basename(path), loop=self.loop) - ) - finally: - os.chdir(cwd) + def start_unix(): + cwd = os.getcwd() + try: + # Work around path length limits in AF_UNIX + os.chdir(os.path.dirname(path)) + self.server = self.loop.run_until_complete( + asyncio.start_unix_server(self.handle_client, os.path.basename(path)) + ) + finally: + os.chdir(cwd) - self.logger.debug('Listening on %r' % path) + self.logger.debug('Listening on %r' % path) - self._cleanup_socket = cleanup - self.address = "unix://%s" % os.path.abspath(path) + self._cleanup_socket = cleanup + self.address = "unix://%s" % os.path.abspath(path) + + self.start = start_unix @abc.abstractmethod def accept_client(self, reader, writer): @@ -201,12 +204,13 @@ class AsyncServer(object): pass def signal_handler(self): + self.logger.debug("Got exit signal") self.loop.stop() - def serve_forever(self): - asyncio.set_event_loop(self.loop) + def _serve_forever(self): try: self.loop.add_signal_handler(signal.SIGTERM, self.signal_handler) + signal.pthread_sigmask(signal.SIG_UNBLOCK, [signal.SIGTERM]) self.run_loop_forever() self.server.close() @@ -214,10 +218,69 @@ class AsyncServer(object): self.loop.run_until_complete(self.server.wait_closed()) self.logger.debug('Server shutting down') finally: - if self.close_loop: - if sys.version_info >= (3, 6): - self.loop.run_until_complete(self.loop.shutdown_asyncgens()) - self.loop.close() - if self._cleanup_socket is not None: self._cleanup_socket() + + def serve_forever(self): + """ + Serve requests in the current process + """ + # Create loop and override any loop that may have existed in + # a parent process. It is possible that the usecases of + # serve_forever might be constrained enough to allow using + # get_event_loop here, but better safe than sorry for now. + self.loop = asyncio.new_event_loop() + asyncio.set_event_loop(self.loop) + self.start() + self._serve_forever() + + def serve_as_process(self, *, prefunc=None, args=()): + """ + Serve requests in a child process + """ + def run(queue): + # Create loop and override any loop that may have existed + # in a parent process. Without doing this and instead + # using get_event_loop, at the very minimum the hashserv + # unit tests will hang when running the second test. + # This happens since get_event_loop in the spawned server + # process for the second testcase ends up with the loop + # from the hashserv client created in the unit test process + # when running the first testcase. The problem is somewhat + # more general, though, as any potential use of asyncio in + # Cooker could create a loop that needs to replaced in this + # new process. + self.loop = asyncio.new_event_loop() + asyncio.set_event_loop(self.loop) + try: + self.start() + finally: + queue.put(self.address) + queue.close() + + if prefunc is not None: + prefunc(self, *args) + + self._serve_forever() + + if sys.version_info >= (3, 6): + self.loop.run_until_complete(self.loop.shutdown_asyncgens()) + self.loop.close() + + queue = multiprocessing.Queue() + + # Temporarily block SIGTERM. The server process will inherit this + # block which will ensure it doesn't receive the SIGTERM until the + # handler is ready for it + mask = signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGTERM]) + try: + self.process = multiprocessing.Process(target=run, args=(queue,)) + self.process.start() + + self.address = queue.get() + queue.close() + queue.join_thread() + + return self.process + finally: + signal.pthread_sigmask(signal.SIG_SETMASK, mask) |