Bounded Buffer is hanging, tried all I can. (C++)

 
Thread Tools Search this Thread
Homework and Emergencies Homework & Coursework Questions Bounded Buffer is hanging, tried all I can. (C++)
# 1  
Old 11-16-2012
Bounded Buffer is hanging, tried all I can. (C++)

Use and complete the template provided. The entire template must be completed. If you don't, your post may be deleted!

1. The problem statement, all variables and given/known data:
My problem is that when creating my producers and consumers, if I don't create an equal number of both, the program will hang once it reaches the pthread_join() for-loops in my main. (i.e. more produce than consume, more consume than produce)

Assignment requirements:
-Use a buffer of size 5
-Create any number of producers and consumers
-Display the current state of the buffer per thread
a) including the current read/write position of the thread
b) if the value consumed is prime
-A final report output of the entire simulation.

2. Relevant commands, code, scripts, algorithms:
-sem_init( &mutex, 0, 1 );
-sem_init( &empty, 0, BUFFER_SIZE );
-sem_init( &full, 0, 0 );

3. The attempts at a solution (include all code and scripts):
I have tried moving my locks around in each thread, not using joins, not using my vectors, joining in different orders, checking for segfaults in the loops.

As well as tons of cout statements.

Code:
            
/*******************
* <removed>
* Operating Systems
* Project #3
* 11/15/2012
* Dr. Katangur
*
*******************/

#include <stdio.h>
#include <stdlib.h>
#include <iostream>
#include <cstdlib>
#include <pthread.h>
#include <semaphore.h>
#include "buffer.h"
#include <vector>
#include <time.h>
#include <math.h>

using namespace std;

void *pConsumer(void*);
void *pProducer(void*);
void interface(char*);
void report();
void runOnce();
void getBuffer();
void printBuffer();
bool isPrime(int);

/******************************************
* STRUCTS
*
* userinput: input
* ----------------
*  -int run: time for main to sleep.
*  -int sleep: time for threads to sleep.
*  -int produce: number of producer threads.
*  -int consume: number of consumer threads.
*  
*  -bool snapshot: display buffer states.
*
*******************************************/
typedef struct
{
        int run;
        int sleep;
        int produce;
        int consume;
        bool snapshot;
}userinput;
userinput input;

struct timespec delay;

/***********************************************************
* VECTORS
*  
*  -int indivConsumeCount: elements produced by each thread.
*  -int indivProduceCount: elements consumed by each thread.
*  
************************************************************/
vector<int> indivConsumeCount;
vector<int> indivProduceCount;
 
/************************************************************
* GLOBALS
*
*  -BUFFER_SIZE: defined max size of buffer.
*
*  -int buffer[]: array used as the buffer.
*  -int produceCount: total number of elements produced.
*  -int consumeCount: total number of elements consumed.
*  -int bufferEmpty: times the buffer was empty.
*  -int bufferFull: times the buffer was full.
*  -int bufferCount: elements currently in the buffer.
*  -int bufferCount2: empty elements currently in the buffer.
*  -int tempConsume: a temporary value passed from threads.
*  -int tempProduce: a temporary value passed from threads. 
*  -int producePos: position producer is in the buffer.
*  -int consumePos: position consumer is in the buffer.
*  -int currentWrite: set position of W in string array.
*  -int currentRead: set position of R in string array.
*
*  -string readWrite[]: used for RW output.
*
*  -bool flag: will tell all threads to end do-whiles.
*
*************************************************************/
int buffer[BUFFER_SIZE];
int produceCount = 0;
int consumeCount = 0;
int bufferEmpty = 0;
int bufferFull = 0;
int bufferCount = 0;
int bufferCount2 = 0;
int tempConsume = 0;
int tempProduce = 0;
int producePos = 0;
int consumePos = 0;
int currentWrite = 11;
int currentRead = 12;
int tempVal = 0;
string readWrite[35];
bool flag = true;

/***********************************************************************
* SEMAPHORES
*
*  -sem_t mutex: control multiple producer or consumer access to buffer.
*  -sem_t full: control both producer and consumer access to buffer.
*  -sem_t empty: control both producer and consumer access to buffer.
*
************************************************************************/
sem_t mutex;
sem_t full;
sem_t empty;

