UPnP client

N

Nikos Fotoulis

Hi.

Recently i needed some code to be able to listen on the public IP
address outside my modem router. Eventually, i came up with a
minimal UPnP implementation and because it seems to work and i'm
happy about it, i've decided to post it here at clpy in case
anybody else may have a use for it. You never know....

------------
# NAT Traversal via UPnP Port Mapping
# Written by Nikos Fotoulis <[email protected]>
# This code is public domain.
#
# Tested on Thomsom TG858v7 modem router.
# UPnP is hairy. May not work with other routers
# Feedback is welcome.

import re, thread, socket, traceback as tb, random
from time import sleep
from urlparse import urlparse
from urllib import urlopen
import urllib2

VERBOSE = VVERBOSE = False
DEFAULT_ADDR = UPNPS = None

# regexes
rWANIP = re.compile (r"ST:[^\n]*(WAN(IP|PPP)Connection:\d+)", re.I).search
rLOCATION = re.compile (r"LoCaTiON:([^\n]+)", re.I).search
def rTAG (t):
return re.compile ("<%s>(.+?)</%s>"%(t, t), re.I|re.DOTALL)
rSERVICE = rTAG ("service").findall

for tag in ["controlURL", "URLBase", "NewExternalIPAddress", "NewLeaseDuration", "NewProtocol",
"NewInternalClient", "NewExternalPort", "NewInternalPort"]:
def f (txt, r=rTAG (tag).search):
x = r (txt)
if x:
return x. groups ()[0].strip ()
if tag.startswith ("New"):
tag = tag [3:]
globals () ["r" + tag.upper ()] = f

# multicast and discover UPnP gateways
# Returns a dictionary where the keys are our "external IP" addresses
def DiscoverUPnP ():
global UPNPS, DEFAULTGW, DEFAULTIFACE, DEFAULT_ADDR
S = {}
UPNPS = {}

s = socket.socket (socket.AF_INET, socket.SOCK_DGRAM)
s.setsockopt (socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2)
#s.setsockopt (socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
R = "M-SEARCH * HTTP/1.1\r\nHOST: 239.255.255.250:1900\r\nMAN: ssdp:discover\r\nMX: 10\r\nST: ssdp:all\r\n\r\n"
try: s.sendto (R, ("239.255.255.250", 1900))
except:
print "UPnP gateways unreachable"
return

timeout = 5
while 1:
s.settimeout (timeout)
try:
data, addr = s.recvfrom (4096)
except:
break
timeout = max (timeout * 0.5, 0.01)
r = rWANIP (data)
if r:
service = r.groups ()[0]
r = rLOCATION (data)
if r:
location = r.groups () [0].strip ()
if VERBOSE:
print "server:", addr, "supports", service, "at", location
S [addr] = service, location
if VVERBOSE: print "+"

for userver, (service, location) in S.items ():
up = urlparse (location)
netloc = up.netloc
if ":" in netloc: server, _, port = netloc.partition (":")
else: server, port = netloc, "80"
data = urlopen (location).read ()

URLBase = rURLBASE (data) or "http://%s:%s"%(server, port)
controlURL = None
for x in rSERVICE (data):
if service in x:
controlURL = rCONTROLURL (x)
break
if controlURL:
addr = GetExternalIP (service, URLBase + controlURL)
if addr:
s = socket.socket (socket.AF_INET, socket.SOCK_STREAM)
s.connect ((server, int (port)))
thishost = s.getsockname () [0]
s.close ()
UPNPS [server] = addr, service, URLBase + controlURL, thishost
if VERBOSE:
print "for server:", server, "controlURL:", controlURL
else:
print "No controlURL found for server:", server

# set defaults
if len (UPNPS) == 1:
k = UPNPS.items ()[0]
DEFAULT_ADDR, DEFAULTGW, DEFAULTIFACE = k [1][0], k [0], k [1][3]
else:
print "Multiple UPnP gateways!"

return UPNPS


# generic request POST data
def envelope (request, service, **kw):
return """<?xml version="1.0"?>
<s:Envelope xmlns:s="http://schemas.xmlsoap.org/soap/envelope/" s:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">
<s:Body>
<u:%s xmlns:u="urn:schemas-upnp-org:service:%s">
"""%(request, service) + "\n".join (["<%s>%s</%s>"%(k,v,k) for k, v in kw.items ()]) + """ </u:%s>
</s:Body>
</s:Envelope>"""%request

def Request (service, URL, request, **kw):
req = urllib2.Request (URL)
req.add_header ("content-type",'text/xml; charset="utf-8"')
req.add_header ("SOAPACTION", '"urn:schemas-upnp-org:service:%s#%s"'%(service, request))
req.add_data (envelope (request, service, **kw))
try: return urllib2.build_opener ().open (req).read ()
except: return

def GetExternalIP (service, URL):
answer = Request (service, URL, "GetExternalIPAddress")

addr = answer and rEXTERNALIPADDRESS (answer)
if not addr:
print "Couldn't get external IP address!"
return addr

## The 3 basic actions of UPnP : list entries, add a mapping, delete a mapping
## Notes (tested on Thomson TG585v7):
## - Some times AddMapping returns a fail code (500) but the
## mapping *is* done and that can be seen by listing the entries (?!)
## So, the only way to be sure is to: list entries, add mapping, list entries
## and see the difference.
## - Returned LeaseDuration seems to be in deci-seconds

def getEntries (service, URL):
pmi = 0
while 1:
answer = Request (service, URL, "GetGenericPortMappingEntry", NewPortMappingIndex=pmi)
if not answer:
break
yield answer
pmi += 1

def listMappings (gw=None):
_, service, URL, iface = UPNPS [gw or DEFAULTGW]
L = []
for a in getEntries (service, URL):
if rPROTOCOL (a) == "TCP" and rINTERNALCLIENT (a) == iface:
L.append ((int (rEXTERNALPORT (a)), int (rINTERNALPORT (a)),
int (rLEASEDURATION (a)) / 10.0))
else: print "strange entry response!", a
return L

def addMapping (local_port, public_port, ttl, gw=None):
_, service, URL, iface = UPNPS [gw or DEFAULTGW]

# test if port already mapped. Result of AddMapping is unreliable
for eport, iport, _ in listMappings (gw):
if eport == public_port and iport != local_port:
return

answer = Request (service, URL, "AddPortMapping",
NewEnabled="1", NewRemoteHost="", NewLeaseDuration=ttl, NewInternalPort=local_port,
NewExternalPort=public_port, NewProtocol="TCP", NewInternalClient=iface,
NewPortMappingDescription="IndependNet")
if answer:
return True

# test if mapped. Result of AddMapping is unreliable
for eport, iport, _ in listMappings (gw):
if eport == public_port and iport == local_port:
return True

def delMapping (public_port, gw=None):
_, service, URL, _ = UPNPS [gw or DEFAULTGW]
if public_port != "all":
Request (service, URL, "DeletePortMapping",
NewRemoteHost="", NewExternalPort=public_port, NewProtocol="TCP")
else:
for public_port, _, _ in listMappings (gw):
Request (service, URL, "DeletePortMapping",
NewRemoteHost="", NewExternalPort=public_port, NewProtocol="TCP")

##
## Socket compatible interface for accepting connections on an external port.
## Does mapping keepalive every 60sec to make sure the mapping is not kept
## indefinately if our application crashes and didn't manage to remove it.
##

LEASE_DURATION = 60

def Accept (port):
if not port:
port = random.randint (2000, 60000)

if UPNPS is None:
DiscoverUPnP ()

if not UPNPS:
raise Error ("No UPnP gateway found. Can't listen ouside the modem")

s = socket.socket ()
s.bind ((DEFAULTIFACE, 0))
inport = s.getsockname ()[1]
if not addMapping (inport, port, LEASE_DURATION):
raise Error ("Port Mapping to external port %i Failed"%port)
s.listen (2)
return Acceptor (s, port, inport)

class UPnPError:
pass

class Acceptor:
def __init__ (self, sock, eport, iport):
self.sock, self.eport, self.iport = sock, eport, iport
self.port = eport
self.active = True
thread.start_new_thread (self.keepalive, ())

def __iter__ (self):
while self.active:
yield self.sock.accept ()

def keepalive (self):
while 1:
ttl = None
for eport, iport, ttl in listMappings ():
if eport == self.eport and iport == self.iport:
break
##print "Lease up for:", ttl
st = 0.1
if ttl is not None:
st = max (ttl - 0.1, 0.1)
sleep (st)
if not self.active: break
if not addMapping (self.iport, self.eport, LEASE_DURATION):
if ttl is None:
self.active = False
print "Failed to Keepalive the lease"

def __del__ (self):
self.active = False
self.sock.close ()
delMapping (self.eport)

## main. UPnP manager & testing

USAGE = """UPnP NAT Traversal (port mapping) test
Usage: python upnp.py [-gw gw] {list|bind|del} <arguments>

upnp list
list mappings
upnp bind internal-port external-port time-to-live
map public port to local port for some time
upnp del external-port|"all"
remove a port mapping
upnp
discover gateways and external IP addresses

Common options:
-gw : select UPnP gateway (if more than one -- NOT IMPLEMENTED)
"""

if __name__ == "__main__":
import sys
args = sys.argv [1:]
if "--help" in args:
print USAGE
exit ()

VERBOSE = True
print "Discovering UPnP gateways..."
DiscoverUPnP ()
for gw, v in UPNPS.items ():
ip, service, URL, iface = v
print "External IP:", ip
print "\tgateway:", gw
print "\tservice:", service
print "\tcontrol URL:", URL
print "\tinterface:", iface


if not UPNPS:
exit ("No UPnP gateway found")
if not args:
exit ()

cmd = args.pop (0)
gw = None

if cmd == "list":
print "Port Mappings:"
for ep, ip, ttl in listMappings (gw):
print "\t%i <- %i (ttl=%i)"%(ip, ep, ttl)
elif cmd == "bind":
iport, eport, ttl = args
iport, eport, ttl = int (iport), int (eport), int (ttl)
if addMapping (iport, eport, ttl, gw):
print "OK"
else: print "Failed. Port already used, or implementation error"
elif cmd == "del":
eport, = args
delMapping (eport, gw)
else:
print USAGE
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Members online

No members online now.

Forum statistics

Threads
473,954
Messages
2,570,116
Members
46,704
Latest member
BernadineF

Latest Threads

Top