Last active
March 11, 2023 19:31
-
-
Save minherz/c04867dc2110e9785fee62a3fc5bfb21 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class BatchPayloads(CombineFn): | |
'''Collect all items in the windowed collection into single batch''' | |
def create_accumulator(self): | |
return [] | |
def add_input(self, accumulator, input): | |
accumulator.append(input) | |
return accumulator | |
def merge_accumulators(self, accumulators): | |
merged = [ | |
item | |
for accumulator in accumulators | |
for item in accumulator | |
] | |
return merged | |
def extract_output(self, accumulator): | |
return accumulator |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class LogRedaction(DoFn): | |
'''Apply inspection and redaction to textPayload field of log entries''' | |
def __init__(self, region, project_id: str): | |
self.project_id = project_id | |
self.region = region | |
self.dlp_client = None | |
def _log_to_row(self, entry): | |
# Make `Row` from `textPayload`. For more details on the row, please see | |
# https://cloud.google.com/dlp/docs/reference/rest/v2/ContentItem#Row | |
payload = entry.get('textPayload', '') | |
return {'values': [{'string_value': payload}]} | |
def setup(self): | |
'''Initialize DLP client''' | |
if self.dlp_client: | |
return | |
self.dlp_client = dlp_v2.DlpServiceClient() | |
if not self.dlp_client: | |
logging.error('Cannot create Google DLP Client') | |
raise PipelineError('Cannot create Google DLP Client') | |
def process(self, logs): | |
# Construct the `table`. For more details on the table schema, please see | |
# https://cloud.google.com/dlp/docs/reference/rest/v2/ContentItem#Table | |
table = { | |
'table': { | |
'headers': [{'name': 'textPayload'}], | |
'rows': map(self._log_to_row, logs) | |
} | |
} | |
response = self.dlp_client.deidentify_content( | |
request={ | |
'parent': f'projects/{self.project_id}/locations/{self.region}', | |
'inspect_config': INSPECT_CFG, | |
'deidentify_config': REDACTION_CFG, | |
'item': table, | |
}) | |
# replace payload with redacted version | |
modified_logs = [] | |
for index, log in enumerate(logs): | |
log['textPayload'] = response.item.table.rows[index].values[0].string_value | |
# you may consider changing insert ID if the project already has a copy | |
# of this log (e.g. log['insertId'] = 'deid-' + log['insertId']) | |
# For more details about insert ID, please see: | |
# https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry#FIELDS.insert_id | |
modified_logs.append(log) | |
yield modified_logs |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class PayloadAsJson(DoFn): | |
'''Convert PubSub message payload to UTF-8 and return as JSON''' | |
def process(self, element): | |
yield json.loads(element.decode('utf-8')) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# For more details about info types format, please see | |
# https://cloud.google.com/dlp/docs/reference/rest/v2/InspectConfig | |
INSPECT_CFG = { | |
'info_types': [ | |
{'name': 'US_SOCIAL_SECURITY_NUMBER'} | |
] | |
} | |
# For more details about transformation format, please see | |
# https://cloud.google.com/dlp/docs/reference/rest/v2/projects.deidentifyTemplates#DeidentifyTemplate.InfoTypeTransformations | |
REDACTION_CFG = { | |
'info_type_transformations': { | |
'transformations': [ | |
{ | |
'primitive_transformation': { | |
'character_mask_config': { | |
'masking_character': '#' | |
} | |
} | |
} | |
] | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class IngestLogs(DoFn): | |
'''Ingest payloads into destination log''' | |
def __init__(self, destination_log_name): | |
self.destination_log_name = destination_log_name | |
self.logger = None | |
def _replace_log_name(self, entry): | |
# updates log name in the entry to logger name | |
entry['logName'] = self.logger.name | |
return entry | |
def setup(self): | |
# initialize logging client | |
if self.logger: | |
return | |
logging_client = logging_v2.Client() | |
if not logging_client: | |
logging.error('Cannot create Google Logging Client') | |
raise PipelineError('Cannot create Google Logging Client') | |
self.logger = logging_client.logger(self.destination_log_name) | |
if not self.logger: | |
logging.error('Google client library cannot create Logger object') | |
raise PipelineError('Google client library cannot create Logger object') | |
def process(self, element): | |
if self.logger: | |
logs = list(map(self._replace_log_name, element)) | |
self.logger.client.logging_api.write_entries(logs) | |
yield logs |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| 'Read log entries from Pub/Sub' >> io.ReadFromPubSub(subscription=pubsub_subscription) | |
| 'Convert log entry payload to Json' >> ParDo(PayloadAsJson()) | |
| 'Aggregate payloads in fixed time intervals' >> WindowInto(FixedWindows(window_size)) | |
# Optimize Google API consumption and avoid possible throttling | |
# by calling APIs for batched data and not per each element | |
| 'Batch aggregated payloads' >> CombineGlobally(BatchPayloads()).without_defaults() | |
| 'Redact SSN info from logs' >> ParDo(LogRedaction(region, destination_log_name.split('/')[1])) | |
| 'Ingest to output log' >> ParDo(IngestLogs(destination_log_name)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment