Skip to content

Instantly share code, notes, and snippets.

@LaurentiuGabriel
Created March 4, 2024 15:14
Show Gist options
  • Save LaurentiuGabriel/99a4a6b0f52a11774b2f534b8119ec49 to your computer and use it in GitHub Desktop.
Save LaurentiuGabriel/99a4a6b0f52a11774b2f534b8119ec49 to your computer and use it in GitHub Desktop.
Threat Hunting on Email Servers with GPT-4
import openai
import json
from elasticsearch import Elasticsearch
from datetime import datetime, timedelta
# Configure Elasticsearch connection
es = Elasticsearch(
['http://localhost:9200'],
http_auth=('user', 'password')
)
# Configure OpenAI GPT-4 API
openai.api_key = 'your-api-key'
# Function to query Elasticsearch for recent email logs
def query_email_logs(hours_ago=24):
query = {
"query": {
"bool": {
"must": [
{"match": {"log_type": "email"}},
{"range": {"@timestamp": {"gte": f"now-{hours_ago}h/h", "lt": "now/h"}}}
]
}
}
}
return es.search(index="your-log-index", body=query)
# Function to analyze logs with GPT-4
def analyze_logs_with_gpt4(logs):
for log in logs['hits']['hits']:
email_content = log['_source']['email_content']
prompt = f"Analyze this email for potential threats or anomalies:\n\n{email_content}"
response = openai.Completion.create(
model="gpt-4-0125-preview",
prompt=prompt,
max_tokens=150
)
print(f"Email: {email_content}\nAnalysis: {response.choices[0].text}\n")
# Main function
def main():
try:
print("Querying Elasticsearch for recent email logs...")
logs = query_email_logs(24) # Query last 24 hours logs
if logs['hits']['total']['value'] > 0:
print("Analyzing logs with GPT-4...")
analyze_logs_with_gpt4(logs)
else:
print("No recent email logs found.")
except Exception as e:
print(f"An error occurred: {e}")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment