Code:
#include<iostream>
#include<vector>
#include<cstdlib>
#include<ctime>
#include<list>
#include<pthread.h>
#include<cstring>
#include<stdarg.h>
#include<unistd.h>
using namespace std;
static int round;
#define checkResults(string, val){ \
if(val){ \
print("failed with %d at %s", val, string); \
exit(1); \
} \
} \
#define MSG_SIZE 7
#define _NODES 8
int print(const char *, ...);
typedef struct{ // thread parameter
int id;
int rcvd_from;
string msg;
}thread_param_t;
struct node // node structure
{
pthread_mutex_t f_lock;
thread_param_t packet;
vector<int> edges;
bool msg_received;
} nodes[_NODES];
pthread_t callThd[_NODES]; //keeps track of all thread IDs
pthread_mutex_t list_lock; //locks the array nodes[], which stores node structures
pthread_mutex_t path_mutex; //locks the path array--we don't need it as we always read the path
pthread_cond_t next_round_ready = PTHREAD_COND_INITIALIZER; //statically initializing a condition variable-- it assures the message is sent
// to all other sender's neighbours before one of the neighbours tries to find it's surrounding neighbours
bool msg_sent_to_all(int , int ); // it looks whether the message is sent to all sender's surrounding neighbours
//---------------
int path[_NODES][_NODES] = {
{ 0, 1, 1, 1, 0, 0, 0, 0 },
{ 1, 0, 0, 1, 1, 0, 0, 0 },
{ 1, 0, 0, 1, 0, 1, 0, 0 },
{ 1, 1, 1, 0, 1, 0, 0, 0 },
{ 0, 1, 0, 1, 0, 1, 0, 1 },
{ 0, 0, 1, 0, 1, 0, 1, 1 },
{ 0, 0, 0, 0, 0, 1, 0, 1 },
{ 0, 0, 0, 0, 1, 1, 1, 0 }
}; // keeps the sparse graph
//---------------
void *funcThd(void *arg) //Thread function, the argument is the structure defined above (thread_param..)
{
print("thread started..");
pthread_attr_t attr;
struct timespec t_req, t_rem;
string message;
int index, rc, received_from, neighbours_size;
t_req.tv_sec = 1;
t_req.tv_nsec = 500;
rc = pthread_attr_init(&attr);
checkResults("pthread_attr_init()\n", rc);
rc = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); // setting detached as our thread attributes, this way none of the calling threads
//expect the caller thread to wait for them-- so, no need for pthread_join(..)
checkResults("pthread_attr_setdetachstate()\n", rc);
thread_param_t *p = (thread_param_t *) arg;
index = p -> id;
message = p -> msg;
received_from = p -> rcvd_from;
print("Thread %d Activated by %d\n", index, received_from);
//pthread_mutex_lock(&path_mutex); we don need to lock the path as we always read it
pthread_mutex_lock(&list_lock); //lock the list of nodes
pthread_mutex_lock(&nodes[index].f_lock); //lock the particular node in which the message has been sent to, this lock exists in every node structure
nodes[index].msg_received = true; //node got the message
nodes[index].packet.id = index; //index of the node
nodes[index].packet.msg = message; //message content
neighbours_size = nodes[received_from].edges.size(); //number of neighbours staying adjacent to the node
print("Message Delivered to %d from %d : ", index, received_from);
//for optimality, for each round the calling threads are not gonna start finding their neighbouring nodes BEFORE the message is sent to all
//the neighbours of the calling threads, for example is message is sent from 0 to 1, 1 is not gonna send the message to its neighbours 1, 3, 4
// before 0 sends the message to all its neighbours first 1, 3, 5 -- so, we wouldn't have redundant threads
while((msg_sent_to_all(received_from, neighbours_size) == false))
pthread_cond_wait(&next_round_ready, &list_lock);
//the condition variable is locked by the node list mutex, it will then automaticly get unlocked whenever the condition satisfies
// that is when the message is sent to all the neighbours
for(int i = 0; i < _NODES; i++)
{
print("loop %d\n", i);
if(path[index][i])
{
nodes[index].edges.push_back(i); //finding neighbours from path matrix and push them back to the nodes edge property
if(nodes[i].msg_received == false) // if the msg is not sent to the node, then try sending it
{
p -> id = i;
p -> msg = message;
p -> rcvd_from = index;
//print("Sending Message to %d from %d \n", i, index);
pthread_create(&callThd[i], &attr, funcThd, (void *) p); // calling the thread
nanosleep(&t_req, &t_rem);
}
}
round++;
}//end of for
pthread_mutex_unlock(&nodes[index].f_lock); //unlocking, node structure lock
pthread_mutex_unlock(&list_lock); //unlocking node list lock
//pthread_mutex_unlock(&path_mutex);
pthread_exit(NULL);
}
//-----------------
bool msg_sent_to_all(int index, int neighbours_size)
{
int temp;
//pthread_mutex_lock(&list_lock);
//pthread_mutex_lock(&nodes[index].f_lock);
for(int i = 0; i < neighbours_size; i++)
{
temp = nodes[index].edges[i];
if(nodes[temp].msg_received == false)
{
print("returning false..\n");
return false;
}
}
//pthread_mutex_unlock(&list_lock);
//pthread_mutex_unlock(&nodes[index].f_lock);
print("returning true..");
return true;
}
//-----------------
int main(int argc, char **argv)
{
thread_param_t param;
pthread_attr_t attr;
pthread_t thread;
int rc = 0, detachstate;
//START-->initializing mutexs
if(!pthread_mutex_init(&list_lock, NULL))
print("mutex_nodes initialized..");
if(!pthread_mutex_init(&path_mutex, NULL))
print("path_mutex initialized..");
for(int index = 0; index < _NODES; index++)
if(!pthread_mutex_init(&nodes[index].f_lock, NULL))
print("nodes[%d].f_lock initialized..", index);
//END-->initializing mutex
//START-->setting thread attributes
// create a default thread attributes object
rc = pthread_attr_init(&attr);
checkResults("pthread_attr_init()\n", rc);
//set the detach state thread attribute
rc = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
checkResults("pthread_attr_setdetachstate()\n", rc);
//END-->setting thread attributes
//Initializing the source node -- node 0
param.id = 0;
param.rcvd_from = -1;
param.msg = "Purple";
nodes[0].msg_received = true;
pthread_create(&callThd[0], &attr, funcThd, (void *) ¶m); // starting the first round ;-)
sleep(10);
// cleanin up space
pthread_cond_destroy(&next_round_ready);
pthread_mutex_destroy(&list_lock);
int cap = 0;
for(int i = 0; i < _NODES; i++)
{
print("%d %d ", nodes[i].msg_received, nodes[i].packet.id);
for(int k = 0; k < MSG_SIZE; k++)
print("%c", nodes[i].packet.msg[k]);
cap = nodes[i].edges.size();
for(int j = 0; j < cap; j++)
{
print(" %d", nodes[i].edges[j]);
}
print("\n");
}
print("Exiting Main()..\n");
exit(EXIT_SUCCESS);
return 0;
}
//-------------------
int print(const char *ft, ...)
{
va_list args;
/* Initializing arguments to store all values after ft */
va_start(args, ft);
vfprintf(stderr, ft, args);
va_end(args);
//fprintf(stderr, "\n");
}
//-------------------