Created
April 8, 2021 20:36
-
-
Save leehinman/154d02b8b72506f1b418d62631eeda54 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import argparse | |
import json | |
import os | |
import pprint | |
import difflib | |
def parse_args(): | |
parser = argparse.ArgumentParser() | |
parser.add_argument("beats_dir", | |
help="Full path to beats directory") | |
parser.add_argument("integrations_dir", | |
help="Full path to integrations directory") | |
return parser.parse_args() | |
def get_data_streams(integrations_dir): | |
zeek_dir = os.path.join(integrations_dir, "packages/zeek/data_stream") | |
return [name for name in os.listdir(zeek_dir) if os.path.isdir(os.path.join(zeek_dir, name))] | |
def get_beats_expected(beats_dir, data_stream): | |
expected_filename = "{0}-json.log-expected.json".format(data_stream) | |
full_path = os.path.join(beats_dir, "x-pack/filebeat/module/zeek", data_stream, "test", expected_filename) | |
if os.path.isfile(full_path): | |
return full_path | |
else: | |
return "" | |
def get_integrations_expected(integrations_dir, data_stream): | |
expected_filename = "test-{0}.log-expected.json".format(data_stream) | |
full_path = os.path.join(integrations_dir, "packages/zeek/data_stream", data_stream, "_dev/test/pipeline", expected_filename) | |
if os.path.isfile(full_path): | |
return full_path | |
else: | |
return "" | |
def get_beats_json(filename): | |
if not filename: | |
return | |
data = "" | |
with open(filename) as f_handle: | |
data = json.load(f_handle) | |
return json.dumps(data, sort_keys=True, indent=2) | |
def get_integrations_json(filename): | |
if not filename: | |
return | |
data = "" | |
with open(filename) as f_handle: | |
data = json.load(f_handle) | |
results = [] | |
for o in data['expected']: | |
results.append(flatten_object(o, [])) | |
return json.dumps(results, sort_keys=True, indent=2) | |
def flatten_object(obj, dict_fields, prefix=""): | |
result = {} | |
for key, value in obj.items(): | |
if isinstance(value, dict) and prefix + key not in dict_fields: | |
new_prefix = prefix + key + "." | |
result.update(flatten_object(value, dict_fields, new_prefix)) | |
else: | |
result[prefix + key] = value | |
return result | |
if __name__ == '__main__': | |
args = parse_args() | |
for ds in get_data_streams(args.integrations_dir): | |
print("Diff in {0}".format(ds)) | |
beats_expected = get_beats_json(get_beats_expected(args.beats_dir, ds)) | |
if beats_expected is None: | |
print("No beats expected file") | |
continue | |
integrations_expected = get_integrations_json(get_integrations_expected(args.integrations_dir, ds)) | |
if integrations_expected is None: | |
print("No integrations expected file") | |
continue | |
d = difflib.Differ() | |
result = list(d.compare(beats_expected.splitlines(keepends=True), | |
integrations_expected.splitlines(keepends=True))) | |
pprint.pprint(result) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment