blob: 7ead483091e66f9cdc79906dfd99eebc02f36dc9 [file] [log] [blame]
# Copyright (C) 2011 Google Inc. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""Module for handling messages and concurrency for run-webkit-tests.
This module implements a message broker that connects the manager
(TestRunner2) to the workers: it provides a messaging abstraction and
message loops (building on top of message_broker2), and handles starting
workers by launching threads and/or processes depending on the
requested configuration.
There are a lot of classes and objects involved in a fully connected system.
They interact more or less like:
TestRunner2 --> _InlineManager ---> _InlineWorker <-> Worker
^ \ / ^
| v v |
\-------------------- MessageBroker -------------/
"""
import logging
import optparse
import printing
import Queue
import sys
import thread
import threading
import time
# Handle Python < 2.6 where multiprocessing isn't available.
try:
import multiprocessing
except ImportError:
multiprocessing = None
from webkitpy.common.system import stack_utils
from webkitpy.layout_tests import port
from webkitpy.layout_tests.layout_package import message_broker2
_log = logging.getLogger(__name__)
#
# Topic names for Manager <-> Worker messaging
#
MANAGER_TOPIC = 'managers'
ANY_WORKER_TOPIC = 'workers'
def runtime_options():
"""Return a list of optparse.Option objects for any runtime values used
by this module."""
options = [
optparse.make_option("--worker-model", action="store",
help=("controls worker model. Valid values are "
"'inline', 'threads', and 'processes'.")),
]
return options
def get(port, options, client, worker_class):
"""Return a connection to a manager/worker message_broker
Args:
port - handle to layout_tests/port object for port-specific stuff
options - optparse argument for command-line options
client - message_broker2.BrokerClient implementation to dispatch
replies to.
worker_class - type of workers to create. This class must implement
the methods in AbstractWorker.
Returns:
A handle to an object that will talk to a message broker configured
for the normal manager/worker communication.
"""
worker_model = options.worker_model
if worker_model == 'inline':
queue_class = Queue.Queue
manager_class = _InlineManager
elif worker_model == 'threads':
queue_class = Queue.Queue
manager_class = _ThreadedManager
elif worker_model == 'processes' and multiprocessing:
queue_class = multiprocessing.Queue
manager_class = _MultiProcessManager
else:
raise ValueError("unsupported value for --worker-model: %s" %
worker_model)
broker = message_broker2.Broker(options, queue_class)
return manager_class(broker, port, options, client, worker_class)
class AbstractWorker(message_broker2.BrokerClient):
def __init__(self, broker_connection, worker_number, options):
"""The constructor should be used to do any simple initialization
necessary, but should not do anything that creates data structures
that cannot be Pickled or sent across processes (like opening
files or sockets). Complex initialization should be done at the
start of the run() call.
Args:
broker_connection - handle to the BrokerConnection object creating
the worker and that can be used for messaging.
worker_number - identifier for this particular worker
options - command-line argument object from optparse"""
raise NotImplementedError
def run(self, port):
"""Callback for the worker to start executing. Typically does any
remaining initialization and then calls broker_connection.run_message_loop()."""
raise NotImplementedError
def cancel(self):
"""Called when possible to indicate to the worker to stop processing
messages and shut down. Note that workers may be stopped without this
method being called, so clients should not rely solely on this."""
raise NotImplementedError
class _ManagerConnection(message_broker2.BrokerConnection):
def __init__(self, broker, options, client, worker_class):
"""Base initialization for all Manager objects.
Args:
broker: handle to the message_broker2 object
options: command line options object
client: callback object (the caller)
worker_class: class object to use to create workers.
"""
message_broker2.BrokerConnection.__init__(self, broker, client,
MANAGER_TOPIC, ANY_WORKER_TOPIC)
self._options = options
self._worker_class = worker_class
def start_worker(self, worker_number):
raise NotImplementedError
class _InlineManager(_ManagerConnection):
def __init__(self, broker, port, options, client, worker_class):
_ManagerConnection.__init__(self, broker, options, client, worker_class)
self._port = port
self._inline_worker = None
def start_worker(self, worker_number):
self._inline_worker = _InlineWorkerConnection(self._broker, self._port,
self._client, self._worker_class, worker_number)
return self._inline_worker
def run_message_loop(self, delay_secs=None):
# Note that delay_secs is ignored in this case since we can't easily
# implement it.
self._inline_worker.run()
self._broker.run_all_pending(MANAGER_TOPIC, self._client)
class _ThreadedManager(_ManagerConnection):
def __init__(self, broker, port, options, client, worker_class):
_ManagerConnection.__init__(self, broker, options, client, worker_class)
self._port = port
def start_worker(self, worker_number):
worker_connection = _ThreadedWorkerConnection(self._broker, self._port,
self._worker_class, worker_number)
worker_connection.start()
return worker_connection
class _MultiProcessManager(_ManagerConnection):
def __init__(self, broker, port, options, client, worker_class):
# Note that this class does not keep a handle to the actual port
# object, because it isn't Picklable. Instead it keeps the port
# name and recreates the port in the child process from the name
# and options.
_ManagerConnection.__init__(self, broker, options, client, worker_class)
self._platform_name = port.real_name()
def start_worker(self, worker_number):
worker_connection = _MultiProcessWorkerConnection(self._broker, self._platform_name,
self._worker_class, worker_number, self._options)
worker_connection.start()
return worker_connection
class _WorkerConnection(message_broker2.BrokerConnection):
def __init__(self, broker, worker_class, worker_number, options):
self._client = worker_class(self, worker_number, options)
self.name = self._client.name()
message_broker2.BrokerConnection.__init__(self, broker, self._client,
ANY_WORKER_TOPIC, MANAGER_TOPIC)
def cancel(self):
raise NotImplementedError
def is_alive(self):
raise NotImplementedError
def join(self, timeout):
raise NotImplementedError
def log_wedged_worker(self, test_name):
raise NotImplementedError
def yield_to_broker(self):
pass
class _InlineWorkerConnection(_WorkerConnection):
def __init__(self, broker, port, manager_client, worker_class, worker_number):
_WorkerConnection.__init__(self, broker, worker_class, worker_number, port._options)
self._alive = False
self._port = port
self._manager_client = manager_client
def cancel(self):
self._client.cancel()
def is_alive(self):
return self._alive
def join(self, timeout):
assert not self._alive
def log_wedged_worker(self, test_name):
assert False, "_InlineWorkerConnection.log_wedged_worker() called"
def run(self):
self._alive = True
self._client.run(self._port)
self._alive = False
def yield_to_broker(self):
self._broker.run_all_pending(MANAGER_TOPIC, self._manager_client)
class _Thread(threading.Thread):
def __init__(self, worker_connection, port, client):
threading.Thread.__init__(self)
self._worker_connection = worker_connection
self._port = port
self._client = client
def cancel(self):
return self._client.cancel()
def log_wedged_worker(self, test_name):
stack_utils.log_thread_state(_log.error, self._client.name(), self.ident, " is wedged on test %s" % test_name)
def run(self):
# FIXME: We can remove this once everyone is on 2.6.
if not hasattr(self, 'ident'):
self.ident = thread.get_ident()
self._client.run(self._port)
class _ThreadedWorkerConnection(_WorkerConnection):
def __init__(self, broker, port, worker_class, worker_number):
_WorkerConnection.__init__(self, broker, worker_class, worker_number, port._options)
self._thread = _Thread(self, port, self._client)
def cancel(self):
return self._thread.cancel()
def is_alive(self):
# FIXME: Change this to is_alive once everyone is on 2.6.
return self._thread.isAlive()
def join(self, timeout):
return self._thread.join(timeout)
def log_wedged_worker(self, test_name):
return self._thread.log_wedged_worker(test_name)
def start(self):
self._thread.start()
if multiprocessing:
class _Process(multiprocessing.Process):
def __init__(self, worker_connection, platform_name, options, client):
multiprocessing.Process.__init__(self)
self._worker_connection = worker_connection
self._platform_name = platform_name
self._options = options
self._client = client
def log_wedged_worker(self, test_name):
_log.error("%s (pid %d) is wedged on test %s" % (self.name, self.pid, test_name))
def run(self):
options = self._options
port_obj = port.get(self._platform_name, options)
# FIXME: this won't work if the calling process is logging
# somewhere other than sys.stderr and sys.stdout, but I'm not sure
# if this will be an issue in practice.
printer = printing.Printer(port_obj, options, sys.stderr, sys.stdout,
int(options.child_processes), options.experimental_fully_parallel)
self._client.run(port_obj)
printer.cleanup()
class _MultiProcessWorkerConnection(_WorkerConnection):
def __init__(self, broker, platform_name, worker_class, worker_number, options):
_WorkerConnection.__init__(self, broker, worker_class, worker_number, options)
self._proc = _Process(self, platform_name, options, self._client)
def cancel(self):
return self._proc.terminate()
def is_alive(self):
return self._proc.is_alive()
def join(self, timeout):
return self._proc.join(timeout)
def log_wedged_worker(self, test_name):
return self._proc.log_wedged_worker(test_name)
def start(self):
self._proc.start()