/*******************************************************
* VOID GETCOMMANDS ( )
*
* description
* -----------   
*  -This function will take parameters from command line
*   and write them to the input struct for global use.
*
********************************************************/
void getCommands(char *argv[])
{
        /*********
        * run time
        **********/
        input.run = atoi(argv[1]);

        /***********
        * sleep time
        ************/
        input.sleep = atoi(argv[2]);

        /*****************
        * num of producers
        ******************/
        if( atoi(argv[3]) < 1 )
        {
                input.produce = 1;
        }
        else
        {
                input.produce = atoi(argv[3]);
        }
        
        /*****************
        * num of consumers
        ******************/
        if( atoi(argv[4]) < 1 )
        {   
                input.consume = 1;
        }
        else
        {
                input.consume = atoi(argv[4]);
        }

        /************************
        * 'yes' or 'no' snapshots 
        *************************/
        if( strcmp(argv[5], "yes") == 0 )
        {
                input.snapshot = true;
        }
        
        else if( strcmp(argv[5], "no") == 0 )
        {
                input.snapshot = false;
        }
        else
        {
                printf("error: require a 'yes' or 'no' for snapshot parameter\n");
                exit(0);
        }
}

/***********************************************
* VOID REPORT ( )
*
* description
* -----------
*  -This function will output the final
*   results of the producer/consumer simulation.
*
************************************************/
void report()
{
        cout << "\nPRODUCER / CONSUMER SIMULATION COMPLETE" << endl;
        cout << "=======================================" << endl;
        
        printf("Simulation Time:                        %d\n", input.run);
        printf("Maximum Thread Sleep Time:              %d\n", input.sleep);
        printf("Number of Producer Threads:             %d\n", input.produce);
        printf("Number of Consumer Threads:             %d\n", input.consume);
        printf("Size of Buffer:                         %d\n", BUFFER_SIZE);
         
        printf("\nTotal Number of Items Produced:         %d\n", produceCount);

        for(int i=0; i < input.produce; i++)
        {
                printf("   Thread %d:                            %d\n", i, indivProduceCount[i]);
        }

        printf("\nTotal number of Items Consumed:         %d\n", consumeCount);

for(int j=0; j < input.consume; j++)
        {
                printf("   Thread %d:                            %d\n", j, indivConsumeCount[j]);
        }
        
        printf("\nNumber of Items Remaining in Buffer:    %d\n", bufferCount);
        printf("Number of Times Buffer Was Full:        %d\n", bufferFull); 
        printf("Number of Times Buffer Was Empty:       %d\n", bufferEmpty);  
}

/*******************************************************
* VOID RUNONCE ( )
*
* description
* -----------
*  -This function will display start of threads
*   and call to display current state of buffer
*   if snapshots are enabled.
*
********************************************************/
void runOnce()
{
                /**************
                * snapshot info
                ***************/
                if( input.snapshot == 1 )
                {
                        printf("\nStarting Threads...\n");
 
                        printBuffer();
                }
}

/*********************************************
* PRINTBUFFER ( )
*  
* description
* -----------
*  -This function will get and output current
*   state of buffer if snapshots are enabled.
*
**********************************************/
void printBuffer()
{
        printf("    buffers occupied: %2d\n", bufferCount);

        printf("buffers:   %2d   %2d   %2d   %2d   %2d\n",
                buffer[0], buffer[1], buffer[2], buffer[3], buffer[4]);
        printf("          ---- ---- ---- ---- ----\n");

        /*************************
        * RW string array function
        **************************/
        readWrite[currentWrite] = "W";
        readWrite[currentRead] = "R";

        for(int k=0; k < 33; k++)
        {
                cout << readWrite[k]; 
        }
        cout << endl;

        readWrite[currentWrite] = " ";
        readWrite[currentRead] = " ";
}

/************************************************
* VOID GETBUFFER( )
*
* description           
* -----------
*  -This function will read through the buffer,
*   counting the current elements inside, along
*   with how many times it becomes full or empty.
*
*************************************************/
void getBuffer()
{
        /*****************************
        * get count of buffer elements
        ******************************/
        for(int j=0; j < BUFFER_SIZE; j++)
        {
                if( buffer[j] != -1 )
                {
                        bufferCount++;
                }

                if( buffer[j] == -1 )
                {
                        bufferCount2++;
                }
        }

        /***************************
        * how many times buffer full
        ****************************/
        if( bufferCount == BUFFER_SIZE )
        {
                bufferFull++;
        }
        
        /****************************
        * how many times buffer empty
        *****************************/
        if( bufferCount2 == BUFFER_SIZE )
        {
                bufferEmpty++;
        }
}

/**********************************************
* BOOL ISPRIME ( )
*
* description
* -----------
*  -This function will determine if the value
*   being consumed in the buffer is a prime.
* 
* variable
* -------- 
*  -int input: is passed the buffer value
*              the consumer is reading.
*
***********************************************/
bool IsPrime(int input)
{
        if(input == 2) return true;
        if(input == 1 || input % 2 == 0) return false;
        
        for(int n = 3; n < sqrt(input); n += 2)
        {
                if(input % n == 0 && n != input) return false;
        }
        
        return true;
}

/*******************************************************
* INT MAIN ( )
*
* variables
* ---------
*  -int placeholder: is placeholder for thread parameter.
*
********************************************************/
int main(int argc, char *argv[])
{
        int placeholder = 0;
         
        /***********************
        * get program parameters
        ************************/
        getCommands(argv);

        /******************
        * initialize buffer
        *******************/
        for(int i=0;i<5;i++)
        {
                buffer[i] = -1;
        }
        
        /********************
        * initialize RW array
        *********************/
        for(int i=0;i<33;i++)
        {
                readWrite[i] = " ";
        }
        
        /********************** 
        * initialize semaphores  
        ***********************/
        sem_init( &mutex, 0, 1 );
        sem_init( &empty, 0, BUFFER_SIZE );
        sem_init( &full, 0, 0 );
        
        pthread_t produce[input.produce];
        pthread_t consume[input.consume];
                
        /*******************
        * print first buffer
        ********************/
        runOnce();
        
        /*****************************
        * create producers & consumers
        ******************************/
        for(int j = 0; j < input.produce; j++)
        {
                pthread_create(&produce[j], NULL, pProducer, &placeholder);
        }
        
        for(int k = 0; k < input.consume; k++)
        {
                pthread_create(&consume[k], NULL, pConsumer, &placeholder);
        }

        /***********
        * sleep main
        ************/
        sleep( input.run ); 
        
        /*************
        * stop threads
        **************/
        flag = false;
        
        /***************************
        * join producers & consumers
        ****************************/
        for(int m = 0; m < input.produce; m++)
        {
                pthread_join(produce[m], (void**) &tempProduce);
                indivProduceCount.push_back( tempProduce );
        }

        for(int n = 0; n < input.consume; n++)
        {
                pthread_join(consume[n], (void**) &tempConsume);
                indivConsumeCount.push_back( tempConsume );
        }
        
        /*****************
        * print statistics
        ******************/
        report();
        
        return 0;
}

/*******************************************************
* VOID pPRODUCER ( )
*
* description
* -----------
* -This thread will write a random element to the
* buffer, increment RW output, display if buffer full,
* and then sleep for a random time (per thread).
*
* variables
* ---------
*  -int thisProduceCount: will count how many elements
*                         each thread produces.
*  -int random: will generate a random value for buffer.
*  -int random2: will generate a random value to sleep.
*
********************************************************/
void *pProducer(void* placeholder)
{
        int thisProduceCount = 0;
        int random;
        int random2;
 
        do   
        {
                /*****************************************
                * generate random value for buffer & sleep
                ******************************************/
                random = rand() % 100 + 1;
                random2 = rand() % (input.sleep + 1);

                /**************
                * time to sleep
                ***************/
                delay.tv_sec = random2;
                nanosleep( &delay, NULL );

                /******
                * locks
                *******/
                sem_wait(&empty);
                sem_wait(&mutex);
 
                /***************
                * reset counters
                ****************/
                bufferCount = 0;
                bufferCount2 = 0;
                producePos = producePos % 5;
                
                /****************
                * produce element
                *****************/
                buffer[producePos] = random;
                
                getBuffer();

                /**************
                * snapshot info
                ***************/
                if( input.snapshot == 1 )
                {
                        if( bufferCount != BUFFER_SIZE )
                        {
                                printf("\nProducer %d writes %d\n", pthread_self() * -1, random);
                                printBuffer();
                        }
                
                
                        if( bufferCount == BUFFER_SIZE )
                        {
                                printf("\nAll buffers full. Producer %d waits.\n", pthread_self()*-1);
                        }
                }
                
                /*******************
                * reset RW increment
                ********************/
                if(currentWrite == 31)
                {
                        currentWrite = 11;
                }
                
                /**************************
                * increment buffer position
                ***************************/
                producePos++;
                currentWrite = currentWrite + 5;
                
                /***************************************
                * how many elements this thread produced
                ****************************************/
                thisProduceCount++;

                /*********************************
                * how many elements produced total
                **********************************/   
                produceCount++;
                
                /*******
                * unlock
                ********/
                sem_post(&mutex);
                sem_post(&full);

        } while( flag );
                
        pthread_exit( (void*)thisProduceCount );
}               
                
/*******************************************************
* VOID pCONSUMER ( )
*
* description
* -----------
* -This thread will remove an element from the
* buffer, increment RW output, display if buffer full,
* and then sleep for a random time (per thread).
*               
* variables
* ---------
*  -int thisConsumeCount: will count how many elements
*                         each thread consumes.
*  -int random: will generate a random value to sleep.
*               
********************************************************/
void *pConsumer(void* placeholder)
{
        int thisConsumeCount = 0;
        int random;

        do {
                /********************************
                * generate random value for sleep
                *********************************/
                random = rand() % (input.sleep + 1);

                /**************
                * time to sleep
                ***************/
                delay.tv_sec = random;
                nanosleep( &delay, NULL );

                /******
                * locks
                *******/
                sem_wait(&full);  
                sem_wait(&mutex);
        
                /***************
                * reset counters
                ****************/
                bufferCount = 0;
                bufferCount2 = 0;
                consumePos = consumePos % 5;
                
                getBuffer();

                /**************
                * snapshot info 
                ***************/
                if( input.snapshot == 1 ) 
                {
                        if( bufferCount != BUFFER_SIZE )
                        {
                                /*********************************************************
                                * will output prime if the current buffer element is prime
                                **********************************************************/
                                if( IsPrime(buffer[consumePos]) )
                                {
                                        printf("\nConsumer %d reads %2d  * * * PRIME * * *\n",
                                        pthread_self() * -1, buffer[consumePos]);
                                }
                                else
                                {
                                        printf("\nConsumer %d reads %2d\n", pthread_self() * -1, buffer[consumePos]);
                                }
                
                                printBuffer();
                
                        }
                
                        if( bufferCount == BUFFER_SIZE )
                        {
                                printf("\nAll buffers full. Consumer %d waits.\n", pthread_self()*-1);
                        }
                }

                /****************
                * consume element
                ****************/  
                buffer[consumePos] = -1;
                
                /*******************
                * reset RW increment
                ********************/
                if(currentRead == 32)
                {
                        currentRead = 12;
                }
                
                /**************************
                * increment buffer position
                ***************************/
                consumePos++;
                currentRead = currentRead + 5;

                /***************************************
                * how many elements this thread consumed
                ****************************************/
                thisConsumeCount++;
                
                /*********************************
                * how many elements consumed total
                **********************************/
                consumeCount++;
                
                /*******
                * unlock
                ********/
                sem_post(&mutex);
                sem_post(&empty);
                
                
        } while( flag );
                
        pthread_exit( (void*)thisConsumeCount );
}

4. Complete Name of School (University), City (State), Country, Name of Professor, and Course Number (Link to Course):

Texas A&M University Corpus Christi
Corpus Chrsti, TX, USA
Dr. Ajay Katangar
falcon.tamucc.edu/~akatangur/fa12/COSC3346/
# 2  
Old 11-16-2012
Seems like one sort of exclusive on the one container is enough. Nobody can add or remove but by that lock. Now, a container manager that serviced queues of work in and filled small queues of work into each consumer, that might make for less contention, as no party has to deal with anyone but the manager, who is on the other end of a ring buffer or some such low contention mechanism. The manager can give precedence to output over input and serve all in rotation on each side. Even with threads, ipc flavors of mechanisms are nice. You could do it in the old message queue mechanism, too. Since everyone is in one memory space, messages just have item pointers.
# 3  
Old 11-16-2012
Quote:
Originally Posted by DGPickett
Seems like one sort of exclusive on the one container is enough. Nobody can add or remove but by that lock. Now, a container manager that serviced queues of work in and filled small queues of work into each consumer, that might make for less contention, as no party has to deal with anyone but the manager, who is on the other end of a ring buffer or some such low contention mechanism. The manager can give precedence to output over input and serve all in rotation on each side. Even with threads, ipc flavors of mechanisms are nice. You could do it in the old message queue mechanism, too. Since everyone is in one memory space, messages just have item pointers.
I'm sorry but I just don't know what you're trying to say.

I'm still very new to threads and locks.
# 4  
Old 11-20-2012
So no one has any idea?
# 5  
Old 11-20-2012
If you lock the global-lock, they try and fail to get data, both will stay locked until the writer adds more -- which it can't because the global lock is stuck locked.

For single-reader and single-writer, just two semaphores should do -- one for read, one for write. Keep the read and write separate and they shouldn't stomp on each other.

Procedure for writing:
1) Wait on write-sem
2) Add data to array
3) Increment and wrap write position
4) Post on read-sem

Procedure for reading:
1) Wait on read-sem
2) Copy element from array
3) Increment and wrap read position
4) Post on write-sem

If you want these to work with multiple readers or multiple writers, you will need a global mutex around items 2 and 3, but only lock the global mutex after you've waited on the read or write sem! Otherwise you'll deadlock(reader blocking for more data, writer can't add more data because the global mutex is locked).
# 6  
Old 11-20-2012
Quote:
Originally Posted by Corona688
If you lock the global-lock, they try and fail to get data, both will stay locked until the writer adds more -- which it can't because the global lock is stuck locked.

For single-reader and single-writer, just two semaphores should do -- one for read, one for write. Keep the read and write separate and they shouldn't stomp on each other.

Procedure for writing:
1) Wait on write-sem
2) Add data to array
3) Increment and wrap write position
4) Post on read-sem

Procedure for reading:
1) Wait on read-sem
2) Copy element from array
3) Increment and wrap read position
4) Post on write-sem

If you want these to work with multiple readers or multiple writers, you will need a global mutex around items 2 and 3, but only lock the global mutex after you've waited on the read or write sem! Otherwise you'll deadlock(reader blocking for more data, writer can't add more data because the global mutex is locked).
I thought I was doing that essentially?

My Producer's mutex lock is after wait, with my Consumer's mutex lock after it's wait. Both have the unlock before the post.

Is it the type of semaphores I'm using are wrong? I've got empty initialized to buffer_size, which is 5, with full as zero?

(Although, I'm still not clear why this should work if I have more than 5 threads of each going. That would leave the extras in limbo or something?)
# 7  
Old 11-20-2012
Quote:
Although, I'm still not clear why this should work if I have more than 5 threads of each going. That would leave the extras in limbo or something?)
The simplest model is with a mutex - like pthread_mutex_t
pseudo code
Code:
// DO NOT dawdle here just read and get the heck out of the mutex
trylock - if busy do something else or loop back so get back into the wait queue
// now you have the mutex
rc= read ()
unlock mutex
// now you do not have the mutex
check return code for < 0 handle error
return number of bytes read or recv() 'd

Do the exact same thing with write or send. Only keep the mutex during the read or write. Do nothing else when you own the mutex.

This model is fine for the situation where you have some processing to do with data, and you read/write and process using threads. I/O is always slower than other operations.

If you have almost no processing to do then threads are not going to gain you much if anything. Your app will always be I/O bound whether it is threaded or not. The idea with threads is to do data crunching or whatever in parallel. When parallelism gains you nothing, skip threading. So if you are misusing threading, then other threads are in fact superfluous.

To avoid hanging on pthread_join, make the threads not joinable, when they return or call
Code:
pthread_exit(NULL)

they simply go away.
Login or Register to Ask a Question

Previous Thread | Next Thread

10 More Discussions You Might Find Interesting

1. Shell Programming and Scripting

XML text bounded with tag

Could you please give your inputs on the below issue: source.xml <?xml version="1.0" encoding="UTF-16"?> <P1 > <C1 type="i"><2></C1> <V1 type="string"><6.2></V1> <D1 type="string"> <D2><1.0></D2> <D2><2.0></D2> </D1> ...................... ...................... many more... (7 Replies)
Discussion started by: unme
7 Replies

2. Shell Programming and Scripting

Sftp hanging

Hi All, I am transfering a file through sftp. But the script is hanging at exit occasionally. I am suspecting that sftp is hanging due to buffer size issue. If that is the case can any body suggest a solution. Please find the code. echo "cd /${CUST_ID}/inbound/${SAFET_ID}" >... (0 Replies)
Discussion started by: Girish19
0 Replies

3. Shell Programming and Scripting

sed hanging

/bin/sed -n '$q;5633653,0 p' lfile lfile is a log file that is being updated several times a second. so, the command above figures out the line number it wants in the lfile and then proceeds to output all lines from that line number to the end of the file. the problem is, the end of the... (2 Replies)
Discussion started by: SkySmart
2 Replies

4. Shell Programming and Scripting

nslookup hanging

Hey folks. Long time lurker, first time poster. I'm a bit of a newbie at "coding" (obviously, scripting is a teensy bit different than coding) and I've run into a problem that I just can't seem to get around. I'm going through a list of servers to check their name, IP, reverse-NSLOOKUP name and... (2 Replies)
Discussion started by: Bearwhale
2 Replies

5. Solaris

df command hanging

Hi Folks, When i execute the command df -kh in my system the o/p hangs.. The command runs fine but takes a lot of time before coming back to the # prompt. Can anyone please suggest the possible cause and solution?. (10 Replies)
Discussion started by: vivek.goel.piet
10 Replies

6. Programming

bounded buffer implementation

Hi Experts, I have a programming assignment that asks us to implement a pipegrep function. it basically has 5 stages and each stage has a thread and buffers are used between stages. am currently implementing stage 1 . In stage 1 am suppose to read directory and store the filenames in buffer1... (15 Replies)
Discussion started by: amejoish
15 Replies

7. Shell Programming and Scripting

Script is hanging

Hello, I have the following shell script and when i execute, it keeps hanging and nothing happens Please let me know. Requirement is to read data from file and pass it to the sql and create files as shown. code /******** #!/bin/sh while read user.dat do echo "user = $1 email =... (1 Reply)
Discussion started by: rakeshsr12
1 Replies

8. UNIX for Dummies Questions & Answers

Hanging port?

Ok, this question my be different. I can ping our unix box, but when we I to access the webpage I cant. To access the webpage I type http://ipaddress:some port. How do I check if a port is hanging and how would I un hang it. Sorry if question doesnt make sense. (1 Reply)
Discussion started by: NycUnxer
1 Replies

9. Solaris

hanging sockets!??!

hi all, i had a program which created a socket on port 7113, but for some reason the program was hunbg and I had to Ctrl+c it. I ran the program for a couple of times and now when I do a netstat -a I see that these sockets are lingering and that might be the reason why my program is not... (1 Reply)
Discussion started by: Naanu
1 Replies

10. UNIX for Dummies Questions & Answers

Hanging commands

Hello, I just tried to run this command: /usr/lib/sendmail -d0.1 -bt < /dev/null | grep -i version Its doing what I want: writing out the sendmail version. But on some machines it writing the version and then exit to the prompt but on others its writing the version but then hangs, I need to... (0 Replies)
Discussion started by: jOOc
0 Replies
Login or Register to Ask a Question