Last active
June 18, 2024 14:39
-
-
Save Climax777/ecea9f6021647b92ead1eadc438125fe to your computer and use it in GitHub Desktop.
ASIO test files
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <boost/asio.hpp> | |
#include <atomic> | |
#include <iostream> | |
#include <vector> | |
#include <thread> | |
#include <mutex> | |
#include <chrono> | |
#include <memory> | |
using namespace std; | |
const int datasize = 256; | |
// Tweak this to test smaller sizes (make counterpart in python program the same) | |
const int multiplier = 1024; | |
uint32_t totalsize = 0; | |
std::mutex my_test_mutex; | |
std::atomic_int64_t counter = 0; | |
std::atomic_int64_t request_order = 0; | |
void handler(const boost::system::error_code& error, std::size_t bytes_transferred, std::shared_ptr<std::vector<char>> buffer, int64_t order) { | |
cout << "Handler called for order: " << order << endl; | |
cout.flush(); | |
if (error) { | |
cerr << "Error: " << error.message() << endl; | |
} else { | |
cout << "Bytes transferred: " << bytes_transferred << endl; | |
} | |
counter++; | |
cout << "Total completed: " << counter << endl; | |
cout.flush(); | |
} | |
void my_test_func(boost::asio::ip::tcp::socket* socket, boost::asio::io_context* io) { | |
while (true) { | |
cout << "Starting out on thread " << std::this_thread::get_id() << endl; | |
cout.flush(); | |
int64_t order = ++request_order; | |
auto buffer = std::make_shared<std::vector<char>>(datasize * multiplier * 2, 0); | |
// Initialize the buffer | |
(*buffer)[0] = static_cast<char>(order % 256); // Use the first byte to store the order | |
for (int i = 0; i < datasize * multiplier; ++i) { | |
(*buffer)[i*2] = static_cast<char>(order % 256); | |
(*buffer)[i*2 + 1] = static_cast<char>(i % 256); | |
} | |
std::vector<boost::asio::const_buffer> scatter_buffer; | |
scatter_buffer.push_back(boost::asio::buffer(*buffer)); | |
boost::asio::async_write(*socket, scatter_buffer, boost::asio::transfer_exactly(totalsize), | |
[buffer, order](const boost::system::error_code& error, std::size_t bytes_transferred) { | |
handler(error, bytes_transferred, buffer, order); | |
}); | |
cout << "Done on thread " << std::this_thread::get_id() << endl; | |
cout.flush(); | |
// Change this to a smaller value (milliseconds/microseconds) to see breakage even in a single thread | |
std::this_thread::sleep_for(std::chrono::seconds(100)); | |
// break; | |
} | |
} | |
int main(int argc, char** argv) { | |
totalsize = datasize * multiplier * 2; | |
boost::asio::io_context my_test_ioservice; | |
boost::asio::ip::tcp::socket my_test_socket(my_test_ioservice); | |
boost::asio::ip::tcp::resolver my_test_tcp_resolver(my_test_ioservice); | |
// Change IP/Port here | |
boost::asio::ip::tcp::resolver::query my_test_tcp_query("localhost", "40000"); | |
boost::asio::ip::tcp::resolver::iterator my_test_tcp_iterator = my_test_tcp_resolver.resolve(my_test_tcp_query); | |
boost::asio::connect(my_test_socket, my_test_tcp_iterator); | |
my_test_ioservice.post([]() { | |
while (true) std::this_thread::sleep_for(std::chrono::seconds(1)); | |
}); | |
cout << "Starting senders for " << totalsize << " bytes" << endl; | |
cout.flush(); | |
vector<std::jthread> sender_threads; | |
// Change this to single thread/multiple threads and witness the destruction | |
for (size_t i = 0; i < 10; ++i) { // Adjust the loop count to tweak the number of sender threads. | |
sender_threads.emplace_back(my_test_func, &my_test_socket, &my_test_ioservice); | |
} | |
cout << "Starting threads" << endl; | |
cout.flush(); | |
std::vector<std::jthread> worker_threads; | |
for (size_t i = 0; i < 19; ++i) { | |
worker_threads.emplace_back([&]() { | |
while (!my_test_ioservice.stopped()) { | |
try { | |
my_test_ioservice.run(); | |
} catch (std::exception& e) { | |
cerr << "Workers: thread exception: " << e.what() << endl; | |
cerr.flush(); | |
if (my_test_ioservice.stopped()) | |
break; | |
} | |
} | |
}); | |
} | |
while (true) { | |
try { | |
my_test_ioservice.run(); | |
} catch (std::exception& e) { | |
cerr << "main: thread exception: " << e.what() << endl; | |
cerr.flush(); | |
if (my_test_ioservice.stopped()) | |
break; | |
} | |
} | |
cout << "Done" << endl; | |
cout.flush(); | |
return 0; | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import socket | |
# Server configuration | |
HOST = '0.0.0.0' # The IP address the server will bind to | |
PORT = 40000 # The port the server will bind to | |
BUFFER_SIZE = 256 * 1024 * 2 # Buffer size to match the C++ client's data size | |
def receive_full_data(client_socket, buffer_size): | |
data = bytearray("", "ascii") | |
while len(data) < buffer_size: | |
packet = client_socket.recv(buffer_size - len(data)) | |
if not packet: | |
break | |
data.extend(packet) | |
return data | |
def start_server(): | |
# Create a TCP/IP socket | |
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as server_socket: | |
# Bind the socket to the address and port | |
server_socket.bind((HOST, PORT)) | |
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |
# Listen for incoming connections | |
server_socket.listen() | |
print(f'Server listening on {HOST}:{PORT}') | |
previous_order = -1 | |
while True: | |
# Wait for a connection | |
client_socket, client_address = server_socket.accept() | |
with client_socket: | |
print(f'Connected by {client_address}') | |
while True: | |
# Receive the full buffer size data | |
data = receive_full_data(client_socket, BUFFER_SIZE) | |
if not data or len(data) == 0 or len(data) != BUFFER_SIZE: | |
print(f"incorrect size {len(data)} vs {BUFFER_SIZE}") | |
break | |
order_byte = data[0] # The first byte indicates the order | |
print(f"starting {order_byte} {(previous_order + 1) % 256} size {len(data)}") | |
if previous_order != -1 and order_byte != (previous_order + 1) % 256: | |
print(f"Error: Order byte {order_byte} did not increment correctly from {previous_order}") | |
else: | |
print(f"Order byte {order_byte} incremented correctly from {previous_order}") | |
previous_order = order_byte | |
# Check that the rest of the data is incrementing correctly | |
correct_data = True | |
for i in range(0, int(len(data)/2), 1): | |
expected_value = i % 256 | |
#print(f"{i} {i*2 + 1}") | |
#print(f"{data[i]} {expected_value}") | |
if data[(i*2)+1] != expected_value: | |
correct_data = False | |
print(f"Error: Data byte {i} with value {data[i*2+1]} is not incrementing correctly. should be {expected_value}") | |
#break | |
if data[(i*2)] != previous_order: | |
correct_data = False | |
print(f"Error: Order byte {i} with value {data[i*2]} is not staying correct. Should be {previous_order}") | |
#break | |
if correct_data: | |
print(f"Data received correctly with order byte {order_byte}") | |
if __name__ == '__main__': | |
start_server() |
Author
Climax777
commented
Jun 18, 2024
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment