#!/usr/bin/python # # urlmonitord.py # # Copyright 2015,2016,2018 Todd Shadburn # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License # as published by the Free Software Foundation; either version 2 # of the License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. # import sys import time import datetime import threading import urllib2 import ssl import re import json from Queue import Queue import redis import json import netinfo FLAG_SKIP_PERF_EVENT = 1 class InternalResult(object): def __init__(self, code): self.code = code return def getcode(self): return self.code def read(self): return '' def page_content_perform_tests(url=None, content=None, tests=None): if url is None or content is None or tests is None: raise ValueError('page_content_perform_tests(): Required argument missing') results = { 'positive': [], 'negative': [] } if 'positive' in tests: for t in tests['positive']: m = re.search(t, content) if m: results['positive'].append('P') else: results['positive'].append('F') if 'negative' in tests: for t in tests['negative']: m = re.search(t, content) if m: results['negative'].append('F') else: results['negative'].append('P') return results def url_load_times(opener, url, tests=None): global timeout global test_direct res = None data = {'url': url, 'tests': tests} data['start'] = datetime.datetime.now() try: res = opener.open(url, None, timeout) except urllib2.HTTPError as e: res = InternalResult(e.code) except urllib2.URLError as e: estr = str(e.reason) res = InternalResult(estr.replace(' ', '_')) except ssl.SSLError as e: estr = str(e) res = InternalResult(estr.replace(' ', '_')) except: estr = str(sys.exc_info()[0]) res = InternalResult(estr.replace(' ', '_')) data['end'] = datetime.datetime.now() data['rc'] = res.getcode() data['opt'] = '' if tests: content = '' try: content = res.read() data['opt'] = page_content_perform_tests(url=url, content=content, tests=tests) except ssl.SSLError as e: estr = str(e) res = InternalResult(estr.replace(' ', '_')) data['rc'] = res.getcode() except: estr = str(sys.exc_info()[0]) res = InternalResult(estr.replace(' ', '_')) data['rc'] = res.getcode() return data def soap_load_times(c, wsdl, func, args): global timeout data = {'url': '%s %s' % (wsdl, func)} res = None data['start'] = datetime.datetime.now() try: f = getattr(c.service, func) res = f(*args) res = 'Success' except ValueError: res = 'Value Error' except: res = 'Exception' data['end'] = datetime.datetime.now() data['rc'] = res #TODO: get status code return data def log_thread(logq): global test_direct global redis_host global key_prefix global site global site_hostname global global_stats global site_retest_list global urlq rfd = None nc = None #config_keybase = ':'.join([key_prefix,'config']) result_keybase = ':'.join([key_prefix,'lastcheck']) rfd = redis.Redis(host=redis_host, port=6379) while True: try: res = logq.get(True, 45) except: # close the connection if no items recently if nc: #print 'DEBUG Closing netinfo connection after timeout.' nc.close() nc = None continue opt = '' # If a timeout occurred, reschedule the check once if not type(res['rc']) is int and ('timed_out' in res['rc'] or 'timeout' in res['rc']): if not res['url'] in site_retest_list: # Resubmit it one time print 'DEBUG Resubmitting because of timeout: %s' % (res['url']) urlq.put( (res['url'],res['tests']) ) site_retest_list[res['url']] = res continue else: # Subsequent attempt failed as well, so it's a real issue print 'DEBUG Resubmission resulted in another timeout: %s' % (res['url']) del site_retest_list[res['url']] res['status'] = FLAG_SKIP_PERF_EVENT pass url = res['url'] diff = res['end'] - res['start'] global_stats['resp_agg'] += diff.seconds + (diff.microseconds/1000000.0) global_stats['resp_count'] += 1 if type(res['rc']) is int: res['rc'] = str(res['rc']) if 'opt' in res: if type(res['opt']) is dict: # Complex result type opt = 'positive=%-10s negative=%-10s' % (''.join(res['opt']['positive']), ''.join(res['opt']['negative'])) else: # simple result type opt = res['opt'] #sys.stdout.write('%-27s %-56s time=%s rc=(%s) %s\n' % (str(res['start'].isoformat()), url, str(diff), res['rc'], opt)) # Prepare result structure d = {} d['url'] = url d['starttime'] = str(res['start'].isoformat()) d['elapsedtime'] = str(diff) d['rc'] = res['rc'] if type(res['opt']) is dict: d['positive'] = ''.join(res['opt']['positive']) d['negative'] = ''.join(res['opt']['negative']) else: d['positive'] = '' d['negative'] = '' # Prepare netinfo event data loadtime = float(d['elapsedtime'].split(':')[2]) msg = '%s Load time %0.6f seconds' % (d['url'],loadtime) perf = 'gauge=%0.6f;0.0;0.0;0.0;0.0' % (loadtime) data = str('|'.join([msg,perf])) obj = str('SITE-'+d['url']) # Load config data #ckey = ':'.join([config_keybase, url]) #jcdata = rfd.get(ckey) #cdata = json.loads(jcdata) # Load last result data rdata = None rkey = ':'.join([result_keybase, url]) jrdata = rfd.get(rkey) if jrdata: rdata = json.loads(jrdata) else: # First time for this URL rdata = d # Send netinfo event(s) flags = 0 nerc = 0 ne_list = [] #print 'url=%s res[rc]=%s' % (res['url'],res['rc']) if res['rc'] != '200': nerc = 2 if 'F' in d['positive'] or 'F' in d['negative']: nerc = 2 res['rc'] = '%s (w/ content failure)' % (res['rc']) d['rc'] = res['rc'] if res['rc'] != rdata['rc']: # state change, append oneshot event if nerc != 0: # Transition from success to failure m = '%s Error occurred while loading site: %s' % (res['url'],res['rc']) else: # transition from failure to success m = msg #print 'state event: %s' % (str(m)) ne_list.append(netinfo.NetinfoEvent( hostname=site_hostname, object=str(obj), flags=1, rc=nerc, data=str(m))) # Append the performance event if not 'status' in res or res['status'] != FLAG_SKIP_PERF_EVENT: #print 'perf event: %s' % (str(data)) ne_list.append(netinfo.NetinfoEvent( hostname=site_hostname, object=str(obj), flags=0, rc=nerc, data=str(data))) # Emit the Netinfo events if not nc: nc = netinfo.NetinfoClient( host='mon.home.opencomputingsolutions.com', cert='/etc/netinfo/netinfod.cert', key='/etc/netinfo/netinfod.key') if nc: for ev in ne_list: try: nc.send_event(ev) #print 'DEBUG would have submitted nievent obj=%s data=%s' % (str(obj),str(data)) #print 'ev.rc=%d' % (ev.rc) except: print str(ev) print sys.exc_info()[0] print sys.exc_info()[1] print sys.exc_info()[2] sys.exit(2) ne_list = None # Update the result key with the latest data rfd.set(rkey, json.dumps(d)) logq.task_done() return def worker_thread(urlq, logq): soapc_cache = {} opener = urllib2.build_opener( urllib2.HTTPHandler, urllib2.HTTPDefaultErrorHandler, urllib2.HTTPRedirectHandler, urllib2.HTTPErrorProcessor) while True: res = None qi = urlq.get() url = None tests = None if len(qi) == 2: url = qi[0] tests = qi[1] else: url = qi if 'SOAPCall ' in url: (fid,wsdl,func,args) = url.split(' ',3) if wsdl in soapc_cache: c = soapc_cache[wsdl] else: c = Client(wsdl) soapc_cache[wsdl] = c print 'DEBUG: Built SOAP cache item' argtup = args.split(',') res = soap_load_times(c, wsdl, func, argtup) else: res = url_load_times(opener, url, tests=tests) logq.put(res) urlq.task_done() return def load_urltester_file(filename=None): if filename is None: raise ValueError('load_urltester_file(): missing argument') ostruct = {} try: fd = open(filename, 'r') ostruct = json.loads(fd.read()) fd.close() except: raise ValueError('load_urltester_file(): Unable to read/parse the given file.') return ostruct # Parse and validate arguments argmod = '' infiles = None outfile = None redis_host = None key_prefix = 'urlmonitor' num_threads = 1 timeout = 30 delay=300 #redis_host = 'localhost' site = 'Unordered' site_hostname = 'websites' avg_resp_warn = 0.800 avg_resp_crit = 1.000 global_resp_object = 'AVERAGE_RESPONSE' # Dictionary of sites under retest (b/c of timeouts, etc) site_retest_list = {} try: import argparse argmod = 'argparse' except: try: from optparse import OptionParser argmod = 'optparse' except: print 'No argparse or optparse modules!' sys.exit(2) if argmod == 'argparse': parser = argparse.ArgumentParser(description='Utility for bulk testing of URL load times.') parser.add_argument('--infile', nargs=1, help='(Mandatory)Input file containing URLs to test.') parser.add_argument('--redis', nargs=1, help='(Mandatory)Redis host/cluster to connect to.') parser.add_argument('--prefix', nargs=1, help='(Optional)Redis keyname prefix to use.') parser.add_argument('--outfile', nargs=1, help='(Mandatory)File to output results to.') parser.add_argument('--timeout', nargs=1, help='(Optional)Connection timeout in seconds (default=30).') parser.add_argument('--threads', nargs=1, help='(Optional)Number of threads to use for requests (default=1).') parser.add_argument('--delay', nargs=1, help='(Optional)Number of seconds of delay between interations (default=300).') parser.add_argument('--sitehostname', nargs=1, help='(Optional)Hostname reported to Netinfo when sending events.') parser.add_argument('--grobjname', nargs=1, help='(Optional)Global response time object name.') args = parser.parse_args() infiles = args.infile if not args.redis is None: redis_host = args.redis[0] if not args.prefix is None: key_prefix = args.prefix[0] if not args.timeout is None: timeout = int(args.timeout[0]) if not args.threads is None: num_threads = int(args.threads[0]) if not args.delay is None: delay = int(args.delay[0]) if delay < 5: delay = 5 if not args.sitehostname is None: site_hostname = args.sitehostname[0] if not args.grobjname is None: global_resp_object = args.grobjname[0] if infiles is None: print "You must supply at least one --infile argument." sys.exit(2) if redis_host is None: print "You must supply at least one --redis argument." sys.exit(2) global_stats = { 'resp_agg': 0, 'resp_count': 0, 'state': 0, } urlq= Queue(0) logq= Queue(0) #eventq= Queue(0) worker = threading.Thread(target=log_thread, args=(logq,)) worker.setDaemon(True) worker.start() #worker = threading.Thread(target=netinfo_event_thread, args=(eventq,)) #worker.setDaemon(True) #worker.start() for i in range(num_threads): worker = threading.Thread(target=worker_thread, args=(urlq,logq,)) worker.setDaemon(True) worker.start() # main loop while True: # Queue URL checks st = time.time() ustruct = load_urltester_file(infiles[0]) if 'hostname' in ustruct: site_hostname = str(ustruct['hostname']) if 'responsetime_warn' in ustruct: avg_resp_warn = float(ustruct['responsetime_warn']) if 'responsetime_crit' in ustruct: avg_resp_crit = float(ustruct['responsetime_crit']) for item in ustruct['urls']: urlq.put( (item['url'],item['tests']) ) et = time.time() # Sleep sleeptime = delay-(et-st) if sleeptime < 0: sleeptime = 0 print 'INFO: sleeptime=%d, changing it to 0' time.sleep(sleeptime) # Calculate the global response time for the last run # After the sleep, so we have all the responses accounted for ne_list = [] nerc = 0 if global_stats['resp_count'] > 0: resptime = global_stats['resp_agg']/global_stats['resp_count'] msg = 'Average global site response time is %0.6f seconds' % (resptime) perf = 'gauge=%0.6f;0.0;0.0;%0.6f;%0.6f' % (resptime,avg_resp_warn,avg_resp_crit) data = str('|'.join([msg,perf])) if resptime >= avg_resp_crit: nerc = 2 elif resptime >= avg_resp_warn: nerc = 1 if nerc != global_stats['state']: ne_list.append(netinfo.NetinfoEvent( hostname=site_hostname, object=str(global_resp_object), flags=1, rc=nerc, data=str(data))) global_stats['state'] = nerc ne_list.append(netinfo.NetinfoEvent( hostname=site_hostname, object=str(global_resp_object), flags=0, rc=nerc, data=str(data))) nc = netinfo.NetinfoClient( host='mon.home.opencomputingsolutions.com', cert='/etc/netinfo/netinfod.cert', key='/etc/netinfo/netinfod.key') for ne in ne_list: nc.send_event(ne) ne = None ne_list = None nc.close() nc = None global_stats['resp_agg'] = 0.0 global_stats['resp_count'] = 0 # Wait for all the queue items to be processed urlq.join() #logq.join() sys.exit(0)