Created
December 18, 2021 03:17
-
-
Save lsferreira42/249e818d8a8e41592fd2ef14a9d8bce7 to your computer and use it in GitHub Desktop.
Threaded socket testing tool
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from collections import deque | |
from queue import Queue | |
from threading import Thread | |
import threading | |
from time import sleep | |
from collections import Counter | |
import os | |
import socket | |
import sys | |
import argparse | |
import resource | |
#import requests | |
NUM_RUNNING = 0 | |
NUM_SUCCESS = 0 | |
LIST_FAIL = [] | |
sem = threading.Semaphore() | |
NUM_THREADS = 2000 | |
SOCKET_TIMEOUT = 30 | |
DEBUG = os.environ.get('DEBUG', False) | |
class Worker(Thread): | |
def __init__(self, tasks): | |
Thread.__init__(self) | |
self.tasks = tasks | |
self.daemon = True | |
self.start() | |
def run(self): | |
while True: | |
func, args, kargs = self.tasks.get() | |
try: | |
func(*args, **kargs) | |
except Exception as error: | |
print("Error: ", error) | |
finally: | |
self.tasks.task_done() | |
class ThreadPool(object): | |
def __init__(self, num_threads): | |
self.tasks = Queue(num_threads) | |
for _ in range(num_threads): | |
Worker(self.tasks) | |
def add_task(self, func, *args, **kargs): | |
self.tasks.put((func, args, kargs)) | |
def wait_completion(self): | |
self.tasks.join() | |
def print_debug(msg): | |
if DEBUG: | |
print("\r\n\nDEBUG: {}\n".format(msg)) | |
def connect_socket(host, port, timeout, path, message="\0", sleep_before=0, sleep_after=0): | |
global NUM_RUNNING | |
global NUM_SUCCESS | |
global LIST_FAIL | |
global sem | |
sem.acquire() | |
NUM_RUNNING = NUM_RUNNING + 1 | |
print('\r' + 'Url: {} - Running: {} - Errors: {} - Ok: {}'.format(host, NUM_RUNNING, len(LIST_FAIL), NUM_SUCCESS), end='') | |
sem.release() | |
sleep(sleep_before) | |
try: | |
socket.setdefaulttimeout(timeout) | |
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
s.settimeout(timeout) | |
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: | |
s.connect((host , port)) | |
get_request = message.format(path, host) | |
get_request = bytes(get_request, 'utf-8') | |
s.sendall(get_request) | |
ret = s.recv(128) | |
print_debug(ret) | |
sleep(sleep_after) | |
s.close() | |
NUM_SUCCESS = NUM_SUCCESS + 1 | |
except Exception as error: | |
LIST_FAIL.append(str(error)) | |
return | |
def connect_http(url, timeout): | |
global NUM_RUNNING | |
global NUM_SUCCESS | |
global LIST_FAIL | |
global sem | |
sem.acquire() | |
NUM_RUNNING = NUM_RUNNING + 1 | |
print('\r' + 'Url: {} - Running: {} - Errors: {} - Ok: {}'.format(url, NUM_RUNNING, len(LIST_FAIL), NUM_SUCCESS), end='') | |
sem.release() | |
sleep(5) | |
try: | |
requests.get(url, timeout=timeout) | |
return True | |
except Exception as error: | |
return False | |
def get_args(): | |
parser = argparse.ArgumentParser(description='Para o modo de debug, sete a variavel de ambiente DEBUG=True') | |
parser.add_argument("-n", "--num-threads", | |
required=False, | |
default=50, | |
type=int, | |
help="Numero de threads") | |
parser.add_argument("-t", "--timeout", | |
required=False, | |
default=30.0, | |
type=float, | |
help="Timeout do socket") | |
parser.add_argument("-r", "--requests", | |
required=False, | |
default=50, | |
type=int, | |
help="Numero de requests") | |
parser.add_argument("-p", "--port", | |
required=False, | |
default=80, | |
type=int, | |
help="Porta do socket") | |
parser.add_argument("-u", "--url", | |
required=True, | |
type=str, | |
help="Url do socket") | |
parser.add_argument("-P", "--path", | |
required=False, | |
type=str, | |
default="/", | |
help="Path da requisicaoo http") | |
parser.add_argument("-m", "--message", | |
required=False, | |
type=str, | |
default="\0", | |
help="Mensagem para ser enviada no socket") | |
parser.add_argument("-s", "--sleep-before", | |
required=False, | |
type=int, | |
default=0, | |
help="Tempo de sleep antes de enviar a requisicao") | |
parser.add_argument("-S", "--sleep-after", | |
required=False, | |
type=int, | |
default=0, | |
help="Tempo de sleep depois de enviar a requisicao") | |
return parser.parse_args() | |
def main(): | |
args = get_args() | |
if args.requests < args.num_threads: | |
args.num_threads = args.requests | |
if args.requests < 1: | |
print("O numero de requests deve ser maior que 0") | |
exit(1) | |
if args.num_threads < 1: | |
print("O numero de threads deve ser maior que 0") | |
exit(1) | |
# fix rlimits | |
limit_procs = args.num_threads | |
resource.setrlimit(resource.RLIMIT_NOFILE, (65536, 65536)) | |
resource.setrlimit(resource.RLIMIT_NPROC, (limit_procs, limit_procs)) | |
pool = ThreadPool(args.num_threads) | |
for i in range(args.requests): | |
pool.add_task(connect_socket, args.url, args.port, float(args.timeout), args.path, args.message, args.sleep_before, args.sleep_after) | |
pool.wait_completion() | |
if DEBUG: | |
print("DEBUG:\n") | |
print("Erros: {}".format(LIST_FAIL)) | |
print("Ok: {}".format(NUM_SUCCESS)) | |
print("Total: {}\n\n".format(NUM_SUCCESS + len(LIST_FAIL))) | |
print("Args: {}\n\n".format(args)) | |
print('\r' + 'Url: {} - Running: {} - Errors: {} - Ok: {}'.format(args.url, NUM_RUNNING, len(LIST_FAIL), NUM_SUCCESS)) | |
if len(LIST_FAIL) > 0: | |
print("\nErrors: {}".format(Counter(LIST_FAIL))) | |
if __name__ == "__main__": | |
sys.exit( | |
main() | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment