Slow Producer - Fast consumer


 
Thread Tools Search this Thread
Top Forums UNIX for Dummies Questions & Answers Slow Producer - Fast consumer
# 1  
Old 11-05-2010
Slow Producer - Fast consumer

I would like to loop through a set of directories, performing operation(s) on each one. The basic script** is
Code:
dirs=`find . -name .svn -print`

for f in $dirs; do 
   echo "Processing $f directory .." 
done

Fine and dandy, but here is the problem: the find expression must complete before the do ... done begins. The find process is relatively slow. I would like to have the do .. done work on entries as they are appended to the list. So in essence there are two threads, the find works in the background, and the do consumes $dirs until empty.

How would the do .. done loop "know" that the $dirs is still being populated? Maybe this needs to be changed completely using another method.

Thanks in advance.



** Some of you will recognize the ".svn", as Subversion stores additional copies of data on the local machine in ".svn" directories.

Last edited by vbe; 11-05-2010 at 03:29 PM.. Reason: Code tags please
# 2  
Old 11-05-2010
maybe something like

Code:
do_something(){
echo "Processing $1 directory"
# any command could be added here...
}
find . -name .svn -print | xargs -n1 do_something

# 3  
Old 11-05-2010
Put your processing into a separate script and call that script with your find commend:

Code:
find . -name .svn -exec /path/to/your/script/process.sh {} \& \;

I don't know offhand how to background your script directly, but if you can't background it using directly from the find command, just create a "startprocess.sh" script to background your actual shell:

Code:
#/bin/sh
/path/to/your/script/process.sh $* &

The find command would then look something like this:

Code:
find . -name .svn -exec /path/to/your/script/startprocess.sh {} \;

# 4  
Old 11-05-2010
This has least latency:
Code:
find . . . -exec 'cmd'

but lacks economy of scale and pipeline-parallel processing compared to:
Code:
find . . . | xargs -n999 'cmd'

This shell solution is somewhere in the middle, some latency from the pipe and no economy of scale but pipeline-parallel:
Code:
find . . . | while read l
do
 whatever $l
done

All do not choke, even on infinitely many files.

