Last active
July 11, 2022 21:10
-
-
Save pedramamini/4486421 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Joe Sandbox API wrapper. | |
# REQUIRES: python-requests http://docs.python-requests.org/en/latest/ | |
import sys | |
import time | |
import random | |
import getpass | |
import requests | |
try: | |
import simplejson as json | |
except ImportError: | |
import json | |
# API URL. | |
API_URL = "https://joesandbox.joesecurity.org/index.php/api/" | |
# credentials. | |
JOE_USER = "" | |
JOE_PASS = "" | |
JOE_TAC = False | |
######################################################################################################################## | |
class joe_api: | |
""" | |
Joe Sandbox API wrapper. | |
@see: https://joesandbox.joesecurity.org/index.php/download/Joe%20Sandbox%20WEB%20API.pdf | |
""" | |
#################################################################################################################### | |
def __init__ (self, username, password, verify_ssl=False): | |
""" | |
Initialize the interface to Joe Sandbox API with username and password. | |
""" | |
self.username = username | |
self.password = password | |
self.verify_ssl = verify_ssl | |
#################################################################################################################### | |
def __API (self, api, params=None, files=None): | |
""" | |
Robustness wrapper. Tries up to 3 times to dance with the Joe Sandbox API. | |
@type api: str | |
@param api: API to call. | |
@type params: dict | |
@param params: Optional parameters for API. | |
@type files: dict | |
@param files: Optional dictionary of files for multipart post. | |
@rtype: requests.response. | |
@return: Response object. | |
@raise Exception: If all attempts failed. | |
""" | |
# default query parameters. | |
default_params = \ | |
{ | |
"username" : self.username, | |
"password" : self.password, | |
} | |
# interpolate default and supplied API params. | |
if params is None: | |
params = default_params | |
else: | |
params.update(default_params) | |
# make up to three attempts to dance with the API, use a jittered exponential back-off delay | |
for i in xrange(3): | |
try: | |
return requests.post(API_URL + api, data=params, files=files, verify=self.verify_ssl) | |
# 0.4, 1.6, 6.4, 25.6, ... | |
except: | |
time.sleep(random.uniform(0, 4 ** i * 100 / 1000.0)) | |
raise Exception("exceeded 3 attempts with Joe sandbox API.") | |
#################################################################################################################### | |
def analyses (self): | |
""" | |
Retrieve a list of analyzed samples. | |
@rtype: list | |
@return: List of objects referencing each analyzed file. | |
""" | |
# dance with the API. | |
response = self.__API("analysis/list") | |
# returns a list, we assign it into a dictionary. | |
content = '{"analyses":' + response.content + '}' | |
# parse the JSON into a container and return the systems list we just created above. | |
return json.loads(content)["analyses"] | |
#################################################################################################################### | |
def analyze (self, handle, systems="xp", inet=True, scae=False, comments=""): | |
""" | |
Submit a file for analysis. | |
@type handle: File handle | |
@param handle: Handle to file to upload for analysis. | |
@type systems: str | |
@param systems: Comma separated list of systems to run sample on. | |
@type inet: bool | |
@param inet: Raise this flag to allow Internet connectivity for sample. | |
@type scae: bool | |
@param scae: Raise this flag to enable Static Code Analysis Engine. | |
@type comments: str | |
@param comments: Comments to store with sample entry. | |
@rtype: dict | |
@return: Dictionary of system identifier and associated webid. | |
""" | |
if inet: | |
inet = "1" | |
else: | |
inet = "0" | |
if scae: | |
scae = "1" | |
else: | |
scae = "0" | |
# query parameters. | |
params = \ | |
{ | |
"tandc" : "1", | |
"inet" : inet, | |
"scae" : scae, | |
"type" : "file", | |
"comments" : comments, | |
} | |
# multipart post files. | |
files = { "sample" : (handle.name, handle) } | |
# keep a list of webids per system we send the sample to. | |
webids = {} | |
# a separate API request must be made per system we wish to analyze on. | |
for system in systems.split(","): | |
# ensure the handle is at offset 0. | |
handle.seek(0) | |
# set the system and dance with the API. | |
params[system] = "1" | |
response = self.__API("analysis", params, files) | |
print response.content | |
# save the webid for the current system. | |
try: | |
webids[system] = json.loads(response.content)["webid"] | |
except: | |
webids[system] = "FAIL" | |
# clear out the current system. | |
del params[system] | |
# return the webid list. | |
return webids | |
#################################################################################################################### | |
def is_available (self): | |
""" | |
Determine if the Joe Sandbox API servers are alive or in maintenance mode. | |
@rtype: bool | |
@return: True if service is available, False otherwise. | |
""" | |
# dance with the API. | |
response = self.__API("server/available") | |
try: | |
if response.content == "1": | |
return True | |
except: | |
pass | |
return False | |
#################################################################################################################### | |
def delete (self, webid): | |
""" | |
Delete the reports associated with the given webid. | |
@type webid: int | |
@param webid: Report ID to delete. | |
@rtype: bool | |
@return: True on success, False otherwise. | |
""" | |
# dance with the API. | |
response = self.__API("analysis/delete", {"webid" : webid}) | |
try: | |
if response.content == "1": | |
return True | |
except: | |
pass | |
return False | |
#################################################################################################################### | |
def queue_size (self): | |
""" | |
Determine Joe sandbox queue length. | |
@rtype: int | |
@return: Number of submissions in sandbox queue. | |
""" | |
# dance with the API. | |
response = self.__API("queue/size") | |
return int(response.content) | |
#################################################################################################################### | |
def report (self, webid, resource="json", run=0): | |
""" | |
Retrieves the specified report for the analyzed item, referenced by webid. Available resource types include: | |
html, xml, json, bins, pcap, shoots, memstrings, binstrings, memdumps, sample, and cookbook. | |
@type webid: int | |
@param webid: Report ID to draw from. | |
@type resource: str | |
@param resource: Resource type. | |
@type run: int | |
@param run: Index into list of supplied systems to retrieve report from. | |
@rtype: dict | |
@return: Dictionary representing the JSON parsed data or raw, for other formats / JSON parsing failure. | |
""" | |
resource = resource.lower() | |
params = \ | |
{ | |
"webid" : webid, | |
"type" : resource, | |
"run" : run, | |
} | |
# dance with the API. | |
response = self.__API("analysis/download", params) | |
# if resource is JSON, convert to container and return the head reference "analysis". | |
if resource == "json": | |
try: | |
return json.loads(response.content)["analysis"] | |
except: | |
None | |
# otherwise, return the raw content. | |
return response.content | |
#################################################################################################################### | |
def search (self, query): | |
""" | |
Retrieve a list of available systems. | |
@type query: str | |
@param query: Search query. | |
@rtype: list | |
@return: List of objects describing available analysis systems. | |
""" | |
# dance with the API. | |
response = self.__API("analysis/search", {"regex" : "/" + query + "/"}) | |
# returns a list, we assign it into a dictionary. | |
content = '{"results":' + response.content + '}' | |
# parse the JSON into a container and return the result list we just created above. | |
return json.loads(content)["results"] | |
#################################################################################################################### | |
def systems (self): | |
""" | |
Retrieve a list of available systems. | |
@rtype: list | |
@return: List of objects describing available analysis systems. | |
""" | |
# dance with the API. | |
response = self.__API("server/systems") | |
# returns a list, we assign it into a dictionary. | |
content = '{"systems":' + response.content + '}' | |
# parse the JSON into a container and return the systems list we just created above. | |
return json.loads(content)["systems"] | |
######################################################################################################################## | |
if __name__ == "__main__": | |
def USAGE (): | |
msg = "%s: <analyses | analyze <fh> | available | delete <id> | queue | report <id> | search <term> | systems>" | |
print msg % sys.argv[0] | |
sys.exit(1) | |
if len(sys.argv) == 2: | |
cmd = sys.argv.pop().lower() | |
arg = None | |
elif len(sys.argv) == 3: | |
arg = sys.argv.pop().lower() | |
cmd = sys.argv.pop().lower() | |
else: | |
USAGE() | |
# read in username, password and aggree to terms and conditions. | |
if JOE_USER: | |
username = JOE_USER | |
else: | |
username = raw_input("Joe Sandbox Username: ") | |
if JOE_PASS: | |
password = JOE_PASS | |
else: | |
password = getpass.getpass("Joe Sandbox Password: ") | |
if JOE_TAC: | |
tac = "yes" | |
else: | |
tac = raw_input("Do you agree to the terms and conditions (yes/no)? ") | |
if tac.lower() != "yes": | |
sys.exit(1) | |
# instantiate Joe Sandbox API interface. | |
joe = joe_api(username, password) | |
# process command line arguments. | |
if "analyses" in cmd: | |
for a in joe.analyses(): | |
print a["webid"], a["status"], a["systems"], a["filename"], a["detections"] | |
elif "analyze" in cmd: | |
if arg is None: | |
USAGE() | |
else: | |
with open(arg, "rb") as handle: | |
print joe.analyze(handle, systems="xp,w7") | |
elif "available" in cmd: | |
print joe.is_available() | |
elif "delete" in cmd: | |
if arg is None: | |
USAGE() | |
else: | |
print joe.delete(arg) | |
elif "queue" in cmd: | |
print joe.queue_size() | |
elif "report" in cmd: | |
if arg is None: | |
USAGE() | |
else: | |
print joe.report(arg) | |
elif "search" in cmd: | |
if arg is None: | |
USAGE() | |
else: | |
for r in joe.search(arg): | |
print r | |
elif "system" in cmd: | |
print ",".join([s["name"] for s in joe.systems()]) | |
else: | |
USAGE() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment