Queue limitations?

M

mateom

Hello,

I'm using Queue to send images from one thread to another, and some of
the images are not appearing in the consumer thread....maybe 1 in 3
arrive. I've tried passing the image data in both string form and as a
PIL Image object, with the same result.

It does work, however, if I use zlib to compress the image string
before passing it to Queue and then decompress it in the consumer
thread. So, my question: Does Queue have some capacity limitation?
(Uncompressed, my images are 786432 long... 512x512x3)

Here's a bit of the code:

# Producer --------
img = rayCaster.ReadTexture()
iq.put(img)
#iq.put(zlib.compress(img)) #this works fine

# End producer

# Consumer --------
class imageQueue(threading.Thread):

def __init__(self):
threading.Thread.__init__(self)
self.filenum = 0
self.theQueue = Queue.Queue(-1)

def run(self):
while 1:
# for testing, do something simple with images
img = self.theQueue.get()
imgt = Image.frombuffer('RGB',(512,512), img)
#imgt = Image.frombuffer('RGB',(512,512),
zlib.decompress(img)) #this one works
imgt.save( "./imgs_out/%i.png" % self.filenum, "PNG")
self.filenum += 1

def SetQueue(self, q_):
self.theQueue = q_
####
Thanks,
Matt
 
M

mateom

the queue holds references to the images, not the images themselves,
so the size should be completely irrelevant.I use one instance of imageQueue.

hmmm.. true. And it also fails when I use PIL Image objects instead of
arrays. Any idea why compressing the string helps?

I'm using one instance of imageQueue.

-Thanks
 
F

Fredrik Lundh

hmmm.. true. And it also fails when I use PIL Image objects instead of
arrays. Any idea why compressing the string helps?

compression and decompression takes time. sounds like you have a timing
problem, and it's probably not in the imageQueue thread. adding a couple
of print statements to strategic locations (e.g push image, pop image, write
image to disk using this filename, etc) might help you sort this one out.

</F>
 
M

mateom

I should be able to add to the queue as fast as I want, right? I tried
adding time.sleep(.05) right after put(image) in the producer, and that
fixes it. There is only one thread producing and one thread consuming.

Thanks for the help.
 
S

simonwittber

hmmm.. true. And it also fails when I use PIL Image objects instead of
arrays. Any idea why compressing the string helps?


Compressing into a string converts the image intro a string type, which
is immutable. When you uncompress it, you've got a copy of the image,
rather than a reference to the original image. This might give you a
hint as to what is going wrong.

-Sw.
 
F

Fredrik Lundh

I should be able to add to the queue as fast as I want, right?

absolutely.

but if you slow the producer down, and you're only using one producer
and one consumer, the chance increases that the producer and the
consumer runs in perfect lockstep. you might as well *call* the con-
sumer from the producer...
I tried adding time.sleep(.05) right after put(image) in the producer,
and that fixes it. There is only one thread producing and one thread
consuming.

are you sure that the producer is creating new images, rather than
just pushing out references to the same data buffer ?

the frombuffer operation you're using in the client creates an image
object based on data in an external object; it does not copy the pixel
data. if the data in that buffer is changed by the producer, the image
seen by the consumer will change too.

</F>
 
M

mateom

I am creating the images by reading from texture memory using
glGetTexImage(). As an experiment, I tried calling glGetTexImage() only
once and letting imagQueue use that same reference over and over, but
the images were all the same.

So, this leads me to believe that the references returned by
glGetTexImage() are to unique buffers, right? ( documentation for
glGetTexImage() doesn't seem to disagree )

Perhaps, as an OpenGL program, something strange is happening with the
references when my producer method goes out of scope. I guess for now I
can be content to just pass immutable types across the thread boundary.
 
M

Metalone

This example is missing a few initialization details although they can
possibly be inferred.
For example is
iq = imageQueue()
but imageQueue does not have a put() method.
Is SetQueue() called?
Is iq.start() called?

I like to see small, fully complete and tested examples.
The following works using strings as images.
It might prove interesting to modify this test case to use your image
objects instead of strings.

import Queue
import threading
import unittest

class imageQueue(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.filenum = 0
self.theQueue = Queue.Queue(-1)
self.done = False
self.results = []

def run(self):
while not self.done:
# for testing, do something simple with images
try:
img = self.theQueue.get(True, 1)
except:
pass
else:
self.results.append(img)
self.filenum += 1

def SetQueue(self, q_):
self.theQueue = q_

def stop(self):
self.done = True


class Tester(unittest.TestCase):
def setUp(self):
pass

def tearDown(self):
pass

def test_1(self):
# initialize
q = Queue.Queue()
iq = imageQueue()
iq.SetQueue(q)
iq.start()

# produce images
images = ["123", "456", "789"]
for x in images:
q.put(x)

# wait till all images consumed
while iq.filenum != 3:
pass

# stop the thread
iq.stop()

# assert that the consumer consumed what was produced
msg = "%s != %s" % (images, iq.results)
self.assert_(images == iq.results, msg)

if __name__ == '__main__':
unittest.main()
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Members online

Forum statistics

Threads
473,995
Messages
2,570,230
Members
46,817
Latest member
DicWeils

Latest Threads

Top