#!/usr/bin/env python # # netinfoclient.py # # (c) 2022 Todd Shadburn # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. # import sys import base64 import requests class NetinfoApiClient(object): def __init__(self, hostname=None, port=80, *args, **kwargs): if not hostname: raise ValueError('hostname and apikey are required arguments') self.hostname = hostname self.port = int(port) self.username = None self.password = None self.scheme = 'http' self.api_key = None self.api_version = 'v1' self.request_kwargs = {} if 'apikey' in kwargs: self.api_key = kwargs['apikey'] if 'username' in kwargs: self.username = kwargs['username'] if 'password' in kwargs: self.password = kwargs['password'] if 'scheme' in kwargs: self.scheme = kwargs['scheme'] if 'apiversion' in kwargs: self.api_version = kwargs['apiversion'] if 'cacert' in kwargs: self.request_kwargs['verify'] = kwargs['cacert'] if 'client_cert' in kwargs and 'client_key' in kwargs: client_keypair = ( kwargs['client_cert'], kwargs['client_key'] ) self.request_kwargs['cert'] = client_keypair #if self.username and self.password: # token = base64.b64encode('%s:%s' % (self.username,self.password)) # self.postdata_headers['Authorization'] = 'Basic %s' % (token) self.postdata_headers = { 'NETINFO-API-KEY': self.api_key, 'Content-Type': 'application/json', } self.url = "%s://%s:%d/api/%s" %(self.scheme, self.hostname, self.port, self.api_version) return def _params_to_url_kvpairs(self, params={}): # convert a dictionary to URL parameters param_list = [] for k in params.keys(): param_list.append('='.join([k,params[k]])) q_str = '' if len(param_list): q_str = '?' + '&'.join(param_list) return q_str def get(self, uri=None, params={}, *args, **kwargs): # Perform a GET on the API # Dont pollute the params dictionary parms = dict(params) url = ''.join([self.url,'/',uri,'/',self._params_to_url_kvpairs(parms)]) res = requests.get( url, headers=self.postdata_headers, **self.request_kwargs ) return res def post(self, uri=None, params={}, body=None, *args, **kwargs): # Perform a POST on the API # Dont pollute the params dictionary parms = dict(params) url = ''.join([self.url,'/',uri,'/',self._params_to_url_kvpairs(parms)]) #print(url) #print(self.request_kwargs) res = requests.post( url=url, headers=self.postdata_headers, data=body, **self.request_kwargs ) return res def put(self, uri=None, params={}, body=None, *args, **kwargs): # Perform a POST on the API # Dont pollute the params dictionary parms = dict(params) url = ''.join([self.url,'/',uri,'/',self._params_to_url_kvpairs(parms)]) res = requests.put( url=url, headers=self.postdata_headers, data=body, **self.request_kwargs ) return res #if __name__ == '__main__': #if len(sys.argv) < 2: # print 'Usage: netinfoclient.py environment' # sys.exit(2) ## Simple connection #c = NetinfoApiClient(hostname=NETINFO_HOSTNAME, apikey='YOUR_API_KEY') ## Connect using client keypair and private CA #c = NetinfoApiClient( # hostname=NETINFO_HOSTNAME, # apikey='YOUR_API_KEY' # cacert='/path/to/cacert.pem' # client_cert='/path/to/client.cert' # client_key='/path/to/client.key' #) ### Get the alerts #res = c.get('alerts') #data = res.json() #for item in data: # print(item)