#!/usr/bin/env python # # testes.py # # Copyright 2020 Todd Shadburn # # Licensed under the GNU GPL version 2 # from datetime import datetime,timedelta from elasticsearch import Elasticsearch es = Elasticsearch(['http://YOUR_ELASTICSEACH_CLUSTER:9200'],http_auth=('USERNAME','PASSWORD')) #doc = { 'author': 'todd', 'text': 'This is a document.', '@timestamp': datetime.now(), 'xfer-time': 42349567, 'result-code': 302} #res = es.index(index='syslog', doc_type='_doc', id=4, body=doc) #q = { # "query": { # "match_all": {} # } #} #res = es.search(index="syslog", body={"query": {"match_all": {}}}) #print('Count: %d' % (len(res['hits']['hits']))) #print(res) #print es.search(index='syslog', q='author:todd AND xfer-time:[10000000 TO 30000000]') #print es.count(index='syslog', q='author:todd AND xfer-time:[10000000 TO 50000000] AND @timestamp:[2020-03-02T15:00:00 TO 2020-03-02T22:00:00]') # rule framework tests def searchtime_to_lucene(searchtime): ts = datetime.utcnow() w = d = h = m = s = 0 if searchtime[-1] == 's': s = int(searchtime[0:-1]) elif searchtime[-1] == 'm': m = int(searchtime[0:-1]) elif searchtime[-1] == 'h': h = int(searchtime[0:-1]) elif searchtime[-1] == 'd': d = int(searchtime[0:-1]) elif searchtime[-1] == 'w': w = int(searchtime[0:-1]) ots = ts - timedelta(days=d, seconds=s, minutes=m, hours=h, weeks=w) return '@timestamp:[%s TO %s]' % (ots.strftime('%Y-%m-%dT%H:%M:%S'), ts.strftime('%Y-%m-%dT%H:%M:%S')) def run_rule(rule): res = None q = '(%s) AND %s' % (rule['query'], searchtime_to_lucene(rule['searchtime'])) print q if rule['type'] == 'count': res = es.count(index='syslog', q=q) if res['count'] >= rule['num_events']: if 'alert_body' in rule: print rule['alert_body'].format(**res) return True elif rule['type'] == 'query': res = es.search(index='syslog', q=q) if len(res['hits']['hits']) >= rule['num_events']: if 'alert_body' in rule: print rule['alert_body'].format(**res['hits']['hits'][0]['_source']) return True return False rule = { 'description': 'Test Rule 1', 'type': 'query', 'query': 'author:todd AND xfer-time:[10000000 TO 50000000]', 'num_events': 2, #'alert_body': 'Warning, {count} events were found that matched the rule', 'alert_body': 'Warning, events found that had {author} as the author, thats wrong.', 'searchtime': '1d' } #print searchtime_to_lucene('1s') #print searchtime_to_lucene('1m') #print searchtime_to_lucene('1h') #print searchtime_to_lucene('1d') print run_rule(rule)