Usage example for ANSI.py from the pexpect package

E

Eric Myers

Hello folks:

(This message is also posted on the help forum at the pexpect
sourceforge page, but all indentation in the code got stripped away
when I submitted the post.)

For some time I've wanted to make use of the ANSI.py module in the
pexpect package to handle screen-based telnet sessions in Python, but I
could never break the ice with the thing. After reading an article by
Greg Jorgenson where he recounts using the package to help him solve a
problem for a customer, I wrote him asking if he could give me any tips
to help me get started. Greg very graciously sent me some code to use
as an example, and I've finally been able to make some headway, thanks
to him. I'm at least sputtering along now, so I wanted to take the
time to pass along a little code in case anyone else out there would
like to tinker with this tool but has a little trouble getting off the
dime...

The code I'm posting below is a script that establishes a telnet
connection with my Linux machine, starts the vim text editor, reads
some text from the splash screen over a range of coordinates, and moves
the cursor around. On my machine, the raw data sent over the telnet
stream after issuing the "vim" command is this:

..[1;24r.[27m.[24m.[0m.(B.[H.[J.[2;1H.[1m~
(...and then a series of "~" symbols with line breaks-- takes up a lot
of screen space... :))
~.[0m.(B.[24;63H0,0-1.[9CAll.[6;32HVIM - Vi IMproved.[8;34Hversion
6.3.54.[9;29Hby Bram Moolenaar et al..[10;24HModified by
..[1m<[email protected]>.[0m.(B.[11;19HVim is open source and freely
distributable.[13;26HHelp poor children in Uganda!.[14;18Htype :help
iccf.[1m<Enter>.[0m.(B.[7Cfor information.[16;18Htype
:q.[1m<Enter>.[0m.(B.[15Cto exit.[17;18Htype :help.[1m<Enter>.[0m.(B
or .[1m<F1>.[0m.(B for on-line help.[18;18Htype :help
version6.[1m<Enter>.[0m.(B for version info.[1;1H

The script is used to process the raw stream and read characters 25
through 37 on row 18, and it correctly reports "help version6" as the
result.

A great deal of the code that follows is either blatantly
cut-and-pasted from the sample Greg sent me, or has been modified only
slightly. I have included some code of my own which makes use of the
set_option_negotiation_callback method of the Telnet class, which can
be used to establish an agreement between the client and server on how
the session will proceed. Learning how to use that method was a hurdle
for me when I did it, maybe someone down the line will find an example
useful. Beware, though-like I said, it's my own code, and I'm a
rookie. There's probably a lot of room for improvement in what I came
up with.

One thing that I think is worth noting-none of the code below depends
on the pexpect module itself. It just uses the standard telnetlib to
establish the connection, and then uses the ANSI, screen, and FSM
modules from the pexpect package to interpret the output of a
screen-based application. The upshot of this is that the pty module is
not required, so you can use this with the native Windows version of
Python. Cygwin is not required.

Finally, I'd like to express my thanks to Mr. Jorgensen one more time.
Greg, you really helped out when I was ready to become frustrated. It's
people like you that make the 'net a nicer place to work and play!

OK, without further ado, here's the script-

#! /usr/bin/env python
""" ansi_py_usage: example for usage of the ANSI.py module """

from telnetlib import Telnet
from telnetlib import DO, DONT, WILL, WONT, theNULL, TTYPE, IAC, SB, \
SE, ECHO
import ANSI

global host_address,username,password

host_address='address_of_the_machine_you_want_to_connect_to'
username='your_username' # P.S.-- it's very bad practice to hard-code
password='your_password' # in a hostname, username, and password!
terminal_type='vt100'


class AnsiPyTestImplementation:
def __init__(self):
self.conn = None
self.crt = ANSI.ANSI(25,80)

def login(self, host, username, password):
try:
# empty the read buffer and check the connection
self.conn.read_eager()
except:
self.conn = None
tries = 45
while tries > 0:
print "logging in..."
try:
if self.conn is None:
#
# The following is an example of how to use the
# set_option_negotiation_callback method of the
# Telnet function. When any sub-negotiation
# requests are received from the server during the
# client/server hand-shaking, a user-defined
# function can be called to handle the situation.
#
self.conn = Telnet()
self.conn.set_option_negotiation_callback(self.neg)
self.conn.open(host)
self.waitfor("login: ", 5)
self.sendln(username)
self.waitfor("Password: ", 2)
self.sendln(password)
self.sendln("");
self.waitfor("$ ", 2)
return
except:
tries = tries - 1
self.conn = None
print "timed out"
raise IOError, "timed out (login)"

def logout(self):
if self.conn:
try:
self.sendln("exit")
self.conn.close()
except:
pass
self.conn = None
print"logged out"
else:
print"not logged in"

def neg(self, sock, command, option):
#
# Here's a function I came up with to handle sub-negotiation.
# During session negotiation, the server can ask a series of
# "will you or won't you" questions of the client. One of
# those questions happens to be:
# "Will you tell me what terminal type you are?"
# This question is the only one out of the possible list of
# such questions that I respond with "Yes, I will." Then later
# the function reports that the terminal type is a DEC VT-100.
# If you don't do the sub-negotiation and the server demands
# to know the terminal type, the Telnet function will report
# that the terminal type is simply "network". No server will
# recognize this, and some will refuse to even start a session
# with you using some default terminal type.
# A couple of good links--
# http://www.cs.cf.ac.uk/Dave/Internet/node136.html
# http://www.scit.wlv.ac.uk/rfc/rfc8xx/RFC854.html
#
negotiation_list=[
['BINARY',WONT,'WONT'],
['ECHO',WONT,'WONT'],
['RCP',WONT,'WONT'],
['SGA',WONT,'WONT'],
['NAMS',WONT,'WONT'],
['STATUS',WONT,'WONT'],
['TM',WONT,'WONT'],
['RCTE',WONT,'WONT'],
['NAOL',WONT,'WONT'],
['NAOP',WONT,'WONT'],
['NAOCRD',WONT,'WONT'],
['NAOHTS',WONT,'WONT'],
['NAOHTD',WONT,'WONT'],
['NAOFFD',WONT,'WONT'],
['NAOVTS',WONT,'WONT'],
['NAOVTD',WONT,'WONT'],
['NAOLFD',WONT,'WONT'],
['XASCII',WONT,'WONT'],
['LOGOUT',WONT,'WONT'],
['BM',WONT,'WONT'],
['DET',WONT,'WONT'],
['SUPDUP',WONT,'WONT'],
['SUPDUPOUTPUT',WONT,'WONT'],
['SNDLOC',WONT,'WONT'],
['TTYPE',WILL,'WILL'],
['EOR',WONT,'WONT'],
['TUID',WONT,'WONT'],
['OUTMRK',WONT,'WONT'],
['TTYLOC',WONT,'WONT'],
['VT3270REGIME',WONT,'WONT'],
['X3PAD',WONT,'WONT'],
['NAWS',WONT,'WONT'],
['TSPEED',WONT,'WONT'],
['LFLOW',WONT,'WONT'],
['LINEMODE',WONT,'WONT'],
['XDISPLOC',WONT,'WONT'],
['OLD_ENVIRON',WONT,'WONT'],
['AUTHENTICATION',WONT,'WONT'],
['ENCRYPT',WONT,'WONT'],
['NEW_ENVIRON',WONT,'WONT']
]
if ord(option)<40:
received_option=negotiation_list[ord(option)][0]
response=negotiation_list[ord(option)][1]
print_response=negotiation_list[ord(option)][2]
else:
received_option='unrecognised'
response=WONT
print_response='WONT'
if command==DO:
print "Received request to DO %s, sending %s" % \
(received_option,print_response)
sock.sendall("%s%s%s" % (IAC, response, option))
elif command==DONT:
print 'Received the DONT command'
elif command==WILL:
print 'Received the WILL command'
elif command==WONT:
print 'Received the WONT command'
elif command==theNULL:
print 'Received the NULL command'
elif command==SB:
print 'Received the SB command'
print ord(option)
print self.conn.read_sb_data()
elif command==SE:
print 'Received the SE command'
print repr(self.conn.read_sb_data())
sock.sendall("%s%s%s%sDEC-VT100%s%s" % \
(IAC,SB,TTYPE,chr(0),IAC,SE))
print 'Sent all'
else:
print 'Received something, don''t know what.'
print ord(option)
return

def getcrt(self, row, colstart, colend):
return self.crt.get_region(row, colstart, row, colend)[0]

def send(self, s):
self.conn.write(s)

def sendln(self, s):
self.send("%s\r" % s)

def updatescreen(self, buf):
self.crt.process_list(buf)

def waitfor(self, match, timeout=None):
s = self.conn.read_until(match, timeout)
if timeout is not None:
n = len(match)
if s[-n:] != match:
raise IOError, "timed out (waitfor)"
self.updatescreen(s)
return s

def DoIt(self):
#
# This is the method called by main(), which in turn uses
# methods from ANSI.py and screen.py to process the raw data
# from a screen-based application (vim in this case)
#
try:
self.login(host_address,username,password)
except:
print "Couldn't log in!"
try:
self.sendln("vim")
print "vim started"

# [1;1H is the last bit of the vim splash screen that my
# server sends me. The self.waitfor method is used to look
# for this sequence of characters. Once it arrives, we
# know the buffer is ready to be processed.
raw_telnet_data=self.waitfor("[1;1H",3)

print "\nThe raw telnet data sent over the network was:\n"
print raw_telnet_data + "\n"

# The vim splash screen lists the help version on row 18,
# characters 25 through 37. We can locate that text in the
# processed buffer.
help_version = self.getcrt(18,25,37).strip()
print "Text found on row 18, columns 25 through 37, was: ",
print help_version + "\n"

# The following shows you a screen-shot of the processed
# buffer.
print "ANSI-fication of the raw telnet data received",
print "looks like this:\n"
print str(self.crt)

except:
print "vim failed to start"
raise IOError, "service unavailable (start)"
try:
# ANSI.py can keep up with where the cursor is...
if self.crt:
print "Right now the cursor is at row " + \
repr(self.crt.cur_r)+ ", column " + \
repr(self.crt.cur_c)+"\n"
print "So now we'll send some keystrokes to change",
print "that, and then report back the new position..."
self.send("i")
self.send("\t")
print"Done\n"
self.waitfor("[1;9H",3)
print "Now the cursor is at row " + \
repr(self.crt.cur_r)+ ", column " + \
repr(self.crt.cur_c)+"\n"
else:
print "cursor is at 0,0"
except:
print "Couldn't get cursor position!"

self.logout()

if __name__=='__main__':
w = AnsiPyTestImplementation()
w.DoIt()
 

Ask a Question

Want to reply to this thread or ask your own question?

You'll need to choose a username for the site, which only take a couple of moments. After that, you can post your question and our members will help you out.

Ask a Question

Members online

Forum statistics

Threads
473,969
Messages
2,570,161
Members
46,705
Latest member
Stefkari24

Latest Threads

Top