Skip to content

Instantly share code, notes, and snippets.

@jonuwz
Created February 1, 2022 10:47
Show Gist options
  • Save jonuwz/58f48b0b17ab2b6a0029b1e8d193927f to your computer and use it in GitHub Desktop.
Save jonuwz/58f48b0b17ab2b6a0029b1e8d193927f to your computer and use it in GitHub Desktop.
sample api server for splunk ingest
#!/usr/bin/env python3
"""
This mimics an api where you fetch audit logs for a particular time range.
The /audit path accepts 2 query parameters, 'start' and 'end'
These are the start and end times for the query, expressed as an epoch
start defaults to 0
end defaults to now
10 random logs are returned between these times
exmple output:
{
"messages": [
{
"audittime": "1643663011",
"message": "hello from 2022-01-31 21:03:31"
},
{
"audittime": "1643665571",
"message": "hello from 2022-01-31 21:46:11"
},
{
"audittime": "1643674525",
"message": "hello from 2022-02-01 00:15:25"
},
{
"audittime": "1643685747",
"message": "hello from 2022-02-01 03:22:27"
},
{
"audittime": "1643687352",
"message": "hello from 2022-02-01 03:49:12"
},
{
"audittime": "1643690199",
"message": "hello from 2022-02-01 04:36:39"
},
{
"audittime": "1643691663",
"message": "hello from 2022-02-01 05:01:03"
},
{
"audittime": "1643703145",
"message": "hello from 2022-02-01 08:12:25"
},
{
"audittime": "1643709682",
"message": "hello from 2022-02-01 10:01:22"
},
{
"audittime": "1643711166",
"message": "hello from 2022-02-01 10:26:06"
}
],
"range": {
"start": "1643663011",
"end": "1643711166"
}
}
"""
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import urlparse, parse_qs
import json
import time
from random import randint
class SimpleHTTPRequestHandler(BaseHTTPRequestHandler):
def do_GET(self):
now = int(time.time())
path = urlparse(self.path).path
# if not /audit, return 404
if path != '/audit':
self.send_response(404)
self.end_headers()
return
# get the query param for 'start' and 'end'
qp = parse_qs(urlparse(self.path).query)
try:
start_time = int(qp.get("start",["0"])[0])
end_time = int(qp.get("end",[time.time()])[0])
except:
self.send_response(500)
self.end_headers()
return
# rudimentary sanity checks
try:
assert(start_time<=end_time)
assert(start_time>=0)
assert(end_time>=0)
except:
self.send_response(400)
self.end_headers()
return
print([start_time,end_time])
times = []
# 10 random times from start_date -> now
for _ in range(10):
times.append(randint(start_time,end_time))
print(times)
# create the message array
messages = []
for t in sorted(times):
messages.append(
{
'audittime': str(t),
'message': 'hello from {}'.format(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(t)))
})
# get the earliest and latest times for the range component
timerange = {
'start': messages[0]['audittime'],
'end': messages[-1]['audittime']
}
# create the body
body = {
'messages': messages,
'range': timerange
}
self.send_response(200)
self.send_header('Content-type', 'application/json; charset=utf-8')
self.end_headers()
self.wfile.write(json.dumps(body, indent=2).encode('utf-8'))
httpd = HTTPServer(('localhost', 8002), SimpleHTTPRequestHandler)
httpd.serve_forever()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment