Skip to content

Instantly share code, notes, and snippets.

@mgarg1
Last active February 13, 2020 06:28
Show Gist options
  • Save mgarg1/0847293b517d85bf668f17602a7d7582 to your computer and use it in GitHub Desktop.
Save mgarg1/0847293b517d85bf668f17602a7d7582 to your computer and use it in GitHub Desktop.
Throttler with threads - t1 thread is for throttling, t2 is for just getting input (tail a file) main thread is controlling the influx
// https://onlinegdb.com/r1y2ERAfL
#include <chrono>
#include <fstream>
#include <sstream>
#include <iostream>
#include <thread>
#include <queue>
#include <cassert>
static const int THROTTLE_VAL = 100;
static const int THROTTLE_PERIOD = 1000; //1 Sec
using Millis = std::chrono::milliseconds;
void validateMessages(const std::stringstream &ss,size_t msgCnt,const size_t inputRate){
// std::cout << msgCnt << std::endl;
std::stringstream ss_tmp(ss.str()); //redundant but test function so it is okay
const size_t maxIntsPerLine = inputRate > THROTTLE_VAL ? THROTTLE_VAL : inputRate;
std::string s1;
while(std::getline(ss_tmp,s1)){
// std::cout << s1 << std::endl;
std::stringstream ss_ints(s1);
int tmp=0;
size_t ints_in_a_line = 0;
int lastInt=tmp+1;
while(ss_ints >> tmp){
ints_in_a_line++;
if(ints_in_a_line == 1){
lastInt = tmp-1;
}
assert(lastInt+1 == tmp);
lastInt = tmp;
}
// std::cout << "maxIntsPerLine - " << maxIntsPerLine << std::endl ;
// std::cout << "ints_in_a_line - " << ints_in_a_line << std::endl ;
assert(maxIntsPerLine >= ints_in_a_line);
msgCnt -= ints_in_a_line;
}
// std::cout << msgCnt;
assert(msgCnt == 0);
}
int main()
{
//reader thread
std::queue<int> q1;
std::stringstream str_strm;
// std::ostream &outStream = std::cout;
std::ostream &outStream = str_strm;
bool keepAlive = true;
//consumer thread - just popping out the queue
std::thread consumerThread([&q1,&outStream,&keepAlive](){
while(keepAlive){
size_t elementsToPop = q1.size();
if(elementsToPop){
elementsToPop = elementsToPop > THROTTLE_VAL ? THROTTLE_VAL : elementsToPop;
// std::cout << "popping elements - \n";
// outStream << elementsToPop << std::endl;
while(elementsToPop--){
outStream << q1.front() << " ";
q1.pop();
}
outStream << std::endl;
}
// timer1.reset();
std::this_thread::sleep_for(Millis(THROTTLE_PERIOD));
}
});
int last_position=0;
//reader thread - similar to tail -f
std::thread fileTailThread([&q1,&last_position,&keepAlive](){
// std::this_thread::sleep_for(Millis(300));
std::ifstream ifs ("buffer.txt", std::ifstream::in);
while(keepAlive){
if(ifs.good() && ifs.peek() != EOF){
int temp;
ifs >> temp;
ifs.ignore(std::numeric_limits<std::streamsize>::max(),'\n');
last_position = ifs.tellg();
// std::cout << temp << std::endl;
q1.push(temp);
}else {
ifs.clear();
ifs.seekg(last_position,std::ios_base::beg);
}
// std::this_thread::sleep_for(Millis(10));
}
ifs.close();
});
auto writeConsIntsToFileWithRate = [](const int wordsPerSecond,const int counter=200,const char *outFile="buffer.txt"){
std::ofstream ofs (outFile, std::ofstream::out | std::ofstream::trunc);
int cntr = 0;
while(cntr < counter){
ofs << cntr << std::endl;
cntr++;
//1000/10 = 100 messages per second
std::this_thread::sleep_for(Millis(int(1000/wordsPerSecond)));
}
ofs.close();
};
auto truncateFile = [](const char *outFile="buffer.txt"){
std::ofstream ofs2 (outFile, std::ofstream::out | std::ofstream::trunc);
ofs2.close();
};
truncateFile();
// TestCase 1 - messages are coming at high frequency(> THROTTLE_VAL), so expected throughtput should be THROTTLE_VAL
{
std::cout << "start of TEST 1\n";
const size_t msgsPerSecond = 100;
const size_t msgCnt = 2000;
writeConsIntsToFileWithRate(msgsPerSecond,msgCnt); //100 ints per second
std::this_thread::sleep_for(Millis(THROTTLE_PERIOD) + Millis(2000));
assert(q1.size() == 0);
assert(!str_strm.str().empty());
validateMessages(str_strm,msgCnt,msgsPerSecond);
std::cout << "end of TEST 1\n";
//cleanup
truncateFile();
str_strm.str(std::string());
last_position=0;
}
//TestCase 2 - messages are coming at low frequency(< THROTTLE_VAL), so expected throughtput should be ~10
{
std::cout << "start of TEST 2\n";
const size_t msgsPerSecond = 10;
const size_t msgCnt = 200;
writeConsIntsToFileWithRate(msgsPerSecond,msgCnt); //10 ints per second
std::this_thread::sleep_for(Millis(THROTTLE_PERIOD) + Millis(1000));
assert(q1.size() == 0);
assert(!str_strm.str().empty());
validateMessages(str_strm,msgCnt,msgsPerSecond);
std::cout << "end of TEST 2\n";
//cleanup
truncateFile();
str_strm.str(std::string());
last_position=0;
}
keepAlive = false;
consumerThread.join();
fileTailThread.join();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment