multi-threaded server, pthreads, sleep


 
Thread Tools Search this Thread
Top Forums Programming multi-threaded server, pthreads, sleep
# 1  
Old 03-16-2005
multi-threaded server, pthreads, sleep

I am trying to writa a multi-client & multi-threaded TCP server.
There is a thread pool. Each thread in the pool will handle
requests of multiple clients.

But here I have a problem. I find a solution but it is not how
it must be... i think. When threads working without sleep(1)
I can't get response from server but when I put sleep(1) in
thread function as you will see in code, everything works fine
and server can make echo nearly for 40000 clients between
1 or 2 seconds. What am I doing wrong here?

thanks,
parahat


------------ server.c -------------
For Linux:
$ cc server.c -o server -lpthread

For FreeBSD:
$ cc server.c -o server -pthread
-----------------------------------
Code:
#include <pthread.h>
#include <stdio.h>
#include <sys/timeb.h>
#include <sys/select.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <netinet/in.h>
#include <string.h>
#include <fcntl.h>
#include <signal.h>
#include <errno.h>

#define MAX_CLIENT_PER_THREAD 300
#define MAX_THREAD 200
#define PORT 3355
#define MAX_CLIENT_BUFFER 256
/*#define DEBUG*/

int listenfd;

typedef struct {
	pthread_t tid;
	int client_count;
	int clients[MAX_CLIENT_PER_THREAD];
} Thread;

pthread_cond_t new_connection_cond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t new_connection_mutex = PTHREAD_MUTEX_INITIALIZER;


Thread threads[MAX_THREAD];

void nonblock(int sockfd)
{
	int opts;
	opts = fcntl(sockfd, F_GETFL);
	if(opts < 0)
	{
		perror("fcntl(F_GETFL)\n");
		exit(1);
	}
	opts = (opts | O_NONBLOCK);
	if(fcntl(sockfd, F_SETFL, opts) < 0) 
	{
		perror("fcntl(F_SETFL)\n");
		exit(1);
	}
}

void *thread_init_func(void *arg)
{
	int tid = (int) arg;
	
	int readsocks;
	int i;
	char buffer[MAX_CLIENT_BUFFER];
	char c;
	int n;
#ifdef DEBUG
	printf("thread %d created\n", tid);
	printf("sizeof thread.clients: %d\n", sizeof(threads[tid].clients));
#endif
	memset((int *) &threads[tid].clients, 0, sizeof(threads[tid].clients));
	memset((char *) &buffer, 0, sizeof(buffer));	
	while(1)
	{
#ifdef DEBUG
		printf("thread %d running, client count: %d\n", tid, threads[tid].client_count);
		sleep(3);
#endif
		sleep(1); /* <-- it works ??? :-| */

		for(i = 0; i < MAX_CLIENT_PER_THREAD; i++)
		{
			if(threads[tid].clients[i] != 0)
			{
				n = recv(threads[tid].clients[i], buffer, MAX_CLIENT_BUFFER, 0);
				if(n == 0)
				{
#ifdef DEBUG
					printf("client %d closed connection 0\n", threads[tid].clients[i]);
#endif
					threads[tid].clients[i] = 0;
					threads[tid].client_count--;
					memset((char *) &buffer, 0, strlen(buffer));
				}
				else if(n < 0)
				{
					if(errno == EAGAIN)
					{
#ifdef DEBUG
						printf("errno: EAGAIN\n");
#endif
					}
					else {
#ifdef DEBUG
						printf("errno: %d\n", errno);
#endif
						threads[tid].clients[i] = 0;
						threads[tid].client_count--;
						memset( (char *) &buffer, 0, strlen(buffer));
#ifdef DEBUG
						printf("client %d closed connection -1\n", threads[tid].clients[i]);
#endif
					}
				}
				else {
#ifdef DEBUG
					printf("%d bytes received from %d - %s\n", n, threads[tid].clients[i], buffer);
#endif
					
					send(threads[tid].clients[i], buffer, strlen(buffer), 0);
					memset((char *) &buffer, 0, strlen(buffer));
				}
			}
		}
	}
}

int choose_thread()
{
	int i=MAX_THREAD-1;
	int min = 0;
	while(i > -1)
	{
		if(threads[i].client_count < threads[i-1].client_count)
		{
			min = i;
			break;
		}
		i--;
	}		
	return min;
}

int main()
{
	char c;
	struct sockaddr_in srv, cli;
	int clifd;
	int tid;
	int i;
	int choosen;

	signal(SIGPIPE, SIG_IGN);
	
	if( (listenfd = socket(AF_INET, SOCK_STREAM, 0)) < 0)
	{
		perror("sockfd\n");
		exit(1);
	}
	bzero(&srv, sizeof(srv));
	srv.sin_family = AF_INET;
	srv.sin_addr.s_addr = INADDR_ANY;
	srv.sin_port = htons(PORT);

	if( bind(listenfd, (struct sockaddr *) &srv, sizeof(srv)) < 0)
	{
		perror("bind\n");
		exit(1);
	}
	
	listen(listenfd, 1024);

	
	/* create threads  */
	for(i = 0; i < MAX_THREAD; i++)
	{
		pthread_create(&threads[i].tid, NULL, &thread_init_func, (void *) i);
		threads[i].client_count = 0;
	}
	

	for( ; ; )
	{
		clifd = accept(listenfd, NULL, NULL);
		nonblock(clifd);

		pthread_mutex_lock(&new_connection_mutex);
		
		choosen = choose_thread();

		for(i = 0; i < MAX_CLIENT_PER_THREAD; i++)
		{
			if(threads[choosen].clients[i] == 0)
			{
#ifdef DEBUG
				printf("before threads clifd\n");
#endif
				threads[choosen].clients[i] = clifd;
#ifdef DEBUG
				printf("after threads clifd\n");
#endif
				threads[choosen].client_count++;
				break;
			}
		}

#ifdef DEBUG
		printf("choosen: %d\n", choosen);

		for(i = 0; i < MAX_THREAD; i++)
		{
			printf("threads[%d].client_count:%d\n", i, threads[i].client_count);
		}
#endif

		pthread_mutex_unlock(&new_connection_mutex);
	}

	if(errno)
	{
		printf("errno: %d", errno);
	}
	
	return 0;
}


===========================================================

--------------- test.c --------------------
$ cc test.c -o test
-------------------------------------------
Code:
#include <stdio.h>
#include <netdb.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <netinet/in.h>
#include <string.h>
#include <fcntl.h>
#include <signal.h>
#include <errno.h>


#define MAX_CLIENT 1000 
/*#define DEBUG*/

int PORT = 3355;
char *IP = "192.168.222.22";

enum {
	U_INT_SIZE = sizeof(unsigned int)
};

int list[MAX_CLIENT];
fd_set fdset;
int highfd = MAX_CLIENT;

void nonblock(int sockfd)
{
	int opts;
	opts = fcntl(sockfd, F_GETFL);
	if(opts < 0)
	{
		perror("fcntl(F_GETFL)\n");
		exit(1);
	}
	opts = (opts | O_NONBLOCK);
	if(fcntl(sockfd, F_SETFL, opts) < 0) 
	{
		perror("fcntl(F_SETFL)\n");
		exit(1);
	}
}


void cli_con()
{	
	struct sockaddr_in srv;
	struct hostent *h_name;
	int clifd;
	int n;

	bzero(&srv, sizeof(srv));
	srv.sin_family = AF_INET;
	srv.sin_port = htons(PORT);
	inet_pton(AF_INET, IP, &srv.sin_addr);	
	
	if( (clifd = socket(AF_INET, SOCK_STREAM, 0)) < 0)
	{
		printf("ERROR clifd\n");
		exit(1);
	}

	if(connect(clifd, (struct sockaddr *) &srv, sizeof(srv)) < 0)
	{
		printf("ERROR connect\n");
		exit(1);
	}

	nonblock(clifd);
	for(n = 0; (n < MAX_CLIENT) && (clifd != -1); n++)
	{
		if(list[n] == 0) {
#ifdef DEBUG
			printf("connected %d\n", clifd);
#endif
			list[n] = clifd;
			clifd = -1;
		}
	}
	
	if(clifd != -1) {
		printf("list FULL");
	}
}

void set_list() {
	int n;
	FD_ZERO(&fdset);

	for(n = 0; n < MAX_CLIENT; n++)
	{
		if(list[n] != 0) {
			FD_SET(list[n], &fdset);
			if(list[n] > highfd)
				highfd = list[n];
		}
	}
}

void recv_data(int num)
{
	int n;
	char buffer[1024];
	n = recv(list[num], buffer, 1023, 0);
	if(n > 0)
	{
#ifdef DEBUG
		printf("%d bytes from %d\n", n, list[num]);
		printf("%s", buffer);
#endif
	}
	else if(n < 0)
	{
		if(errno != EAGAIN)
			printf("client %d disconnected\n", list[num]);
	}
	else if(n == 0)
	{
		printf("client %d disconnected\n", list[num]);
	}
}


void send_data(int num)
{
	int n;
	char buffer[1024];
	sprintf(buffer, "TEST list[%d]=%d \r\n", num, list[num]);
	if((n = send(list[num], buffer, strlen(buffer), 0)) > 0)
	{
#ifdef DEBUG
		printf("%d bytes sent from %d\n", n, list[num]);
#endif
	}
}

void scan_clients()
{
#ifdef DEBUG
	printf("scan_clients\n");
#endif
	int num;
	
	for(num = 0; num < MAX_CLIENT; num++) {
		if(FD_ISSET(list[num], &fdset))
			recv_data(num);
	}

	for(num = 0; num < MAX_CLIENT; num++)
		send_data(num);
}

int main()
{
	int readsocks;
	int i=0;
	struct timeval tv;
	tv.tv_sec = 0;
	tv.tv_usec = 10;
	
	while(i < MAX_CLIENT)
	{
		cli_con();
		i++;	
	}
	i=0;
	while(i < MAX_CLIENT)
	{
		printf("list[%d] = %d\n",i,list[i]);
		i++;
	}
	i = 0;
	while(1)
	{
		set_list();
		readsocks = select(highfd+1, &fdset, NULL, NULL, &tv);
		if(readsocks < 0) {
			perror("select\n");
			exit(1);
		}
		 
		/*printf("else scan_clients\n");*/
		scan_clients();
#ifdef DEBUG
		sleep(4);
#endif
	}
	return 0;
}


Last edited by Parahat Melayev; 03-16-2005 at 02:04 PM..
Login or Register to Ask a Question

Previous Thread | Next Thread

10 More Discussions You Might Find Interesting

1. Shell Programming and Scripting

Multi server access through remote server using ssh

Team, Presently I have 5 ip address kept in ip_abc1 file, for each of the ip address listed, i need to login on each ipaddress one at a time and login as below for that specific ip address ssh -p 8101 karaf@<ip.address_for the specific ip address as logged in> password features:list... (4 Replies)
Discussion started by: whizkidash
4 Replies

2. Shell Programming and Scripting

script for multi-threaded bash processes

hey everyone, I'm having some trouble breaking down some code. It's simple a control script that takes machines meant to be backed up from a list. Then according to that will run multi-threaded processes up until the specified thread limit. for example if there are 4 machines to be backed up,... (2 Replies)
Discussion started by: terrell
2 Replies

3. Programming

Deallocating memory in multi-threaded environment.

I'm having a hard time figuring out how to manage deallocation of memory in multithreaded environments. Specifically what I'm having a hard time with is using a lock to protect a structure, but when it's time to free the structure, you have to unlock the lock to destroy the lock itself. Which will... (5 Replies)
Discussion started by: gngrwzrd
5 Replies

4. Programming

multi-threaded memory leak

Hello All : I write a .c program to test the exactually resource the memory leak as follows: 1 #include <stdio.h> 2 #define NUM 100000 3 void *Thread_Run(void * arg){ 4 //TODO 5 //pthread_datch(pthread_self()); 6 int socket= (int)arg; 7 ... (1 Reply)
Discussion started by: aobai
1 Replies

5. Linux

Multi-threaded encryption @ Fedora 11

Hello, are any of the encryption programs capable of true multi-threading ? Friend of mine tells me that he's been running some testing on Fedora 11 and that the kernel doesn't support multi-threading at that level. I've been looking into TrueCrypt, encfs and both calm to support... (1 Reply)
Discussion started by: TehOne
1 Replies

6. UNIX for Advanced & Expert Users

Multi-threaded encryption @ Fedora 11

Hello, are any of the encryption programs capable of true multi-threading ? Friend of mine tells me that he's been running some testing on Fedora 11 and that the kernel doesn't support multi-threading at that level. I've been looking into TrueCrypt, encfs and both calm to support... (0 Replies)
Discussion started by: TehOne
0 Replies

7. Shell Programming and Scripting

In need of multi threaded perl assistance

I need to write a perl script to execute external programs and grab the output and return code. Each program should be killed if it has not completed within X seconds. Imagine that the script goes something like this : @commands = &get_commands(); foreach $cmd (@commands) { $pid =... (4 Replies)
Discussion started by: SandmanCL
4 Replies

8. Programming

tcp server using pthreads is not working...ne help plz?

Hi, I am new to using threads in C++ though I have been wkring on C++ for past 1.5 years...I want to write a TCP server that serves multiple client connections...to start off..i have been working on a simple tcp echo server trying to understand how threads work.... this is my server code: ... (5 Replies)
Discussion started by: deepti_v25
5 Replies

9. AIX

multi threaded program is hanging

I have a Multithreaded program which is hanging on AIX. OS Version: AIX 5.2 and thread library version : 5.2.0.75 We Initiate the process with 50 threads..when we are disconnecting from the process it hangs.There is lots of other stuff involved here.I am just sending the piece of the problem with... (0 Replies)
Discussion started by: hikrishn
0 Replies

10. Programming

HOWTO: Calculate the balance of work in multi-threaded app.

I was wondering if anyone could give me a good idea how to calculate how balanced the threading is on a multi-threaded application. I want a percentage, such as "threads are 80% balanced." This is the way I am currently going about it, maybe it is good, maybe not. First, whenever a thread... (2 Replies)
Discussion started by: DreamWarrior
2 Replies
Login or Register to Ask a Question