Event driven programming / epoll / typedef union / session data array


 
Thread Tools Search this Thread
Top Forums Programming Event driven programming / epoll / typedef union / session data array
# 1  
Old 09-17-2014
Event driven programming / epoll / typedef union / session data array

Sorry for the “word salad” subject, but I wanted to cast a wide net for help.

I've created an IP (Internet Protocol) server which serves HTTP, SMTP, and FTP requests.

As you probably know, they all require creating a socket, listening on it, accepting connections, and then having a short “conversation” as specified by the appropriate RFC (Request For Comment).

I've also created an automated HTTP client which reads web pages.

During testing of the HTTP client I think I encountered some performance issues possibly those in the C10K (Customer 10,000) problem - creating new threads for each connection became very slow.

So I decided to try event driven programming which purportedly solves the C10K problem.

I'm working with C on Linux. Epoll was added to Linux over 10 years ago to solve the C10K problem. It reduces the problem order from N squared (where N is the number of connections) to constant time for each event or something like that.

So I've struggled and gotten more and more confused but then found some most excellent code here...
epoll_wait: Linux sockets example

I think Lrrr (blogspot poster and probable code author) has provided a nearly perfect bit of self documented code! Lrrr has used print statements in nearly every block which make clear what is going on. Even the code formatting is the same as mine so I found the example clear and useful.

With virtually no modification I was able to incorporate Lrrr's code into my program in a test mode. When it runs, I'm able to open multiple telnet sessions and see the code in action in both the console and in the telnet sessions.

So far so good.

I've even found the two points in the code where I'd need to somehow modify it so it follows the RFC for a web-server for example.

Code:
rc = snprintf(buffer, sizeof(buffer), "Hello socket %d from server socket %d!\n", fd, sd);

and
Code:
printf("Received '");
sprint_buffer(buffer, rc);
printf("' from socket with sd %d\n", fd);

So I know where I need to integrate my old code and I know what structure I have to use.

epoll_create, epoll_ctl, and epoll_wait all use the epoll_event structure which in turn uses the “typedef union epoll_data”

Code:
typedef union epoll_data
{
  void        *ptr;
  int          fd;
  __uint32_t   u32;
  __uint64_t   u64;
} epoll_data_t;

struct epoll_event
{
  __uint32_t   events; /* Epoll events */
  epoll_data_t data;   /* User data variable */
};

I've never used a union before and this is where extra confusion starts. My vague understanding is that a union can hold any of the defined members - but only one-at-a-time.

The void pointer looks like it might be what I want to use, but the fd member is in-use so if I assigned to ptr, fd would be corrupted.

What I'd like is to somehow associate epoll_event.data with one member of a session data array. I envision this array to be comprised of structures which for pilot purposes could be simply 4 integers, say 1,2,3,4. Where data is written to the client, the first integer is printed and others shifted around so the second time data is written to the client the second integer is printed and so on.

I'll gladly entertain any thoughts or suggestions at this time.
# 2  
Old 09-21-2014
Resolved!

First let me credit this post by sachin...
c++ - How to use epoll_event data.ptr - Stack Overflow

which pointed out..
"The void *ptr and int fd both are inside a union inside the struct epoll_event. You should use either of them not both. Hence in your struct, add the field for fd as well and only link pointer to the struct in the ptr field of the epoll_event. This way when you get back your pointer then get the fd from it for further use."

In retrospect, it's all perfectly obvious, isn't it?

Here's what I added and edited...


I created a SessionData structure
Code:
struct SessionData
{
    int FileDescriptor;
    int Data1;
    int Data2;
    int Data3;
};

I ended up incrementing Data1 rather than rotating them to confirm operation as I desired. It's all good as they say.



