Last active
September 6, 2022 13:20
-
-
Save artemrys/e5da6d92205e4fffa65cfd4cf2e929ee to your computer and use it in GitHub Desktop.
Splunk Add-on for Cisco Meraki custom rest handler with input validation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# | |
# SPDX-FileCopyrightText: 2021 Splunk, Inc. <sales@splunk.com> | |
# SPDX-License-Identifier: LicenseRef-Splunk-8-2021 | |
# | |
# | |
import import_declare_test # noqa: F401 # isort: skip | |
import logging | |
from splunk_ta_cisco_meraki_organization_validation import organization_validation | |
from splunktaucclib.rest_handler import admin_external, util | |
from splunktaucclib.rest_handler.admin_external import AdminExternalHandler | |
from splunktaucclib.rest_handler.endpoint import ( | |
RestModel, | |
SingleModel, | |
field, | |
validator, | |
) | |
util.remove_http_proxy_env_vars() | |
fields = [ | |
field.RestField( | |
"region", | |
required=True, | |
encrypted=False, | |
default=None, | |
validator=validator.Enum( | |
values={"global", "china"}, | |
), | |
), | |
field.RestField( | |
"organization_id", | |
required=True, | |
encrypted=False, | |
default=None, | |
validator=validator.AllOf( | |
validator.String( | |
max_len=50, | |
min_len=1, | |
), | |
validator.Pattern( | |
regex=r"""^\d+$""", | |
), | |
), | |
), | |
field.RestField( | |
"organization_api_key", | |
required=True, | |
encrypted=True, | |
default=None, | |
validator=validator.AllOf( | |
validator.String( | |
max_len=50, | |
min_len=1, | |
), | |
validator.Pattern( | |
regex=r"""^[a-z0-9]+$""", | |
), | |
), | |
), | |
] | |
model = RestModel(fields, name=None) | |
endpoint = SingleModel( | |
"splunk_ta_cisco_meraki_organization", model, config_name="organization" | |
) | |
class CiscoMerakiOrganizationExternalHandler(AdminExternalHandler): | |
def __init__(self, *args, **kwargs): | |
AdminExternalHandler.__init__(self, *args, **kwargs) | |
def handleList(self, confInfo): | |
AdminExternalHandler.handleList(self, confInfo) | |
def handleEdit(self, confInfo): | |
organization_validation( | |
self.payload.get("region"), | |
self.payload.get("organization_id"), | |
self.payload.get("organization_api_key"), | |
self.getSessionKey(), | |
) | |
AdminExternalHandler.handleEdit(self, confInfo) | |
def handleCreate(self, confInfo): | |
organization_validation( | |
self.payload.get("region"), | |
self.payload.get("organization_id"), | |
self.payload.get("organization_api_key"), | |
self.getSessionKey(), | |
) | |
AdminExternalHandler.handleCreate(self, confInfo) | |
def handleRemove(self, confInfo): | |
AdminExternalHandler.handleRemove(self, confInfo) | |
if __name__ == "__main__": | |
logging.getLogger().addHandler(logging.NullHandler()) | |
admin_external.handle( | |
endpoint, | |
handler=CiscoMerakiOrganizationExternalHandler, | |
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# | |
# SPDX-FileCopyrightText: 2021 Splunk, Inc. <sales@splunk.com> | |
# SPDX-License-Identifier: LicenseRef-Splunk-8-2021 | |
# | |
# | |
""" | |
This module validates organization being saved by the user | |
""" | |
import import_declare_test # noqa: F401 # isort: skip | |
import traceback | |
import cisco_meraki_utils as utils | |
from splunktaucclib.rest_handler.error import RestError | |
def organization_validation(region, organization_id, organization_api_key, session_key): | |
""" | |
This method verifies the credentials by making an API call | |
""" | |
logger = utils.set_logger( | |
session_key, "splunk_ta_cisco_meraki_organization_validation" | |
) | |
logger.info( | |
"Verifying API key for the organization id {} ({} region)".format( | |
organization_id, region | |
) | |
) | |
if not organization_id or not organization_api_key: | |
raise RestError( | |
400, | |
"Provide all necessary arguments: " | |
"organization_id and organization_api_key.", | |
) | |
try: | |
proxy_settings = utils.get_proxy_settings(logger, session_key) | |
dashboard = utils.build_dashboard_api( | |
region, organization_api_key, proxy_settings | |
) | |
organizations = dashboard.organizations.getOrganizations() | |
valid_organization_id = False | |
for organization in organizations: | |
if str(organization["id"]) == str(organization_id): | |
valid_organization_id = True | |
break | |
if not valid_organization_id: | |
msg = "Failed to validate organization id: {} ({} region)".format( | |
organization_id, region | |
) | |
logger.error(msg) | |
raise RestError(400, msg) | |
except Exception: | |
logger.error( | |
"Failed to connect to Meraki for organization id: {} ({} region). {}".format( | |
organization_id, region, traceback.format_exc() | |
) | |
) | |
msg = ( | |
"Could not connect to Meraki for organization id: {} ({} region). " | |
"Check configuration and network settings".format(organization_id, region) | |
) | |
raise RestError(400, msg) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment