Could any one please suggest me how to solve this......
when I compiled the program and tried to execute it as follows, it works fine under different inputs like:
./main -r 300 -w 3 -s 100
./main -r 600 -w 3 -s 1000
./main -r 10 -w 3 -s 100
But it fails when I tired to give the input as follows:
./main -r 10 -w 4 -s 100
./main -r 1 -w 4 -s 100
If you can see the difference between both the input conditions (successful and failure) is that I have increased the writer thread limit from 3 to 4.
Reason of failure:
I can't run this program with 4 or more number of "writer threads". I have attached the debug (using dbx) output of it.
Point where it fails:
It fails in two places or the functions
a) In main function (also see the code attached) below is the code snippet where exactly it fails
if((shmid[j] = shmget(IPC_PRIVATE, Size, SHMEM_MODE)) < 0)
perror("Shmptr Shmget Failed");
if((long)(shmptr[i] = shmat(shmid[j], 0, 0)) == -1)
perror("Shmat Failed Here *** : Shared Memory Attach Error ");
(********** It is failing on above line in main function **************)
}
writer_th = (pthread_t *) malloc((size_t) (num_writers \
* sizeof(pthread_t)));
b) In reader function (see the attached code) below is the code snippet
if(pthread_mutex_unlock(&mutex_r[num_w]))
perror("Can't Release Read Lock");
for(i = 0; i < buffer_size; i++)
cksum_r += *ptr++;
(************* It fails on above line in Reader function *************)
Code :
/****************************************************/
/********************** header.h ********************/
/****************************************************/
#include <stdio.h>
#include <errno.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#include <sys/ipc.h>
#include <sys/shm.h>
#include <sys/sem.h>
#include <sys/wait.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <sys/signal.h>
/********************* User Defined Data ***************/
#define MAX_THREAD_NUMBER 5000
#define MAX_WRITER_NUMBER 100
#define MAX_READER_NUMBER 400
#define DEFAULT_NUM_READERS 2
#define DEFAULT_NUM_WRITERS 2
#define SHMEM_MODE (SHM_R | SHM_W)
#define DEFAULT_SHMEM_SIZE 20000
#define MB (1024*1024)
#define MAX_SHMEM_NUMBER 11
#define USAGE "\nUsage: %s [-r num_readers] [-w num_writers] [-s shmem_size]\n
\n" \
"\t-r num_readers number of reader thread to create\n" \
"\t-w num_writers number of writer thread to create\n" \
"\t-s buffer_size size of shmem segments in bytes\n" \
"\t must be less than 256MB\n\n"
/************ Global Functions ****************/
int parse_args(int , char **);
int *reader(void *);
int *writer(void *);
int error(const char *, int);
int release();
/*int sys_error(const char *, int);*/
/************ Global Variables ****************/
pthread_t *writer_th;
pthread_t *reader_th;
int thread_hold[MAX_WRITER_NUMBER];
pthread_mutex_t cond_mutex[MAX_WRITER_NUMBER];
pthread_mutex_t mutex_r[MAX_WRITER_NUMBER];
pthread_cond_t cond_var[MAX_WRITER_NUMBER];
int *read_count[MAX_WRITER_NUMBER];
char *shmptr[MAX_WRITER_NUMBER];
unsigned long *checksum[MAX_WRITER_NUMBER];
unsigned long cksum[MAX_WRITER_NUMBER];
int shmem_size = DEFAULT_SHMEM_SIZE;
pid_t parent_pid;
int num_readers = DEFAULT_NUM_READERS;
int buffer_size = DEFAULT_SHMEM_SIZE;
int num_writers = DEFAULT_NUM_WRITERS;
int shmid[MAX_THREAD_NUMBER + MAX_WRITER_NUMBER];
/****************************************************/
/********************** main.c **********************/
/****************************************************/
#include "header.h"
int main(int argc, char **argv)
{
pthread_attr_t newattr;
int i, j, k;
size_t Size;
unsigned long *ulptr;
parse_args(argc, argv);
printf("%s: Shared Memory Test Program\n\n", *argv);
printf("\tNumber of writers thread = %d\n", num_writers);
printf("\tNumber of readers thread = %d\n", num_readers);
printf("\tShared Memory Size in Bytes = %d\n", buffer_size);
printf("\n");
for(i = 0; i < num_writers; i++) {
j = i * 3;
Size = sizeof(int);
if((shmid[j] = shmget(IPC_PRIVATE, Size, SHMEM_MODE)) < 0)
perror("Shmget Error");
if((long)(read_count[i] = (int *) shmat(shmid[j], 0, 0)) == -1)
perror("Shmat Error");
*(read_count[i]) = 0;
j++;
Size = sizeof(unsigned long) * num_readers;
if((shmid[j] = shmget(IPC_PRIVATE, Size, SHMEM_MODE)) < 0)
perror("Checksum Shmget Failed");
if((long)(checksum[i] = (unsigned long *) \
shmat(shmid[j], 0, 0)) == -1)
perror("Shmat Failed : Shared Memory Attach Error ");
ulptr = checksum[i];
for(k = 0; k < num_readers; k++)
{
*ulptr = 0;
ulptr++;
}
Size = buffer_size;
j++;
if((shmid[j] = shmget(IPC_PRIVATE, Size, SHMEM_MODE)) < 0)
perror("Shmptr Shmget Failed");
if((long)(shmptr[i] = shmat(shmid[j], 0, 0)) == -1)
perror("Shmat Failed Here *** : Shared Memory Attach Error ");
}
writer_th = (pthread_t *) malloc((size_t) (num_writers \
* sizeof(pthread_t)));
reader_th = (pthread_t *) malloc((size_t) (num_writers \
* num_readers * sizeof(pthread_t)));
for(i = 0; i < num_writers; i++) {
if(pthread_mutex_init(&mutex_r[i] , (pthread_mutexattr_t *)NULL) != 0)
perror("Can't Initialise Mutex");
if(pthread_mutex_init (&cond_mutex[i], (pthread_mutexattr_t *)NULL)!=0)
perror("Can't Initialize Cond_Mutex");
if(pthread_cond_init (&cond_var[i], (pthread_condattr_t *)NULL)!=0)
perror("Cond_Var Failed");
thread_hold[i] = 1;
}
if(pthread_attr_init(&newattr))
perror("Attr_Init Failed");
if(pthread_attr_setdetachstate(&newattr, PTHREAD_CREATE_UNDETACHED))
perror("Attr_Setdetachstate Failed");
for(i = 0; i < num_writers; i++)
{
if(pthread_create(&writer_th[i], &newattr, writer, \
(void *) (long)i))
perror("Writer Failed");
k = i * num_readers;
for(j = k; j < (k + num_readers) ; j++)
{
if(pthread_create(&reader_th[j], &newattr, reader,\
(void *) (long)j))
perror("Reader Failed");
}
}
for(i = 0; i < num_writers; i++)
{
if(pthread_join(writer_th[i], NULL)) {
printf("writer_th: pthread_join return: %d\n",i);
perror("Pthread_join Bad Status");
}
k = i * num_readers;
for(j = k; j < (k + num_readers) ; j++)
{
if(pthread_join(reader_th[j], NULL)) {
perror("Pthread_join Bad Status");
}
}
}
for(i = 0; i < num_writers; i++)
{
ulptr = checksum[i];
for(j = 0; j < num_readers; j++) {
if(cksum[i] != *ulptr )
error("checksums do not match", __LINE__);
}
}
printf("\n\tMain: Readers calculated segment successfully\n");
release();
printf("\nsuccessful!\n");
return(0);
}
/****************************************************/
/******************** reader.c **********************/
/****************************************************/
#include "header.h"
void *reader(void *parm)
{
int num_p = (int) (long)parm;
unsigned long cksum_r = 0;
int i, num_r, num_w;
char *ptr;
unsigned long *ulptr_r;
num_r = num_p % num_readers;
num_w = num_p - num_r;
num_w = num_w / num_readers;
ptr = shmptr[num_w];
ulptr_r = checksum[num_w];
if(pthread_mutex_lock (&cond_mutex[num_w]))
perror("Can't Take Cond Lock");
while(thread_hold[num_w])
{
if(pthread_cond_wait(&cond_var[num_w], &cond_mutex[num_w]))
perror("cond_wait failed");
}
if(pthread_mutex_unlock(&cond_mutex[num_w]))
perror("Release cond Lock Failed");
if(pthread_mutex_lock(&mutex_r[num_w]))
perror("Can't take read Lock");
(*(read_count [num_w]))++;
if(pthread_mutex_unlock(&mutex_r[num_w]))
perror("Can't Release Read Lock");
for(i = 0; i < buffer_size; i++)
cksum_r += *ptr++;
if(pthread_mutex_lock(&mutex_r[num_w]))
perror("Can't Take Read Lock");
(*(read_count[num_w]))--;
if(pthread_mutex_unlock(&mutex_r[num_w]))
perror("Can't Release 1 Read Lock");
*ulptr_r = cksum_r;
printf("\tReader (%d) of Writer (%d): checksum %04ld\n",\
num_r,num_w,cksum_r);
return NULL;
}
/****************************************************/
/******************** writer.c **********************/
/****************************************************/
#include "header.h"
void *writer(void *parm)
{
int num_w = (int) (long)parm;
unsigned long cksum_w = 0;
char data = 0;
char *ptr;
int count = 0;
data = num_w;
for(ptr = shmptr[num_w]; ptr < (shmptr[num_w]+buffer_size); ptr++) {
*ptr = data++;
cksum_w += *ptr;
}
/*perror("Writer: Writer failed here 0: ");*/
if(pthread_mutex_lock(&cond_mutex[num_w]))
perror("Mutex_lock Failed");
thread_hold[num_w] = 0;
if(pthread_cond_broadcast(&cond_var[num_w]))
perror("cond_signal Failed");
if(pthread_mutex_unlock(&cond_mutex[num_w]))
perror("Mutex_unlock Failed");
cksum[num_w] = cksum_w;
printf("\n\tWriter (%d): shared memory checksum %04ld\n", \
num_w, cksum_w);
return NULL;
}
/****************************************************/
/******************** parseargs.c *******************/
/****************************************************/
#include "header.h"
int parse_args(int argc, char **argv)
{
int i;
int errflag = 0;
char *program_name = *argv;
extern char *optarg;
while((i = getopt(argc, argv, "r:s:w:?")) != EOF) {
switch (i) {
case 'r':
num_readers = atoi(optarg);
break;
case 's':
buffer_size = atoi(optarg);
break;
case 'w':
num_writers = atoi(optarg);
break;
case '?':
errflag++;
break;
}
}
if(num_writers >= MAX_WRITER_NUMBER) {
errflag++;
fprintf(stderr, "ERROR: num_writers must be less\
than %d\n", MAX_WRITER_NUMBER);
}
if(num_readers >= MAX_READER_NUMBER) {
errflag++;
fprintf(stderr, "ERROR: num_readers must be less\
than %d\n", MAX_READER_NUMBER);
}
i = num_readers * num_writers;
if(i >= MAX_THREAD_NUMBER) {
errflag++;
fprintf(stderr, "ERROR: maximun threads number\
must be less than %d\n", MAX_THREAD_NUMBER);
}
if(errflag) {
fprintf(stderr, USAGE, program_name);
exit (2);
}
}
int release()
{
int i, j;
for(i = 0; i < num_writers; i++) {
if(pthread_mutex_destroy(&cond_mutex[i]) != 0)
perror("Can't destroy cond_mutex");
if(pthread_mutex_destroy(&mutex_r[i]) != 0)
perror("Can't destroy mutex_r");
}
for(i = 0; i < num_writers; i++) {
j = i * 3;
if(shmctl(shmid[j], IPC_RMID, 0) < 0)
perror("Read_count shmctl Failed");
j++;
if(shmctl(shmid[j], IPC_RMID, 0) < 0)
perror("checksum shmctl failed");
j++;
if(shmctl(shmid[j], IPC_RMID, 0) < 0)
perror("shmptr shmctl failed");
}
}
int sys_error(const char *msg, int line)
{
char syserr_msg [256];
sprintf(syserr_msg, "%s: %s\n", msg, strerror (errno));
error(syserr_msg, line);
}
int error(const char *msg, int line)
{
fprintf(stderr, "ERROR [line: %d] %s\n", line, msg);
if(line>= 260)
release();
exit(-1);
}