#!/usr/bin/python # # urltester.py # # Copyright 2015,2016,2018 Todd Shadburn # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License # as published by the Free Software Foundation; either version 2 # of the License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. # import sys import time import datetime import threading import urllib2 import ssl import re import json from Queue import Queue class InternalResult(object): def __init__(self, code): self.code = code return def getcode(self): return self.code def read(self): return '' def page_content_perform_tests(url=None, content=None, tests=None): if url is None or content is None or tests is None: raise ValueError('page_content_perform_tests(): Required argument missing') results = { 'positive': [], 'negative': [] } if 'positive' in tests: for t in tests['positive']: m = re.search(t, content) if m: results['positive'].append('P') else: results['positive'].append('F') if 'negative' in tests: for t in tests['negative']: m = re.search(t, content) if m: results['negative'].append('F') else: results['negative'].append('P') return results def url_load_times(opener, url, tests=None): global timeout global test_direct data = {'url': url} res = None data['start'] = datetime.datetime.now() try: res = opener.open(url, None, timeout) except urllib2.HTTPError as e: res = InternalResult(e.code) except urllib2.URLError as e: estr = str(e.reason) res = InternalResult(estr.replace(' ', '_')) except ssl.SSLError as e: estr = str(e) res = InternalResult(estr.replace(' ', '_')) except: estr = str(sys.exc_info()[0]) res = InternalResult(estr.replace(' ', '_')) data['end'] = datetime.datetime.now() data['rc'] = res.getcode() data['opt'] = '' if tests: content = res.read() data['opt'] = page_content_perform_tests(url=url, content=content, tests=tests) return data def soap_load_times(c, wsdl, func, args): global timeout data = {'url': '%s %s' % (wsdl, func)} res = None data['start'] = datetime.datetime.now() try: f = getattr(c.service, func) res = f(*args) res = 'Success' except ValueError: res = 'Value Error' except: res = 'Exception' data['end'] = datetime.datetime.now() data['rc'] = res #TODO: get status code return data def log_thread(logq): global test_direct lfd = None if outfile: lfd = open(outfile, 'w+') while True: res = logq.get() opt = '' url = res['url'] diff = res['end'] - res['start'] if type(res['rc']) is int: res['rc'] = str(res['rc']) if 'opt' in res: if type(res['opt']) is dict: # Complex result type opt = 'positive=%-10s negative=%-10s' % (''.join(res['opt']['positive']), ''.join(res['opt']['negative'])) else: # simple result type opt = res['opt'] if lfd: lfd.write('%s %s time=%s rc=(%s) %s\n' % (str(res['start'].isoformat()), url, str(diff), res['rc'], opt)) else: sys.stdout.write('%-27s %-56s time=%s rc=(%s) %s\n' % (str(res['start'].isoformat()), url, str(diff), res['rc'], opt)) logq.task_done() if lfd: lfd.close() def worker_thread(urlq, logq): soapc_cache = {} opener = urllib2.build_opener(urllib2.HTTPHandler, urllib2.HTTPDefaultErrorHandler, urllib2.HTTPRedirectHandler, urllib2.HTTPErrorProcessor) while True: res = None qi = urlq.get() url = None tests = None if len(qi) == 2: url = qi[0] tests = qi[1] else: url = qi if 'SOAPCall ' in url: (fid,wsdl,func,args) = url.split(' ',3) if wsdl in soapc_cache: c = soapc_cache[wsdl] else: c = Client(wsdl) soapc_cache[wsdl] = c print 'DEBUG: Built SOAP cache item' argtup = args.split(',') res = soap_load_times(c, wsdl, func, argtup) else: res = url_load_times(opener, url, tests=tests) logq.put(res) urlq.task_done() def load_urltester_file(filename=None): if filename is None: raise ValueError('load_urltester_file(): missing argument') ostruct = {} try: fd = open(filename, 'r') ostruct = json.loads(fd.read()) fd.close() except: raise ValueError('load_urltester_file(): Unable to read/parse the given file.') return ostruct # Parse and validate arguments argmod = '' infiles = None outfile = None num_iterations = 1 num_threads = 1 timeout = 30 delay=0 try: import argparse argmod = 'argparse' except: try: from optparse import OptionParser argmod = 'optparse' except: print 'No argparse or optparse modules!' sys.exit(2) if argmod == 'argparse': parser = argparse.ArgumentParser(description='Utility for bulk testing of URL load times.') parser.add_argument('--infile', nargs=1, help='(Mandatory)Input file containing URLs to test.') parser.add_argument('--outfile', nargs=1, help='(Mandatory)File to output results to.') parser.add_argument('--timeout', nargs=1, help='(Optional)Connection timeout in seconds (default=30).') parser.add_argument('--threads', nargs=1, help='(Optional)Number of threads to use for requests (default=1).') parser.add_argument('--iterations', nargs=1, help='(Optional)Number of iterations through the URL list (default=1).') parser.add_argument('--delay', nargs=1, help='(Optional)Number of seconds of delay between interations (default=0).') args = parser.parse_args() infiles = args.infile if not args.outfile is None: outfile = args.outfile[0] if not args.timeout is None: timeout = int(args.timeout[0]) if not args.threads is None: num_threads = int(args.threads[0]) if not args.iterations is None: num_iterations = int(args.iterations[0]) if not args.delay is None: delay = int(args.delay[0]) else: parser = OptionParser() parser.add_option('--infile', dest='infiles', help='(Mandatory)Input file containing URLs to test.', metavar='FILE') parser.add_option('--outfile', dest='outfile', help='(Mandatory)File to output results to.') parser.add_option('--timeout', dest='timeout', help='(Optional)Connection timeout in seconds (default=30).') parser.add_option('--threads', dest='threads', help='(Optional)Number of threads to use for requests (default=1).') parser.add_option('--iterations', dest='iterations', help='(Optional)Number of iterations through the URL list (default=1).') parser.add_option('--delay', dest='delay', help='(Optional)Number of seconds of delay between interations (default=0).') (options, args) = parser.parse_args() if not options.infiles is None: infiles = [options.infiles] if not options.outfile is None: outfile = options.outfile if not options.timeout is None: timeout = int(options.timeout) if not options.threads is None: num_threads = int(options.threads) if not options.iterations is None: num_iterations = int(options.iterations) if not options.delay is None: delay = int(options.delay) if infiles is None: print "You must supply at least one --infile argument." sys.exit(2) #if outfile is None: # print "You must supply at least one --outfile argument." # sys.exit(2) urlq= Queue(0) logq= Queue(0) worker = threading.Thread(target=log_thread, args=(logq,)) worker.setDaemon(True) worker.start() for i in range(num_threads): worker = threading.Thread(target=worker_thread, args=(urlq,logq,)) worker.setDaemon(True) worker.start() for i in range(num_iterations): ustruct = load_urltester_file(infiles[0]) for item in ustruct['urls']: urlq.put( (item['url'],item['tests']) ) time.sleep(delay) # Wait for all the queue items to be processed urlq.join() #logq.join() sys.exit(0)