Paul L. Du Bois said:
Has anyone written a Queue.Queue replacement that avoids busy-waiting?
It doesn't matter if it uses os-specific APIs (eg
WaitForMultipleObjects). I did some googling around and haven't found
anything so far.
This isn't a Queue.Queue replacement, but it implements a buffer
intended for inter-thread transmission, so it could be adjusted to
mimic Queue semantics fairly easily. In fact, internally it actually
keeps write chunks in a list until read for better performance, so
just removing the coalesce process would be the first step.
It was written specifically to minimize latency (which is a
significant issue with the polling loop in the normal Python Queue
implementation) and CPU usage in support of a higher level
Win32-specific serial I/O class, so it uses Win32 events to handle the
signaling for the key events when waiting.
The fundamental issue with the native Python lock is that to be
minimalistic in what it requires from each OS, it doesn't impose a
model of being able to wait on an event signal - that's the key thing
you need to have (a timed blocking wait on some signalable construct)
to be most efficient for these operations - which is what I use the
Win32 Event for.
-- David
- - - - - - - - - - - - - - - - - - - - - - - - -
import thread
import win32event as we
class Buffer:
"""A thread safe unidirectional data buffer used to represent data
traveling to or from the application and serial port handling threads.
This class is used as an underlying implementation mechanism by SerialIO.
Application code should not typically need to access this directly, but
can handle I/O through SerialIO.
Note that we use Windows event objects rather than Python's because
Python's OS-independent versions are not very efficient with timed waits,
imposing internal latencies and CPU usage due to looping around a basic
non-blocking construct. We also use the lower layer thread lock rather
than threading's to minimize overhead.
"""
def __init__(self, notify=None):
self.lock = thread.allocate_lock()
self.has_data = we.CreateEvent(None,1,0,None)
self.clear()
self.notify = notify
def _coalesce(self):
if self.buflist:
self.buffer += ''.join(self.buflist)
self.buflist = []
def __len__(self):
self.lock.acquire()
self._coalesce()
result = len(self.buffer)
self.lock.release()
return result
def clear(self):
self.lock.acquire()
self.buffer = ''
self.buflist = []
self.lock.release()
def get(self, size=0, timeout=None):
"""Retrieve data from the buffer, up to 'size' bytes (unlimited if
0), but potentially less based on what is available. If no
data is currently available, it will wait up to 'timeout' seconds
(forever if None, no blocking if 0) for some data to arrive"""
self.lock.acquire()
self._coalesce()
if not self.buffer:
# Nothing buffered, wait until something shows up (timeout
# rules match that of threading.Event)
self.lock.release()
if timeout is None:
win_timeout = we.INFINITE
else:
win_timeout = int(timeout * 1000)
rc = we.WaitForSingleObject(self.has_data, win_timeout)
self.lock.acquire()
self._coalesce()
if not size:
size = len(self.buffer)
result_len = min(size,len(self.buffer))
result = self.buffer[:result_len]
self.buffer = self.buffer[result_len:]
we.ResetEvent(self.has_data)
self.lock.release()
return result
def put_back(self,data):
self.lock.acquire()
self.buffer = data + self.buffer
self.lock.release()
we.SetEvent(self.has_data)
if self.notify:
self.notify()
def put(self, data):
self.lock.acquire()
self.buflist.append(data)
self.lock.release()
we.SetEvent(self.has_data)
if self.notify:
self.notify()