Last active
September 15, 2022 12:22
-
-
Save ojmccall/f994be414d67b10c1878e62eb8004045 to your computer and use it in GitHub Desktop.
Slack DE builder.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def sfmc_api(event,context): | |
import requests | |
import json | |
from google.cloud import storage | |
from time import gmtime, strftime | |
import pandas as pd | |
import xml.etree.ElementTree as ET | |
import ast | |
import sys | |
#I use this to store my token value and API credentials via Secrets Manager | |
from os import environ | |
client_id = "" | |
client_secret = "" | |
baseURL = "" | |
slackURL = "" | |
triggerfilename = format(event["name"]) | |
bucket = format(event["bucket"]) | |
storage_client = storage.Client() | |
bucket = storage_client.get_bucket(bucket) | |
blob = bucket.blob(triggerfilename) | |
blob =blob.download_as_string() | |
body = ast.literal_eval(blob.decode("utf-8")) | |
print(body) | |
token = environ["SlackBot_Token"] | |
token = token.replace("\n","") | |
print(token) | |
if "text" in str(body): | |
slackData = body["event"]["user"] | |
text = body["event"]["text"] | |
file_id = body["event"]["files"][0]["id"] | |
url = f"https://slack.com/api/files.info?file={file_id}" | |
r = requests.get(url,headers={"Authorization": "Bearer %s" % token}) | |
r.raise_for_status | |
response = r.json() | |
print(r.content) | |
assert response["ok"] | |
filename = response["file"]["name"] | |
file_url = response["file"]["url_private"] | |
file_count = 1 | |
if "Data Extension" not in text: | |
sys.exit() | |
if "Data Extension" in text and file_count > 0: | |
messageBody = { | |
"blocks": [ | |
{ | |
"type": "header", | |
"text": { | |
"type": "plain_text", | |
"text": f"SFMC Data Extension Creation Request" | |
} | |
}, | |
{ | |
"type": "divider" | |
}, | |
{ | |
"type": "section", | |
"text": { | |
"type": "mrkdwn", | |
"text": f"\n*Thanks for your submission to create a Data Extension* <@{slackData}>" | |
} | |
}, | |
{ | |
"type": "section", | |
"text": | |
{ | |
"type": "mrkdwn", | |
"text": "\n I'll get back to you in a minute or 2 after I've processed your file :eyes:" | |
} | |
}, | |
{ | |
"type": "divider" | |
} | |
] | |
} | |
url = | |
header = {"Content-Type":"application/json"} | |
post = requests.post(url, json = messageBody ,headers = header) | |
body = post.content | |
status = post.status_code | |
#download slack file to temp memory | |
r = requests.get(file_url, headers={"Authorization": "Bearer %s" % token}) | |
file_data = r.content # get binary content | |
fullpath= "/tmp/file.csv" | |
# save file to disk | |
with open(fullpath , "w+b") as f: | |
f.write(bytearray(file_data)) | |
#Find DE data in filename | |
var = filename[5:] | |
var = var.replace(".csv","") | |
split =var.split("_") | |
DEName = split[0] | |
if len(split)>1: | |
FolderID = split[1] | |
else: | |
FolderID = "" | |
if len(FolderID) >0: | |
Folder = """<CategoryID>""" + FolderID + """</CategoryID> """ | |
else: | |
Folder = "" | |
#get csv values to create DE | |
df = pd.read_csv(fullpath,dtype="unicode") | |
df = df.fillna("") | |
df_dict = df.to_dict(orient="records") | |
print("df_array: "+str(df_dict)) | |
print("length: "+str(len(df_dict))) | |
fields = [] | |
for column in df: | |
fieldname = column | |
fieldtype = df.at[0,fieldname] | |
fieldlength = str(df.at[1,fieldname]) | |
isprimarykey = str(df.at[2,fieldname]) | |
isrequired = str(df.at[3,fieldname]) | |
fieldscale = str(df.at[4,fieldname]) | |
defaultvalue = str(df.at[5,fieldname]) | |
SOAPField = "<Field>" | |
SOAPField +="<Name>"+fieldname+"</Name>" | |
SOAPField += "<FieldType>"+fieldtype+"</FieldType>" | |
if fieldtype == "Text" or fieldtype == "Decimal": | |
SOAPField += "<MaxLength>"+fieldlength+"</MaxLength>" | |
if len(isrequired) >0: | |
SOAPField +="<IsRequired>"+isrequired+"</IsRequired>" | |
if len(isprimarykey) >0: | |
SOAPField += "<IsPrimaryKey>"+isprimarykey+"</IsPrimaryKey>" | |
if len(fieldscale) >0: | |
SOAPField += "<Scale>"+fieldscale+"</Scale>" | |
if len(defaultvalue) >0: | |
SOAPField += "<DefaultValue>"+defaultvalue+"</DefaultValue>" | |
SOAPField += "</Field>" | |
fields.append(SOAPField) | |
Fields = " ".join([str(elem) for elem in fields]) | |
print("string fields: "+Fields) | |
#remove DE config rows from data | |
datarows = df_dict[6:len(df_dict)] | |
#Set Values for DE structure | |
SOAPEndpoint = "https://"+baseURL+".soap.marketingcloudapis.com/Service.asmx" | |
#get token | |
url = "https://"+baseURL+".auth.marketingcloudapis.com/v2/Token" | |
data = {"grant_type":"client_credentials", | |
"client_id":client_id, | |
"client_secret":client_secret | |
} | |
r = requests.post(url, data=data) | |
print("posted") | |
body = json.loads(r.content) | |
token = body["access_token"] | |
TokenStatus = r.status_code | |
if TokenStatus != 200: | |
Failure = 1 | |
StatusMessage = "token request invalid" | |
else: | |
Failure = 0 | |
#compile XML | |
XMLString = ("""<?xml version="1.0" encoding="UTF-8"?><s:Envelope xmlns:s="http://www.w3.org/2003/05/soap-envelope" xmlns:a="http://schemas.xmlsoap.org/ws/2004/08/addressing" xmlns:u="http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-utility-1.0.xsd"><s:Header> <a:Action s:mustUnderstand="1">Create</a:Action><a:To s:mustUnderstand="1">"""+SOAPEndpoint+"""</a:To><fueloauth xmlns="http://exacttarget.com">"""+token+"""</fueloauth></s:Header><s:Body xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema"><CreateRequest xmlns="http://exacttarget.com/wsdl/partnerAPI"><Options/><Objects xsi:type="DataExtension"><Name> """ + DEName +"""</Name> <CustomerKey>""" + DEName + """</CustomerKey>"""+Folder+"""<IsSendable>false</IsSendable> <Fields>""" + Fields + """</Fields></Objects> </CreateRequest> </s:Body> </s:Envelope>""") | |
XML = bytes(XMLString,"utf-8") | |
print("XML here: "+str(XML)) | |
#Create DE | |
header = {"Content-Type": "text/xml" } | |
post = requests.post(SOAPEndpoint, data = XML ,headers=header) | |
print("posted") | |
body = post.content | |
XMLString = body.decode("utf-8") | |
print("DE post"+ str(XMLString)) | |
root = ET.fromstring(XMLString) | |
for elem in root.iter(): | |
if "StatusMessage" in elem.tag: | |
StatusMessage = elem.text | |
print(StatusMessage) | |
if "StatusCode" in elem.tag: | |
StatusCode = elem.text | |
print(StatusCode) | |
if "RequestID" in elem.tag: | |
#print(elem.text) | |
RequestId = elem.text | |
print(RequestId) | |
if StatusCode != "OK": | |
Failure = 1 | |
else: | |
Failure = 0 | |
if Failure != 1: | |
# IF DE created then post Data to DE via Async API | |
if len(datarows) >0: | |
def chunker(seq, size): | |
return (seq[pos:pos + size] for pos in range(0, len(seq), size)) | |
for group in chunker(datarows, 1000): | |
#print(group) | |
payloadraw = {"items":group} | |
payload = json.dumps(payloadraw) | |
print(payloadraw) | |
head = {"Authorization": "Bearer " + token} | |
url = "https://"+baseURL+".rest.marketingcloudapis.com/data/v1/async/dataextensions/key:"+DEName+"/rows" | |
response = requests.post(url, json=payloadraw, headers=head) | |
#Send response back to slack based on DE build response | |
if Failure ==1: | |
messageBody = { | |
"blocks": [ | |
{ | |
"type": "header", | |
"text": { | |
"type": "plain_text", | |
"text": f"SFMC DE Creation Request" | |
} | |
}, | |
{ | |
"type": "divider" | |
}, | |
{ | |
"type": "section", | |
"text": { | |
"type": "mrkdwn", | |
"text": f"\n :rotating_light: *Your DE job failed!* <@{slackData}>" | |
} | |
}, | |
{ | |
"type": "section", | |
"text": | |
{ | |
"type": "mrkdwn", | |
"text": f"\n There was an error,\n your Data Extension: *{DEName}* wasnt built or there was an issue with the data inside it" | |
} | |
}, | |
{ | |
"type": "divider" | |
} | |
] | |
} | |
else: | |
messageBody = { | |
"blocks": [ | |
{ | |
"type": "header", | |
"text": { | |
"type": "plain_text", | |
"text": f"SFMC Data Extension Creation Request" | |
} | |
}, | |
{ | |
"type": "divider" | |
}, | |
{ | |
"type": "section", | |
"text": { | |
"type": "mrkdwn", | |
"text": f"\n :tada: *Your Data Extension is ready* <@{slackData}>" | |
} | |
}, | |
{ | |
"type": "section", | |
"text": | |
{ | |
"type": "mrkdwn", | |
"text": f"\n Thanks for choosing Oscar's DE builder,\n your Data Extension: *{DEName}* is ready to go!" | |
} | |
}, | |
{ | |
"type": "divider" | |
} | |
] | |
} | |
url = slackURL | |
header = {"Content-Type":"application/json"} | |
post = requests.post(url, json = messageBody ,headers = header) | |
@matheswarwan - The SOAP call is to build the DE only.
The DE request below uses the async rows endpoint and there's a function to
break the data frame into chunks of 1000 rows which can be adjusted. (lines 252 -267)
Ah - makes sense. Missed that. Thanks for pointing it.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Neat. Question in line #142. Since the soap call would fail for >2500 records, shouldn’t we split the loop & calls to multiple chunks?