#!/usr/bin/python # # urltester.py # # Copyright 2015 Todd Shadburn # # Licensed under the GNU GPL version 2 # import sys import time import datetime import threading import urllib2 import ssl from Queue import Queue class InternalResult(object): def __init__(self, code): self.code = code return def getcode(self): return self.code def url_load_times(opener, url): global timeout global test_direct data = {'url': url} res = None data['start'] = datetime.datetime.now() try: res = opener.open(url, None, timeout) except urllib2.HTTPError as e: res = InternalResult(e.code) except urllib2.URLError as e: estr = str(e.reason) res = InternalResult(estr.replace(' ', '_')) except ssl.SSLError as e: estr = str(e) res = InternalResult(estr.replace(' ', '_')) except: estr = str(sys.exc_info()[0]) res = InternalResult(estr.replace(' ', '_')) data['end'] = datetime.datetime.now() data['rc'] = res.getcode() return data def log_thread(logq): global test_direct lfd = None if outfile: lfd = open(outfile, 'w+') while True: res = logq.get() url = res['url'] diff = res['end'] - res['start'] if type(res['rc']) is int: res['rc'] = str(res['rc']) if lfd: lfd.write('%s %s time=%s rc=(%s)\n' % (str(res['start'].isoformat()), url, str(diff), res['rc'])) else: sys.stdout.write('%s %s time=%s rc=(%s)\n' % (str(res['start'].isoformat()), url, str(diff), res['rc'])) logq.task_done() if lfd: lfd.close() def worker_thread(urlq, logq): opener = urllib2.build_opener(urllib2.HTTPHandler, urllib2.HTTPDefaultErrorHandler, urllib2.HTTPRedirectHandler, urllib2.HTTPErrorProcessor) while True: url = urlq.get() res = url_load_times(opener, url) logq.put(res) urlq.task_done() # Parse and validate arguments argmod = '' infiles = None outfile = None num_iterations = 1 num_threads = 1 timeout = 30 delay=0 try: import argparse argmod = 'argparse' except: try: from optparse import OptionParser argmod = 'optparse' except: print 'No argparse or optparse modules!' sys.exit(2) if argmod == 'argparse': parser = argparse.ArgumentParser(description='Utility for bulk testing of URL load times.') parser.add_argument('--infile', nargs=1, help='(Mandatory)Input file containing URLs to test.') parser.add_argument('--outfile', nargs=1, help='(Mandatory)File to output results to.') parser.add_argument('--timeout', nargs=1, help='(Optional)Connection timeout in seconds (default=30).') parser.add_argument('--threads', nargs=1, help='(Optional)Number of threads to use for requests (default=1).') parser.add_argument('--iterations', nargs=1, help='(Optional)Number of iterations through the URL list (default=1).') parser.add_argument('--delay', nargs=1, help='(Optional)Number of seconds of delay between interations (default=0).') args = parser.parse_args() infiles = args.infile if not args.outfile is None: outfile = args.outfile[0] if not args.timeout is None: timeout = int(args.timeout[0]) if not args.threads is None: num_threads = int(args.threads[0]) if not args.iterations is None: num_iterations = int(args.iterations[0]) if not args.delay is None: delay = int(args.delay[0]) else: parser = OptionParser() parser.add_option('--infile', dest='infiles', help='(Mandatory)Input file containing URLs to test.', metavar='FILE') parser.add_option('--outfile', dest='outfile', help='(Mandatory)File to output results to.') parser.add_option('--timeout', dest='timeout', help='(Optional)Connection timeout in seconds (default=30).') parser.add_option('--threads', dest='threads', help='(Optional)Number of threads to use for requests (default=1).') parser.add_option('--iterations', dest='iterations', help='(Optional)Number of iterations through the URL list (default=1).') parser.add_option('--delay', dest='delay', help='(Optional)Number of seconds of delay between interations (default=0).') (options, args) = parser.parse_args() if not options.infiles is None: infiles = [options.infiles] if not options.outfile is None: outfile = options.outfile if not options.timeout is None: timeout = int(options.timeout) if not options.threads is None: num_threads = int(options.threads) if not options.iterations is None: num_iterations = int(options.iterations) if not options.delay is None: delay = int(options.delay) if infiles is None: print "You must supply at least one --infile argument." sys.exit(2) #if outfile is None: # print "You must supply at least one --outfile argument." # sys.exit(2) urlq= Queue(0) logq= Queue(0) worker = threading.Thread(target=log_thread, args=(logq,)) worker.setDaemon(True) worker.start() for i in range(num_threads): #print 'DEBUG: Starting thread #%d' % (i) worker = threading.Thread(target=worker_thread, args=(urlq,logq,)) worker.setDaemon(True) worker.start() for i in range(num_iterations): #print 'DEBUG: Iteration #%d' % (i) ufd = open(infiles[0], 'r') for line in ufd: url = line.rstrip('\n') urlq.put(url) ufd.close() time.sleep(delay) # Wait for all the queue items to be processed urlq.join() #logq.join() sys.exit(0)