Skip to content

Instantly share code, notes, and snippets.

@BitTheByte
Last active April 16, 2021 00:32
Show Gist options
  • Save BitTheByte/b88e35864848ab1abeb52742e85e7cb5 to your computer and use it in GitHub Desktop.
Save BitTheByte/b88e35864848ab1abeb52742e85e7cb5 to your computer and use it in GitHub Desktop.
import json
import flask
import requests
import time
import threading
import os
import random
requests.packages.urllib3.disable_warnings()
acunetix_host = "127.0.0.1"
acunetix_port = 3443
acunetix_token = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
max_active_scans = 10
pending_targets = set()
pending_file_name = "pending.scans.list"
API_BASE = "/api/v1/"
API_SCAN = API_BASE + "scans"
API_TARGET = API_BASE + "targets"
target_criticality_list = {
"critical": "10",
"high": "20",
"normal": "10",
"low": "0",
}
target_criticality_allowed = list(target_criticality_list.keys())
scan_profiles_list = {
"full_scan": "11111111-1111-1111-1111-111111111111",
"high_risk_vuln": "11111111-1111-1111-1111-111111111112",
"xss_vuln": "11111111-1111-1111-1111-111111111116",
"sql_injection_vuln": "11111111-1111-1111-1111-111111111113",
"weak_passwords": "11111111-1111-1111-1111-111111111115",
"crawl_only": "11111111-1111-1111-1111-111111111117",
}
scan_profiles_allowed = list(scan_profiles_list.keys())
class AXException(Exception):
HTTP_ERROR = "httpError"
AUTH_ERROR = "authError"
SERVER_RESOURCE = "serverResource"
NOT_ALLOWED_CRITICYLITY_PROFILE = "Criticallity not found"
NOT_ALLOWED_SCAN_PROFILE = "Scan Profile not found"
JSON_PARSING_ERROR = "Decoding JSON has failed"
def __init__(self, key, message):
Exception.__init__(self, message)
self.key = key
class Acunetix(object):
def __init__(self, host=None, api=None, timeout=20):
self.apikey = api
self.host = str(
"{}{}".format("https://" if "https://" not in host else "", host)
)
self.timeout = timeout
self.headers = {
"X-Auth": self.apikey,
"content-type": "application/json",
"User-Agent": "Acunetix",
}
self.authenticated = self.__is_connected()
if not self.authenticated:
raise AXException("AUTH_ERROR", "Wrong API Key !")
def __json_return(self, data):
try:
return json.loads(data)
except Exception as e:
raise AXException("JSON_PARSING_ERROR", f"Json Parsing has occured: {e}")
def __send_request(self, method="get", endpoint="", data=None):
request_call = getattr(requests, method)
url = str("{}{}".format(self.host, endpoint if endpoint else "/"))
try:
request = request_call(
url,
headers=self.headers,
timeout=self.timeout,
data=json.dumps(data),
verify=False,
)
if request.status_code == 403:
raise AXException("HTTP_ERROR", f"HTTP ERROR OCCURED: {request.text}")
return self.__json_return(request.text)
except Exception as e:
raise AXException("HTTP_ERROR", f"HTTP ERROR OCCURED: {e}")
def __is_connected(self):
return False if 'Unauthorized' in str(self.info()) else True
def info(self):
return self.__send_request(method="get", endpoint="/api/v1/info")
def targets(self):
return self.__send_request(
method="get", endpoint=f"{API_TARGET}?pagination=200"
)
def add_target(self, target="", criticality="normal"):
if criticality not in target_criticality_allowed:
raise AXException("NOT_ALLOWED_CRITICYLITY_PROFILE",
"Criticallity not found allowed values {}".format(str(list(target_criticality_allowed))))
target_address = (
target if "http://" in target or "https://" in target else "http://{}".format(target)
)
data = {
"address": str(target_address),
"description": "Sent from Acunetix-Python",
"criticality": target_criticality_list[criticality],
}
return self.__send_request(method="post", endpoint=API_TARGET, data=data)
def delete_target(self, target_id):
try:
return self.__send_request(
method="delete", endpoint=f"{API_TARGET}/{target_id}"
)
except:
pass
def delete_all_targets(self):
while True:
targets = self.targets()
if len(targets["targets"]):
for target in targets["targets"]:
self.delete_target(target["target_id"])
else:
break
def running_scans(self):
return self.__send_request(method="get", endpoint=f"{API_SCAN}?l=100&q=status:processing;")
def starting_scans(self):
return self.__send_request(method="get", endpoint=f"{API_SCAN}?l=100&q=status:starting;")
def start_scan(self, address=None, target_id=None, scan_profile="full_scan"):
if scan_profile not in scan_profiles_allowed:
raise AXException("NOT_ALLOWED_SCAN_PROFILE",
"Scan Profile not found allowed values {}".format(str(list(scan_profiles_allowed))))
if address and not target_id:
target_id = self.add_target(target=address)["target_id"]
scan_payload = {
"target_id": str(target_id),
"profile_id": scan_profiles_list[scan_profile],
"schedule": {"disable": False, "start_date": None, "time_sensitive": False},
}
return target_id, scan_profile, self.__send_request(method="post", endpoint=API_SCAN, data=scan_payload)
app = flask.Flask(import_name='')
@app.route("/add_target/<host>")
def add_interface(host):
global pending_targets
pending_targets.add(host)
return f"Added {host} to pending targets queue. len = {len(pending_targets)}"
@app.route("/show_targets")
def target_interface():
global pending_targets
return '\n'.join(pending_targets)
def populate_from_file(name, _list):
data = [target.strip() for target in open(name, "r").read().split("\n") if target.strip()]
for item in data:
_list.add(item)
return _list
threading.Thread(target=lambda: app.run(host="0.0.0.0", port=80)).start()
acunetix = Acunetix(host="%s:%i" % (acunetix_host, acunetix_port), api=acunetix_token)
if not os.path.isfile(pending_file_name):
open(pending_file_name, "w").write("")
while 1:
try:
time.sleep(5)
populate_from_file(pending_file_name, pending_targets)
print(f"[INFO] number of targets in qeueue = {len(pending_targets)}")
n_running_scans = len(acunetix.running_scans()['scans'])
n_starting_scans = len(acunetix.starting_scans()['scans'])
if n_running_scans >= max_active_scans or n_starting_scans >= max_active_scans or len(pending_targets) == 0:
print(f"[WARNING] max_active_scans of {max_active_scans} scan(s) is reached or no pending targets")
continue
temp = pending_targets.copy()
try:
target = pending_targets.pop()
target_id, scan_id, response = acunetix.start_scan(target)
print(f"[INFO] started new scan for {target} with target_id={target_id}, scan_id={scan_id}")
open(pending_file_name, "w").write('\n'.join(pending_targets))
except Exception as e :
print(e)
random.shuffle(temp)
pending_targets = temp
except Exception as e:
print(e)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment