#!/usr/bin/env python ########################################################################## # paloclient.py - Client and helpers for Palo Alto API interactions # ########################################################################## # Copyright 2017 Todd Shadburn # # Licensed under the GNU GPL version 2 # ########################################################################## from __future__ import print_function import sys import logging import ssl import pan.xapi # Palo Alto XML formats for their API palo_xpath_separator = '/' palo_format_xpath_device_base = '/config/devices/entry[@name=\'localhost.localdomain\']' palo_format_xml_member_tag = '{tag}' palo_format_xml_userid_ip_action = """ update <{action}> {member_tag_line} """ palo_format_xml_address_object = '{ip}{member_tag_line}' palo_format_xml_user_commit = """ excludedexcluded{username} """ def palo_xml_userid_address(action='register', ip=None, persistent=0, tag_list=[]): if not ip: return '' if not action in ['register','unregister']: return '' member_tag_line = '' for tag in tag_list: member_tag_line += palo_format_xml_member_tag.format(**locals()) return palo_format_xml_userid_ip_action.format(**locals()) def palo_xml_address_object(action='', name=None, ip=None, location='', tag_list=[]): if not ip: return '' if not name: return '' member_tag_line = '' for tag in tag_list: member_tag_line += palo_format_xml_member_tag.format(**locals()) return palo_format_xml_address_object.format(**locals()) def palo_xpath_devicegroup_address(devicegroup=None, addrname=None): if not devicegroup: return None if not addrname: return None xpath = palo_xpath_separator.join([ palo_format_xpath_device_base, 'device-group', 'entry[@name=\'{name}\']'.format(name=devicegroup), 'address', 'entry[@name=\'{addrname}\']'.format(addrname=addrname), ]) return xpath def palo_xpath_devicegroup_tag(devicegroup=None, tagname=None): if not devicegroup: return None if not tagname: return None xpath = palo_xpath_separator.join([ palo_format_xpath_device_base, 'device-group', 'entry[@name=\'{name}\']'.format(name=devicegroup), 'tag', 'entry[@name=\'{tagname}\']'.format(tagname=tagname), ]) return xpath def palo_xml_user_commit(action='', username=None): if not username: return '' return palo_format_xml_user_commit.format(**locals()) class PaloClient(object): def __init__(self, *args, **kwargs): return def connect(self, host=None, username=None, password=None, cacert=None, *args, **kwargs): self.client = None self.host = host self.username = username self.password = password try: ssl_context = self.create_ssl_context(cacert, None) self.client = pan.xapi.PanXapi(hostname=host, api_username=username, api_password=password, ssl_context=ssl_context) self.api_key = self.client.keygen() except pan.xapi.PanXapiError: logging.error('Failed to connect/login to the device') raise ValueError('pan.xapi.PanXapiError') return def create_ssl_context(self, cafile, capath): """ The Panorama and Palo Alto firewalls (currently) use self-signed certificates. Newer versions of openssl have certificate validation enabled by default. So, we need to setup a context where cert validation is disabled. If we switch to CA-signed certs, we can re-enable validations at that point. This function is called prior to the Palo XAPI library setup. """ context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) # Disable SSLv2/v3, leaving only TLS1.x context.options |= ssl.OP_NO_SSLv2 context.options |= ssl.OP_NO_SSLv3 # Do we require a valid cert on the server side? #context.verify_mode = ssl.CERT_REQUIRED #context.verify_mode = ssl.CERT_OPTIONAL context.verify_mode = ssl.CERT_NONE try: context.load_verify_locations(cafile=cafile) except Exception as e: logging.error('cafile or capath invalid: %s' % e) raise ValueError('Error creating SSL context') return context def get_address_object(self, dgname, addrname): if not self.client: raise ValueError('No client specified.') if not self.client.api_username: raise ValueError('The client API username is not set.') # Get the address obejct xpath = palo_xpath_devicegroup_address(devicegroup=dgname, addrname=addrname) logging.debug(xpath) try: self.client.get(xpath=xpath) except pan.xapi.PanXapiError: logging.error('xapi get() returned %s', self.client.status) raise ValueError('pan.xapi.PanXapiError') logging.info('get() called successfully') return self.client.xml_result() def set_address_object(self, dgname, addrname, addrip, taglist=[]): if not self.client: raise ValueError('No client specified.') if not self.client.api_username: raise ValueError('The client API username is not set.') # Add the address obejct xpath = palo_xpath_devicegroup_address(devicegroup=dgname, addrname=addrname) addrxml = palo_xml_address_object(name=addrname, ip=addrip, location=dgname, tag_list=taglist) logging.debug(xpath) logging.debug(addrxml) try: self.client.set(xpath=xpath, element=addrxml) except pan.xapi.PanXapiError: logging.error('xapi set() returned %s', self.client.status) raise ValueError('pan.xapi.PanXapiError') logging.info('set() called successfully') return def delete_address_object(self, dgname, addrname): if not self.client: raise ValueError('No client specified.') if not self.client.api_username: raise ValueError('The client API username is not set.') # Add the address obejct xpath = palo_xpath_devicegroup_address(devicegroup=dgname, addrname=addrname) logging.debug(xpath) try: self.client.delete(xpath=xpath) except pan.xapi.PanXapiError: logging.error('xapi delete() returned %s', self.client.status) raise ValueError('pan.xapi.PanXapiError') logging.info('delete() called successfully') return def get_tag_object(self, dgname, tagname): if not self.client: raise ValueError('No client specified.') if not self.client.api_username: raise ValueError('The client API username is not set.') xpath = palo_xpath_devicegroup_tag(devicegroup=dgname, tagname=tagname) logging.debug(xpath) try: self.client.get(xpath=xpath) except pan.xapi.PanXapiError: logging.error('xapi get() returned %s', self.client.status) raise ValueError('pan.xapi.PanXapiError') logging.info('get() called successfully') return self.client.xml_result() def set_tag_object(self, dgname, tagname): if not self.client: raise ValueError('No client specified.') if not self.client.api_username: raise ValueError('The client API username is not set.') xpath = palo_xpath_devicegroup_tag(devicegroup=dgname, tagname=tagname) logging.debug(xpath) try: self.client.set(xpath=xpath, element='') except pan.xapi.PanXapiError: logging.error('xapi set() returned %s', self.client.status) raise ValueError('pan.xapi.PanXapiError') logging.info('set() called successfully') return def delete_tag_object(self, dgname, tagname): if not self.client: raise ValueError('No client specified.') if not self.client.api_username: raise ValueError('The client API username is not set.') xpath = palo_xpath_devicegroup_tag(devicegroup=dgname, tagname=tagname) logging.debug(xpath) try: self.client.delete(xpath=xpath) except pan.xapi.PanXapiError: logging.error('xapi delete() returned %s', self.client.status) raise ValueError('pan.xapi.PanXapiError') logging.info('delete() called successfully') return def register_userid_ip(self, addrip, taglist=[]): if not self.client: raise ValueError('No client specified.') if not self.client.api_username: raise ValueError('The client API username is not set.') xml = palo_xml_userid_address(action='register', ip=addrip, persistent=1, tag_list=taglist) logging.debug(xml) try: self.client.user_id(cmd=xml) except pan.xapi.PanXapiError: logging.error('xapi set() returned %s', self.client.status) raise ValueError('pan.xapi.PanXapiError') logging.info('userid register called successfully') return def unregister_userid_ip(self, addrip, taglist=[]): if not self.client: raise ValueError('No client specified.') if not self.client.api_username: raise ValueError('The client API username is not set.') xml = palo_xml_userid_address(action='unregister', ip=addrip, persistent=0, tag_list=taglist) logging.debug(xml) try: self.client.user_id(cmd=xml) except pan.xapi.PanXapiError: logging.error('xapi delete() returned %s', self.client.status) raise ValueError('pan.xapi.PanXapiError') logging.info('userid unregister called successfully') return def commit_changes_by_user(self): if not self.client: raise ValueError('No client specified.') if not self.client.api_username: raise ValueError('The client API username is not set.') commitxml = palo_xml_user_commit(username=self.client.api_username) logging.debug(commitxml) self.client.commit(commitxml) if self.client.status != 'success': logging.error('xapi commit() returned %s', self.client.status) raise ValueError('pan.xapi.PanXapiError') logging.info('commit() called successfully') return