在《POSIX多线程程序设计》中,作者David R. Butenhof给我们展示了诸多实用pthread_mutex_t 和 pthread_cond_t构建的线程同步工具,我最喜欢的两个是barrier和rwlock。所以用C实现并在虚拟机上爽了一把。先贴出代码以及注释,以供大家查阅,共同进步。
下载源代码
barrier用于停止线程,直到所有在barrier的线程都到达当前状态,才返回。barrier经常用于确保某些并行算法中所有合作的线程到达同一点。比如启动了N个线程对某大数组进行分段处理,在所有线程处理完之后,再对结果进行合并或展示,这个时候就可以使用barrier。其作用这时就相当于Win 下的WaitForMutipleObjects。
要使所有线程达到同一状态,然后阻塞,最简单的想法就是保存当前已到达指定状态的线程数,如果线程数量与指定的最大线程数不同,线程进入等待状态,相同,达到预设条件,唤醒其他线程。
要使线程等待,在POSIX中,就理所当然应该使用pthread_cond_t作为等待的对象。同时要修改当前线程的计数,需要一个互斥变量,阻止线程同时读写这个计数。barrier的声明如下:
/* * Structure describing a barrier. */ typedef struct barrier_tag { pthread_mutex_t mutex; /* Control access to barrier */ pthread_cond_t cv; /* Wait for barrier */ int valid; /* Set when valid */ int threshold; /* number of threads required */ int counter; /* current number of threads */ unsigned long cycle; /* count cycles */ } barrier_t;要提供的操作barrier的api也只需要三个:init, wait, destroy。
/* * Define barrier operations. */ extern int barrier_init(barrier_t * barrier, int count); /* dynamic initialization of barriers */ extern int barrier_destroy(barrier_t * barrier); /* destroy the barrier */ extern int barrier_wait(barrier_t * barrier); /* wait until the barrier is actived */
此函数用于初始化barrier_t,分别初始化mutex, cond变量,设置等待线程和需要等待线程总数,设置谓语动词cycle,设置barrier有效标志
/* * Initialize a barrier for use. */ int barrier_init(barrier_t * barrier, int count) { int status; barrier->threshold = barrier->counter = count; barrier->cycle = 0; status = pthread_mutex_init(&barrier->mutex, NULL); if (status != 0) { return status; } status = pthread_cond_init(&barrier->cv, NULL); if (status != 0) { pthread_mutex_destroy(&barrier->mutex); return status; } barrier->valid = BARRIER_VALID; return 0; }
销毁barrier, 为了防止重复销毁barrier, 先对valid标志进行检查,然后置空valid标志,这样就可以阻止其他线程再试图等待该barrier。最后一一销毁mutex和cond。
/* * Destroy a barrier when done use it. */ int barrier_destroy(barrier_t * barrier) { int status, status2; if (barrier->valid != BARRIER_VALID) { return EINVAL; } /* Set barrier invalid. */ status = pthread_mutex_lock(&barrier->mutex); if (status != 0) { return status; } if (barrier->counter != barrier->threshold) { pthread_mutex_unlock(&barrier->mutex); return EBUSY; } barrier->valid = 0; status = pthread_mutex_unlock(&barrier->mutex); if (status != 0) { return status; } status = pthread_mutex_destroy(&barrier->mutex); status2 = pthread_cond_destroy(&barrier->cv); return (status != 0 ? status : status2); }
等待所有线程到达同一状态点。试图wait时,先检查该barrier是否有效。然后互斥修改barrier->count。如果count为0,唤醒所有其他线程。否则等待cv。
/* * Wait all threads reached. */ int barrier_wait(barrier_t * barrier) { int status, cycle, cancel, tmp; if (barrier->valid != BARRIER_VALID) { return EINVAL; } status = pthread_mutex_lock(&barrier->mutex); if (status != 0) { return status; } cycle = barrier->cycle; /* If the last thread arrived, wake others */ if (--barrier->counter == 0) { barrier->counter = barrier->threshold; barrier->cycle ++; status = pthread_cond_broadcast(&barrier->cv); /* The last thread return -1 rather than 0, so that * it can be used to do some special serial code following * the barrier. */ if (status == 0) { status = -1; } } else { /* Wait with cancellation disabled, because barrier_wait * should not be a cancellation point. */ pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &cancel); while (cycle == barrier->cycle) { status = pthread_cond_wait(&barrier->cv, &barrier->mutex); if (status != 0) { break; } } pthread_setcancelstate(cancel, &tmp); } pthread_mutex_unlock(&barrier->mutex); return status; }
完整代码以及测试用例,在这里。
读写锁用于线程之间写异步,读同步操作(当然您也可以反过来,如果确实需要的话:P)。这里给出一个读优先的例子。
实现读写锁,只需要,在写的时候,查看当前是否有读或写线程,如果没有,那么设置当前有一个写线程,执行写操作;如果有等待被唤醒。而在读的时候只需要查看有没有写线程,如果没有,直接去读,不管是否有无其他读线程;如果有,则等待被唤醒。
要设计一个读写锁,需要一个mutex用于保护读写变量的安全,一个可读条件,一个可写条件。另外还需要当前读变量数,当前写变量数,当前等待的读变量数,当前等待的写变量数。
读写锁的结构如下:
typedef struct rwlock_tag{ pthread_mutex_t mutex; /* Access locker */ pthread_cond_t read; /* Wait for read */ pthread_cond_t write; /* Wait for write */ int r_wait; /* Waiting readers */ int w_wait; /* Waiting writers */ int r_active; /* Activing readers */ int w_active; /* Activing writers */ int valid; /* Set when valid */ }rwlock_t;
extern int rwlock_init(rwlock_t * rwlock); extern int rwlock_destroy(rwlock_t * rwlock); extern int rwlock_readlock(rwlock_t * rwlock); extern int rwlock_tryreadlock(rwlock_t * rwlock); extern int rwlock_writelock(rwlock_t * rwlock); extern int rwlock_trywritelock(rwlock_t * rwlock); extern int rwlock_readunlock(rwlock_t * rwlock); extern int rwlock_writeunlock(rwlock_t * rwlock);
实现读的原理前面已经叙述过:检查当前有木有正在读的线程,如果木有,增加正在读线程的计数,返回成功。由于读线程可以被取消,为了保证其他的读写线程可以正确工作,不能破坏rwlock的内部变量,当线程被取消时需要进行清理工作。明白原理,直接看代码吧。
static void rwlock_readcleanup(void * arg) { rwlock_t * rwlock = (rwlock_t *)arg; --rwlock->r_wait; pthread_mutex_unlock(&rwlock->mutex); } int rwlock_readlock(rwlock_t * rwlock) { int status; if (rwlock->valid != RWLOCK_VALID) { return EINVAL; } status = pthread_mutex_lock(&rwlock->mutex); if (status != 0) { return status; } if (rwlock->w_active > 0) { rwlock->r_wait++; /* As read lock allow thread be canceled, * set cleanup to release resource. */ pthread_cleanup_push(rwlock_readcleanup, (void *)rwlock); while (rwlock->w_active > 0) { status = pthread_cond_wait(&rwlock->read, &rwlock->mutex); if (status != 0) { break; } } pthread_cleanup_pop(0); rwlock->r_wait--; } if (status == 0) { rwlock->r_active ++; } pthread_mutex_unlock(&rwlock->mutex); return status; }
读解锁时,只需要减少当前活动读线程计数,如果有待写线程,叫醒写线程。
int rwlock_readunlock(rwlock_t * rwlock) { int status, status2; if (rwlock->valid != RWLOCK_VALID) { return EINVAL; } status = pthread_mutex_unlock(&rwlock->mutex); if (status != 0) { return status; } if (--rwlock->r_active == 0) { if (rwlock->w_wait > 0){ status = pthread_cond_signal(&rwlock->write); } } status2 = pthread_mutex_unlock(&rwlock->mutex); return status != 0 ? status : status2; }
最近有些迷茫,原因是年龄的增长和自我感觉个人能力的不足以及目标的遥不可及。特别是每当看到大牛写的代码时,各种羡慕嫉妒恨,时常感到一阵压抑,脑中会充满各种问题,自己什么时候才能变成这样?有什么捷径?将来怎么办?
冰冻三尺,非一日之寒;为山九仞,岂一日之功。又:合抱之木,生于毫末;九层之台,起于累土;千里之行,始于足下。不积跬步,无以至千里。所以用心做好自己,从平时的点滴做起,每天进步一点点。关键时刻,然后抓住机遇,实现自己的目标。
大家共勉之!