Code:
/*******************
* <removed>
* Operating Systems
* Project #3
* 11/15/2012
* Dr. Katangur
*
*******************/
#include <stdio.h>
#include <stdlib.h>
#include <iostream>
#include <cstdlib>
#include <pthread.h>
#include <semaphore.h>
#include "buffer.h"
#include <vector>
#include <time.h>
#include <math.h>
#include <sched.h>
using namespace std;
void *pConsumer(void*);
void *pProducer(void*);
void interface(char*);
void report();
void runOnce();
void getBuffer();
void printBuffer();
bool isPrime(int);
/******************************************
* STRUCTS
*
* userinput: input
* ----------------
* -int run: time for main to sleep.
* -int sleep: time for threads to sleep.
* -int produce: number of producer threads.
* -int consume: number of consumer threads.
*
* -bool snapshot: display buffer states.
*
*******************************************/
typedef struct
{
int run;
int sleep;
int produce;
int consume;
bool snapshot;
}userinput;
userinput input;
struct timespec delay1, delay2, delay3, delay4;
/***********************************************************
* VECTORS
*
* -int indivConsumeCount: elements produced by each thread.
* -int indivProduceCount: elements consumed by each thread.
*
************************************************************/
vector<int> indivConsumeCount;
vector<int> indivProduceCount;
/*********************************************************
* GLOBALS
*
* -BUFFER_SIZE: defined max size of buffer.
*
* -buffer_item: array used as the buffer.
* -int produceCount: total number of elements produced.
* -int consumeCount: total number of elements consumed.
* -int bufferEmpty: times the buffer was empty.
* -int bufferFull: times the buffer was full.
* -int bufferCount: elements currently in the buffer.
* -int bufferCount2: elements currently in the buffer.
* -int tempConsume: a temporary value passed from threads.
* -int tempProduce: a temporary value passed from threads.
* -int producePos: position producer is in the buffer.
* -int consumePos: position consumer is in the buffer.
* -int currentWrite: set position of W in string array.
* -int currentRead: set position of R in string array.
*
* -string readWrite[]: used for RW output.
*
* -bool flag: will tell all threads to end whiles.
*
**********************************************************/
buffer_item buffer[BUFFER_SIZE];
int produceCount = 0;
int consumeCount = 0;
int bufferEmpty = 0;
int bufferFull = 0;
int bufferCount = 0;
int bufferCount2 = 0;
int tempConsume = 0;
int tempProduce = 0;
int producePos = 0;
int consumePos = 0;
int currentWrite = 11;
int currentRead = 12;
int tempVal = 0;
string readWrite[35];
bool flag = true;
/***********************************************************************
* SEMAPHORES
*
* -sem_t mutex: control multiple producer or consumer access to buffer.
* -sem_t canRead: control both producer and consumer access to buffer.
* -sem_t canWrite: control both producer and consumer access to buffer.
*
************************************************************************/
pthread_mutex_t mutex;
sem_t canWrite;
sem_t canRead;
/*******************************************************
* VOID GETCOMMANDS ( )
*
* description
* -----------
* -This function will take parameters from command line
* and write them to the input struct for global use.
*
********************************************************/
void getCommands(char *argv[])
{
/*********
* run time
**********/
input.run = atoi(argv[1]);
/***********
* sleep time
************/
input.sleep = atoi(argv[2]);
/*****************
* num of producers
******************/
if( atoi(argv[3]) < 1 )
{
input.produce = 1;
}
else
{
input.produce = atoi(argv[3]);
}
/*****************
* num of consumers
******************/
if( atoi(argv[4]) < 1 )
{
input.consume = 1;
}
else
{
input.consume = atoi(argv[4]);
}
/************************
* 'yes' or 'no' snapshots
*************************/
if( strcmp(argv[5], "yes") == 0 )
{
input.snapshot = true;
}
else if( strcmp(argv[5], "no") == 0 )
{
input.snapshot = false;
}
else
{
printf("error: require a 'yes' or 'no' for snapshot parameter\n");
exit(0);
}
}
/***********************************************
* VOID REPORT ( )
*
* description
* -----------
* -This function will output the final
* results of the producer/consumer simulation.
*************************************************/
void report()
{
getBuffer();
cout << "\nPRODUCER / CONSUMER SIMULATION COMPLETE" << endl;
cout << "=======================================" << endl;
printf("Simulation Time: %d\n", input.run);
printf("Maximum Thread Sleep Time: %d\n", input.sleep);
printf("Number of Producer Threads: %d\n", input.produce);
printf("Number of Consumer Threads: %d\n", input.consume);
printf("Size of Buffer: %d\n", BUFFER_SIZE);
printf("\nTotal Number of Items Produced: %d\n", produceCount);
for(int i=0; i < input.produce; i++)
{
printf(" Thread %d: %d\n", i, indivProduceCount[i]);
}
printf("\nTotal number of Items Consumed: %d\n", consumeCount);
for(int j=0; j < input.consume; j++)
{
printf(" Thread %d: %d\n", j, indivConsumeCount[j]);
}
printf("\nNumber of Items Remaining in Buffer: %d\n", bufferCount);
printf("Number of Times Buffer Was Full: %d\n", bufferFull);
printf("Number of Times Buffer Was Empty: %d\n", bufferEmpty);
}
/**********************************************
* VOID RUNONCE ( )
*
* description
* -----------
* -This function will display start of threads
* and call to display current state of buffer
* if snapshots are enabled.
**
***********************************************/
void runOnce()
{
/**************
* snapshot info
***************/
if( input.snapshot == 1 )
{
printf("\nStarting Threads...\n");
printBuffer();
}
}
/*********************************************
* PRINTBUFFER ( )
*
* description
* -----------
* -This function will get and output current
* state of buffer if snapshots are enabled.
*
**********************************************/
void printBuffer()
{
printf(" (buffers occupied: %2d)\n", bufferCount);
printf("buffers: %2d %2d %2d %2d %2d\n",
buffer[0], buffer[1], buffer[2], buffer[3], buffer[4]);
printf(" ---- ---- ---- ---- ----\n");
/*************************
* RW string array function
**************************/
readWrite[currentWrite] = "W";
readWrite[currentRead] = "R";
for(int k=0; k < 33; k++)
{
cout << readWrite[k];
}
cout << endl;
readWrite[currentWrite] = " ";
readWrite[currentRead] = " ";
}
/************************************************
* VOID GETBUFFER( )
*
* description
* -----------
* -This function will read through the buffer,
* counting the current elements inside, along
* with how many times it becomes full or empty.
*
*************************************************/
void getBuffer()
{
bufferCount2 = 0;
bufferCount = 0;
/*****************************
* get count of buffer elements
******************************/
for(int j=0; j < BUFFER_SIZE; j++)
{
if( buffer[j] != -1 )
{
bufferCount++;
}
if( buffer[j] == -1 )
{
bufferCount2++;
}
}
/***************************
* how many times buffer full
****************************/
if( bufferCount == BUFFER_SIZE )
{
bufferFull++;
}
/****************************
* how many times buffer empty
*****************************/
if( bufferCount2 == BUFFER_SIZE )
{
bufferEmpty++;
}
}
/**********************************************
* BOOL ISPRIME ( )
*
* description
* -----------
* -This function will determine if the value
* being consumed in the buffer is a prime.
*
* variable
* --------
* -int input: is passed the buffer value
* the consumer is reading.
*
***********************************************/
bool IsPrime(int input)
{
if(input == 2) return true;
if(input == 1 || input % 2 == 0) return false;
for(int n = 3; n < sqrt(input); n += 2)
{
if(input % n == 0 && n != input) return false;
}
return true;
}
/*******************************************************
* INT MAIN ( )
*
* variables
* ---------
* -int placeholder: is placeholder for thread parameter.
*
********************************************************/
int main(int argc, char *argv[])
{
int placeholder = 0;
/***********************
* get program parameters
************************/
getCommands(argv);
/******************
* initialize buffer
*******************/
for(int i=0;i<5;i++)
{
buffer[i] = -1;
}
/********************
* initialize RW array
*********************/
for(int i=0;i<33;i++)
{
readWrite[i] = " ";
}
/**********************
* initialize semaphores
***********************/
pthread_mutex_init(&mutex, NULL);
sem_init( &canWrite, 0, BUFFER_SIZE );
sem_init( &canRead, 0, 0 );
/***************
* set thread IDs
****************/
pthread_t produce[input.produce];
pthread_t consume[input.consume];
/*********************************
* implicitly make threads joinable
**********************************/
pthread_attr_t attr;
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
/*******************
* print first buffer
********************/
runOnce();
/*****************************
* create producers & consumers
******************************/
for(int j = 0; j < input.produce; j++)
{
pthread_create(&produce[j], &attr, pProducer, &placeholder);
}
for(int k = 0; k < input.consume; k++)
{
pthread_create(&consume[k], &attr, pConsumer, &placeholder);
}
/***********
* sleep main
************/
delay1.tv_sec = input.run;
nanosleep( &delay1, NULL );
/*************
* stop threads
**************/
flag = false;
/***************************
* join producers & consumers
****************************/
for(int m = 0; m < input.produce; m++)
{
pthread_join(produce[m], (void**) &tempProduce);
indivProduceCount.push_back( tempProduce );
}
for(int n = 0; n < input.consume; n++)
{
pthread_join(consume[n], (void**) &tempConsume);
indivConsumeCount.push_back( tempConsume );
}
/*****************
* print statistics
******************/
report();
return 0;
}
/*******************************************************
* VOID pPRODUCER ( )
*
* description
* -----------
* -This thread will write a random element to the
* buffer, increment RW output, display if buffer full,
* and then sleep for a random time (per thread).
*
* variables
* ---------
* -int thisProduceCount: will count how many elements
* each thread produces.
* -int random: will generate a random value for buffer.
*
********************************************************/
void *pProducer(void* placeholder)
{
int thisProduceCount = 0;
int random;
while(flag)
{
/**************
* time to sleep
***************/
delay2.tv_sec = rand() % input.sleep + 1;
nanosleep( &delay2, NULL );
/*********************************
* generate random value for buffer
*********************************/
random = rand() % 100 + 1;
/*****************
* get buffer state
******************/
getBuffer();
if( bufferCount < BUFFER_SIZE )
{
/******
* locks
*******/
sem_wait(&canWrite);
pthread_mutex_lock(&mutex);
/***************
* reset counters
****************/
producePos = producePos % 5;
/*******************
* reset RW increment
********************/
if(currentWrite == 31)
{
currentWrite = 6;
}
currentWrite = currentWrite + 5;
/****************
* produce element
*****************/
buffer[producePos] = random;
/*****************
* get buffer state
******************/
getBuffer();
/**************
* snapshot info
***************/
if( input.snapshot == 1 )
{
printf("\nProducer %d writes %d\n", pthread_self() * -1, random);
printBuffer();
}
/**************************
* increment buffer position
***************************/
producePos++;
/***************************************
* how many elements this thread produced
****************************************/
thisProduceCount++;
/*********************************
* how many elements produced total
**********************************/
produceCount++;
/*******
* unlock
********/
pthread_mutex_unlock(&mutex);
sem_post(&canRead);
}//if
else
{
if( input.snapshot == 1 )
{
printf("\nAll buffers full. Producer %d waits.\n", pthread_self()*-1);
}
}
}//while
pthread_exit( (void*)thisProduceCount );
}
/*******************************************************
* VOID pCONSUMER ( )
*
* description
* -----------
* -This thread will remove an element from the
* buffer, increment RW output, display if buffer full,
* and then sleep for a random time (per thread).
*
* variables
* ---------
* -int thisConsumeCount: will count how many elements
* each thread consumes.
*
********************************************************/
void *pConsumer(void* placeholder)
{
int thisConsumeCount = 0;
int test = 0;
while(flag)
{
/**************
* time to sleep
***************/
delay3.tv_sec = rand() % input.sleep + 1;
nanosleep( &delay3, NULL );
if( bufferCount2 < BUFFER_SIZE && bufferCount2 > 0 || bufferCount == BUFFER_SIZE )
{
/******
* locks
*******/
sem_wait(&canRead);
pthread_mutex_lock(&mutex);
/***************
* reset counters
****************/
consumePos = consumePos % 5;
/*******************
* reset RW increment
********************/
if(currentRead == 32)
{
currentRead = 7;
}
currentRead = currentRead + 5;
/**************
* snapshot info
***************/
if( input.snapshot == 1 )
{
/*********************************************************
* will output prime if the current buffer element is prime
**********************************************************/
bufferCount--;
if( IsPrime( buffer[consumePos] ))
{
printf("\nConsumer %d reads %2d * * * PRIME * * *\n",
pthread_self() * -1, buffer[consumePos]);
printBuffer();
}
else
{
printf("\nConsumer %d reads %2d\n", pthread_self() * -1, buffer[consumePos]);
printBuffer();
}
}
/****************
* consume element
*****************/
buffer[consumePos] = -1;
/**************************
* increment buffer position
***************************/
consumePos++;
/***************************************
* how many elements this thread consumed
****************************************/
thisConsumeCount++;
/*********************************
* how many elements consumed total
**********************************/
consumeCount++;
/*******
* unlock
********/
pthread_mutex_unlock(&mutex);
sem_post(&canWrite);
}//if
else
{
if( input.snapshot == 1 )
{
printf("\nAll buffers empty. Consumer %d waits.\n", pthread_self()*-1);
}
}
/****************************************
* workaround; ensures no hangs in the
* case of more consumers than producers.
*
* will remove the thread if it wasn't
* lucky enough to enter the if-statement.
*
* -likely cause is canWrite's post is
* too small for canRead's wait, ergo.
*****************************************/
if( input.consume > input.produce )
{
pthread_exit( (void*)thisConsumeCount );
}
}//while
pthread_exit( (void*)thisConsumeCount );
}