使用windows API实现4个线程,其中一个线程向有限容量的缓冲区中生产固定总数的“产品”,其余线程从此缓冲区拿取“产品”,最后显示这些“消费者”线程各自拿取了多少。
下面是程序将要用到的结构和类型。注意到缓冲区为空并不代表生产者造完了所有“产品”,所以用NoMoreData来指示是否所有“产品”都已生产完毕。
#define BUFFER 512//缓冲区大小 #define MAX_ITEM 1280//生产者一共要造1024个产品 #define NTHREAD 4 typedef struct{ DWORD tid;//线程ID,指示谁生产的产品 int dat;//数据内容,产品编号 } ITEM, *LPITEM; DWORD _stdcall ProducerRoutine(LPVOID); DWORD _stdcall ConsumerRoutine(LPVOID); ITEM buffer[BUFFER];//一个环形缓冲 int front; int rear; HANDLE mutBuffer;//保证缓冲区的互斥访问 HANDLE semFull;//可以消费的产品资源数 HANDLE semEmpty;//可以存放新产品的空位数 BOOL NoMoreData;//生产者造完所有产品后其值为“真”
主函数逻辑不复杂,新建4个线程,为其指定任务和参数后立即参与执行。打开缓冲区的互斥锁,随后等待四个线程正常结束。最后把这些消费者线程各自消费的产品数量相加并显示出来。
int main() { LPVOID lpParam; HANDLE hThreads[NTHREAD]; DWORD dwThreadIds[NTHREAD]; DWORD dwResults[NTHREAD]; LPTHREAD_START_ROUTINE lpThreadRoutine; int i, nProducts = MAX_ITEM; mutBuffer = CreateMutex(NULL, TRUE, NULL); semFull = CreateSemaphore(NULL, 0, BUFFER, NULL); semEmpty = CreateSemaphore(NULL, BUFFER, BUFFER, NULL); NoMoreData = FALSE; for (i = 0; i < NTHREAD; i++) { if (i == NTHREAD - 1) { lpThreadRoutine = ProducerRoutine; lpParam = &nProducts;//最后一个当生产者 } else { lpThreadRoutine = ConsumerRoutine; lpParam = NULL;//其余当消费者 } hThreads[i] = CreateThread(NULL, 0, lpThreadRoutine, &nProducts, 0, &dwThreadIds[i]); if (hThreads[i] == INVALID_HANDLE_VALUE) { printf("can't create new thread[%d]\n", i); ExitProcess(1); } } ReleaseMutex(mutBuffer);//解除互斥量以使缓冲区可用 WaitForMultipleObjects(NTHREAD, hThreads, TRUE, INFINITE); for (i = 0; i < NTHREAD; i++) { GetExitCodeThread(hThreads[i], &dwResults[i]); CloseHandle(hThreads[i]); } CloseHandle(semFull); CloseHandle(semEmpty); CloseHandle(mutBuffer); for (i = 0; i < NTHREAD - 1; i++) dwResults[NTHREAD - 1] += dwResults[i]; printf("%ld items were processed\n", dwResults[NTHREAD - 1]); return 0; }
生产者与消费者各自的任务:
DWORD _stdcall ProducerRoutine(LPVOID lpArg) { int nProduct; ITEM item; DWORD ThisTid; nProduct = *(int*)lpArg; ThisTid = GetCurrentThreadId(); while (nProduct > 0) { item.tid = ThisTid;//生产一个“产品” item.dat = nProduct; WaitForSingleObject(semEmpty, INFINITE);//要一个空位 WaitForSingleObject(mutBuffer, INFINITE);//要独占缓冲区 buffer[rear] = item; rear = (rear + 1) % BUFFER;//放进缓冲区 printf("Produce No.%d\r\n", item.dat); if (nProduct == 1) NoMoreData = TRUE;//所有产品都已生产 ReleaseMutex(mutBuffer);//让出缓冲区给别的线程 ReleaseSemaphore(semFull, 1, NULL);//多了一个“产品”资源 nProduct--;//计数 } ExitThread(0); } DWORD _stdcall ConsumerRoutine(LPVOID lpArg) { int count; ITEM item; DWORD ThisTid; count = 0; ThisTid = GetCurrentThreadId(); while (1) { WaitForSingleObject(semFull, INFINITE); WaitForSingleObject(mutBuffer, INFINITE); if (front == rear) { //①被告知没数据了,退出 ReleaseMutex(mutBuffer); break; } //取一个产品 item = buffer[front]; front = (front + 1) % BUFFER; printf("Th%ld: Consume No.%d\r\n", ThisTid, item.dat); if (NoMoreData && front == rear) { //没产品可用了,让其它线程醒来到上面①处
//这里信号量将增加 NTHREAD-1,这让所有消费者都能被唤醒并正确退出 ReleaseSemaphore(semFull, NTHREAD-1, NULL); } ReleaseMutex(mutBuffer); ReleaseSemaphore(semEmpty, 1, NULL); count++;//计数 } ExitThread(count); }
最终效果是这样的:
从VS的Output窗口可以看到三个消费者线程各自消费的产品数量(这俩图不是同一次执行),加起来刚好是1280,说明没啥问题。当然有没有问题还得靠严格的论证,毕竟正确的多线程程序的代码逻辑不是那么好设计的。