Created
October 5, 2021 13:19
-
-
Save eagleEggs/db4208301e6045af9341f48003c3d65c to your computer and use it in GitHub Desktop.
Testing upload mock data for mySQL / Django Script Repository API
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import mysql.connector | |
from random import random | |
from random import randrange | |
from random import choice | |
import json | |
class Database(object): | |
def __init__(self, username, dbhost, database, password, databaseport): | |
self.username = username | |
self.database = database | |
self.password = password | |
self.dbhost = dbhost | |
self.databaseport = databaseport | |
self.database = mysql.connector.connect(user=self.username, | |
database=self.database, | |
passwd=self.password, | |
port=self.databaseport, | |
host=self.dbhost) | |
self.myCursor = self.database.cursor(buffered=True) | |
def create_randdata(self, wad2post): | |
self.name = choice(["Turnip", "Bob", "Silent Jay", "Corn Pop", | |
"Nacho Libre"]) | |
self.downloads = choice(["5", "1", "15", "33", "69"]) | |
self.created = "" | |
self.lastmod = "" | |
self.userdown = choice(["bob", "georgeo", "fran", "lily", "franky"]) | |
self.userup = choice(["jill", "billy", "todd", "matt", "sam"]) | |
self.exe = choice(["this.exe", "that.exe", "None"]) | |
sql = "INSERT INTO ax VALUES(id, %s ,%s, NOW(), NOW(), %s, %s, " \ | |
"%s, %s, NOW())" | |
cursor = self.myCursor | |
print(wad2post) | |
cursor.execute("SET AUTOCOMMIT=1") | |
cursor.execute(sql, [self.name, self.downloads, self.userdown, | |
self.userup, wad2post, self.exe]) | |
class FakeClass(Database): | |
def __init__(self, iterations, *args): | |
super(FakeClass, self).__init__(*args) | |
self.iterations = iterations | |
self.titles = ['aaa.py', 'bbb.ps1', 'ccc.bat', 'ddd.sh', 'eee.py', | |
'fff.c', | |
'ggg.pl', 'hhh.ok', | |
'iii.hi'] | |
self.authors = ['bob', 'jay', 'me', 'you', 'us', 'them', 'who?'] | |
self.platforms = ['windows10', 'windows7', 'linux', 'all', 'ws2012'] | |
self.languages = ['python', 'powershell', 'bash', 'batch', 'perl', | |
'ruby'] | |
self.infrastructures = ['vmware6', 'hyperv', 'wdomain'] | |
self.binary_option = ['y', 'n'] | |
self.cross_platform = ['y', 'n'] | |
self.abstract = ['y', 'n'] | |
self.gui = ['y', 'n'] | |
self.cli = ['y', 'n'] | |
self.risk = ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '10'] | |
self.dependencies = ['pscore6', 'net35', 'net45', 'wkhtml'] | |
self.description = ['Does this thing thats really cool', | |
'Reverts ' | |
'Space-Time', | |
'Opens Gate to Helena', 'Fixes Unfixable Stuff'] | |
self.hash = ['5ee1b75ee625a51ce5f7436b4f99bc2a', | |
'zzz1b75ee625a51ce5f7436b4f99bfff', | |
'rre1b75ee625a51cettt436b4f99boop'] | |
self.binary = ['none'] | |
self.script = ['''class Instance(Engine): | |
def __init__(self, *args): | |
logging.info("Initiating Instance") | |
super(Instance, self).__init__(*args) | |
# no args = use inherited param > else use hard param | |
def open_site(self, site=None): | |
if site is None: | |
logging.info("Opening Browser Session") | |
self.engine.get(self.siteaddress) | |
else: | |
logging.info("Opening Custom Browser Session") | |
self.engine.get(site)''', '''class TestCycle(Instance): | |
def __init__(self, order, thread_max, cycles, brake, instance, scripts, | |
*args): | |
logging.info("Initiating Test Cycle") | |
super(Instance, self).__init__(*args) | |
self.order = order | |
self.instance = instance | |
self.scripts = scripts | |
self.time = "1" | |
self.successes = 0 | |
self.failures = 0 | |
if self.order == "repository": | |
self.script_total = os.listdir(self.scripts) | |
self.thread_total = 0 | |
self.thread_max = thread_max | |
self.script_max = len(self.script_total) | |
self.running = True | |
self.running_thread = True | |
self.cycles = cycles | |
self.brake = brake | |
global report_hook | |
self.UUID = uuid | |
report_hook = TA_Report_Template.Report(self.UUID, self.appName) | |
logging.info("Test Cycle Starting") | |
logging.info("Order: {}".format(self.order)) | |
logging.info("Max Cycles: {}".format(self.thread_max)) | |
logging.info("Test Cycle: Scripts: {}".format(self.scripts)) | |
if self.order == "single": | |
logging.info("Running Single Test") | |
Sg.Print("\nRunning Single Test:\n{}".format(self.scripts)) | |
thread.start_new_thread( | |
self.runonce, (self.instance, self.scripts)) | |
if self.order == "repository" and thread_max > 0: | |
logging.info("Running Repository Cycles") | |
Sg.Print("\nTotal: {} Tests:\n{}" | |
.format(self.script_total, self.scripts)) | |
# self.run_repository(self.instance, self.scripts) | |
thread.start_new_thread( | |
self.run_repository, (self.instance, self.scripts, | |
report_hook)) | |
while self.thread_total <= self.thread_max and \ | |
self.script_max != 0: | |
pass | |
if self.order == "repository" and thread_max == 0: | |
thread.start_new_thread(self.run_repository_inc, (self.instance, | |
self.scripts, | |
report_hook))''', | |
''' def wait_for_element_xy_loop(self, image, timesample, x1, y1, x2, y2, | |
precision=0.8): | |
pos = self.get_element_from_xy(image, x1, y1, x2, y2, precision) | |
while pos[0] == -1: | |
time.sleep(timesample) | |
pos = self.get_element_from_xy(image, x1, y1, x2, y2, precision) | |
return pos | |
def count_element(self, image, precision=0.9): | |
img_rgb = self.engine.screenshot() | |
img_rgb = np.array(img_rgb) | |
img_gray = cv2.cvtColor(img_rgb, cv2.COLOR_BGR2GRAY) | |
template = cv2.imread(image, 0) | |
w, h = template.shape[::-1] | |
res = cv2.matchTemplate(img_gray, template, cv2.TM_CCOEFF_NORMED) | |
loc = np.where(res >= precision) | |
count = 0 | |
for pt in zip(*loc[::-1]): # Swap columns and rows | |
# cv2.rectangle(img_rgb, pt, (pt[0] + w, pt[1] + h), (0, 0, 255), | |
# 2) // Uncomment to draw boxes around found occurances | |
count = count + 1 | |
# cv2.imwrite('result.png', img_rgb) // Uncomment to write output | |
# image with boxes drawn around occurances | |
return count | |
def r(self, num, rand): | |
return num + rand * random.random() | |
# compare_images takes two required images as inputs | |
def compare_images(self, image_one, image_two): | |
first_image = cv2.imread(image_one) | |
second_image = cv2.imread(image_two) | |
try: | |
x1 = cv2.cvtColor(first_image, cv2.COLOR_BGR2GRAY) | |
x2 = cv2.cvtColor(second_image, cv2.COLOR_BGR2GRAY) | |
absdiff = cv2.absdiff(first_image, second_image) | |
cv2.imwrite("images/absdiff.png", absdiff) | |
diff = cv2.subtract(first_image, second_image) | |
b, g, r = cv2.split(diff) | |
if cv2.countNonZero(b) == 0 and cv2.countNonZero(g) == 0 and \ | |
cv2.countNonZero(r) == 0: | |
return True | |
else: | |
return False'''] | |
self.fakeit() | |
# TEST formatting from SQL works: | |
with open('script_test.py', 'w') as f: | |
f.write(self.script[0]) | |
print("ok") | |
def fakeit(self): | |
for itera in range(0, self.iterations + 1): | |
wad = { | |
'title': choice(self.titles), | |
'author': choice(self.authors), | |
'platform': choice(self.platforms), | |
'language': choice(self.languages), | |
'target_infrastructure': choice(self.infrastructures), | |
'binary_option': choice(self.binary_option), | |
'cross_platform': choice(self.cross_platform), | |
'abstract': choice(self.abstract), | |
'gui': choice(self.gui), | |
'cli': choice(self.cli), | |
'risk': choice(self.risk), | |
'dependencies': choice(self.dependencies), | |
'description': choice(self.description), | |
'hash': choice(self.hash), | |
'binary': choice(self.binary), | |
'script': choice(self.script) | |
} | |
self.create_randdata("{}".format(wad)) | |
x = FakeClass(300, "me", "ip", "aaas", "pw", 3306) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment