""" BotSense IP Proxy Unit Test Module
Author: Robin D. Knight
Email: robin.knight@roadnarrowsrobotics.com
URL: http://www.roadnarrowsrobotics.com
Date: 2007.10.32
Copyright (C) 2007. RoadNarrows LLC.
"""
import sys
import socket
import time
try:
import readline
except ImportError:
pass
BsProxyMsgIdLog = 'l'
BsProxyMsgIdVersion = 'v'
BsProxyMsgIdError = 'e'
BsProxyMsgIdLoopback = 'b'
BsProxyMsgIdProxyInfo = '?'
BsProxyMsgIdDevOpen = 'o'
BsProxyMsgIdDevClose = 'c'
BsProxyMsgIdRead = 'r'
BsProxyMsgIdWrite = 'w'
BsProxyMsgIdTrans = 't'
BsProxyMsgIdDevIoctl = 'i'
BsProxyMsgIdDevScan = 's'
BsProxyMsgIdDevCmd = '!'
BsProxyDevTypeNone = 0x00
BsProxyDevTypeI2C = 0x01
BsProxyDevTypeBPFoot = 0x02
BsProxyDevTypeBPImu = 0x03
BsProxyDevTypeBPHand = 0x04
BsProxyDevTypeBPCompass = 0x05
BsProxyDevTypeRS232 = 0x06
BsProxyDevTypeRCB3 = 0x07
BsProxyRspPass = 'P'
BsProxyRspFail = 'F'
BsProxyBPFootCmdIdGetIds = 'i'
BsProxyBPFootCmdIdGetCal = 'c'
BsProxyBPFootCmdIdGetRaw = 'r'
BsProxyBPFootCmdIdGetCooked = 'd'
class Packer:
def __init__(self):
pass
def PackMsg(self, packfmt):
msg = []
for ffmt in packfmt:
fval = ffmt['fval']
ftype = ffmt.get('ftype', 'u8')
if type(fval) == list:
for val in fval:
msg += self.PackField(val, ftype)
else:
msg += self.PackField(fval, ftype)
return msg
def PackField(self, fval, ftype):
if ftype == 'u8':
return self.PackU8(fval)
elif ftype == 'u16':
return self.PackU16(fval)
elif ftype == 'u32':
return self.PackU32(fval)
elif ftype == 'char':
return self.PackChar(fval)
elif ftype == 'zstr':
return self.PackZStr(fval)
else:
raise TypeError, ftype
def PackU8(self, fval):
if type(fval) == str:
fval = ord(fval)
return [fval & 0xff]
def PackU16(self, fval):
msg = []
msg += [((fval >> 8) & 0xff)]
msg += [(fval & 0xff)]
return msg
def PackU32(self, fval):
msg = []
msg += [((fval >> 24) & 0xff)]
msg += [((fval >> 16) & 0xff)]
msg += [((fval >> 8) & 0xff)]
msg += [(fval & 0xff)]
return msg
def PackChar(self, fval):
return ["%c" % fval]
def PackZStr(self, fval):
msg = []
for c in fval:
msg += [ord(c)]
msg += [0]
return msg
def UnpackMsg(self, msg, unpackfmt, pos=0):
unpackedVals = {}
lenMsg = len(msg)
for ffmt in unpackfmt:
if pos >= lenMsg:
self._error(unpackedVals, pos, "message too short to unpack all fields")
return unpackedVals
fname = ffmt['fname']
ftype = ffmt.get('ftype', 'u8')
fcount = ffmt.get('fcount', 1)
i = 0
while i < fcount:
val = self.UnpackField(msg, ftype, pos)
if val['_rc'] == 'error':
self._error(unpackedVals, val['_pos'], val['_error_msg'])
return unpackedVals
elif fcount > 1:
if j == 0:
unpackedVals[fname] = []
unpackedVals[fname] += [val['fval']]
else:
unpackedVals[fname] = val['fval']
pos = val['_pos']
i += 1
unpackedVals['_rc'] = 'ok'
unpackedVals['_pos'] = pos
return unpackedVals
def UnpackField(self, msg, ftype, pos=0):
if ftype == 'u8':
return self.UnpackU8(msg, pos)
elif ftype == 'u16':
return self.UnpackU16(msg, pos)
elif ftype == 'u32':
return self.UnpackU32(msg, pos)
elif ftype == 'char':
return self.UnpackChar(msg, pos)
elif ftype == 'zstr':
return self.UnpackZStr(msg, pos)
else:
raise TypeError, ftype
def UnpackU8(self, msg, pos=0):
unpackedVal = {}
if pos < len(msg):
unpackedVal['_rc'] = 'ok'
unpackedVal['_pos'] = pos+1
if type(msg[pos]) == str:
unpackedVal['fval'] = ord(msg[pos])
else:
unpackedVal['fval'] = (msg[pos] & 0xff)
else:
self._error(unpackedVal, pos, "message too short to unpack u8")
return unpackedVal
def UnpackU16(self, msg, pos=0):
unpackedVal = {}
if pos+1 < len(msg):
fval = ((ord(msg[pos]) << 8) | (ord(msg[pos+1]) & 0xff)) & 0xffff;
unpackedVal['_rc'] = 'ok'
unpackedVal['_pos'] = pos+2
unpackedVal['fval'] = fval
else:
self._error(unpackedVal, pos, "message too short to unpack u16")
return unpackedVal
def UnpackU32(self, msg, pos=0):
unpackedVal = {}
if pos+3 < len(msg):
fval = (ord(msg[pos]) << 24)
fval |= (ord(msg[pos+1]) << 16)
fval |= (ord(msg[pos+2]) << 8)
fval |= ord(msg[pos+3])
unpackedVal['_rc'] = 'ok'
unpackedVal['_pos'] = pos+4
unpackedVal['fval'] = fval
else:
self._error(unpackedVal, pos, "message too short to unpack u32")
return unpackedVal
def UnpackChar(self, msg, pos=0):
unpackedVal = {}
if pos < len(msg):
unpackedVal['_rc'] = 'ok'
unpackedVal['_pos'] = pos+1
unpackedVal['fval'] = "%c" % msg[pos]
else:
self._error(unpackedVal, pos, "message too short to unpack u8")
return unpackedVal
def UnpackZStr(self, msg, pos=0):
unpackedVal = {}
if pos < len(msg):
try:
if type(msg) == str:
i = msg.find('\0', pos)
if i < 0:
raise ValueError
else:
i = msg.index(0, pos)
except:
self._error(unpackedVal, pos, "null-terminated string not found")
return unpackedVal
fval = ''
while pos < i:
fval += "%c" % msg[pos]
pos += 1
unpackedVal['_rc'] = 'ok'
unpackedVal['_pos'] = pos+1
unpackedVal['fval'] = fval
else:
self._error(unpackedVal, pos, "message too short to unpack zstr")
return unpackedVal
def DumpBytes(self, msg):
for byte in msg:
if type(byte) == str:
byte = ord(byte)
if byte >= 0x20 and byte <= 0x7e:
print "%c" % byte,
else:
print "0x%02x" % byte,
print
def _error(self, unpackedVals, pos, error_msg):
unpackedVals['_rc'] = 'error'
unpackedVals['_pos'] = pos
unpackedVals['_error_msg'] = error_msg
return unpackedVals
class BotSenseProxyPacker(Packer):
def __init__(self):
self._tid = 0
self._lasttid = 0
self._cmd_hdr_fmt = [
{'fval':0, 'ftype':'u8'},
{'fval':0, 'ftype':'u8'},
{'fval':0, 'ftype':'u8'}]
self._rsp_hdr_fmt = [
{'fname':'msgid', 'ftype':'u8'},
{'fname':'tid', 'ftype':'u8'},
{'fname':'pf', 'ftype':'char'},
{'fname':'blen', 'ftype':'u8'}]
self._rsp_err_fmt = [{'fname':'_error_msg', 'ftype':'zstr'}]
def GenTid(self):
self._lasttid = self._tid
self._tid = (self._tid + 1) % 256
return self._lasttid
def GetLastTid(self):
return self._lasttid
def PackMsg(self, msgId, packfmt):
body = Packer.PackMsg(self, packfmt)
if type(msgId) == str:
msgId = ord(msgId)
self._cmd_hdr_fmt[0]['fval'] = msgId
self._cmd_hdr_fmt[1]['fval'] = self.GenTid()
self._cmd_hdr_fmt[2]['fval'] = len(body)
cmdhdr = Packer.PackMsg(self, self._cmd_hdr_fmt)
return cmdhdr+body
def UnpackMsg(self, msg, msgId, unpackfmt, tid=-1):
if type(msgId) == str:
msgId = ord(msgId)
rsp = Packer.UnpackMsg(self, msg, self._rsp_hdr_fmt, 0)
if rsp['_rc'] == 'error':
rsp['_error_msg'] = "bad response header: " + rsp['_error_msg']
return rsp
elif rsp['msgid'] != msgId:
return self._error(rsp, 0,
"command 0x%02x - response 0x%02x message id mismatch" % \
(msgId, rsp['msgid']))
elif tid >= 0 and rsp['tid'] != tid:
return self._error(rsp, 1,
"command %d - response %d transaction id mismatch" % \
(tid, rsp['tid']))
if rsp['pf'] == 'P':
body = Packer.UnpackMsg(self, msg, unpackfmt, 4)
else:
body = Packer.UnpackMsg(self, msg, self._rsp_err_fmt, 4)
body['_rc'] = 'error'
rsp.update(**body)
return rsp
def str2int(s):
try:
val = eval(s)
return val
except (SyntaxError, NameError, TypeError):
print >>sys.stderr, "error: %s: bad integer" % s
return None
def dumpBytes(msg):
for byte in msg:
if type(byte) == str:
byte = ord(byte)
if byte >= 0x20 and byte <= 0x7e:
print "%c" % byte,
else:
print "0x%02x" % byte,
print
def makeMsg(bytelist):
sMsg = ''
for byte in bytelist:
sMsg += "%c" % (byte)
return sMsg
def sendCmd(sock, cmd, trace=False):
if type(cmd) is list:
sCmd = makeMsg(cmd)
else:
sCmd = cmd
if trace:
dumpBytes(sCmd)
sock.sendall(sCmd)
def recvRsp(sock, trace=False):
rsp = sock.recv(1024)
if trace:
dumpBytes(rsp)
return rsp
def CmdBPFoot(args):
cmdName = 'bpfoot'
msgId = BsProxyMsgIdDevCmd
if len(args) < 2:
print >>sys.stderr, "error: %s missing argument(s)" % cmdName
return
whichfoot = args[0]
footcmd = args[1]
if whichfoot not in ['bpfoot_left', 'bpfoot_right']:
print >>sys.stderr, "error: %s %s: whichfoot foot?" % (cmdName, args[0])
return
handle = BsClientHandles.get(whichfoot)
if handle is None:
print >>sys.stderr, "error: %s: proxied device not found" % whichfoot
return
if footcmd == 'getids':
devCmdId = BsProxyBPFootCmdIdGetIds
cmd = BsPacker.PackMsg(msgId,
[ {'fval':handle, 'ftype':'u8'},
{'fval':devCmdId, 'ftype':'u8'},
])
sendCmd(BsClientSock, cmd)
rsp = recvRsp(BsClientSock)
if rsp:
vals = BsPacker.UnpackMsg(rsp, msgId,
[ {'fname':'devId', 'ftype':'u16'},
{'fname':'version', 'ftype':'u8'}
])
if vals['_rc'] == 'ok':
print "%s %s device_id=0x%04x version=0x%02x" % \
(whichfoot, cmdName, vals['devId'], vals['version'])
else:
print "error:", vals['_error_msg']
elif footcmd == 'getraw':
devCmdId = BsProxyBPFootCmdIdGetRaw
cmd = BsPacker.PackMsg(msgId,
[ {'fval':handle, 'ftype':'u8'},
{'fval':devCmdId, 'ftype':'u8'},
])
sendCmd(BsClientSock, cmd)
rsp = recvRsp(BsClientSock)
if rsp:
vals = BsPacker.UnpackMsg(rsp, msgId,
[{'fname':'press_raw', 'ftype':'u16', 'fcount':8}])
if vals['_rc'] == 'ok':
print "%s pressure_raw" % (whichfoot),
for press in vals['press_raw']:
print press,
print
else:
print "error:", vals['_error_msg']
elif footcmd == 'getcooked':
devCmdId = BsProxyBPFootCmdIdGetCooked
cmd = BsPacker.PackMsg(msgId,
[ {'fval':handle, 'ftype':'u8'},
{'fval':devCmdId, 'ftype':'u8'},
])
sendCmd(BsClientSock, cmd)
rsp = recvRsp(BsClientSock)
if rsp:
vals = BsPacker.UnpackMsg(rsp, msgId,
[{'fname':'press_cal', 'ftype':'u16', 'fcount':8}])
if vals['_rc'] == 'ok':
print "%s pressure_cal" % (whichfoot),
for press in vals['press_cal']:
print press,
print
else:
print "error:", vals['_error_msg']
else:
print >>sys.stderr, "error: %s: unknown device command" % args[1]
def CmdDevOpen(args):
cmdName = 'devadd'
msgId = BsProxyMsgIdDevOpen
if len(args) < 2:
print >>sys.stderr, "error: %s: missing argument(s)" % cmdName
return
devTypeName = args[0]
if devTypeName == 'i2c':
devType = BsProxyDevTypeI2C
elif devTypeName == 'bpfoot_left' or devType == 'bpfoot_right':
devType = BsProxyDevTypeBPFoot
elif devTypeName == 'bpimu':
devType = BsProxyDevTypeBPImu
else:
print >>sys.stderr, "error: 0x%02x: unknown type" % devTypeName
return
i2cAddr = str2int(args[1])
if i2cAddr is None:
return
if len(args) > 2:
devName = args[2]
else:
devName = '/dev/null'
cmd = BsPacker.PackMsg(msgId,
[ {'fval':devType, 'ftype':'u8'},
{'fval':i2cAddr, 'ftype':'u8'},
{'fval':devName, 'ftype':'zstr'}
])
sendCmd(BsClientSock, cmd)
rsp = recvRsp(BsClientSock)
if rsp:
vals = BsPacker.UnpackMsg(rsp, msgId,
[{'fname':'handle', 'ftype':'u8'}],
BsPacker.GetLastTid())
if vals['_rc'] == 'ok':
print "proxied device %s's handle is %d" % (devTypeName, vals['handle'])
BsClientHandles[devTypeName] = vals['handle']
else:
print "error:", vals['_error_msg']
def CmdDevClose(args):
cmdName = 'devdel'
msgId = BsProxyMsgIdDevClose
if len(args) < 1:
print >>sys.stderr, "error: %s missing argument(s)" % cmdName
return None
handle = str2int(args[0])
if handle is None:
return
cmd = BsPacker.PackMsg(msgId, [{'fval':handle, 'ftype':'u8'}])
sendCmd(BsClientSock, cmd)
rsp = recvRsp(BsClientSock)
if rsp:
vals = BsPacker.UnpackMsg(rsp, msgId, [{'fname':'handle', 'ftype':'u8'}])
if vals['_rc'] == 'ok':
for k,v in BsClientHandles.iteritems():
if v == handle:
del BsClientHandles[k]
break
else:
print "error:", vals['_error_msg']
def CmdOpen(args):
cmdName = 'open'
if len(args) > 0:
addr = args[0]
else:
addr = '127.0.0.1'
if len(args) > 1:
port = str2int(args[1])
else:
port = 9195
if port is None:
return
rc = BsClientSock.connect_ex((addr, port))
if rc != 0:
print "Failed to connect, rc=%d (%s)" % \
(rc, errno.errorcode.get(rc, 'UNKNOWN'))
return
BsClientHandles.clear()
print "Connected to bsproxy @ %s:%d" % (addr, port)
def CmdClose():
global BsClientSock
BsClientSock.close()
BsClientSock = socket.socket()
BsClientHandles.clear()
def CmdLoopback(args):
cmdName = 'loopback'
msgId = BsProxyMsgIdLoopback
if len(args) > 0:
max = str2int(args[0])
else:
max = None
if len(args) > 1:
s = args[1]
else:
s = 'from ut.py'
cnt = 0
while True:
loopmsg = "loopback "+str(cnt)+" "+s+"\n"
cmd = BsPacker.PackMsg(msgId, [{'fval':loopmsg, 'ftype':'zstr'}])
sendCmd(BsClientSock, cmd)
rsp = recvRsp(BsClientSock)
if rsp:
vals = BsPacker.UnpackMsg(rsp, msgId,
[{'fname':'loopmsg', 'ftype':'zstr'}],
BsPacker.GetLastTid())
if vals['_rc'] == 'ok':
print vals['loopmsg'],
else:
print "error:", vals['_error_msg']
cnt = cnt + 1
if max is not None and cnt > max:
return
time.sleep(1)
def CmdVersion():
cmdName = 'version'
msgId = BsProxyMsgIdVersion
cmd = BsPacker.PackMsg(msgId, [])
sendCmd(BsClientSock, cmd)
rsp = recvRsp(BsClientSock)
if rsp:
vals = BsPacker.UnpackMsg(rsp, msgId, [{'fname':'version', 'ftype':'zstr'}])
if vals['_rc'] == 'ok':
print 'BotSense Server Version: %s' % vals['version']
else:
print "error:", vals['_error_msg']
def CmdInfo():
cmdName = 'info'
msgId = BsProxyMsgIdProxyInfo
cmd = BsPacker.PackMsg(msgId, [])
sendCmd(BsClientSock, cmd)
rsp = recvRsp(BsClientSock)
if rsp:
vals = BsPacker.UnpackMsg(rsp, msgId, [])
if vals['_rc'] == 'ok':
print 'Server proxied devices:',
n = vals['blen']
k = vals['_pos']
while n > 0:
print '0x%02x' % ord(rsp[k]),
n -= 1
k += 1
print
else:
print "error:", vals['_error_msg']
def CmdLog(args):
cmdName = 'log'
msgId = BsProxyMsgIdLog
if len(args) < 1:
print >>sys.stderr, "error: %s: missing argument(s)" % cmdName
return
level = str2int(args[0])
if level is None:
return
cmd = BsPacker.PackMsg(msgId, [{'fval':level, 'ftype':'u8'}])
sendCmd(BsClientSock, cmd)
rsp = recvRsp(BsClientSock)
if rsp:
vals = BsPacker.UnpackMsg(rsp, msgId, [{'fname':'level', 'ftype':'u8'}])
if vals['_rc'] == 'ok':
print 'Server logging level: %d' % vals['level']
else:
print "error:", vals['_error_msg']
def CmdI2CScan(args):
cmdName = 'i2cscan'
msgId = BsProxyMsgIdDevScan
if len(args) < 1:
print >>sys.stderr, "error: %s: missing argument(s)" % cmdName
return
handle = str2int(args[0])
if handle is None:
return
cmd = BsPacker.PackMsg(msgId, [{'fval':handle, 'ftype':'u8'}])
sendCmd(BsClientSock, cmd)
rsp = recvRsp(BsClientSock)
if rsp:
vals = BsPacker.UnpackMsg(rsp, msgId, [{'fname':'nScanned', 'ftype':'u8'}])
if vals['_rc'] == 'ok':
nScanned = vals['nScanned']
print "Scanned %d I2C devices" % nScanned
for byte in rsp[5:]:
if nScanned <= 0:
break
print "0x%02x" % ord(byte),
nScanned -= 1
else:
print "error:", vals['_error_msg']
BsPacker = BotSenseProxyPacker()
BsClientSock = socket.socket()
BsClientHandles = {}
while True:
cmd = raw_input("bsclient$ ")
args = cmd.split()
if not args:
pass
elif args[0] == 'quit':
break
elif args[0] == 'open':
CmdOpen(args[1:])
elif args[0] == 'close':
CmdClose()
elif args[0] == 'loopback':
CmdLoopback(args[1:])
elif args[0] == 'version':
CmdVersion()
elif args[0] == 'info':
CmdInfo()
elif args[0] == 'log':
CmdLog(args[1:])
elif args[0] == 'i2cscan':
CmdI2CScan(args[1:])
elif args[0] == 'devopen':
handle = CmdDevOpen(args[1:])
elif args[0] == 'devclose':
CmdDevClose(args[1:])
elif args[0] == 'bpfoot_left' or args[0] == 'bpfoot_right':
CmdBPFoot(args)
elif args[0] == 'help':
print >>sys.stderr, """\
BotSense Server UnitTest Help
-----------------------------
*BrainPack Foot Commands
Two BP Feet (left and right) are supported. Use command 'devadd' to add each
foot to the bsproxy proxied devices.
General Foot Command Format:
bpfoot_left footcmd [byte...]
bpfoot_right footcmd [byte...]
footcmd - Foot-specific command.
byte... - Foot-specific command bytes, if any.
Specific Foot Commands:
... getids - Get BrainPack foot identifiers.
... getraw - Get foot raw sensor data.
... getcooked - Get foot cooked sensor data.
*BotSense Proxy Connection Commands
Opens, closes, and tests bsproxy connection.
Commands:
open [addr [port]] - Open connection on IP address and port.
addr - IP address. Default: 127.0.0.1
port - IP port. Default: 9195
close - Close connection.
loopback [cnt] - Loopback cnt times.
cnt - Number of loops. Default is forever.
version - Server version.
info - List of server supported proxied device types.
log - Set server log level.
i2cscan handle - Scan the I2C Bus for available I2C attached devices.
handle - allocated proxied device handle
*Proxied Device General Commands
Opens, closes, and lists client's proxied devices.
Commands:
devopen type i2caddr [devname]
- Open proxied device.
type - One of: i2c bpfoot_left bpfoot_right bpimu.
i2caddr - I2C address [0x01 - 0x7f].
devname - device name. Default: /dev/null
devclose handle - Close proxied device.
handle - allocated proxied device handle
*BotSense UnitTest Commands
help - Print this help.
quit - Quit UnitTest shell.
"""
else:
print >>sys.stderr, "Error:", args[0], "; unknown command"