testproto.py: use argparse module to parse command line arguments, full PEP8 compliance

This commit is contained in:
Sebastien Helleu 2013-11-03 13:44:43 +01:00
parent 0e4ce9c967
commit 1cf51dd211

View File

@ -21,185 +21,219 @@
# along with QWeeChat. If not, see <http://www.gnu.org/licenses/>. # along with QWeeChat. If not, see <http://www.gnu.org/licenses/>.
# #
# from __future__ import print_function
# Usage: python testproto.py [-h] [-v] [-6] <hostname> <port>
# import argparse
# With initial commands: echo "init password=xxxx" | python testproto.py localhost 5000 import os
# python testproto.py localhost 5000 < commands.txt import select
# import shlex
# Return code: import socket
# 0: OK import struct
# 1: missing/invalid arguments (hostname or port) import sys
# 2: connection to WeeChat/relay failed import time
# 3: I/O error with WeeChat/relay import traceback
#
import os, sys, socket, select, struct, time
import protocol # WeeChat/relay protocol import protocol # WeeChat/relay protocol
options = { 'h': 0, 'v': 0, '6': 0 }
hostname = None
port = None
def usage(): class TestProto:
"""Display usage."""
print('\nSyntax: python %s [-h] [-v] [-6] <hostname> <port>\n' % sys.argv[0])
print(' -h display this help')
print(' -v verbose mode: long objects view (two -v: display raw messages)')
print(' -6 connect using IPv6')
print(' hostname, port hostname (or IP address) and port of machine running WeeChat relay')
print('')
print('Some commands can be piped to the script, for example:')
print(' echo "init password=xxxx" | python %s localhost 5000' % sys.argv[0])
print(' python %s localhost 5000 < commands.txt' % sys.argv[0])
print('')
def connect(address, ipv6): def __init__(self, args):
"""Connect to WeeChat/relay.""" self.args = args
inet = socket.AF_INET6 if ipv6 else socket.AF_INET self.sock = None
sock = None self.has_quit = False
try: self.address = '{self.args.hostname}/{self.args.port} ' \
sock = socket.socket(inet, socket.SOCK_STREAM) '(IPv{0})'.format(6 if self.args.ipv6 else 4, self=self)
sock.connect(address)
except:
if sock:
sock.close()
print('Failed to connect to %s/%d using %s' % (address[0], address[1],
'IPv4' if inet == socket.AF_INET else 'IPv6'))
return (None, None)
print('Connected to %s/%d (%s)' % (hostname, port,
'IPv4' if inet == socket.AF_INET else 'IPv6'))
return (sock, inet)
def send(sock, messages): def connect(self):
"""Send a text message to WeeChat/relay.""" """
has_quit = False Connect to WeeChat/relay.
try: Return True if OK, False if error.
for msg in messages.split('\n'): """
if msg == 'quit': inet = socket.AF_INET6 if self.args.ipv6 else socket.AF_INET
has_quit = True try:
sock.sendall(msg + '\n') self.sock = socket.socket(inet, socket.SOCK_STREAM)
print('\x1b[33m<-- %s\x1b[0m' % msg) self.sock.connect((self.args.hostname, self.args.port))
except: except:
print('Failed to send message') if self.sock:
return (False, has_quit) self.sock.close()
return (True, has_quit) print('Failed to connect to', self.address)
return False
print('Connected to', self.address)
return True
def decode(message): def send(self, messages):
"""Decode a binary message received from WeeChat/relay.""" """
global options Send a text message to WeeChat/relay.
try: Return True if OK, False if error.
proto = protocol.Protocol() """
message = proto.decode(message, separator='\n' if options['v'] else ', ') try:
print('') for msg in messages.split('\n'):
if options['v'] >= 2 and message.uncompressed: if msg == 'quit':
# display raw message self.has_quit = True
print('\x1b[32m--> message uncompressed (%d bytes):\n%s\x1b[0m' self.sock.sendall(msg + '\n')
% (message.size_uncompressed, print('\x1b[33m<-- ' + msg + '\x1b[0m')
protocol.hex_and_ascii(message.uncompressed, 20))) except:
# display decoded message traceback.print_exc()
print('\x1b[32m--> %s\x1b[0m' % message) print('Failed to send message')
except: return False
print('Error while decoding message from WeeChat') return True
return False
return True
def mainloop(sock): def decode(self, message):
"""Main loop: read keyboard, send commands, read socket and decode and display received binary messages.""" """
message = '' Decode a binary message received from WeeChat/relay.
recvbuf = '' Return True if OK, False if error.
prompt = '\x1b[36mrelay> \x1b[0m' """
sys.stdout.write(prompt) try:
sys.stdout.flush() proto = protocol.Protocol()
try: msgd = proto.decode(message,
while True: separator='\n' if self.args.verbose > 0
inr, outr, exceptr = select.select([sys.stdin, sock], [], [], 1) else ', ')
for fd in inr: print('')
if fd == sys.stdin: if self.args.verbose >= 2 and msgd.uncompressed:
buf = os.read(fd.fileno(), 4096) # display raw message
if buf: print('\x1b[32m--> message uncompressed ({0} bytes):\n'
message += buf '{1}\x1b[0m'
if '\n' in message: ''.format(msgd.size_uncompressed,
messages = message.split('\n') protocol.hex_and_ascii(msgd.uncompressed, 20)))
msgsent = '\n'.join(messages[:-1]) # display decoded message
if msgsent: print('\x1b[32m--> {0}\x1b[0m'.format(msgd))
(send_ok, has_quit) = send(sock, msgsent) except:
if not send_ok: traceback.print_exc()
return 3 print('Error while decoding message from WeeChat')
if has_quit: return False
return 0 return True
message = messages[-1]
def send_stdin(self):
"""
Send commands from standard input if some data is available.
Return True if OK (it's OK if stdin has no commands),
False if error.
"""
inr, outr, exceptr = select.select([sys.stdin], [], [], 0)
if inr:
data = os.read(sys.stdin.fileno(), 4096)
if data:
if not test.send(data.strip()):
#self.sock.close()
return False
# open stdin to read user commands
sys.stdin = open('/dev/tty')
return True
def mainloop(self):
"""
Main loop: read keyboard, send commands, read socket,
decode/display binary messages received from WeeChat/relay.
Return 0 if OK, 4 if send error, 5 if decode error.
"""
if self.has_quit:
return 0
message = ''
recvbuf = ''
prompt = '\x1b[36mrelay> \x1b[0m'
sys.stdout.write(prompt)
sys.stdout.flush()
try:
while not self.has_quit:
inr, outr, exceptr = select.select([sys.stdin, self.sock],
[], [], 1)
for fd in inr:
if fd == sys.stdin:
buf = os.read(fd.fileno(), 4096)
if buf:
message += buf
if '\n' in message:
messages = message.split('\n')
msgsent = '\n'.join(messages[:-1])
if msgsent and not self.send(msgsent):
return 4
message = messages[-1]
sys.stdout.write(prompt + message)
sys.stdout.flush()
else:
buf = fd.recv(4096)
if buf:
recvbuf += buf
while len(recvbuf) >= 4:
remainder = None
length = struct.unpack('>i', recvbuf[0:4])[0]
if len(recvbuf) < length:
# partial message, just wait for the
# end of message
break
# more than one message?
if length < len(recvbuf):
# save beginning of another message
remainder = recvbuf[length:]
recvbuf = recvbuf[0:length]
if not self.decode(recvbuf):
return 5
if remainder:
recvbuf = remainder
else:
recvbuf = ''
sys.stdout.write(prompt + message) sys.stdout.write(prompt + message)
sys.stdout.flush() sys.stdout.flush()
else: except:
buf = fd.recv(4096) traceback.print_exc()
if buf: self.send('quit')
recvbuf += buf return 0
while len(recvbuf) >= 4:
remainder = None
length = struct.unpack('>i', recvbuf[0:4])[0]
if len(recvbuf) < length:
# partial message, just wait for end of message
break
# more than one message?
if length < len(recvbuf):
# save beginning of another message
remainder = recvbuf[length:]
recvbuf = recvbuf[0:length]
if not decode(recvbuf):
return 3
if remainder:
recvbuf = remainder
else:
recvbuf = ''
sys.stdout.write(prompt + message)
sys.stdout.flush()
except:
send(sock, 'quit')
# display help if arguments are missing def __del__(self):
if len(sys.argv) < 3: print('Closing connection with', self.address)
usage() time.sleep(0.5)
sys.exit(1) self.sock.close()
# read command line arguments
try:
for arg in sys.argv[1:]:
if arg[0] == '-':
for opt in arg[1:]:
options[opt] = options.get(opt, 0) + 1
elif hostname:
port = int(arg)
else:
hostname = arg
except:
print('Invalid arguments')
sys.exit(1)
if options['h']: if __name__ == "__main__":
usage() # parse command line arguments
sys.exit(0) parser = argparse.ArgumentParser(
formatter_class=argparse.RawDescriptionHelpFormatter,
fromfile_prefix_chars='@',
description='Command-line program for testing protocol WeeChat/relay.',
epilog='''
Environment variable "TESTPROTO_OPTIONS" can be set with default options.
Argument "@file.txt" can be used to read default options in a file.
# connect to WeeChat/relay Some commands can be piped to the script, for example:
(sock, inet) = connect((hostname, port), options['6']) echo "init password=xxxx" | python {0} localhost 5000
if not sock: python {0} localhost 5000 < commands.txt
sys.exit(2)
# send commands from standard input if some data is available The script returns:
has_quit = False 0: OK
inr, outr, exceptr = select.select([sys.stdin], [], [], 0) 2: wrong arguments (command line)
if inr: 3: connection error
data = os.read(sys.stdin.fileno(), 4096) 4: send error (message sent to WeeChat)
if data: 5: decode error (message received from WeeChat)
(send_ok, has_quit) = send(sock, data.strip()) '''.format(sys.argv[0]))
if not send_ok: parser.add_argument('-6', '--ipv6', action='store_true',
sock.close() help='connect using IPv6')
sys.exit(3) parser.add_argument('-v', '--verbose', action='count', default=0,
# open stdin to read user commands help='verbose mode: long objects view '
sys.stdin = open('/dev/tty') '(-vv: display raw messages)')
parser.add_argument('hostname',
help='hostname (or IP address) of machine running '
'WeeChat/relay')
parser.add_argument('port', type=int,
help='port of machine running WeeChat/relay')
if len(sys.argv) == 1:
parser.print_help()
sys.exit(0)
args = parser.parse_args(
shlex.split(os.getenv('TESTPROTO_OPTIONS') or '') + sys.argv[1:])
# main loop (wait commands, display messages received) test = TestProto(args)
if not has_quit:
mainloop(sock) # connect to WeeChat/relay
time.sleep(0.5) if not test.connect():
sock.close() sys.exit(3)
# send commands from standard input if some data is available
if not test.send_stdin():
sys.exit(4)
# main loop (wait commands, display messages received)
rc = test.mainloop()
del test
sys.exit(rc)