Created
June 30, 2022 15:29
-
-
Save stephenreid/7ce2bdfd10b6ff9d8c6c099c1d9c9923 to your computer and use it in GitHub Desktop.
RefreshIdentifiesService.rb
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class Analytics::RefreshIdentifiesService | |
# This service is explicitly written to send identify calls for users | |
# that have incorrect attributes in Braze. The flow for this service is as | |
# follows: | |
# | |
# 1. Find all users that have been updated in the last hour | |
# 2. For each user, pull the corresponding user attributes for that user | |
# from Braze using the Braze REST API (via the Braze-Ruby gem) | |
# 3. If the traits in the user's identify_data is different than what comes | |
# from Braze (that is, there are either new attributes or changed attributes), | |
# then: | |
# 4. Issue an identify call to Segment for the changed attributes | |
attr_reader :within, :batch_size | |
def initialize(within: 2.hours, batch_size: 50) | |
@within = within | |
if batch_size.nil? || batch_size > 50 || batch_size < 1 | |
raise ArgumentError, 'batch_size must be between 1 and 50' | |
end | |
@batch_size = batch_size | |
end | |
def perform | |
updated_users = | |
ActiveRecord::Base.connected_to(role: :reading_replica) do | |
User | |
.not_deleted | |
.where( | |
User.arel_table[:updated_at].gt(within.ago).and( | |
User.arel_table[:identify_data].not_eq(nil) | |
) | |
) | |
.pluck(:id, :identify_data) | |
end | |
updated_users.each_slice(batch_size).each do |users| | |
process_users(users) | |
end | |
end | |
private | |
def process_users(users) | |
user_data = Hash[users] | |
return if user_data.empty? | |
braze_users = fetch_braze_users(user_data.keys) | |
return if braze_users.empty? | |
braze_users.each do |braze_user| | |
user_id = braze_user['external_id'].to_i | |
braze_traits = normalize_braze_traits(braze_user) | |
next unless user_data[user_id] | |
current_traits = normalize_user_traits(user_data[user_id]['traits']) | |
next unless current_traits.present? | |
changed_traits = traits_diff(current_traits, braze_traits) | |
next unless changed_traits.present? | |
send_identify(user_id, changed_traits) | |
end | |
segment_client.flush | |
end | |
def remove_nils(hash) | |
hash.delete_if { |_, v| v.nil? } | |
end | |
def fetch_braze_users(user_ids) | |
success, data = braze_client.export_users( | |
external_ids: user_ids, | |
fields_to_export: %w( | |
external_id | |
first_name | |
last_name | |
custom_attributes | |
) | |
) | |
if success && data['users'] && data['message'] == 'success' | |
data['users'] | |
end | |
end | |
def normalize_braze_traits(braze_user) | |
traits = {} | |
traits.merge!(braze_user['custom_attributes']) if braze_user['custom_attributes'] | |
traits['email'] = braze_user['email'] if braze_user['email'] | |
if braze_user['first_name'] || braze_user['last_name'] | |
traits['name'] = build_name(braze_user['first_name'], braze_user['last_name']) | |
traits['first_name'] = braze_user['first_name'].to_s.strip | |
traits['last_name'] = braze_user['last_name'].to_s.strip | |
end | |
normalize_timestamps(traits) | |
traits | |
end | |
def normalize_user_traits(traits) | |
remove_nils(traits) | |
traits['name'] = traits['name'].strip if traits['name'] | |
traits['first_name'] = traits['first_name'].to_s.strip if traits['first_name'] | |
traits['last_name'] = traits['last_name'].to_s.strip if traits['last_name'] | |
normalize_timestamps(traits) | |
traits | |
end | |
def normalize_timestamps(traits) | |
%w( | |
created_at | |
last_login | |
current_period_end | |
current_period_start | |
subscription_ended_at | |
subscription_canceled_at | |
).each do |attr| | |
traits[attr] = DateTime.parse(traits[attr]).utc.iso8601 if traits[attr] | |
end | |
end | |
def build_name(first_name, last_name) | |
"#{first_name} #{last_name}".strip | |
end | |
def traits_diff(current, braze) | |
(current.to_a - braze.to_a).to_h | |
end | |
def send_identify(user_id, traits) | |
Rails.logger.debug "[#{self.class}] Identifying user (refresh): #{user_id}, #{traits.inspect}" | |
segment_client.identify(build_identify(user_id, traits)) | |
end | |
def build_identify(user_id, traits) | |
{ | |
user_id: user_id, | |
traits: traits, | |
integrations: Analytics::IntegrationsService.identify_options(user_id) | |
}.as_json | |
end | |
def braze_client | |
@braze_client ||= Analytics::Api::BrazeService.new | |
end | |
def segment_client | |
@segment_client ||= Segment::Analytics.new( | |
write_key: Settings.segment.write_key, | |
batch_size: batch_size | |
) | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment