Skip to content

Instantly share code, notes, and snippets.

@aparakian
Created August 25, 2016 23:25
Show Gist options
  • Save aparakian/51ec9f667ec10fe39b8a8be30d60e3c2 to your computer and use it in GitHub Desktop.
Save aparakian/51ec9f667ec10fe39b8a8be30d60e3c2 to your computer and use it in GitHub Desktop.
from collections import Counter
from ads_app.api import AdsApi
from ads_app.api.objects import AdLabel
from sjp_app.errors import CampaignWarning, CampaignError
from sjp_app.utils import stats
class BaseFacebookCampaignHandler(object):
"""
This is the base class for all objects that modify facebook Campaigns.
It relies on a SJP campaign and provides a bunch of generic attributes / methods.
"""
def __init__(self, sjp_campaign):
super(BaseFacebookCampaignHandler, self).__init__()
self.api = AdsApi()
self.sjp_campaign = sjp_campaign
self.campaign_id = str(sjp_campaign.id)
self.fb_campaign = sjp_campaign.fb_campaign
def create_image_adlabels(self, images):
"""
Shortcut to create image adlabels. Needed both in the pusher and the
updater. This method uses a batch.
:param images: a list of SjpImage instance
:return: a list of dictionary {"image": image, "adlabel": adlabel}
"""
batch = self.api.new_batch()
image_adlabels = []
for image in images:
adlabel = AdLabel(parent_id=self.api.account_id)
adlabel.remote_create(params={
AdLabel.Field.name: AdLabel.IMAGE_LABEL_PREFIX % image.google_id
}, batch=batch)
image_adlabels.append({"image": image, "adlabel": adlabel})
batch.execute()
return image_adlabels
def check_errors_and_alert(self, change, title="", text="", tags=None, *object_list):
"""
"""
successes, errors = self._compute_errors(*object_list)
if title:
text = self._process_alert_text(text, successes, errors)
stats._send_ads_sjp_event_to_datadog(self.sjp_campaign, title, text, tags)
if len(errors) > len(successes): # We raise if there is more errors than successes
raise CampaignError(
"Error while \"%s\" for %s: %s" % (change, self.sjp_campaign, [e.error for e in errors])
)
if errors:
raise CampaignWarning(
"Warning while \"%s\" for %s" % (change, self.sjp_campaign),
[e.error for e in errors]
)
def _process_alert_text(self, text, successes, errors):
"""
"""
success_summary = Counter([s.__class__.__name__ for s in successes])
error_summary = Counter([e.__class__.__name__ for e in errors])
text += "\nSuccesses: %s" % ('%s %s,' % (v, k) for k, v in success_summary)
text += "\nErrors: %s" % ('%s %s,' % (v, k) for k, v in error_summary)
return text
def _compute_errors(self, *object_list):
"""
"""
successes = []
errors = []
for o in object_list:
if isinstance(o, list):
s, e = self._compute_errors(o)
successes.extend(s)
errors.extend(e)
elif o.has_error:
errors.append(o)
else:
successes.append(o)
return successes, errors
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment