Bounded Buffer is hanging, tried all I can. (C++)

 
Thread Tools Search this Thread
Homework and Emergencies Homework & Coursework Questions Bounded Buffer is hanging, tried all I can. (C++)
# 8  
Old 11-20-2012
The separate write and read pointers do make the array into a ring buffer with simultaneous read and write. I like to use a modulo-two size array and just mask for the low bits. You can write if write_cnt - read_cnt < size, and you can read if write_cnt > read_cnt. The writer can zero the read count and then the write count if the buffer is empty, but due to cache issues it is sometimes safer to just use long long and never zero the counters. Works for bytes, pointers, and arrays of whatever. You can even have the writer double the buffer size on full, copying the bottom half to the top half, and then adjust the mask up a bit. I have a version that recorded the last three relative maxima in usage and adjusted the size down by halves if space was underutilized. It was a structure with a long long write counter, read counter, size-1 mask, a pointer to current space (realloc()able), and stats.

Now if you have a manager thread for writing and another for reading, you can use multiple input buffers, one per writer, and have the manager consolidate them in the central container. A read manager thread can fill output buffers to multiple consumers. If service times are highly variable, output buffers should be shallow (2) so one thread does not end up lagging due to bad luck when all others are done. Excess output threads is more effective than deep buffers dedicated to one thread.

However, one global lock can work, as long as dry reads lead to sufficient sleep to allow writers access. If the readers check the depth before locking to read, even that is not a problem. Writers should check available space before locking to write. It is OK if readers underestimate how much is to be read, and it is OK for writers to underestimate how much space there is to write, for a moment until variables are updated and cache catches up. If they check before the lock, they need not sleep, but a little sleep is nice so the CPUs are shared around.

Last edited by DGPickett; 11-20-2012 at 04:28 PM..
# 9  
Old 11-21-2012
Thanks all for your help!

After much trial/error, here's my final code: (it's not the prettiest)

>_> forgot to remove that <sched.h>...

Code:
/*******************
* <removed>
* Operating Systems
* Project #3
* 11/15/2012
* Dr. Katangur
*
*******************/

#include <stdio.h>
#include <stdlib.h>
#include <iostream>
#include <cstdlib>
#include <pthread.h>
#include <semaphore.h>
#include "buffer.h"
#include <vector>
#include <time.h>
#include <math.h>
#include <sched.h>

using namespace std;

void *pConsumer(void*);
void *pProducer(void*);
void interface(char*);
void report();
void runOnce();
void getBuffer();
void printBuffer();
bool isPrime(int);

/******************************************
* STRUCTS
* 
* userinput: input
* ----------------
*  -int run: time for main to sleep.
*  -int sleep: time for threads to sleep.
*  -int produce: number of producer threads.
*  -int consume: number of consumer threads.
*
*  -bool snapshot: display buffer states.
*
*******************************************/
typedef struct
{
	int run;
	int sleep;
	int produce;
	int consume;
	bool snapshot;
}userinput;
userinput input;

struct timespec delay1, delay2, delay3, delay4;

/***********************************************************
* VECTORS 
*
*  -int indivConsumeCount: elements produced by each thread.
*  -int indivProduceCount: elements consumed by each thread. 
*
************************************************************/ 
vector<int> indivConsumeCount;
vector<int> indivProduceCount;

/*********************************************************
* GLOBALS
*
*  -BUFFER_SIZE: defined max size of buffer.
*  
*  -buffer_item: array used as the buffer.
*  -int produceCount: total number of elements produced.
*  -int consumeCount: total number of elements consumed.
*  -int bufferEmpty: times the buffer was empty.
*  -int bufferFull: times the buffer was full.
*  -int bufferCount: elements currently in the buffer.
*  -int bufferCount2: elements currently in the buffer.
*  -int tempConsume: a temporary value passed from threads.
*  -int tempProduce: a temporary value passed from threads.
*  -int producePos: position producer is in the buffer.
*  -int consumePos: position consumer is in the buffer.
*  -int currentWrite: set position of W in string array.
*  -int currentRead: set position of R in string array.
*
*  -string readWrite[]: used for RW output.
*  
*  -bool flag: will tell all threads to end whiles.
*
**********************************************************/ 
buffer_item buffer[BUFFER_SIZE];
int produceCount = 0;
int consumeCount = 0;
int bufferEmpty = 0;
int bufferFull = 0;
int bufferCount = 0;
int bufferCount2 = 0;
int tempConsume = 0;
int tempProduce = 0;
int producePos = 0;
int consumePos = 0;
int currentWrite = 11;
int currentRead = 12;
int tempVal = 0;
string readWrite[35];
bool flag = true;

/***********************************************************************
* SEMAPHORES
*
*  -sem_t mutex: control multiple producer or consumer access to buffer. 
*  -sem_t canRead: control both producer and consumer access to buffer.
*  -sem_t canWrite: control both producer and consumer access to buffer.
*
************************************************************************/
pthread_mutex_t mutex;
sem_t canWrite;
sem_t canRead;

/*******************************************************
* VOID GETCOMMANDS ( ) 
* 
* description 
* -----------
*  -This function will take parameters from command line 
*   and write them to the input struct for global use.
*
********************************************************/ 
void getCommands(char *argv[]) 
{
	/*********
	* run time
	**********/
	input.run = atoi(argv[1]);
	
	/***********
	* sleep time
	************/        
	input.sleep = atoi(argv[2]);
		
	/*****************
	* num of producers
	******************/	
	if( atoi(argv[3]) < 1 )
	{
		input.produce = 1;
	} 
	else
	{
		input.produce = atoi(argv[3]);
        }
	
	/*****************
	* num of consumers
	******************/	
	if( atoi(argv[4]) < 1 )
        {
                input.consume = 1;  
        }
        else
        {
                input.consume = atoi(argv[4]);
        }
	
	/************************
	* 'yes' or 'no' snapshots
	*************************/        
	if( strcmp(argv[5], "yes") == 0 )
        {
                input.snapshot = true;
        }
        
        else if( strcmp(argv[5], "no") == 0 )
        {
                input.snapshot = false;
        }
	else
	{
		printf("error: require a 'yes' or 'no' for snapshot parameter\n");
		exit(0);	
	}
}

/***********************************************
* VOID REPORT ( )
*
* description
* -----------
*  -This function will output the final 
*   results of the producer/consumer simulation.
*************************************************/
void report()
{	
	getBuffer();

	cout << "\nPRODUCER / CONSUMER SIMULATION COMPLETE" << endl;
	cout << "=======================================" << endl;

	printf("Simulation Time:                        %d\n", input.run);
	printf("Maximum Thread Sleep Time:              %d\n", input.sleep);
	printf("Number of Producer Threads:             %d\n", input.produce);
	printf("Number of Consumer Threads:             %d\n", input.consume);
	printf("Size of Buffer:                         %d\n", BUFFER_SIZE);  	

	printf("\nTotal Number of Items Produced:         %d\n", produceCount);
	
	for(int i=0; i < input.produce; i++)
        {
                printf("   Thread %d:                            %d\n", i, indivProduceCount[i]); 
        }

	printf("\nTotal number of Items Consumed:         %d\n", consumeCount);
	
	for(int j=0; j < input.consume; j++)
	{
		printf("   Thread %d:                            %d\n", j, indivConsumeCount[j]);              
        }
	
	printf("\nNumber of Items Remaining in Buffer:    %d\n", bufferCount);
	printf("Number of Times Buffer Was Full:        %d\n", bufferFull);
	printf("Number of Times Buffer Was Empty:       %d\n", bufferEmpty); 
}

/**********************************************
* VOID RUNONCE ( )
*
* description
* -----------
*  -This function will display start of threads 
*   and call to display current state of buffer 
*   if snapshots are enabled.
**
***********************************************/
void runOnce()
{
                /**************
                * snapshot info
                ***************/
                if( input.snapshot == 1 )
                {
                        printf("\nStarting Threads...\n");

                        printBuffer();			
                }
}

/*********************************************
* PRINTBUFFER ( )
* 
* description
* -----------
*  -This function will get and output current 
*   state of buffer if snapshots are enabled.
*  
**********************************************/
void printBuffer()
{
	printf("    (buffers occupied: %2d)\n", bufferCount);
                        
        printf("buffers:   %2d   %2d   %2d   %2d   %2d\n",
                buffer[0], buffer[1], buffer[2], buffer[3], buffer[4]);
        printf("          ---- ---- ---- ---- ----\n");
	
        /*************************
        * RW string array function
        **************************/
        readWrite[currentWrite] = "W";
        readWrite[currentRead] = "R";
        
        for(int k=0; k < 33; k++)
        {
        	cout << readWrite[k];
        }
        cout << endl;
                
        readWrite[currentWrite] = " ";
        readWrite[currentRead] = " ";
}

/************************************************
* VOID GETBUFFER( )
* 
* description
* -----------
*  -This function will read through the buffer, 
*   counting the current elements inside, along 
*   with how many times it becomes full or empty.
*
*************************************************/
void getBuffer()
{
	bufferCount2 = 0;
        bufferCount = 0; 

	/*****************************
	* get count of buffer elements
        ******************************/

	for(int j=0; j < BUFFER_SIZE; j++)
        {
       		if( buffer[j] != -1 )
       		{
             		bufferCount++;
              	}

		if( buffer[j] == -1 )
		{
			bufferCount2++;
		}
       	}

	/***************************
	* how many times buffer full
	****************************/	
	if( bufferCount == BUFFER_SIZE )
	{
		bufferFull++;
	}

	/****************************  
        * how many times buffer empty
        *****************************/
        if( bufferCount2 == BUFFER_SIZE )
       	{
        	bufferEmpty++;
        }
}

/**********************************************
* BOOL ISPRIME ( )
*
* description
* -----------
*  -This function will determine if the value
*   being consumed in the buffer is a prime.
*
* variable
* --------
*  -int input: is passed the buffer value 
*              the consumer is reading.
*
***********************************************/
bool IsPrime(int input)
{
        if(input == 2) return true;
        if(input == 1 || input % 2 == 0) return false;

        for(int n = 3; n < sqrt(input); n += 2)
        {
                if(input % n == 0 && n != input) return false;
        }
        return true;
}

/******************************************************* 
* INT MAIN ( ) 
* 
* variables 
* --------- 
* -int placeholder: is placeholder for thread parameter. 
* 
********************************************************/ 
int main(int argc, char *argv[]) 
{
	int placeholder = 0;
	
	/***********************
	* get program parameters
	************************/
	getCommands(argv);
	
	/******************
	* initialize buffer
	*******************/	
	for(int i=0;i<5;i++)
	{
		buffer[i] = -1;
	}
	
	/********************
	* initialize RW array
	*********************/	
	for(int i=0;i<33;i++)
	{
		readWrite[i] = " ";
	}
	
	/**********************
	* initialize semaphores
	***********************/
	pthread_mutex_init(&mutex, NULL);
	sem_init( &canWrite, 0, BUFFER_SIZE );	
	sem_init( &canRead, 0, 0 );
	
	/***************
	* set thread IDs
	****************/
	pthread_t produce[input.produce];
	pthread_t consume[input.consume];
	
	/*********************************
	* implicitly make threads joinable
	**********************************/
	pthread_attr_t attr;
  	pthread_attr_init(&attr);
  	pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
	
	/*******************
	* print first buffer
	********************/
	runOnce();
		
	/*****************************
	* create producers & consumers
	******************************/		
	for(int j = 0; j < input.produce; j++)
	{
		pthread_create(&produce[j], &attr, pProducer, &placeholder);
	}	
	
	for(int k = 0; k < input.consume; k++)
	{
                pthread_create(&consume[k], &attr, pConsumer, &placeholder);
	}	

	/***********
	* sleep main
	************/
	delay1.tv_sec = input.run;
	nanosleep( &delay1, NULL );

	/*************
	* stop threads
	**************/
	flag = false;

	/***************************
	* join producers & consumers
	****************************/		
	for(int m = 0; m < input.produce; m++)
	{
		pthread_join(produce[m], (void**) &tempProduce);
	
		indivProduceCount.push_back( tempProduce );
	}		
		
	for(int n = 0; n < input.consume; n++)
	{
		pthread_join(consume[n], (void**) &tempConsume);

                indivConsumeCount.push_back( tempConsume );
	}
	
	/*****************
	* print statistics
	******************/	
	report();
	
	return 0;
}

/******************************************************* 
* VOID pPRODUCER ( ) 
* 
* description 
* ----------- 
* -This thread will write a random element to the 
* buffer, increment RW output, display if buffer full, 
* and then sleep for a random time (per thread). 
*
* variables
* ---------
*  -int thisProduceCount: will count how many elements 
*                         each thread produces.
*  -int random: will generate a random value for buffer.
*
********************************************************/ 
void *pProducer(void* placeholder) 
{
	int thisProduceCount = 0;
	int random;

	while(flag)
	{
 		/**************
                * time to sleep
                ***************/
		delay2.tv_sec = rand() % input.sleep + 1;
		nanosleep( &delay2, NULL );

		/*********************************
		* generate random value for buffer 
		*********************************/
		random = rand() % 100 + 1;

		/*****************
		* get buffer state
		******************/
		getBuffer();						
		
		if( bufferCount < BUFFER_SIZE )
		{	
			/******
                	* locks
                	*******/
			sem_wait(&canWrite);
             		pthread_mutex_lock(&mutex);					

			/***************
			* reset counters
			****************/
			producePos = producePos % 5;

			/*******************
			* reset RW increment
			********************/	
			if(currentWrite == 31)   
        		{       
        			currentWrite = 6;
        		}		

			currentWrite = currentWrite + 5;		

			/****************
			* produce element
			*****************/	
			buffer[producePos] = random;

			/*****************
			* get buffer state
			******************/
			getBuffer();				
				
			/************** 
                	* snapshot info
                	***************/
			if( input.snapshot == 1 )
			{	
		               	printf("\nProducer %d writes %d\n", pthread_self() * -1, random);		

				printBuffer();
			}				

			/************************** 
                	* increment buffer position
                	***************************/
			producePos++;
 		
			/*************************************** 
                	* how many elements this thread produced
                	****************************************/
			thisProduceCount++;
		
			/********************************* 
                	* how many elements produced total
                	**********************************/
			produceCount++;

			/******* 
        	        * unlock
                	********/
			pthread_mutex_unlock(&mutex);
			sem_post(&canRead);		
		}//if
		
		else
		{
			if( input.snapshot == 1 )
			{
				printf("\nAll buffers full. Producer %d waits.\n", pthread_self()*-1);
			}
		}

	}//while
	
	pthread_exit( (void*)thisProduceCount );
}

/******************************************************* 
* VOID pCONSUMER ( )
*
* description
* -----------
* -This thread will remove an element from the
* buffer, increment RW output, display if buffer full,
* and then sleep for a random time (per thread).
*               
* variables
* ---------
*  -int thisConsumeCount: will count how many elements
*                         each thread consumes.
*
********************************************************/
void *pConsumer(void* placeholder)
{	
	int thisConsumeCount = 0;
	int test = 0;

	while(flag)
	{	
 		/**************
                * time to sleep
                ***************/
		delay3.tv_sec = rand() % input.sleep + 1;
		nanosleep( &delay3, NULL );	
		
		if( bufferCount2 < BUFFER_SIZE && bufferCount2 > 0 || bufferCount == BUFFER_SIZE )
		{ 	
			/******  
        		* locks
			*******/
			sem_wait(&canRead);
			pthread_mutex_lock(&mutex);	
			
			/***************
               		* reset counters
                	****************/
			consumePos = consumePos % 5;

			/*******************
        		* reset RW increment
        		********************/		
			if(currentRead == 32)
       			{       
        			currentRead = 7;
        		}					

			currentRead = currentRead + 5;

			/************** 
                	* snapshot info
                	***************/
			if( input.snapshot == 1 )   
                	{	
				/*********************************************************
				* will output prime if the current buffer element is prime
				**********************************************************/
 				bufferCount--;
				if( IsPrime( buffer[consumePos] ))
                               	{
                                       	printf("\nConsumer %d reads %2d  * * * PRIME * * *\n",
                                       	pthread_self() * -1, buffer[consumePos]);
					printBuffer();
                               	}
  
	                      	else
                               	{   
                                       	printf("\nConsumer %d reads %2d\n", pthread_self() * -1, buffer[consumePos]);
					printBuffer();
                               	}
                       	}

			/****************
			* consume element
			*****************/
			buffer[consumePos] = -1;			

			/************************** 
                	* increment buffer position
                	***************************/
			consumePos++;

			/***************************************
                	* how many elements this thread consumed
                	****************************************/
                	thisConsumeCount++;
                
                	/*********************************
                	* how many elements consumed total
                	**********************************/
               	 	consumeCount++;

			/******* 
        	        * unlock
                	********/	
			pthread_mutex_unlock(&mutex);
			sem_post(&canWrite);
			
		}//if

		else
		{		
			if( input.snapshot == 1 )
			{
                		printf("\nAll buffers empty. Consumer %d waits.\n", pthread_self()*-1);
			}
		}
		
		/****************************************
		* workaround; ensures no hangs in the 
		* case of more consumers than producers. 
		*
		* will remove the thread if it wasn't 
		* lucky enough to enter the if-statement.
		*
		* -likely cause is canWrite's post is 
		*  too small for canRead's wait, ergo.		 
		*****************************************/		
		if( input.consume > input.produce )
		{
			pthread_exit( (void*)thisConsumeCount );   
		}

	}//while
	
	pthread_exit( (void*)thisConsumeCount );
}


Last edited by digitalbrainiac; 11-21-2012 at 12:41 AM..
# 10  
Old 11-21-2012
Does it work?
# 11  
Old 11-21-2012
yes it does. give it a try.

the command line arguements are:

<run duration> <delay per thread> <num of producers> < num of consumer > <yes/no>

yes/no is snapshots of buffer.
# 12  
Old 11-21-2012
Great! Welcome to the land of parallel processing, concurrent threads and locks!

