Skip to content

Instantly share code, notes, and snippets.

@carun
Created November 20, 2012 12:24
Show Gist options
  • Save carun/4117631 to your computer and use it in GitHub Desktop.
Save carun/4117631 to your computer and use it in GitHub Desktop.
Multithreaded application to process files in a directory
#include <errno.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <dirent.h>
#include <pthread.h>
#define NAME_MAX (256)
/* #define MAX_THREADS (99) */
#define SLEEP_DUR (1)
struct thread_data {
char file_name[NAME_MAX+1];
};
static int num_threads;
static pthread_cond_t cond;
static pthread_mutex_t count_mutex;
int select_filter( struct dirent *entry )
{
/* Filesystem differences with EXT4 and JFS2 - Unable to use d_type member
* to compare DT_REG to check for the filetype */
if (!strcmp(entry->d_name, ".") || !strcmp(entry->d_name, ".."))
return 0;
else
return 1;
}
void *process_file( void *arg )
{
struct thread_data *data_t = (struct thread_data *)arg;
unsigned int i, j;
pthread_t p = pthread_self();
/* Do some processing with data_t */
fprintf( stdout, "%ld: Processing file %s\n", (unsigned long int)p, data_t->file_name );
unlink( data_t->file_name );
/* When done processing, free the resources */
free( data_t );
for (i=0; i< 9999; i++)
for (j=0; j< 9999; j++);
/* fprintf( stdout, "%p: Locking Mutex\n", (void *)p); */
pthread_mutex_lock( &count_mutex );
/* Critical section */
num_threads --;
/* fprintf( stdout, "%p: Signaling\n", (void *)p); */
pthread_cond_signal( &cond );
/* fprintf( stdout, "%p: Unlocking Mutex\n", (void *)p); */
fprintf( stdout, "%d: Exiting. num_threads: %d\n", p, num_threads);
pthread_mutex_unlock( &count_mutex );
pthread_exit( NULL );
return NULL;
}
int main(int argc, const char *argv[])
{
struct dirent **file_list = NULL;
short int no_of_files = 0;
char *dir_name = "/u/home/raddanki/Arun/Threads/data";
short int idx = 0;
struct thread_data *data_t = NULL;
int MAX_THREADS = 0;
pthread_t tid = -1;
pthread_attr_t attr;
/* Get the arguments */
if (argc < 2) {
printf( "Usage: %s <no-of-threads>\n", argv[0] );
exit(0);
}
MAX_THREADS = atoi( argv[1] );
if (MAX_THREADS < 1 && MAX_THREADS > 99) {
printf( "Max threads should be between 1 and 99. Provided: %d\n", MAX_THREADS );
exit(0);
}
/* Initialize thread attributes */
pthread_cond_init( &cond, NULL );
pthread_mutex_init( &count_mutex, NULL );
pthread_attr_init( &attr );
pthread_attr_setdetachstate( &attr, PTHREAD_CREATE_DETACHED );
while (1) {
while (no_of_files == 0) {
/* Open the dir for reading */
pthread_mutex_lock( &count_mutex );
if (num_threads == 0) {
if (file_list)
free( file_list );
if ((no_of_files = scandir( dir_name, &file_list, select_filter, alphasort )) == -1) {
fprintf( stdout, "Failed to open dir: %s\n", dir_name );
/* need to wait for threads to complete and then exit */
exit (1);
}
}
pthread_mutex_unlock( &count_mutex );
idx = no_of_files - 1;
if (!no_of_files) {
#ifdef DEBUG
if (num_threads == 0) {
free( file_list );
pthread_attr_destroy( &attr );
pthread_mutex_destroy( &count_mutex );
pthread_cond_destroy( &cond );
pthread_exit(0);
}
#endif
fprintf( stdout, "Going to sleep now\n" );
sleep(SLEEP_DUR);
}
}
/* fprintf( stdout, "MAIN: Locking Mutex\n" ); */
/* while (1) { */
pthread_mutex_lock( &count_mutex );
if (num_threads == MAX_THREADS) {
/* Let the threads finish */
/* fprintf( stdout, "MAIN: Waiting on condition\n" ); */
pthread_cond_wait( &cond, &count_mutex );
fprintf( stdout, "MAIN: Waking up from condition\n" );
/* fprintf( stdout, "MAIN: Unlocking Mutex\n" ); */
}
pthread_mutex_unlock( &count_mutex );
/* else */
/* break; */
/* } */
/* Be sure about the boundaries in File[] */
if (num_threads > MAX_THREADS) {
fprintf( stdout, "Prevented severe memory error %d\n", num_threads );
exit(2);
}
if (((data_t = calloc( 1, sizeof(struct thread_data) )) == NULL)) {
fprintf( stdout, "Failed to allocate memory\n" );
exit(1);
}
snprintf( data_t->file_name, sizeof(data_t->file_name), "%s/%s", dir_name, file_list[idx]->d_name );
free(file_list[idx--]);
no_of_files--;
fprintf( stdout, "MAIN: No of files: %d\n", no_of_files );
/* Manage the thread stuff */
if (pthread_create( &tid, &attr, process_file, (void *)data_t ) != 0) {
fprintf( stdout, "Failed to allocate memory for thread\n" );
exit(1);
}
fprintf( stdout, "Created thread: %d\nMAIN: Locking Mutex: %d\n", tid, num_threads );
pthread_mutex_lock( &count_mutex );
num_threads++;
/* fprintf( stdout, "MAIN: Unlocking Mutex 2 - %d\n", num_threads ); */
pthread_mutex_unlock( &count_mutex );
}
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment