Code:
int Close ( int fd, const char* filename, int line ) {
int ret;
if ( ( ret = close( fd ) ) == -1 ) {
#ifdef LOGGING_ENABLE
sprintf( log, "%s(%d) -> close() failed: %s", filename, line, strerror( errno ) );
Log( log );
#endif
}
return ret;
}
void Client_CloseConnection ( int sockfd ) {
Close( sockfd, __FILE__, __LINE__ );
}
static void* CLIENT_THREAD ( void* v ) {
int sockfd; /* Socket descriptor. */
int index = *( int* )v; /*free( v );*/ /* Position in the @PeerList. */
...
/* Assume the connection is OK by default. */
Pthread_rwlock_wrlock( &rwlock, __FILE__, __LINE__ );
node->state = CONNECTED;
Pthread_rwlock_unlock( &rwlock, __FILE__, __LINE__ );
while ( 1 ) {
/* Wait until messages come. */
Pthread_mutex_lock( &node->mutex, __FILE__, __LINE__ );
Pthread_rwlock_rdlock( &rwlock, __FILE__, __LINE__ );
tmp = Queue4SendData_GetCount( & ( node->Q ) );
Pthread_rwlock_unlock( &rwlock, __FILE__, __LINE__ );
while ( tmp == 0 ) {
Pthread_cond_wait( &node->isEmpty, &node->mutex, __FILE__, __LINE__ );
Pthread_rwlock_rdlock( &rwlock, __FILE__, __LINE__ );
tmp = Queue4SendData_GetCount( & ( node->Q ) );
Pthread_rwlock_unlock( &rwlock, __FILE__, __LINE__ );
if ( tmp == 0 )
continue;
break;
}
//Get the message which going to send
Pthread_rwlock_wrlock( &rwlock, __FILE__, __LINE__ );
data = Queue4SendData_Pop( & ( node->Q ) );
Pthread_rwlock_unlock( &rwlock, __FILE__, __LINE__ );
/* Let Producer continue insert items. */
Pthread_cond_signal( &node->isFull, __FILE__, __LINE__ );
Pthread_mutex_unlock( &node->mutex, __FILE__, __LINE__ );
tmp = 1; /* Status flag (0 = error, 1 = OK). */
if ( Client_TestConnection( &sockfd, IPStr ) ) {
Writen( sockfd, data.send_buf, strlen( data.send_buf ), __FILE__, __LINE__ );
if ( ( n = Readline( index, sockfd, recvline, TCP_MAXLINE ) ) <= 0 ) {
#ifdef LOGGING_ENABLE
if ( n == 0 )
sprintf( log, "%s(%d) -> Server %s terminated prematurely", __FILE__, __LINE__, IPStr );
else
sprintf( log, "%s(%d) -> Readline() on %s failed: %s", __FILE__, __LINE__, IPStr, strerror( errno ) );
Log( log );
#endif
tmp = 0;
} else {
handler = data.handler;
handler( recvline, IPStr ); /* Call packet handler. */
}
} else {
tmp = 0;
}
//DON'T FORGET THESE CODES.
Client_CloseConnection( sockfd ); <- close is called here!!
if ( tmp == 0 )
break;
}
return NULL;
}
This function used to create a connection with a server; in my project, a client needs to connect few servers at the same time.
void Client_Constructor ( const char* IPAddr ) {
int status;
pthread_t tid; /* Thread ID. */
pthread_attr_t rx; /* Parameter of thread. */
int pos = Client_GetPosition( IPAddr );
struct PeerInfo* node = &PeerList[ pos ];
Pthread_rwlock_rdlock( &rwlock, __FILE__, __LINE__ );
status = node->state;
Pthread_rwlock_unlock( &rwlock, __FILE__, __LINE__ );
if ( status == DISCONNECTED ) { //Server is offline, then connect it!
Pthread_rwlock_wrlock( &rwlock, __FILE__, __LINE__ );
strcpy( node->IPStr, IPAddr );
Queue4SendData_Init( &( node->Q ), SEND_QUEUE );
node->state = CONNECTING;
Pthread_rwlock_unlock( &rwlock, __FILE__, __LINE__ );
Pthread_attr_init( &rx, __FILE__, __LINE__ );
Pthread_attr_setdetachstate( &rx, PTHREAD_CREATE_DETACHED, __FILE__, __LINE__ );
Pthread_rwlock_wrlock( &tbRwlock, __FILE__, __LINE__ );
if ( threadBufPos >= 1024 )
threadBufPos = 0;
threadBuf[ threadBufPos ] = pos;
Pthread_create( &tid, &rx, CLIENT_THREAD, ( void* )&( threadBuf[ threadBufPos ] ), __FILE__, __LINE__ );
threadBufPos++;
Pthread_rwlock_unlock( &tbRwlock, __FILE__, __LINE__ );
}
}
Send messages to a server, IPAddr is the IP address of server.
void Client_Pend ( const char* IPAddr, struct SendDataStruct *data ) {
struct SendDataStruct local;
struct PeerInfo* node = &PeerList[ Client_GetPosition( IPAddr ) ];
enum TCPState TCPStatus;
int qSize;
Pthread_rwlock_rdlock( &rwlock, __FILE__, __LINE__ );
TCPStatus = node->state; /* Recent TCP status of the connection. */
Pthread_rwlock_unlock( &rwlock, __FILE__, __LINE__ );
if ( TCPStatus == CONNECTED ) {
/* Blocking if the queue is full. */
Pthread_mutex_lock( &node->mutex, __FILE__, __LINE__ );
Pthread_rwlock_rdlock( &rwlock, __FILE__, __LINE__ );
qSize = Queue4SendData_GetCount( & ( node->Q ) );
Pthread_rwlock_unlock( &rwlock, __FILE__, __LINE__ );
while ( SEND_QUEUE <= qSize ) {
Pthread_cond_wait( &node->isFull, &node->mutex, __FILE__, __LINE__ );
Pthread_rwlock_rdlock( &rwlock, __FILE__, __LINE__ );
qSize = Queue4SendData_GetCount( & ( node->Q ) );
Pthread_rwlock_unlock( &rwlock, __FILE__, __LINE__ );
if ( SEND_QUEUE <= qSize )
continue; /* If the Queue is full, then blocking. */
break;
}
Pthread_rwlock_wrlock( &rwlock, __FILE__, __LINE__ );
local.handler = data->handler; /* Clone packet handler. */
strcpy( local.send_buf, data->send_buf ); /* Clone message to temporary array. */
if ( Queue4SendData_Insert( & ( node->Q ), local ) == -1 ) {
#ifdef LOGGING_ENABLE
sprintf( log, "%s(%d) -> Queue is full on client %s", __FILE__, __LINE__, node->IPStr );
Log( log );
#endif
exit( -1 );
} else {
}
Pthread_rwlock_unlock( &rwlock, __FILE__, __LINE__ );
/* Let the consumer read the inserted item. */
Pthread_cond_signal( &node->isEmpty, __FILE__, __LINE__ );
Pthread_mutex_unlock( &node->mutex, __FILE__, __LINE__ );
} else if ( TCPStatus == DISCONNECTED ) {
Client_Constructor( node->IPStr );
}
}