If you have highly variable waits and load, I wrote this simple but fast xargs with pipeline-parallelism in C:
Code:
$ cat mysrc/fxargs2.c
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <errno.h>
#include <unistd.h>
#include <signal.h>
#include <strings.h>
#include <poll.h>
static  char usage[] =
"\n"
"Usage: fxargs2 [ -n <args_per_exec> ] [ -v ] [ -p ] <cmd> [ <cmd_arg> ... ]\n"
"\n"
"Reads arguments as lines from standard input and executes:\n"
" <cmd> [ <cmd_args> ... ] <args_from_stdin>\n"
"Each line becomes one argument.  The number of <args_from_stdin> is limited\n"
"by <args_per_exec> (default 1024).  The command is executed when either:\n"
" - the total number of args from standard input is <args_per_cmd>, or\n"
" - the buffer has ( 80 * <args_per_cmd> ) unexecuted bytes of input, or\n"
" - EOF is detected with any args from standard input.\n"
"The <cmd> [ <cmd_args> ... ] is never executed alone.\n"
"While a command is executing, reading resumes, but before another command\n"
"is executed, the prior command must return a status.\n"
"With -v, any abnormal child state returned is reported.\n"
"With -p, any child terminating on SIGPIPE causes a normal exit.\n"
"\n" ;
static size_t p_read( int fd, char *buf, size_t len )
{
        static int ret = 0 ;
        static int eof_retry = 0 ;
        int fsf ;
        do
        {
                switch ( ret = read( fd, buf, len ) )
                {
                case -1:
                        switch( errno )
                        {
                        case EAGAIN:
                                poll( 0, 0, 1 );
                        case EINTR:
                                continue ;
                        default:
                                perror( "fxargs2: stdin" );
                                exit( 1 );
                        }
                case 0:
                        if ( ++eof_retry > 50 )
                        {
                                return 0 ;
                        }
                        poll( 0, 0, 1 );
                        ret = -1 ;
                        continue ;
                default:
                        return ret ;
                }
        } while ( ret < 0 );
        return ret ;
}
static void p_wait( int v, int p )
{
        int     cpid ;
        int     cstat ;
        cstat = 0 ;
        if ( 0 > ( cpid = wait( &cstat ) ) )
        {
                if ( errno == ECHILD )
                        return ;
                perror( "fxargs2: wait()" );
                exit( 1 );
        }
        if ( p
          && ( cstat & 0xffff ) == SIGPIPE )
        {
                exit( 0 );
        }
        if ( !v )
                return ;
        switch ( cstat & 0xff )
        {
        case 0:
                if ( cstat )
                        fprintf( stderr, "\nfxargs2: Process %d exit %d\n",
                                                cpid, cstat>>8 );
                break ;
        case WSTOPFLG :
                fprintf( stderr, "\nfxargs2: Process %d stopped on signal %d\n",
                                        cpid, cstat>>8 );
                break ;
        default:
                if ( !( cstat & 0xff00 ) )
                        fprintf( stderr, "\nfxargs2: Process %d term by sig %d\n",
                                                cpid, cstat & 0xff );
                else
                        fprintf( stderr, "\nfxargs2: Process %d wait(%d-%d)\n",
                                                cpid, cstat>>8, cstat & 0xff );
                break ;
        }
        return ;
}
int main( int argc, char **argv ){
        int     c = 0 ;
        int     execs = 0 ;
        int     i ;
        int     n = 1024 ;
        int     rret ;
        int     p = 0 ;
        int     v = 0 ;
        size_t  b ;
        size_t  cib = 0 ;
        char    **argv2 = NULL ;
        char    *buf = NULL ;
        char    *cp, *cp2 ;
        for ( i = 1 ; i < argc ; i++ )
        {
                if ( !strcmp( argv[i], "-v" ) )
                {
                        v = 1 ;
                        continue ;
                }
                if ( !strcmp( argv[i], "-p" ) )
                {
                        p = 1 ;
                        continue ;
                }
                if ( !memcmp( argv[i], "-n", 2 ) )
                {
                        if ( argv[i][2] )
                        {
                                n = atoi( argv[i] + 2 ) ;
                        }
                        else if ( ++i < argc )
                        {
                                n = atoi( argv[i] );
                        }
                        else
                        {
                                n = 0 ;
                        }
                        if ( 1 > n )
                        {
                                fputs( usage, stderr );
                                exit( 1 );
                        }
                        continue ;
                }
                if ( !c )
                {
                        if ( '-' == argv[i][0] )
                        {
                                fputs( usage, stderr );
                                exit( 1 );
                        }
                        n += ( argc - i + 1 );
                        b = ( 80 * n ) + 1 ;
                        if ( !( argv2
                                = (char**)malloc( n-- * sizeof( char* ) ) )
                          || !( buf = (char*)malloc( b-- ) ) )
                        {
                                perror( "fxargs2: malloc()" );
                                exit( 1 );
                        }
                }
                argv2[c++] = argv[i] ;
        }
        if ( !c )
        {
                fputs( usage, stderr );
                exit( 1 );
        }
        while ( 0 < ( rret = p_read( 0, buf + cib, b - cib ) ) )
        {
                i = c ;
                cp = buf ;
                cib += rret ;
                buf[cib] = NULL ;
                while ( i < n )
                {
                        if ( !( cp2 = strchr( cp, '\n' ) ) )
                        {
                                break ;
                        }
                        argv2[i++] = cp ;
                        *cp2 = NULL ;
                        cp = cp2 + 1 ;
                }
                if ( i == c )
                {
                        if ( cib < b )
                                continue ;
                        fputs( "\nfxargs2: Fatal: line too long!\n", stderr );
                        exit( 1 );
                }
                argv2[i] = NULL ;
                if ( execs++ )
                {
                        p_wait( v, p );
                }
                switch( vfork() )
                {
                case -1:
                        perror( "fxargs2: vfork()" );
                        exit( 1 );
                case 0: /* child */
                        freopen( "/dev/null", "r", stdin );
                        poll(0,0,1);
                        execvp( *argv2, argv2 );
                        perror( "fxargs2: execvp(cmd)" );
                        exit( 1 );
                default: /* parent */
                        break ;
                }
                cib -= ( cp - buf );
                memmove( buf, cp, cib );
        }
        if ( execs )
        {
                p_wait( v, p );
        }
        exit( 0 );
}


Last edited by DGPickett; 11-05-2010 at 05:50 PM..
# 5  
Old 11-05-2010
MySQL

Thanks all. Great place for those tricky questions.

Since I am using a GNU port on Windows ctsgnb's the do_something solution
would not work:Smilie

Code:
C:\BashFun>bash ds.sh
xargs: do_something: No such file or directory

but DGPickett's works great:Smilie

Code:
find . -name .svn -print | while read d
do 
    echo "Processing $d directory"
done

Kudos to achenle Smilie for pointing out I could have done this with a background task. That would be more fun, if I had the time.

Cheers!

Last edited by Neo; 11-05-2010 at 09:35 PM..
# 6  
Old 11-05-2010
Maybe the function do_something need to be exorted in the environment before being called
# 7  
Old 11-07-2010
Hey DGPickett, you remind me of ... well, me. I will have to try fxargs.c this at my next downtime. And ctsgnb, I try never to exort anything Smilie, but the export might work. Smilie I have another thing I am working on where this may help!
 
Login or Register to Ask a Question

Previous Thread | Next Thread

10 More Discussions You Might Find Interesting

1. Homework & Coursework Questions

Abnormal producer consumer problem driving me nuts

Use and complete the template provided. The entire template must be completed. If you don't, your post may be deleted! 1. The problem statement, all variables and given/known data: normally, i hate asking someone to do my homework for me but am getting desperate right now. i have a project... (1 Reply)
Discussion started by: alexantosh
1 Replies

2. Homework & Coursework Questions

producer consumer semaphore

Control two exclusively shared resources(semaphore). The two resources are two files. The producer will write even numbers to one file, and odd numbers to another one. The consumer respectively reads from each file until it gets 5 even numbers and 5 odd numbers. Can any one help me with the... (1 Reply)
Discussion started by: gokult
1 Replies

3. UNIX for Dummies Questions & Answers

producer consumer

Control two exclusively shared resources. The two resources are two files. The producer will write even numbers to one file, and odd numbers to another one. The consumer respectively reads from each file until it gets 5 even numbers and 5 odd numbers. Can any one help me with the code. ... (3 Replies)
Discussion started by: gokult
3 Replies

4. Programming

producer consumer semaphore

Control two exclusively shared resources(semaphore). The two resources are two files. The producer will write even numbers to one file, and odd numbers to another one. The consumer respectively reads from each file until it gets 5 even numbers and 5 odd numbers. Can any one help me with the... (0 Replies)
Discussion started by: gokult
0 Replies

5. UNIX for Advanced & Expert Users

producer consumer

Control two exclusively shared resources(semaphore). The two resources are two files. The producer will write even numbers to one file, and odd numbers to another one. The consumer respectively reads from each file until it gets 5 even numbers and 5 odd numbers. Can any one help me with the... (0 Replies)
Discussion started by: gokult
0 Replies

6. Shell Programming and Scripting

producer consumer

Control two exclusively shared resources. The two resources are two files. The producer will write even numbers to one file, and odd numbers to another one. The consumer respectively reads from each file until it gets 5 even numbers and 5 odd numbers. Can any one help me with the code. ... (1 Reply)
Discussion started by: gokult
1 Replies

7. Programming

producer consumer

Control two exclusively shared resources. The two resources are two files. The producer will write even numbers to one file, and odd numbers to another one. The consumer respectively reads from each file until it gets 5 even numbers and 5 odd numbers. Can any one help me with the code. ... (0 Replies)
Discussion started by: gokult
0 Replies

8. UNIX for Dummies Questions & Answers

Slow Producer - Fast consumer

I would like to loop through a set of directories, performing operation(s) on each one. The basic script** is dirs=`find . -name .svn -print` for f in $dirs; do echo "Processing $f directory .." done Fine and dandy, but here is the problem: the find expression must complete... (2 Replies)
Discussion started by: jakeo25
2 Replies

9. Programming

producer-consumer problem

The intention of the program. Create N threads with 2 shared memories. One shared memory to write, one shared memory to recieve. Consumer creates 2 shared memory to share with producer. I need H threads for the producer as well(somebody help on it..) Also another question, would segment_id... (2 Replies)
Discussion started by: joey
2 Replies

10. UNIX for Dummies Questions & Answers

Producer/Consumer

Hi all, I have an Producer/Consumer program in C(using shared memory and semaphores).It is working fine with sleep calls after every read/write operation.Each one is a independent program.Now if i remove the sleep calls in consumer,it goes on waiting in the loop till the producer puts some valid... (3 Replies)
Discussion started by: poorni_uma
3 Replies
Login or Register to Ask a Question