有限缓冲区的消费者与生产者问题编码实践

发布时间 2023-08-09 08:23:21作者: 汀洲杜若

使用windows API实现4个线程,其中一个线程向有限容量的缓冲区中生产固定总数的“产品”,其余线程从此缓冲区拿取“产品”,最后显示这些“消费者”线程各自拿取了多少。

下面是程序将要用到的结构和类型。注意到缓冲区为空并不代表生产者造完了所有“产品”,所以用NoMoreData来指示是否所有“产品”都已生产完毕。

#define BUFFER   512//缓冲区大小
#define MAX_ITEM 1280//生产者一共要造1024个产品
#define NTHREAD  4

typedef struct{
    DWORD tid;//线程ID,指示谁生产的产品
    int   dat;//数据内容,产品编号
} ITEM, *LPITEM;

DWORD _stdcall ProducerRoutine(LPVOID);
DWORD _stdcall ConsumerRoutine(LPVOID);

ITEM   buffer[BUFFER];//一个环形缓冲
int    front;
int    rear;

HANDLE mutBuffer;//保证缓冲区的互斥访问
HANDLE semFull;//可以消费的产品资源数
HANDLE semEmpty;//可以存放新产品的空位数
BOOL   NoMoreData;//生产者造完所有产品后其值为“真”

 主函数逻辑不复杂,新建4个线程,为其指定任务和参数后立即参与执行。打开缓冲区的互斥锁,随后等待四个线程正常结束。最后把这些消费者线程各自消费的产品数量相加并显示出来。

int main() {
    LPVOID lpParam;
    HANDLE hThreads[NTHREAD];
    DWORD  dwThreadIds[NTHREAD];
    DWORD  dwResults[NTHREAD];
    LPTHREAD_START_ROUTINE lpThreadRoutine;
    int i, nProducts = MAX_ITEM;

    mutBuffer  = CreateMutex(NULL, TRUE, NULL);
    semFull    = CreateSemaphore(NULL, 0, BUFFER, NULL);
    semEmpty   = CreateSemaphore(NULL, BUFFER, BUFFER, NULL);
    NoMoreData = FALSE;

    for (i = 0; i < NTHREAD; i++)
    {
        if (i == NTHREAD - 1)
        {
            lpThreadRoutine = ProducerRoutine;
            lpParam = &nProducts;//最后一个当生产者
        }
        else
        {
            lpThreadRoutine = ConsumerRoutine;
            lpParam = NULL;//其余当消费者
        }

        hThreads[i] = CreateThread(NULL, 0, lpThreadRoutine, &nProducts, 0, &dwThreadIds[i]);
        if (hThreads[i] == INVALID_HANDLE_VALUE)
        {
            printf("can't create new thread[%d]\n", i);
            ExitProcess(1);
        }
    }

    ReleaseMutex(mutBuffer);//解除互斥量以使缓冲区可用
    WaitForMultipleObjects(NTHREAD, hThreads, TRUE, INFINITE);

    for (i = 0; i < NTHREAD; i++)
    {
        GetExitCodeThread(hThreads[i], &dwResults[i]);
        CloseHandle(hThreads[i]);
    }
    CloseHandle(semFull);
    CloseHandle(semEmpty);
    CloseHandle(mutBuffer);

    for (i = 0; i < NTHREAD - 1; i++)
        dwResults[NTHREAD - 1] += dwResults[i];
    printf("%ld items were processed\n", dwResults[NTHREAD - 1]);
    return 0;
}

生产者与消费者各自的任务:

DWORD _stdcall ProducerRoutine(LPVOID lpArg)
{
    int   nProduct;
    ITEM  item;
    DWORD ThisTid;

    nProduct = *(int*)lpArg;
    ThisTid  = GetCurrentThreadId();

    while (nProduct > 0)
    {
        item.tid = ThisTid;//生产一个“产品”
        item.dat = nProduct;

        WaitForSingleObject(semEmpty, INFINITE);//要一个空位
        WaitForSingleObject(mutBuffer, INFINITE);//要独占缓冲区

        buffer[rear] = item;
        rear = (rear + 1) % BUFFER;//放进缓冲区
        printf("Produce No.%d\r\n", item.dat);

        if (nProduct == 1)
            NoMoreData = TRUE;//所有产品都已生产

        ReleaseMutex(mutBuffer);//让出缓冲区给别的线程
        ReleaseSemaphore(semFull, 1, NULL);//多了一个“产品”资源

        nProduct--;//计数
    }
    ExitThread(0);
}
DWORD _stdcall ConsumerRoutine(LPVOID lpArg)
{
    int   count;
    ITEM  item;
    DWORD ThisTid;
    
    count   = 0;
    ThisTid = GetCurrentThreadId();

    while (1)
    {
        WaitForSingleObject(semFull, INFINITE);
        WaitForSingleObject(mutBuffer, INFINITE);

        if (front == rear)
        {
            //①被告知没数据了,退出
            ReleaseMutex(mutBuffer);
            break;
        }

        //取一个产品
        item = buffer[front];
        front = (front + 1) % BUFFER;
        printf("Th%ld: Consume No.%d\r\n", ThisTid, item.dat);

        if (NoMoreData && front == rear)
        {
            //没产品可用了,让其它线程醒来到上面①处
//这里信号量将增加 NTHREAD-1,这让所有消费者都能被唤醒并正确退出
ReleaseSemaphore(semFull, NTHREAD-1, NULL); } ReleaseMutex(mutBuffer); ReleaseSemaphore(semEmpty, 1, NULL); count++;//计数 } ExitThread(count); }

最终效果是这样的:

从VS的Output窗口可以看到三个消费者线程各自消费的产品数量(这俩图不是同一次执行),加起来刚好是1280,说明没啥问题。当然有没有问题还得靠严格的论证,毕竟正确的多线程程序的代码逻辑不是那么好设计的。