Skip to content

Instantly share code, notes, and snippets.

@mdrakiburrahman
Last active July 30, 2024 08:28
Show Gist options
  • Save mdrakiburrahman/2de0fffbdf71b9022b3346a20f0f88cd to your computer and use it in GitHub Desktop.
Save mdrakiburrahman/2de0fffbdf71b9022b3346a20f0f88cd to your computer and use it in GitHub Desktop.
Querying various Azure Purview REST APIs to gather Insights data into Pandas DataFrames
import os
import requests
import json
import jmespath
import pandas as pd
from pprint import pprint
def azuread_auth(tenant_id: str, client_id: str, client_secret: str, resource_url: str):
"""
Authenticates Service Principal to the provided Resource URL, and returns the OAuth Access Token
"""
url = f"https://login.microsoftonline.com/{tenant_id}/oauth2/token"
payload= f'grant_type=client_credentials&client_id={client_id}&client_secret={client_secret}&resource={resource_url}'
headers = {
'Content-Type': 'application/x-www-form-urlencoded'
}
response = requests.request("POST", url, headers=headers, data=payload)
access_token = json.loads(response.text)['access_token']
return access_token
def top_classifications_for_tables_and_files(data_catalog_name: str, azuread_access_token: str):
url = f"https://{data_catalog_name}.guardian.purview.azure.com/graphql"
headers = {
'Authorization': f'Bearer {azuread_access_token}',
'Content-Type': 'application/json'
}
payload="""{
"variables":{
"startTime":"Fri, 12 Feb 2021 05:00:00 GMT",
"endTime":"Fri, 12 Mar 2021 12:25:35 GMT"
},
"query":"query ($startTime: DateTime!, $endTime: DateTime!) {\n classificationQuery {\n files: classifications(sortBy: {fieldsToSort: {fieldName: FILESCOUNT, sortDirection: DESC}}, paginationOptions: {offset: 0, limit: 5}, timeRequestOptions: {startTime: $startTime, endTime: $endTime}) {\n totalCount\n values {\n name\n filesCount\n __typename\n }\n __typename\n }\n tables: classifications(sortBy: {fieldsToSort: {fieldName: TABLESCOUNT, sortDirection: DESC}}, paginationOptions: {offset: 0, limit: 5}, timeRequestOptions: {startTime: $startTime, endTime: $endTime}) {\n totalCount\n values {\n name\n tablesCount\n __typename\n }\n __typename\n }\n __typename\n }\n}\n"
}
"""
response = json.loads((requests.request("POST", url, headers=headers, data=payload)).text)
tables_data = jmespath.search("data.classificationQuery.tables.values[].[name, tablesCount]", response)
tables_df = pd.DataFrame(tables_data, columns=['name','tablesCount'],dtype=float)
files_data = jmespath.search("data.classificationQuery.files.values[].[name, filesCount]", response)
files_df = pd.DataFrame(files_data, columns=['name','filesCount'],dtype=float)
return tables_df, files_df
def top_classification_categories_by_sources(data_catalog_name: str, azuread_access_token: str):
url = f"https://{data_catalog_name}.guardian.purview.azure.com/graphql"
headers = {
'Authorization': f'Bearer {azuread_access_token}',
'Content-Type': 'application/json'
}
payload="""{
"variables":{
},
"query":"{\n classificationQuery {\n classificationCategories(paginationOptions: {offset: 0, limit: 5}) {\n values {\n count\n name\n __typename\n }\n __typename\n }\n __typename\n }\n}\n"
}
"""
response = json.loads((requests.request("POST", url, headers=headers, data=payload)).text)
data = jmespath.search("data.classificationQuery.classificationCategories.values[].[name, count]", response)
df = pd.DataFrame(data, columns=['name','count'],dtype=float)
# Convert name to Title style from all caps
df['name'] = df['name'].apply(lambda x: x.title())
return df
def top_file_extensions(data_catalog_name: str, azuread_access_token: str):
url = f"https://{data_catalog_name}.guardian.purview.azure.com/reports/fileExtensions"
headers = {
'Authorization': f'Bearer {azuread_access_token}',
'Content-Type': 'application/json'
}
payload="""{
"Query":{
"StartTime":"2021-02-12T05:00:00.000Z",
"EndTime":"2021-03-12T18:49:00.000Z",
"takeTopCount":30,
"assetTypes":[
]
}
}
"""
response = json.loads((requests.request("POST", url, headers=headers, data=payload)).text)
data = jmespath.search("fileExtensionDetails[].[fileExtension, assets, subscriptions, count]", response)
df = pd.DataFrame(data, columns=['fileExtension', 'assets', 'subscriptions', 'count'],dtype=float)
return df
def asset_count_per_source_type(data_catalog_name: str, azuread_access_token: str):
url = f"https://{data_catalog_name}.guardian.purview.azure.com/mapanddiscover/reports/asset2/assetDistributionByDataSource"
headers = {
'Authorization': f'Bearer {azuread_access_token}',
'Content-Type': 'application/json'
}
payload="""{"registeredSourceGroup":""}
"""
response = json.loads((requests.request("POST", url, headers=headers, data=payload)).text)
data = jmespath.search("countsByDataSource[].[dataSource, assetCount]", response)
df = pd.DataFrame(data, columns=['dataSource', 'assetCount'],dtype=float)
return df
# ==========
# Service Principal with "Purview Data Source Administrator" permissions on Purview
tenant_id = "your-tenant-id"
client_id = "service-principal-client-id"
client_secret = "service-principal-client-secret"
resource_url = "https://purview.azure.net"
data_catalog_name = "your-purview-service-name"
# Retrieve authentication objects
azuread_access_token = azuread_auth(tenant_id, client_id, client_secret, resource_url)
# ==========
[topClassificationsForTables, topClassificationsForFiles] = top_classifications_for_tables_and_files(data_catalog_name, azuread_access_token)
topClassificationCategoriesBySources = top_classification_categories_by_sources(data_catalog_name, azuread_access_token)
topFileExtensions = top_file_extensions(data_catalog_name, azuread_access_token)
assetCountPerSourceType = asset_count_per_source_type(data_catalog_name, azuread_access_token)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment