-
-
Save zorteran/3fa09f98ab9480a722b7a634b8868e15 to your computer and use it in GitHub Desktop.
Python script to use in CI-CD to update custom detection rules in Elastic Stack
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
stages: | |
- test | |
- deploy | |
test_rules: | |
stage: test | |
image: python:3.8 | |
script: | |
- python -V # Print out python version for debugging | |
- pip install virtualenv | |
- virtualenv venv | |
- source venv/bin/activate | |
- pip install -r requirements.txt | |
- python -m detection_rules test | |
deploy_rules: | |
stage: deploy | |
image: python:3.8 | |
only: | |
- main | |
script: | |
- python -V # Print out python version for debugging | |
- pip install virtualenv | |
- virtualenv venv | |
- source venv/bin/activate | |
- pip install -r requirements.txt | |
- python update_custom_rules.py |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import requests | |
import toml | |
import os | |
from uuid import uuid4 | |
kbnuser = os.environ["kibana_usr"] | |
kbnpwd = os.environ["kibana_pwd"] | |
def create_rules(createbody, kbnuser,kbnpwd): | |
resp = requests.post( | |
url="https://{}/api/detection_engine/rules/_bulk_create".format(os.environ["kibana_url"]), | |
json=createbody, | |
headers={ | |
"Content-Type": "application/json", | |
"kbn-xsrf": uuid4() | |
}, | |
auth=(kbnuser,kbnpwd), | |
verify=False | |
) | |
for response in resp.json(): | |
failure = False | |
try: | |
if response["statusCode"] in range(400, 599): | |
response["statusCode"] | |
print(resp.json()) | |
print("=====================================================================") | |
print(createbody) | |
failure = True | |
if failure: | |
print(response) | |
raise ValueError("Failed to create rule") | |
except Exception as err: | |
print("Exception: {}".format(err)) | |
print(response) | |
raise ValueError("Failed to create rule") | |
custom_rules = [] | |
# Get all the custom rules; aka those prefixed with your custom prefix | |
for root, dirs, files in os.walk("rules/"): | |
for file in files: | |
if file.startswith("custom_prefix_"): | |
custom_rules.append(os.path.join(root, file)) | |
# read in toml | |
toml_rules = [] | |
for rulefile in custom_rules: | |
try: | |
with open(rulefile, "r") as f: | |
rule = f.read() | |
t_rule = toml.loads(rule) | |
toml_rules.append(t_rule) | |
except Exception as err: | |
pass | |
#print("Failed to parse {} with error: {}".format(rulefile, err)) | |
updatebody = [] | |
for r in toml_rules: | |
rule = r["rule"] | |
if "rule_id" not in rule: | |
continue | |
else: | |
updatebody.append(rule) | |
# bulk request to update | |
resp = requests.put( | |
url="https://{}/api/detection_engine/rules/_bulk_update".format(os.environ["kibana_url"]), | |
json=updatebody, | |
headers={ | |
"Content-Type": "application/json", | |
"kbn-xsrf": uuid4() | |
}, | |
auth=(kbnuser,kbnpwd), | |
verify=False | |
) | |
response = resp.json() | |
if "error" in response: | |
print(response["message"]) | |
exit(1) | |
createbody = [] | |
for rule_resp in resp.json(): | |
try: | |
if "error" in rule_resp and "not found" in rule_resp["error"]["message"]: | |
print(rule_resp["error"]["message"]) | |
# find rule in body and create the rule | |
for r in updatebody: | |
if r["rule_id"] in rule_resp["error"]["message"]: | |
createbody.append(r) | |
except TypeError: | |
print(rule_resp) | |
created = False | |
while not created: | |
try: | |
create_rules(createbody, kbnuser, kbnpwd) | |
except Exception: | |
pass | |
else: | |
created = True |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment