Last active
February 13, 2020 06:28
-
-
Save mgarg1/0847293b517d85bf668f17602a7d7582 to your computer and use it in GitHub Desktop.
Throttler with threads - t1 thread is for throttling, t2 is for just getting input (tail a file) main thread is controlling the influx
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// https://onlinegdb.com/r1y2ERAfL | |
#include <chrono> | |
#include <fstream> | |
#include <sstream> | |
#include <iostream> | |
#include <thread> | |
#include <queue> | |
#include <cassert> | |
static const int THROTTLE_VAL = 100; | |
static const int THROTTLE_PERIOD = 1000; //1 Sec | |
using Millis = std::chrono::milliseconds; | |
void validateMessages(const std::stringstream &ss,size_t msgCnt,const size_t inputRate){ | |
// std::cout << msgCnt << std::endl; | |
std::stringstream ss_tmp(ss.str()); //redundant but test function so it is okay | |
const size_t maxIntsPerLine = inputRate > THROTTLE_VAL ? THROTTLE_VAL : inputRate; | |
std::string s1; | |
while(std::getline(ss_tmp,s1)){ | |
// std::cout << s1 << std::endl; | |
std::stringstream ss_ints(s1); | |
int tmp=0; | |
size_t ints_in_a_line = 0; | |
int lastInt=tmp+1; | |
while(ss_ints >> tmp){ | |
ints_in_a_line++; | |
if(ints_in_a_line == 1){ | |
lastInt = tmp-1; | |
} | |
assert(lastInt+1 == tmp); | |
lastInt = tmp; | |
} | |
// std::cout << "maxIntsPerLine - " << maxIntsPerLine << std::endl ; | |
// std::cout << "ints_in_a_line - " << ints_in_a_line << std::endl ; | |
assert(maxIntsPerLine >= ints_in_a_line); | |
msgCnt -= ints_in_a_line; | |
} | |
// std::cout << msgCnt; | |
assert(msgCnt == 0); | |
} | |
int main() | |
{ | |
//reader thread | |
std::queue<int> q1; | |
std::stringstream str_strm; | |
// std::ostream &outStream = std::cout; | |
std::ostream &outStream = str_strm; | |
bool keepAlive = true; | |
//consumer thread - just popping out the queue | |
std::thread consumerThread([&q1,&outStream,&keepAlive](){ | |
while(keepAlive){ | |
size_t elementsToPop = q1.size(); | |
if(elementsToPop){ | |
elementsToPop = elementsToPop > THROTTLE_VAL ? THROTTLE_VAL : elementsToPop; | |
// std::cout << "popping elements - \n"; | |
// outStream << elementsToPop << std::endl; | |
while(elementsToPop--){ | |
outStream << q1.front() << " "; | |
q1.pop(); | |
} | |
outStream << std::endl; | |
} | |
// timer1.reset(); | |
std::this_thread::sleep_for(Millis(THROTTLE_PERIOD)); | |
} | |
}); | |
int last_position=0; | |
//reader thread - similar to tail -f | |
std::thread fileTailThread([&q1,&last_position,&keepAlive](){ | |
// std::this_thread::sleep_for(Millis(300)); | |
std::ifstream ifs ("buffer.txt", std::ifstream::in); | |
while(keepAlive){ | |
if(ifs.good() && ifs.peek() != EOF){ | |
int temp; | |
ifs >> temp; | |
ifs.ignore(std::numeric_limits<std::streamsize>::max(),'\n'); | |
last_position = ifs.tellg(); | |
// std::cout << temp << std::endl; | |
q1.push(temp); | |
}else { | |
ifs.clear(); | |
ifs.seekg(last_position,std::ios_base::beg); | |
} | |
// std::this_thread::sleep_for(Millis(10)); | |
} | |
ifs.close(); | |
}); | |
auto writeConsIntsToFileWithRate = [](const int wordsPerSecond,const int counter=200,const char *outFile="buffer.txt"){ | |
std::ofstream ofs (outFile, std::ofstream::out | std::ofstream::trunc); | |
int cntr = 0; | |
while(cntr < counter){ | |
ofs << cntr << std::endl; | |
cntr++; | |
//1000/10 = 100 messages per second | |
std::this_thread::sleep_for(Millis(int(1000/wordsPerSecond))); | |
} | |
ofs.close(); | |
}; | |
auto truncateFile = [](const char *outFile="buffer.txt"){ | |
std::ofstream ofs2 (outFile, std::ofstream::out | std::ofstream::trunc); | |
ofs2.close(); | |
}; | |
truncateFile(); | |
// TestCase 1 - messages are coming at high frequency(> THROTTLE_VAL), so expected throughtput should be THROTTLE_VAL | |
{ | |
std::cout << "start of TEST 1\n"; | |
const size_t msgsPerSecond = 100; | |
const size_t msgCnt = 2000; | |
writeConsIntsToFileWithRate(msgsPerSecond,msgCnt); //100 ints per second | |
std::this_thread::sleep_for(Millis(THROTTLE_PERIOD) + Millis(2000)); | |
assert(q1.size() == 0); | |
assert(!str_strm.str().empty()); | |
validateMessages(str_strm,msgCnt,msgsPerSecond); | |
std::cout << "end of TEST 1\n"; | |
//cleanup | |
truncateFile(); | |
str_strm.str(std::string()); | |
last_position=0; | |
} | |
//TestCase 2 - messages are coming at low frequency(< THROTTLE_VAL), so expected throughtput should be ~10 | |
{ | |
std::cout << "start of TEST 2\n"; | |
const size_t msgsPerSecond = 10; | |
const size_t msgCnt = 200; | |
writeConsIntsToFileWithRate(msgsPerSecond,msgCnt); //10 ints per second | |
std::this_thread::sleep_for(Millis(THROTTLE_PERIOD) + Millis(1000)); | |
assert(q1.size() == 0); | |
assert(!str_strm.str().empty()); | |
validateMessages(str_strm,msgCnt,msgsPerSecond); | |
std::cout << "end of TEST 2\n"; | |
//cleanup | |
truncateFile(); | |
str_strm.str(std::string()); | |
last_position=0; | |
} | |
keepAlive = false; | |
consumerThread.join(); | |
fileTailThread.join(); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment