Threading with Socket Server

T

T

Hello all, I am writing a Windows service that needs to 1) Act as a
server (TCP), and write to a shelve file, and 2) Read that same shelve
file every x number of seconds. My thought is to create 2 separate
classes (1 for the socket server and writing to the shelve file, and
another to read the file and perform other requested actions) -
however, I need to make sure they are both running concurrently. I'm
sure threading is involved, but my experience with it has been
minimal, so any help to steer me in the right direction is greatly
appreciated.
 
B

Boris FELD

I'm not sure to understand what you want. Is it your server process
that will read the shelve file ? You may give more informations about
your problem, but i can give you some hints.

In order to create a TCP server, you can use SocketServer which is in
the builtin library
(http://docs.python.org/library/socketserver.html). I'm not sure about
how shelve manage concurrency, but in my opinion, you'll need to
create a single process or thread which manage read and write
operation in order to avoiding race conditions.

Cheers,
Feld Boris
 
T

T

The server portion of the program will typically be writing to the
shelve file (there may be a few cases in which I will need it to
read), and the other part of the program will read it. Basically, I
want the following to be able to both go on at the same time: 1)
Server portion waits for connections, and upon connection writes data
received to shelve file and 2) Continuously polls shelve file
every x seconds and checks for new entries. I had given thought to
the potential of a race condition as you mentioned, but am not sure of
how to "safely" allow each portion of the program to read/write.
 
B

baloan

If you don't mind to use the coroutine library eventlet you can
implement a single threaded solution. See example below. For your use
case you need to change controller to load the shelve every
eventlet.sleep(n) seconds.

Regards, Andreas

# eventlet single thread demo

""" prc_publish.eventlet

Price Publisher
"""

# imports

from argparse import ArgumentParser
import eventlet
import logging
import os
import random
import sys
import cPickle as pickle

LOG = logging.getLogger()

# definitions

def main(argv=None):
if argv is None:
argv = sys.argv
LOG.info("starting '%s %s'", os.path.basename(argv[0]), "
".join(argv[1:]))
# parse options and arguments
parser = ArgumentParser(description="Price Publisher")
parser.add_argument("-f", "--file", dest="filename",
help="read configuration from %(dest)s")
parser.add_argument("-p", "--port", default=8001, type=int,
help="server port [default: %(default)s")
args = parser.parse_args()
print args
# create product dict
prds = { }
pubqs = []
for n in range(10):
key = "AB" + "{:04}".format(n)
prds["AB" + key] = Pricer(key)
# start one thread for price changes
eventlet.spawn(controller, prds, pubqs)
address = ('localhost', 8010)
eventlet.spawn(listener, address, pubqs)
# main thread runs eventlet loop
while True:
eventlet.sleep(10)


def listener(address, pubqs):
sock = eventlet.listen(address)
while True:
LOG.info('waiting for connection on %s', address)
cx, remote = sock.accept()
LOG.info("accepting connection from %s", remote)
inq = eventlet.queue.Queue()
pubqs.append(inq)
eventlet.spawn(receiver, cx)
eventlet.spawn(publisher, pubqs, inq, cx)


def publisher(pubqs, inq, cx):
LOG.info("Publisher running")
try:
while True:
# what happens if client does not pick up
# what happens if client dies during queue wait
try:
with eventlet.Timeout(1):
item = inq.get()
s = pickle.dumps(item, pickle.HIGHEST_PROTOCOL)
# s = "{0[0]} {0[1]}\n\r".format(item)
cx.send(s)
except eventlet.Timeout:
# raises IOError if connection lost
cx.fileno()
# if connection closes
except IOError, e:
LOG.info(e)
# make sure to close the socket
finally:
cx.close()
pubqs.remove(inq)
LOG.info("Publisher terminated")


def receiver(cx):
LOG.info("Receiver running")
try:
while True:
# what happens if client does not pick up
s = cx.recv(4096)
if not s:
break
LOG.info(s)
# if connection closes
except IOError, e:
LOG.info(e)
# make sure to close the socket
finally:
cx.close()
LOG.info("Receiver terminated")

def controller(prds, pubqs):
while True:
LOG.info("controller: price update cycle, %i pubqs",
len(pubqs))
Pricer.VOLA = update_vola(Pricer.VOLA)
for prd in prds.values():
prd.run()
for pubq in pubqs:
pubq.put((prd.name, prd.prc))
eventlet.sleep(5)

def update_vola(old_vola):
new_vola = max(old_vola + random.choice((-1, +1)) * 0.01, 0.01)
return new_vola

class Pricer(object):
VOLA = 0.01
def __init__(self, name):
self.name = name
self.prc = random.random() * 100.0

def run(self):
self.prc += random.choice((-1, +1)) * self.prc * self.VOLA


if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s.%(msecs)03i %(levelname).
4s %(funcName)10s: %(message)s',
datefmt='%H:%M:%S')
main()
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Members online

No members online now.

Forum statistics

Threads
473,995
Messages
2,570,228
Members
46,817
Latest member
AdalbertoT

Latest Threads

Top