ncpu=$(cat /proc/cpuinfo | grep ^processor | wc -l)
ncpu=len([1 for l in open("/proc/cpuinfo","r") if
l.startswith("processor")])
#define __USE_GNU
#include <sched.h>
int count_cpu() {
cpu_set_t set;
sched_getaffinity(0,sizeof(cpu_set_t),&set);
int i,count=0;
for(i=0;i<CPU_SETSIZE;i++)
if(CPU_ISSET(i,&set)) count++;
return count;
}
Typically, for CPU-intensive operations, one runs as many tasks
(threads or processes) as CPUs: less would be under-using available
resources and more would cause unnecessary context switches.
trap 'kill -HUP 0' 0 # handle exit gracefully
for((cpu=0; cpu<ncpu; cpu++)); do
for((i=cpu; i<n; i+=ncpu)); do
operation $i
done &
done
wait
prefix=/tmp/task_lock.$$ # some non-existing name
rm -f $prefix*
touch $prefix.0
trap 'kill -HUP 0' 0 # handle exit gracefully
for((cpu=0; cpu<ncpu; cpu++)); do
for((i=0; i<n; i++)); do
if mv $prefix.$i $prefix.$[i+1] 2> /dev/null; then
operation $i
fi
done &
done
wait
This assumes that mv
is an atomic operation. It is
therefore advisable to take a lock file on a local disk (/tmp
), because files
on NFS may get de-synchronized.
prefix=/tmp/task_lock.$$ # some non-existing name
rm -f $prefix*
touch $prefix.0
trap 'kill -HUP 0' 0 # handle exit gracefully
for machine in edmund cepheus anonio; do
for((i=0; i<n; i++)); do
if mv $prefix.$i $prefix.$[i+1] 2> /dev/null; then
ssh $machine "operation.sh $i"
fi
done &
done
wait
oarsub -lnodes=10 -I
(replace -I
with a script name when it works):
prefix=/tmp/task_lock.$$ # some non-existing name
rm -f $prefix*
touch $prefix.0
trap 'kill -HUP 0' 0 # handle exit gracefully
for machine in $( cat $OAR_NODEFILE ); do
for((i=0; i<n; i++)); do
if mv $prefix.$i $prefix.$[i+1] 2> /dev/null; then
oarsh $machine "operation.sh $i"
fi
done &
done
wait
import multiprocessing from multiprocessing.dummy import Pool as ThreadPool def operation(i): # complicated processing on element i pool = ThreadPool(multiprocessing.cpu_count()) pool.map(operation, range(n))
multiprocessing.Pool
instead of threads.
ncpu
threads with:
#include <omp.h> #pragma omp parallel for num_threads(ncpu) schedule(dynamic) for(i=0; i<n; i++) { /* complicated processing on element i */ }
schedule(dynamic)
can be left out. Be careful to declare local variables used in the "complicated processing" part inside the braces, else they are shared between threads (fireworks!).
-fopenmp
flag.
void operation(void *arg, int tid, int i) {
/* complicated processing on element i */
/* tid is the index in 0..ncpu-1 of the thread */
/* arg is an arbitrary argument */
}
compute_tasks(n, ncpu, &operation, arbitrary_argument);
It is tempting to implement this by checking ("polling") every x seconds if an event has occured, or a state has changed. This is usually not advisable: if x is small or 0, the checking consumes resources whithout doing anything useful most of the time ("busy wait"); if x is large, event processing will not be reactive.
To avoid busy waits, event processing code should wait for events like
the end of a subprocess (wait
in shell) or thread
(pthread_join
in C), data on a file descriptor
(read
or select
in C), or synchonization
events on threaded code (pthread_mutex_lock
,
pthread_cond_wait
in C).