Rev 4743: Start adding a more precise synchronization between the various test threads. in file:///home/vila/src/bzr/bugs/392127-thread-leak/
Vincent Ladeuil
v.ladeuil+lp at free.fr
Sat Oct 10 00:15:22 BST 2009
At file:///home/vila/src/bzr/bugs/392127-thread-leak/
------------------------------------------------------------
revno: 4743
revision-id: v.ladeuil+lp at free.fr-20091009231522-09cs7tqrzi2splv7
parent: v.ladeuil+lp at free.fr-20091009121020-9zlxqc2hz3esbd61
committer: Vincent Ladeuil <v.ladeuil+lp at free.fr>
branch nick: 392127-thread-leak
timestamp: Sat 2009-10-10 01:15:22 +0200
message:
Start adding a more precise synchronization between the various test threads.
* bzrlib/tests/https_server.py:
(TestingHTTPSServerMixin): Ensure the server is still active
before ssl-wrapping the socket.
* bzrlib/tests/http_server.py:
(TestingHTTPServerMixin): Drill down to address the thread
synchronizations bugs. Introduce a threading.Event 'serving' used
to properly handle the last connection.
(TestingHTTPServerMixin.serve): Use the 'serving' event, delegate
the execption handling to handle_request().
(TestingHTTPServerMixin.join_thread): Catch the hung threads
instead of hanging ourself.
(TestingThreadingHTTPServer.process_request_thread): Use the
received 'started' event to synchronize with the spawner.
(TestingThreadingHTTPServer.process_request): Add the thread to
the client data and wait for it to really start.
(TestingThreadingHTTPServer.shutdown_client): Join the client
thread after shutting down the socket.
(HttpServer._http_start): Add a 'started' event set when the
server is ready to accept connections.
(HttpServer.setUp): Use a threading.Event 'started' to ensure the
server is ready to accept connections.
* bzrlib/tests/__init__.py:
(CommandFailed): Not used anymore and I'm not sure it was ever
used.
-------------- next part --------------
=== modified file 'bzrlib/tests/__init__.py'
--- a/bzrlib/tests/__init__.py 2009-10-08 17:19:38 +0000
+++ b/bzrlib/tests/__init__.py 2009-10-09 23:15:22 +0000
@@ -713,11 +713,6 @@
The feature should be used to construct the exception.
"""
-
-class CommandFailed(Exception):
- pass
-
-
class StringIOWrapper(object):
"""A wrapper around cStringIO which just adds an encoding attribute.
=== modified file 'bzrlib/tests/http_server.py'
--- a/bzrlib/tests/http_server.py 2009-10-09 12:10:20 +0000
+++ b/bzrlib/tests/http_server.py 2009-10-09 23:15:22 +0000
@@ -324,8 +324,44 @@
# the tests cases.
self.test_case_server = test_case_server
self._home_dir = test_case_server._home_dir
- self.serving = False
+ self.serving = None
self.is_shut_down = threading.Event()
+ # We collect the sockets/threads used by the clients so we can
+ # close/join them when shutting down
+ self.clients = []
+
+ def get_request (self):
+ """Get the request and client address from the socket.
+ """
+ sock, addr = self._get_request()
+ self.clients.append([sock, addr])
+ return sock, addr
+
+ def verify_request(self, request, client_address):
+ """Verify the request.
+
+ Return True if we should proceed with this request, False if we should
+ not even touch a single byte in the socket !
+ """
+ return self.serving is not None and self.serving.isSet()
+
+ def handle_request(self):
+ request, client_address = self.get_request()
+ try:
+ if self.verify_request(request, client_address):
+ self.process_request(request, client_address)
+ except:
+ if self.serving is not None and self.serving.isSet():
+ self.handle_error(request, client_address)
+ else:
+ # Exceptions raised while we shut down are just noise, but feel
+ # free to put a breakpoint here if you suspect something
+ # else. Such an example is the SSL handshake: it's automatic
+ # once we start processing the request but the last connection
+ # will close immediately and will not be able to correctly
+ # reply.
+ pass
+ self.close_request(request)
def server_bind(self):
# The following has been fixed in 2.5 so we need to provide it for
@@ -334,13 +370,14 @@
self.server_address = self.socket.getsockname()
def serve(self):
- self.serving = True
+ self.serving = threading.Event()
+ self.serving.set()
self.is_shut_down.clear()
# Ensure that we will not stay blocked in listen()
self.socket.settimeout(1)
if 'threads' in tests.selftest_debug_flags:
print 'Starting %r' % (self.server_address,)
- while self.serving:
+ while self.serving.isSet():
try:
# Really a connection but the python framework is generic and
# call them requests
@@ -348,18 +385,12 @@
except socket.timeout:
# So we can check if we're asked to stop
pass
- except (socket.error, select.error), e:
- if e[0] == errno.EBADF:
- # Starting with python-2.6, handle_request may raise socket
- # or select exceptions when the server is shut down as we
- # do.
- pass
- else:
- raise
if 'threads' in tests.selftest_debug_flags:
print 'Closing %r' % (self.server_address,)
# Let's close the listening socket
self.server_close()
+ if 'threads' in tests.selftest_debug_flags:
+ print 'Closed %r' % (self.server_address,)
self.is_shut_down.set()
def connect_socket(self):
@@ -377,18 +408,33 @@
sock.close()
raise socket.error, msg
+ def join_thread(self, thread, timeout=2):
+ thread.join(timeout)
+ if thread.isAlive():
+ # The timeout expired without joining the thread, the thread is
+ # therefore stucked and that's a failure as far as the test is
+ # concerned. We used to hang here.
+ raise AssertionError('thread %s hung' % (thread.name,))
+
def shutdown(self):
"""Stops the serve() loop.
Blocks until the loop has finished. This must be called while serve()
is running in another thread, or it will deadlock.
"""
- if not self.serving:
+ if self.serving is None:
+ # If the server wasn't properly started, there is nothing to
+ # shutdown.
return
- self.serving = False
+ # As soon as we stop serving, no more connection are accepted except
+ # one to get out of the blocking listen.
+ self.serving.clear()
# The server is listening for a last connection, let's give it:
try:
fake_conn = self.connect_socket()
+ # But close it immediately without trying to use it. The server
+ # will not process a single byte on that socket to avoid
+ # complications (SSL starts with a handshake for example).
fake_conn.close()
except socket.error, e:
# But ignore connection errors as the point is to unblock the
@@ -397,10 +443,19 @@
# leading to self.setUp() *not* being called but self.tearDown()
# still being called)
pass
+ # We don't have to wait for the server to shut down to start shutting
+ # the clients, so let's start now.
+ for c in self.clients:
+ self.shutdown_client(c)
+ self.clients = []
+ # Now we wait for the thread running serve() to finish
self.is_shut_down.wait()
+ def shutdown_client(self, client):
+ sock, addr = client[:2]
+ self.shutdown_client_socket(sock)
- def shutdown_client(self, client_socket):
+ def shutdown_client_socket(self, sock):
"""Properly shutdown a client socket.
Under some circumstances (as in bug #383920), we need to force the
@@ -413,7 +468,7 @@
try:
# The request process has been completed, the thread is about to
# die, let's shutdown the socket if we can.
- client_socket.shutdown(socket.SHUT_RDWR)
+ sock.shutdown(socket.SHUT_RDWR)
except (socket.error, select.error), e:
if e[0] in (errno.EBADF, errno.ENOTCONN):
# Right, the socket is already down
@@ -430,6 +485,9 @@
SocketServer.TCPServer.__init__(self, server_address,
request_handler_class)
+ def _get_request (self):
+ return SocketServer.TCPServer.get_request(self)
+
def server_bind(self):
SocketServer.TCPServer.server_bind(self)
TestingHTTPServerMixin.server_bind(self)
@@ -454,26 +512,41 @@
# process. This is prophylactic as we should not leave the threads
# lying around.
self.daemon_threads = True
- # We collect the sockets/threads used by the clients so we can
- # close/join them when shutting down
- self.clients = []
-
- def process_request_thread(self, request, client_address):
- self.clients.append((request, threading.currentThread()))
+
+ def _get_request (self):
+ return SocketServer.ThreadingTCPServer.get_request(self)
+
+ def process_request_thread(self, started, request, client_address):
+ started.set()
SocketServer.ThreadingTCPServer.process_request_thread(
self, request, client_address)
# Shutdown the socket as soon as possible, the thread will be joined
# later if needed during server shutdown thread.
- self.shutdown_client(request)
-
- def shutdown(self):
- TestingHTTPServerMixin.shutdown(self)
- # Let's close all our pending clients too
- for sock, thread in self.clients:
- self.shutdown_client(sock)
- thread.join()
- del thread
- self.clients = []
+ self.shutdown_client_socket(request)
+
+ def process_request(self, request, client_address):
+ """Start a new thread to process the request."""
+ client = self.clients.pop()
+ started = threading.Event()
+ t = threading.Thread(target = self.process_request_thread,
+ args = (started, request, client_address))
+ t.name = '%s -> %s' % (client_address, self.server_address)
+ client.append(t)
+ self.clients.append(client)
+ if self.daemon_threads:
+ t.setDaemon (1)
+ t.start()
+ started.wait()
+
+ def shutdown_client(self, client):
+ TestingHTTPServerMixin.shutdown_client(self, client)
+ if len(client) == 3:
+ # The thread has been created only if the request is processed but
+ # after the connection is inited. This could happne when the server
+ # is shut down.
+ sock, addr, thread = client
+ if thread.isAlive():
+ self.join_thread(thread)
def server_bind(self):
SocketServer.ThreadingTCPServer.server_bind(self)
@@ -546,7 +619,7 @@
host, self.port = self._httpd.server_address
return self._httpd
- def _http_start(self):
+ def _http_start(self, started):
"""Server thread main entry point. """
server = None
try:
@@ -562,7 +635,7 @@
finally:
# Release the lock or the main thread will block and the whole
# process will hang.
- self._http_starting.release()
+ started.set()
# From now on, exceptions are taken care of by the
# SocketServer.BaseServer or the request handler.
@@ -604,40 +677,31 @@
self._http_base_url = None
# Create the server thread
- self._http_starting = threading.Lock()
- self._http_starting.acquire()
- self._http_thread = threading.Thread(target=self._http_start)
+ started = threading.Event()
+ self._http_thread = threading.Thread(target=self._http_start,
+ args = (started,))
self._http_thread.setDaemon(True)
self._http_exception = None
self._http_thread.start()
- if 'threads' in tests.selftest_debug_flags:
- print 'Thread started: %s' % (self._http_thread.ident,)
-
# Wait for the server thread to start (i.e release the lock)
- self._http_starting.acquire()
+ started.wait()
+ self._http_thread.name = self._http_base_url
+ if 'threads' in tests.selftest_debug_flags:
+ print 'Thread started: %s' % (self._http_thread.name,)
+
if self._http_exception is not None:
# Something went wrong during server start
exc_class, exc_value, exc_tb = self._http_exception
raise exc_class, exc_value, exc_tb
- self._http_starting.release()
self.logs = []
def tearDown(self):
"""See bzrlib.transport.Server.tearDown."""
self._httpd.shutdown()
- self._http_thread.join(5.0)
- if self._http_thread.is_alive():
- # The timeout expired without joining the thread, the server thread
- # is therefore stucked and that's a failure as far as the test is
- # concerned, we used to hang here, but that wasn't very productive.
- raise AssertionError('http server at %r hanged'
- % (self._httpd.server_address,))
-
+ self._httpd.join_thread(self._http_thread)
if 'threads' in tests.selftest_debug_flags:
- print 'Thread joined: %s' % (self._http_thread.ident,)
- del self._http_thread
- self._http_thread = None
+ print 'Thread joined: %s' % (self._http_thread.name,)
def get_url(self):
"""See bzrlib.transport.Server.get_url."""
=== modified file 'bzrlib/tests/https_server.py'
--- a/bzrlib/tests/https_server.py 2009-03-23 14:59:43 +0000
+++ b/bzrlib/tests/https_server.py 2009-10-09 23:15:22 +0000
@@ -30,18 +30,24 @@
self.key_file = key_file
self.cert_file = cert_file
- def get_request (self):
- """Get the request and client address from the socket.
-
- This is called in response to a connection issued to the server, we
- wrap the socket with SSL.
+ def _get_ssl_request (self, sock, addr):
+ """Wrap the socket with SSL"""
+ ssl_sock = ssl.wrap_socket(sock, server_side=True,
+ keyfile=self.key_file,
+ certfile=self.cert_file,
+ do_handshake_on_connect=False)
+ return ssl_sock, addr
+
+ def verify_request(self, request, client_address):
+ """Verify the request.
+
+ Return True if we should proceed with this request, False if we should
+ not even touch a single byte in the socket !
"""
- sock, addr = self.socket.accept()
- sslconn = ssl.wrap_socket(sock, server_side=True,
- keyfile=self.key_file,
- certfile=self.cert_file)
- return sslconn, addr
-
+ serving = self.serving is not None and self.serving.isSet()
+ if serving:
+ request.do_handshake()
+ return serving
class TestingHTTPSServer(TestingHTTPSServerMixin,
http_server.TestingHTTPServer):
@@ -52,6 +58,10 @@
http_server.TestingHTTPServer.__init__(
self, server_address, request_handler_class, test_case_server)
+ def _get_request (self):
+ sock, addr = http_server.TestingHTTPServer._get_request(self)
+ return self._get_ssl_request(sock, addr)
+
class TestingThreadingHTTPSServer(TestingHTTPSServerMixin,
http_server.TestingThreadingHTTPServer):
@@ -62,6 +72,10 @@
http_server.TestingThreadingHTTPServer.__init__(
self, server_address, request_handler_class, test_case_server)
+ def _get_request (self):
+ sock, addr = http_server.TestingThreadingHTTPServer._get_request(self)
+ return self._get_ssl_request(sock, addr)
+
class HTTPSServer(http_server.HttpServer):
@@ -73,7 +87,7 @@
}
# Provides usable defaults since an https server requires both a
- # private key and certificate to work.
+ # private key and a certificate to work.
def __init__(self, request_handler=http_server.TestingHTTPRequestHandler,
protocol_version=None,
key_file=ssl_certs.build_path('server_without_pass.key'),
More information about the bazaar-commits
mailing list