In one of my past projects, I was cooking up a container object that took in data from very many concurrent threads, fast, by having many input containers so they could just bypass locked ones, and internal threads to merge the data, order not guaranteed, and feed it to multiple output containers as evenly as possible but avoiding locks, so it would be as delay-free a container as possible. The containers were single linked lists, so they could easily be emptied and merged. One can imagine this used with a multi-thread sorting container to sort data as it arrived such that, when the last item arrived, the sorted output could commence immediately.
This User Gave Thanks to DGPickett For This Post:
# 13  
Old 11-21-2012
That seems a bit beyond me DGPickett, lol. But I do see how multi-threading would make searching multiple things a lot faster.

I'm planning on using multi-threads for my upcoming Digital Forensic's assignment. Need to write a program to crack a password entered in (no encryption).

What I'm going to do is have one thread per character of password entered, go through the ascii table on each, writing 8 characters to a string and compare that to the string entered.
Login or Register to Ask a Question

Previous Thread | Next Thread

10 More Discussions You Might Find Interesting

1. Shell Programming and Scripting

XML text bounded with tag

Could you please give your inputs on the below issue: source.xml <?xml version="1.0" encoding="UTF-16"?> <P1 > <C1 type="i"><2></C1> <V1 type="string"><6.2></V1> <D1 type="string"> <D2><1.0></D2> <D2><2.0></D2> </D1> ...................... ...................... many more... (7 Replies)
Discussion started by: unme
7 Replies

2. Shell Programming and Scripting

Sftp hanging

Hi All, I am transfering a file through sftp. But the script is hanging at exit occasionally. I am suspecting that sftp is hanging due to buffer size issue. If that is the case can any body suggest a solution. Please find the code. echo "cd /${CUST_ID}/inbound/${SAFET_ID}" >... (0 Replies)
Discussion started by: Girish19
0 Replies

3. Shell Programming and Scripting

sed hanging

/bin/sed -n '$q;5633653,0 p' lfile lfile is a log file that is being updated several times a second. so, the command above figures out the line number it wants in the lfile and then proceeds to output all lines from that line number to the end of the file. the problem is, the end of the... (2 Replies)
Discussion started by: SkySmart
2 Replies

4. Shell Programming and Scripting

nslookup hanging

Hey folks. Long time lurker, first time poster. I'm a bit of a newbie at "coding" (obviously, scripting is a teensy bit different than coding) and I've run into a problem that I just can't seem to get around. I'm going through a list of servers to check their name, IP, reverse-NSLOOKUP name and... (2 Replies)
Discussion started by: Bearwhale
2 Replies

5. Solaris

df command hanging

Hi Folks, When i execute the command df -kh in my system the o/p hangs.. The command runs fine but takes a lot of time before coming back to the # prompt. Can anyone please suggest the possible cause and solution?. (10 Replies)
Discussion started by: vivek.goel.piet
10 Replies

6. Programming

bounded buffer implementation

Hi Experts, I have a programming assignment that asks us to implement a pipegrep function. it basically has 5 stages and each stage has a thread and buffers are used between stages. am currently implementing stage 1 . In stage 1 am suppose to read directory and store the filenames in buffer1... (15 Replies)
Discussion started by: amejoish
15 Replies

7. Shell Programming and Scripting

Script is hanging

Hello, I have the following shell script and when i execute, it keeps hanging and nothing happens Please let me know. Requirement is to read data from file and pass it to the sql and create files as shown. code /******** #!/bin/sh while read user.dat do echo "user = $1 email =... (1 Reply)
Discussion started by: rakeshsr12
1 Replies

8. UNIX for Dummies Questions & Answers

Hanging port?

Ok, this question my be different. I can ping our unix box, but when we I to access the webpage I cant. To access the webpage I type http://ipaddress:some port. How do I check if a port is hanging and how would I un hang it. Sorry if question doesnt make sense. (1 Reply)
Discussion started by: NycUnxer
1 Replies

9. Solaris

hanging sockets!??!

hi all, i had a program which created a socket on port 7113, but for some reason the program was hunbg and I had to Ctrl+c it. I ran the program for a couple of times and now when I do a netstat -a I see that these sockets are lingering and that might be the reason why my program is not... (1 Reply)
Discussion started by: Naanu
1 Replies

10. UNIX for Dummies Questions & Answers

Hanging commands

Hello, I just tried to run this command: /usr/lib/sendmail -d0.1 -bt < /dev/null | grep -i version Its doing what I want: writing out the sendmail version. But on some machines it writing the version and then exit to the prompt but on others its writing the version but then hangs, I need to... (0 Replies)
Discussion started by: jOOc
0 Replies
Login or Register to Ask a Question