|
import sys, requests, time, math, subprocess, platform, threading |
|
from requests.adapters import HTTPAdapter |
|
from requests.packages.urllib3.util.retry import Retry |
|
|
|
verbose = '-v' in sys.argv |
|
|
|
def downloadFile(u, hostname): |
|
try: |
|
if verbose: |
|
print(f"Attempting to download from {u}...") |
|
|
|
session = requests.Session() |
|
retry = Retry(total=5, backoff_factor=0.1, status_forcelist=[500, 502, 503, 504]) |
|
adapter = HTTPAdapter(max_retries=retry) |
|
session.mount('http://', adapter) |
|
session.mount('https://', adapter) |
|
|
|
headers = {'User-Agent': 'Mozilla/5.0'} |
|
|
|
if verbose: |
|
print("Session initialized. Sending GET request...") |
|
|
|
r = session.get(u, headers=headers, stream=True, timeout=10) |
|
|
|
if '-v' in sys.argv: |
|
pdb.set_trace() |
|
|
|
if r.status_code == 200: |
|
if verbose: |
|
print("GET request successful. Streaming content...") |
|
else: |
|
print(f"GET request failed with status code {r.status_code}") |
|
return None |
|
|
|
tl = int(r.headers.get('content-length')) |
|
|
|
if verbose: |
|
print(f"Content length: {tl} bytes.") |
|
|
|
pts = [] |
|
latencies = [] |
|
dl = 0 |
|
start_time = time.time() |
|
|
|
with open("/dev/null", 'wb') as f: |
|
st = time.process_time() |
|
for c in r.iter_content(1024): |
|
if verbose: |
|
print("Writing chunk to file...") |
|
dl += len(c) |
|
f.write(c) |
|
time_diff = time.process_time() - st |
|
if time_diff > 0: # prevent division by zero |
|
pts.append(dl // (time_diff)) |
|
latencies.append(get_latency(hostname)) |
|
p = int(50 * dl / tl) |
|
sys.stdout.write(f"\r[{'#' * p}{'.' * (50 - p)}] {int(100 * dl / tl)}%") |
|
sys.stdout.flush() |
|
|
|
end_time = time.time() |
|
elapsed_time = end_time - start_time |
|
|
|
avg = round(sum(pts) / len(pts), 2) if pts else 0 |
|
avg_latency = round(sum(latencies) / len(latencies), 2) if latencies else 0 |
|
max_jitter = max(latencies) - min(latencies) if latencies else 0 |
|
packet_loss = (1 - dl / tl) * 100 if tl else 0 |
|
|
|
if verbose: |
|
print(f"{time.time()-start_time:.2f}s: Download completed.") |
|
|
|
return avg, elapsed_time, avg_latency, max_jitter, packet_loss |
|
|
|
except Exception as e: |
|
print(f"An error occurred: {e}") |
|
return None |
|
|
|
intro_art = ''' |
|
# # |
|
# # ###### ##### ###### # # ###### ##### |
|
# # # # # ## # # # # |
|
####### ##### # # # # # ##### # # |
|
# # # # # # # # # ##### |
|
# # # # # # ## # # # |
|
# # ###### # ###### # # ###### # # |
|
|
|
##### ####### |
|
# # ##### ###### ###### ##### # ###### #### ##### |
|
# # # # # # # # # # # |
|
##### # # ##### ##### # # # ##### #### # |
|
# ##### # # # # # # # # |
|
# # # # # # # # # # # # |
|
##### # ###### ###### ##### # ###### #### # |
|
''' |
|
|
|
print(intro_art) |
|
|
|
hosts = { |
|
"fsn1-speed.hetzner.com": {"sm": "https://fsn1-speed.hetzner.com/100MB.bin", "md": "https://fsn1-speed.hetzner.com/1GB.bin", "lg": "https://fsn1-speed.hetzner.com/10GB.bin"}, |
|
"hel1-speed.hetzner.com": {"sm": "https://hel1-speed.hetzner.com/100MB.bin", "md": "https://hel1-speed.hetzner.com/1GB.bin", "lg": "https://hel1-speed.hetzner.com/10GB.bin"}, |
|
"speed.hetzner.de": {"sm": "https://speed.hetzner.de/100MB.bin", "md": "https://speed.hetzner.de/1GB.bin", "lg": "https://speed.hetzner.de/10GB.bin"}, |
|
"ash.icmp.hetzner.com": {"sm": "http://ash.icmp.hetzner.com/100MB.bin", "md": "http://ash.icmp.hetzner.com/1GB.bin", "lg": "http://ash.icmp.hetzner.com/10GB.bin"}, |
|
"hil.icmp.hetzner.com": {"sm": "http://hil.icmp.hetzner.com/100MB.bin", "md": "http://hil.icmp.hetzner.com/1GB.bin", "lg": "http://hil.icmp.hetzner.com/10GB.bin"} |
|
} |
|
|
|
loc_map = { |
|
"fsn1-speed.hetzner.com": "Falkenstein, Germany", |
|
"hel1-speed.hetzner.com": "Helsinki, Finland", |
|
"speed.hetzner.de": "Nuremberg, Germany", |
|
"ash.icmp.hetzner.com": "Ashburn, Virginia, USA", |
|
"hil.icmp.hetzner.com": "Hillsboro, Oregon, USA" |
|
} |
|
|
|
def convert_size(s): |
|
if s <= 0: |
|
return "0B" |
|
i = int(math.floor(math.log(s, 1024))) |
|
return f"{round(s / math.pow(1024, i), 2)}{'B,KB,MB,GB,TB,PB,EB,ZB,YB'.split(',')[i]}" |
|
|
|
def get_latency(h): |
|
if platform.system() == "Windows": return 0 |
|
try: |
|
o = subprocess.check_output(["ping", "-c", "1", h]).decode("utf-8").split("\n") |
|
for l in o: |
|
if "time=" in l: return float(l.split("time=")[1].split(" ")[0]) |
|
except: return 0 |
|
|
|
def downloadFile(u, hostname): |
|
try: |
|
start_time = time.time() |
|
if verbose: |
|
print(f"{time.time()-start_time:.2f}s: Attempting to download from {u}...") |
|
session = requests.Session() |
|
retry = Retry(total=5, backoff_factor=0.1, status_forcelist=[500, 502, 503, 504]) |
|
adapter = HTTPAdapter(max_retries=retry) |
|
session.mount('http://', adapter) |
|
session.mount('https://', adapter) |
|
|
|
headers = {'User-Agent': 'Mozilla/5.0'} |
|
|
|
if verbose: |
|
print(f"{time.time()-start_time:.2f}s: Session initialized. Sending GET request...") |
|
r = session.get(u, headers=headers, stream=True, timeout=10) |
|
|
|
if r.status_code == 200: |
|
if verbose: |
|
print(f"{time.time()-start_time:.2f}s: GET request successful. Streaming content...") |
|
else: |
|
print(f"{time.time()-start_time:.2f}s: GET request failed with status code {r.status_code}") |
|
return None |
|
|
|
tl = int(r.headers.get('content-length')) |
|
if verbose: |
|
print(f"{time.time()-start_time:.2f}s: Content length: {tl} bytes.") |
|
|
|
pts = [] |
|
latencies = [] |
|
dl = 0 |
|
start_time = time.time() |
|
|
|
with open("/dev/null", 'wb') as f: |
|
st = time.process_time() |
|
last_latency_check = time.time() # Initialize the last latency check to the current time |
|
latency_check_interval = 5 # Set how often to check latency, in seconds |
|
|
|
for c in r.iter_content(1024): |
|
dl += len(c) |
|
f.write(c) |
|
time_diff = time.process_time() - st |
|
|
|
if time_diff > 0: #prevent division by zero |
|
pts.append(dl // time_diff) |
|
current_time = time.time() |
|
if current_time - last_latency_check > latency_check_interval: |
|
latencies.append(get_latency(hostname)) |
|
last_latency_check = current_time # Update the last latency check time |
|
|
|
p = int(50 * dl / tl) |
|
sys.stdout.write(f"\r[{'#' * p}{'.' * (50 - p)}] {int(100 * dl / tl)}%") |
|
sys.stdout.flush() |
|
|
|
end_time = time.time() |
|
elapsed_time = end_time - start_time |
|
|
|
avg = round(sum(pts) / len(pts), 2) if pts else 0 |
|
avg_latency = round(sum(latencies) / len(latencies), 2) if latencies else 0 |
|
max_jitter = max(latencies) - min(latencies) if latencies else 0 |
|
packet_loss = (1 - dl / tl) * 100 if tl else 0 |
|
if verbose: |
|
print(f"{time.time()-start_time:.2f}s: Download completed.") |
|
return avg, elapsed_time, avg_latency, max_jitter, packet_loss |
|
|
|
except Exception as e: |
|
print(f"{time.time()-start_time:.2f}s: An error occurred: {e}") |
|
return None |
|
|
|
def main(): |
|
sz = None |
|
|
|
if len(sys.argv) > 1 and sys.argv[1] in ["sm", "md", "lg"]: |
|
sz = sys.argv[1] |
|
|
|
if sz is None: |
|
print("Usage: python3 hetzner-speedtest.py [sm] Small 100MB") |
|
print(" [md] Medium 1GB") |
|
print(" [lg] Large 10GB") |
|
sz = input("Enter size (sm, md, lg): ") |
|
if sz not in ["sm", "md", "lg"]: |
|
return |
|
else: |
|
sz = sys.argv[1] |
|
|
|
lat_res = {} |
|
spd_res = {} |
|
time_res = {} |
|
avg_latency_res = {} |
|
max_jitter_res = {} |
|
packet_loss_res = {} |
|
traceroute_res = {} |
|
dns_resolution_res = {} |
|
dns_resolution_time_res = {} |
|
|
|
for h in hosts: |
|
loc = loc_map.get(h, h) |
|
print(f"\nTesting {loc}") |
|
f = hosts[h][sz] |
|
download_result = downloadFile(f, h) |
|
|
|
if download_result is not None: |
|
avg_spd, elapsed_time, avg_latency, max_jitter, packet_loss = download_result |
|
lat_res[loc] = get_latency(h) |
|
spd_res[loc] = avg_spd |
|
time_res[loc] = elapsed_time |
|
avg_latency_res[loc] = avg_latency |
|
max_jitter_res[loc] = max_jitter |
|
packet_loss_res[loc] = packet_loss |
|
|
|
print(f"\nSpeed: {convert_size(avg_spd)}/s") |
|
print(f"Time: {elapsed_time:.2f} seconds") |
|
print(f"Latency: {lat_res[loc]}ms") |
|
print(f"Avg Latency: {avg_latency}ms") |
|
print(f"Max Jitter: {max_jitter}ms") |
|
print(f"Packet Loss: {packet_loss:.2f}%") |
|
# Uncomment these lines if you implement traceroute and DNS resolution |
|
# print(f"Traceroute: \n{traceroute_result}") |
|
# print(f"DNS Resolution: {dns_resolution_result} (Time: {dns_resolution_time:.4f} seconds)") |
|
else: |
|
print(f"Could not complete the download for {loc}. Skipping...") |
|
|
|
if lat_res: |
|
min_lat_loc = min(lat_res, key=lat_res.get) |
|
max_spd_loc = max(spd_res, key=spd_res.get) |
|
min_time_loc = min(time_res, key=time_res.get) |
|
min_jitter_loc = min(max_jitter_res, key=max_jitter_res.get) |
|
max_jitter_loc = max(max_jitter_res, key=max_jitter_res.get) |
|
else: |
|
print("No successful streams established to analyse.") |
|
|
|
print(f"\nResults:") |
|
print(f"Lowest Latency: {min_lat_loc.ljust(25)}") |
|
print(f"Latency: {lat_res[min_lat_loc]:.3f}ms; Time: {time_res[min_lat_loc]:.2f}s; Speed: {convert_size(spd_res[min_lat_loc])}/s)") |
|
print(f"Highest Speed: {max_spd_loc.ljust(25)}") |
|
print(f"(Latency: {lat_res[max_spd_loc]:.3f}ms; Time: {time_res[max_spd_loc]:.2f}s; Speed: {convert_size(spd_res[max_spd_loc])}/s)") |
|
print(f"Fastest Time: {min_time_loc.ljust(25)}") |
|
print(f"(Latency: {lat_res[min_time_loc]:.3f}ms; Time: {time_res[min_time_loc]:.2f}s; Speed: {convert_size(spd_res[min_time_loc])}/s)") |
|
print(f"Lowest Jitter: {min_jitter_loc.ljust(25)} (Jitter: {convert_size(max_jitter_res[min_jitter_loc])}; Packet Loss: {packet_loss_res[min_jitter_loc]:.2f}%)") |
|
print(f"Highest Jitter: {max_jitter_loc.ljust(25)} (Jitter: {convert_size(max_jitter_res[max_jitter_loc])}; Packet Loss: {packet_loss_res[max_jitter_loc]:.2f}%)") |
|
|
|
if __name__ == "__main__": |
|
main() |