There are six places where I modified Lrrr's original code.
Code:
    if (listen(sd, SOMAXCONN))
    {
       printf("Could not start listening on server socket %d: %m\n", sd);
       goto cleanup;
    }
    else
    {
       printf("Server socket %d started listening to address 'INADDR_ANY' and port %u\n", sd, port);
    }

    ev.events = EPOLLIN;
    //ev.data.u64 = 0LL; // 0 Long Long
    //ev.data.fd = sd;

    SessionData = malloc(sizeof(SessionData));
    SessionData->FileDescriptor = sd;
    //SessionData->Data1 = 1;
    //SessionData->Data2 = 2;
    //SessionData->Data3 = 3;
    ev.data.ptr = SessionData;

I commented out the original code to set the data value to the file descriptor since I wanted to use the pointer value. I don't need the data 1, 2, and 3 values here because this is the SERVER socket. Indeed there is NO session data for the server socket, but since the event loop treats all sockets the same, I had to use a session data record.


Code:
        for (i = 0; i < rval; i++)
        {
            events = epoll_events[i].events;
            //fd = epoll_events[i].data.fd;
            fd = ((struct SessionData *) epoll_events[i].data.ptr)->FileDescriptor;

Here is the start of the event handling loop. I now get the fd value from the session data structure.



Code:
                        printf("Sent '");
                        sprint_buffer(buffer, rc);
                        printf("' to socket with sd %d\n", fd);

                        ev.events = EPOLLIN;
                        //ev.data.u64 = 0LL;
                        //ev.data.fd = fd;
                        //(struct SessionData *) ev.data.ptr->FileDescriptor = fd;
                        ((struct SessionData *) ev.data.ptr)->FileDescriptor = fd;

Here, for some not-yet-clear reason, Lrrr moves fd BACK to where it had recently been taken FROM. I have dutifully modified it to use the session data structure.


Code:
if (inet_ntop(AF_INET, &peeraddr.sin_addr.s_addr, buffer, sizeof(buffer)) != NULL)
                    {
                        printf("Accepted connection from %s:%u, assigned new sd %d\n", buffer, ntohs(peeraddr.sin_port), clientsd);
                    } else
                    {
                        printf("Failed to convert address from binary to text form: %m\n");
                    }

                    ev.events = EPOLLIN;
                    //ev.data.u64 = 0LL;
                    //ev.data.fd = clientsd;
                    //((struct SessionData *) ev.data.ptr)->FileDescriptor = clientsd;
                    SessionData = malloc(sizeof(SessionData));
                    SessionData->FileDescriptor = clientsd;
    SessionData->Data1 = 1;
    SessionData->Data2 = 2;
    SessionData->Data3 = 3;
                    ev.data.ptr = SessionData;

Here is where a new client has connected. This is where the SessionData is needed. A new structure is malloc'd. I have yet to free this so I currently have a memory leak and will fix that next.


Code:
                        printf("Received '");
                        sprint_buffer(buffer, rc);
                        printf("' from socket with sd %d\n", fd);

                        ev.events = EPOLLIN | EPOLLOUT;
                        //ev.data.u64 = 0LL;
                        //ev.data.fd = fd;
                        ((struct SessionData *) ev.data.ptr)->FileDescriptor = fd;

Here again, fd is being copied back to where it came from a few moments earlier. I still do not see the need for this.



Code:
            if (events & EPOLLOUT)
            {
                if (fd != sd)
                {
                    rc = snprintf(buffer, sizeof(buffer), "Hello socket %d from server socket %d! Session data: %d\n", fd, sd, ((struct SessionData *) epoll_events[i].data.ptr)->Data1 ++);

Here I appended the output to expose the changing session data. Opening two telnet sessions showed the Data1 value incrementing and doing so independently for either session.



I also implemented a 10,000 msec (10 sec) timeout.
Code:
        while ((rval = epoll_wait(efd, epoll_events, EPOLL_ARRAY_SIZE, 10000)) < 0) // 10 second timeout
        {
            if ((rval < 0) && (errno != EINTR))
            {
                printf("EPoll on %d fds failed: %m\n", pollsize);
                goto cleanup;
            }
        }

        //if we include a timeout
        if (rval == 0)
        {
            // handle timeout and return to wait
            printf("EPoll timeout\n");
            //goto ReWait;
        }

I guess I am missing something to rationalize the use of the union type versus a structure so if anyone has an explanation regarding that, I'd be keen to hear it.

Cheers!

Last edited by John S.; 09-21-2014 at 02:34 PM.. Reason: Pasted wrong code.
# 3  
Old 09-21-2014
Yet again, you can't expect us to see what's going on when you only post tiny out-of-context snippets of your code showing none of your data structures.
# 4  
Old 09-21-2014
Big picture of code - sorry for not posting before...

My apologies to Corona688 and others following this.

Please remember, over 90% of this is Lrrr's code as found here.
epoll_wait: Linux sockets example

I've commented out moving fd BACK to where it had recently been taken FROM and it seems to work great.

I've also found one spot to free the SessionData and plug the memory leak.


Code:
// 10,000 to solve C10K problem, yes? YES. see http://lse.sourceforge.net/epoll/
#define EPOLL_ARRAY_SIZE   64


void sprint_buffer(const char *buffer, int size)
{
   int i;
   for (i = 0; i < size; i++)
   {
      if (isprint(buffer[i]))
         printf("%c", buffer[i]);
      else
         printf("\\0x%02X", buffer[i]);
   }
}






struct SessionData
{
    int FileDescriptor;
    int Data1;
    int Data2;
    int Data3;
};

void Tests()
{

    //http://epollwait.blogspot.com/2010/11/linux-sockets-example.html
    //
    //int main(int argc, char *argv[])
    //{
    int sd, efd, clientsd, fd; // socket descriptor, epoll file descriptor, client socket descriptor, file descriptor
    struct sockaddr_in bindaddr, peeraddr;
    socklen_t salen = sizeof(peeraddr);
    int pollsize = 1;
    struct epoll_event ev;
    struct epoll_event epoll_events[EPOLL_ARRAY_SIZE];
    uint32_t events;
    //unsigned short port = 20000;
    unsigned short port = 3333;
    int i, rval, on = 1;
    ssize_t rc;
    char buffer[1024];

    struct SessionData * SessionData;

    efd = epoll_create(pollsize);

    if (efd < 0)
    {
       printf("Could not create the epoll fd: %m");
       //return 1;
       return;
    }

    sd = socket(AF_INET, SOCK_STREAM, 0);
    if (sd < 0)
    {
        printf("Could not create new socket: %m\n");
        //return 2;
        return ;
    }

    printf("New socket created with sd %d\n", sd);

    if (fcntl(sd, F_SETFL, O_NONBLOCK))
    {
       printf("Could not make the socket non-blocking: %m\n");
       close(sd);
       //return 3;
       return ;
    }

    if (setsockopt(sd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)))
    {
       printf("Could not set socket %d option for reusability: %m\n", sd);
       close(sd);
       //return 4;
       return ;
    }

    bindaddr.sin_addr.s_addr = htonl(INADDR_ANY);
    bindaddr.sin_family= AF_INET;
    bindaddr.sin_port = htons(port);

    if (bind(sd, (struct sockaddr *) &bindaddr, sizeof(struct sockaddr_in)) < 0)
    {
       printf("Could not bind socket %d to address 'INADDR_ANY' and port %u: %m", sd, port);
       close(sd);
       //return 5;
       return ;
    }
    else
    {
       printf("Bound socket %d to address 'INADDR_ANY' and port %u\n", sd, port);
    }

    if (listen(sd, SOMAXCONN))
    {
       printf("Could not start listening on server socket %d: %m\n", sd);
       goto cleanup;
    }
    else
    {
       printf("Server socket %d started listening to address 'INADDR_ANY' and port %u\n", sd, port);
    }

    ev.events = EPOLLIN;
    //ev.data.u64 = 0LL; // 0 Long Long
    //ev.data.fd = sd;

    SessionData = malloc(sizeof(SessionData));
    SessionData->FileDescriptor = sd;
    //SessionData->Data1 = 1;
    //SessionData->Data2 = 2;
    //SessionData->Data3 = 3;
    ev.data.ptr = SessionData;

    //ev.data.ptr = 0;






    if (epoll_ctl(efd, EPOLL_CTL_ADD, sd, &ev) < 0)
    {
        printf("Couldn't add server socket %d to epoll set: %m\n", sd);
        goto cleanup;
    }

    for (;;)
    {
        //ReWait:

        printf("Starting epoll_wait on %d fds\n", pollsize);

        //while ((rval = epoll_wait(efd, epoll_events, EPOLL_ARRAY_SIZE, -1)) < 0)
        while ((rval = epoll_wait(efd, epoll_events, EPOLL_ARRAY_SIZE, 10000)) < 0) // 10 second timeout
        {
            if ((rval < 0) && (errno != EINTR))
            {
                printf("EPoll on %d fds failed: %m\n", pollsize);
                goto cleanup;
            }
        }

        //if we include a timeout
        if (rval == 0)
        {
            // handle timeout and return to wait
            printf("EPoll timeout\n");
            //goto ReWait;
        }



        for (i = 0; i < rval; i++)
        {
            events = epoll_events[i].events;
            //fd = epoll_events[i].data.fd;
            fd = ((struct SessionData *) epoll_events[i].data.ptr)->FileDescriptor;

            if (events & EPOLLERR)
            {
                if (fd == sd)
                {
                    printf("EPoll on %d fds failed: %m\n", pollsize);
                    // could free SessionData here but why bother?
                    goto cleanup;
                } else
                {
                    printf("Closing socket with sd %d\n", fd);
                    shutdown(fd, SHUT_RDWR);
                    close(fd);
                    free(((struct SessionData *) epoll_events[i].data.ptr)); // is free needed anywhere else?
                    continue;
                }
            }

            if (events & EPOLLWAKEUP) // MY ADDITION
            {
                printf("EPoll timeout\n");
            }

            if (events & EPOLLHUP)
            {
                if (fd == sd)
                {
                    printf("EPoll on %d fds failed: %m\n", pollsize);
                    goto cleanup;
                } else
                {
                    printf("Closing socket with sd %d\n", fd);
                    shutdown(fd, SHUT_RDWR);
                    close(fd);
                    continue;
                }
            }

            if (events & EPOLLRDHUP)
            {
                if (fd == sd)
                {
                    printf("EPoll on %d fds failed: %m\n", pollsize);
                    goto cleanup;
                } else
                {
                    printf("Closing socket with sd %d\n", fd);
                    shutdown(fd, SHUT_RDWR);
                    close(fd);
                    continue;
                }
            }

            if (events & EPOLLOUT)
            {
                if (fd != sd)
                {
                    //rc = snprintf(buffer, sizeof(buffer), "Hello socket %d from server socket %d!\n", fd, sd);
                    //rc = snprintf(buffer, sizeof(buffer), "Hello socket %d from server socket %d!\n", fd, sd);
                    //rc = snprintf(buffer, sizeof(buffer), "Hello socket %d from server socket %d!\n", fd, sd);
                    //rc = snprintf(buffer, sizeof(buffer), "Hello socket %d from server socket %d!\n", fd, sd);
                    rc = snprintf(buffer, sizeof(buffer), "Hello socket %d from server socket %d! Session data: %d\n", fd, sd, ((struct SessionData *) epoll_events[i].data.ptr)->Data1 ++);
                    while ((rc = send(fd, buffer, rc, 0)) < 0)
                    {
                        if ((fd < 0) && (errno != EINTR))
                        {
                            printf("Send to socket %d failed: %m\n", fd);
                            pollsize--;
                            shutdown(fd, SHUT_RDWR);
                            close(fd);
                            continue;
                        }
                    }

                    if (rc == 0)
                    {
                        printf("Closing socket with sd %d\n", fd);
                        pollsize--;
                        shutdown(fd, SHUT_RDWR);
                        close(fd);
                        continue;
                    } else if (rc > 0)
                    {
                        printf("Sent '");
                        sprint_buffer(buffer, rc);
                        printf("' to socket with sd %d\n", fd);

                        ev.events = EPOLLIN;
                        //ev.data.u64 = 0LL;
                        //ev.data.fd = fd;
                        //(struct SessionData *) ev.data.ptr->FileDescriptor = fd;
         //               ((struct SessionData *) ev.data.ptr)->FileDescriptor = fd;

                        if (epoll_ctl(efd, EPOLL_CTL_MOD, fd, &ev) < 0)
                        {
                            printf("Couldn't modify client socket %d in epoll set: %m\n", fd);
                            goto cleanup;
                        }
                    }
                }
            }

            if (events & EPOLLIN)
            {
                if (fd == sd)
                {
                    while ((clientsd = accept(sd, (struct sockaddr *) &peeraddr, &salen)) < 0)
                    {
                        if ((clientsd < 0) && (errno != EINTR))
                        {
                            printf("Accept on socket %d failed: %m\n", sd);
                            goto cleanup;
                        }
                    }

                    if (inet_ntop(AF_INET, &peeraddr.sin_addr.s_addr, buffer, sizeof(buffer)) != NULL)
                    {
                        printf("Accepted connection from %s:%u, assigned new sd %d\n", buffer, ntohs(peeraddr.sin_port), clientsd);
                    } else
                    {
                        printf("Failed to convert address from binary to text form: %m\n");
                    }

                    ev.events = EPOLLIN;
                    //ev.data.u64 = 0LL;
                    //ev.data.fd = clientsd;
                    //((struct SessionData *) ev.data.ptr)->FileDescriptor = clientsd;
                    SessionData = malloc(sizeof(SessionData));
                    SessionData->FileDescriptor = clientsd;
    SessionData->Data1 = 1;
    SessionData->Data2 = 2;
    SessionData->Data3 = 3;
                    ev.data.ptr = SessionData;

                    if (epoll_ctl(efd, EPOLL_CTL_ADD, clientsd, &ev) < 0)
                    {
                        printf("Couldn't add client socket %d to epoll set: %m\n", clientsd);
                        goto cleanup;
                    }

                    pollsize++;
                    } else
                {
                    while ((rc = recv(fd, buffer, sizeof(buffer), 0)) < 0)
                    {
                        if ((fd < 0) && (errno != EINTR))
                        {
                            printf("Receive from socket %d failed: %m\n", fd);
                            pollsize--;
                            shutdown(fd, SHUT_RDWR);
                            close(fd);
                            continue;
                        }
                    }

                    if (rc == 0)
                    {
                        printf("Closing socket with sd %d\n", fd);
                        pollsize--;
                        shutdown(fd, SHUT_RDWR);
                        close(fd);
                        continue;
                    } else if (rc > 0)
                    {
                        //printf("Received '");
                        //printf("Received '");
                        //printf("Received '");
                        //printf("Received '");
                        //printf("Received '");
                        printf("Received '");
                        sprint_buffer(buffer, rc);
                        printf("' from socket with sd %d\n", fd);

                        ev.events = EPOLLIN | EPOLLOUT;
                        //ev.data.u64 = 0LL;
                        //ev.data.fd = fd;
            //            ((struct SessionData *) ev.data.ptr)->FileDescriptor = fd;

                        if (epoll_ctl(efd, EPOLL_CTL_MOD, fd, &ev) < 0)
                        {
                            printf("Couldn't modify client socket %d in epoll set: %m\n", fd);
                            goto cleanup;
                        }
                    }
                }
            }
        }
    }

cleanup:
    shutdown(sd, SHUT_RDWR);
    close(sd);

    return ;

Login or Register to Ask a Question

Previous Thread | Next Thread

1 More Discussions You Might Find Interesting

1. Web Development

Intersection and union of array by hash

Hi, A piece of script from Perl-cookbook I do not understand, and post here for explanation. The purpose is to find the element in either array (union), and in both array (intersection). Thank you in advance. @a=qw(1 3 5 6 7 8); @b=qw(2 3 5 7 9); foreach $e (@a, @b) {$union{$e}++ &&... (3 Replies)
Discussion started by: yifangt
3 Replies
Login or Register to Ask a Question