linux同步机制-completion

发布时间 2023-11-05 14:00:56作者: 大奥特曼打小怪兽

一、completion

1.1 什么是completion

linux内核中,完成量completion是一种代码同步机制。如果有一个或多个线程必须等待某个内核活动操作达到某个点或某个特定状态,那么completion完成量可以提供一个无竞争的解决方案。

1.2 completion的使用

1.2.1 定义并初始化完成量
// 方式一
struct completion mycompletion;
init_completion(&mycompletion)
// 方式二
DECLARE_COMPLETION(mycompletion);

以下两种方式是等效的,都实现了定义和初始化完成量mycompletion的功能。

1.2.2 等待完成量
wait_for_completion(&mycompletion);
wait_for_completion_interruptible(&mycompletion);
wait_for_completion_timeout(&mycompletion,timeout);
1.2.3 唤醒完成量
complete(&mycompletion);

二、completion实现源码

2.1 struct completion结构体

struct completion结构体定义位于include/linux/completion.h文件中:

/*
 * struct completion - structure used to maintain state for a "completion"
 *
 * This is the opaque structure used to maintain the state for a "completion".
 * Completions currently use a FIFO to queue threads that have to wait for
 * the "completion" event.
 *
 * See also:  complete(), wait_for_completion() (and friends _timeout,
 * _interruptible, _interruptible_timeout, and _killable), init_completion(),
 * reinit_completion(), and macros DECLARE_COMPLETION(),
 * DECLARE_COMPLETION_ONSTACK().
 */
struct completion {
        unsigned int done;
        wait_queue_head_t wait;
};

wait等待队列头,用来放置需要等到的任务,done用来指示任务是否完成。

可以看到其内部是通过等待队列来实现的。

2.2 init_completion

init_completion用于初始化完成量:

#define init_completion(x) __init_completion(x)

/**
 * init_completion - Initialize a dynamically allocated completion
 * @x:  pointer to completion structure that is to be initialized
 *
 * This inline function will initialize a dynamically created completion
 * structure.
 */
static inline void __init_completion(struct completion *x)
{
        x->done = 0;
    	// 初始化等待队列头
        init_waitqueue_head(&x->wait);  
}

2.3 等待完成量

2.3.1wait_for_completion

wait_for_completion用于等待完成量的释放,定义在kernel/sched/completion.c;

do_wait_for_common(struct completion *x,
                   long (*action)(long), long timeout, int state)
{
        if (!x->done) {
                DECLARE_WAITQUEUE(wait, current);

                __add_wait_queue_entry_tail_exclusive(&x->wait, &wait);
                do {
                        if (signal_pending_state(state, current)) {
                                timeout = -ERESTARTSYS;
                                break;
                        }
                        __set_current_state(state);
                        spin_unlock_irq(&x->wait.lock);
                        timeout = action(timeout);
                        spin_lock_irq(&x->wait.lock);
                } while (!x->done && timeout);
                __remove_wait_queue(&x->wait, &wait);
                if (!x->done)
                        return timeout;
        }
        if (x->done != UINT_MAX)
                x->done--;
        return timeout ?: 1;
}


/**
 * wait_for_completion: - waits for completion of a task
 * @x:  holds the state of this particular completion
 *
 * This waits to be signaled for completion of a specific task. It is NOT
 * interruptible and there is no timeout.
 *
 * See also similar routines (i.e. wait_for_completion_timeout()) with timeout
 * and interrupt capability. Also see complete().
 */
void __sched wait_for_completion(struct completion *x)
{
        wait_for_common(x, MAX_SCHEDULE_TIMEOUT, TASK_UNINTERRUPTIBLE);
}

wait_for_completion函数的主要作用是等待某个特定任务的完成。当调用该函数时,如果任务的完成状态没有被设置为完成,当前线程将被阻塞,直到任务的完成状态被设置为完成。

需要注意的是,wait_for_completion函数是不可中断的(non-interruptible),这意味着无论是否收到中断信号,当前线程都会一直等待任务的完成。此外,该函数也没有超时功能,即使等待时间很长,也不会自动超时返回。

2.3.2 wait_for_completion_interruptible

wait_for_completion_interruptible同样用于等待完成量的释放,只不过该函数是可以中断的,定义在kernel/sched/completion.c;

/**
 * wait_for_completion_interruptible: - waits for completion of a task (w/intr)
 * @x:  holds the state of this particular completion
 *
 * This waits for completion of a specific task to be signaled. It is
 * interruptible.
 *
 * Return: -ERESTARTSYS if interrupted, 0 if completed.
 */
int __sched wait_for_completion_interruptible(struct completion *x)
{
        long t = wait_for_common(x, MAX_SCHEDULE_TIMEOUT, TASK_INTERRUPTIBLE);
        if (t == -ERESTARTSYS)
                return t;
        return 0;
}

2.4 complete

complete函数用于唤醒等待此特定complete事件的单个线程,定义在kernel/sched/completion.c;

/**
 * complete: - signals a single thread waiting on this completion
 * @x:  holds the state of this particular completion
 *
 * This will wake up a single thread waiting on this completion. Threads will be
 * awakened in the same order in which they were queued.
 *
 * See also complete_all(), wait_for_completion() and related routines.
 *
 * If this function wakes up a task, it executes a full memory barrier before
 * accessing the task state.
 */
void complete(struct completion *x)
{
        unsigned long flags;

        spin_lock_irqsave(&x->wait.lock, flags);

        if (x->done != UINT_MAX)
                x->done++;
        __wake_up_locked(&x->wait, TASK_NORMAL, 1);
        spin_unlock_irqrestore(&x->wait.lock, flags);
}

当调用该函数时,如果有线程正在等待该完成状态,它将被唤醒并继续执行。

需要注意的是,complete函数会按照线程入队的顺序依次唤醒等待的线程。

2.5 completion_done

completion_done函数用于检测某个complete是否有正在等待的线程,定义在kernel/sched/completion.c;

/**
 *      completion_done - Test to see if a completion has any waiters
 *      @x:     completion structure
 *
 *      Return: 0 if there are waiters (wait_for_completion() in progress)
 *               1 if there are no waiters.
 *
 *      Note, this will always return true if complete_all() was called on @X.
 */
bool completion_done(struct completion *x)
{
        unsigned long flags;

        if (!READ_ONCE(x->done))
                return false;

        /*
         * If ->done, we need to wait for complete() to release ->wait.lock
         * otherwise we can end up freeing the completion before complete()
         * is done referencing it.
         */
        spin_lock_irqsave(&x->wait.lock, flags);
        spin_unlock_irqrestore(&x->wait.lock, flags);
        return true;
}

当调用该函数时,如果有线程正在等待该完成状态,则返回0;如果没有线程在等待,则返回1。

需要注意的是,如果之前调用了complete_all函数来完成所有等待的线程,则无论如何都会返回1,表示没有等待的线程。

三、completion示例程序

3.1 驱动程序

completion_dev.c文件定义:

#include <linux/uaccess.h>
#include <linux/fs.h>
#include <linux/stat.h>
#include <linux/cdev.h>
#include <linux/io.h>
#include <linux/gpio.h>
#include <linux/slab.h>
#include <linux/irq.h>
#include <linux/mutex.h>
#include <linux/interrupt.h>
#include <linux/bug.h>			/* For BUG_ON.  */
#include <linux/cpu.h>
#include <linux/init.h> /* Needed for the macros */
#include <linux/kernel.h> /* Needed for pr_info() */
#include <linux/module.h> /* Needed by all modules */
#include <linux/delay.h>
#include <linux/smp.h>
#include <linux/kernel_stat.h>
#include <linux/sched.h>
#include <linux/percpu-defs.h>
#include <linux/wait.h>
#include <linux/gpio/driver.h>
#include <linux/atomic.h>
#include <linux/platform_device.h>
#include <linux/poll.h>
#include <linux/kfifo.h>
#include <linux/completion.h>

#define DELAY_CMD_WRITE   _IOW('A',0,unsigned int)
#define DELAY_CMD_READ   _IOW('A',1,unsigned int)
#define ERROR  (-1)

DEFINE_KFIFO(myfifo, char, 1024);

dev_t devid;                      // 起始设备编号
struct cdev hello_cdev;          // 保存操作结构体的字符设备 
struct class *hello_cls;

DECLARE_WAIT_QUEUE_HEAD(wq);

static int hello_open(struct inode *inode, struct file *file) 
{
	pr_info("hello_open \n");
	return 0;
}

static ssize_t hello_write(struct file *filp, const char __user *user_buffer,
		size_t size, loff_t *offset) 
{
	int ret;
	unsigned int len = 0;
	pr_info("write");
	ret = kfifo_from_user(&myfifo, user_buffer, size, &len);
	if (ret != 0) {
		pr_err("kfifo_from_user error");
		return 0;
	}
	if (len <= 0)
		return 0;
	*offset += len;
	wake_up(&wq);
	return len;
}

static ssize_t hello_read(struct file *filp, char __user *user_buffer,
		size_t count, loff_t *offset) 
{
	int ret;
	unsigned int len = 0;
	pr_info("read");
	ret = kfifo_to_user(&myfifo, user_buffer, count, &len);
	if (len <= 0)
		return 0;
	*offset += len;
	return len;
}

unsigned int hello_poll(struct file *flip, struct poll_table_struct *table) 
{
	int mask = 0;
	pr_info("hello_poll \n");
	poll_wait(flip, &wq, table);
	if (kfifo_is_empty(&myfifo)) {

	} else {
		mask |= POLLIN | POLLRDNORM;
	}
	return mask;
}

int hello_close(struct inode *inode, struct file *flip) 
{
	pr_info("hello_close \n");
	return 0;
}

// 定义并初始化完成量
DECLARE_COMPLETION(mycompletion);

static long hello_ioctl(struct file *file, unsigned int cmd, unsigned long arg) 
{
	pr_info("cmd %d\n", cmd);
	switch (_IOC_NR(cmd)) {
		case 0: {
			pr_info("i am waiting \n");
        	// 等待完成量
			wait_for_completion(&mycompletion);
			pr_info("haha,i get the message\n");
			break;
		}
		case 1: {
			pr_info("complete \n");
            // 唤醒完成量
			complete(&mycompletion);
			break;
		}
		default: {
			pr_info("default: \n");
		}
	}
	return 0;
}

static const struct file_operations hello_fops = { 
    .owner 			= THIS_MODULE, 
    .open  			= hello_open, 
    .read  			= hello_read, 
    .write 			= hello_write, 
    .poll  			= hello_poll, 
    .release 		= hello_close, 
    .unlocked_ioctl = hello_ioctl,
};

static __init int hello_init(void) 
{
	int err;
	pr_info("hello driver init\n");
    
    /* 动态分配字符设备: (major,0) */
    err = alloc_chrdev_region(&devid, 0, 1, "hello");
	if (err != 0) {
		return err;
	}

    cdev_init(&hello_cdev, &hello_fops);
    cdev_add(&hello_cdev, devid, 1);
    
    /* 创建类,它会在sys目录下创建/sys/class/hello这个类  */
     hello_cls = class_create(THIS_MODULE, "hello");
     if(IS_ERR(hello_cls)){
         printk("can't create class\n");
         return ERROR;
     }
    /* 在/sys/class/hello下创建hello设备,然后mdev通过这个自动创建/dev/hello这个设备节点 */
     device_create(hello_cls, NULL, devid, NULL, "hello"); 
	
	return 0;
}

static void __exit hello_exit(void) 
{
    pr_info("hello driver exit\n");
    /* 注销类、以及类设备 /sys/class/hello会被移除*/
    device_destroy(hello_cls, devid);
    class_destroy(hello_cls);
    
	cdev_del(&hello_cdev);
    unregister_chrdev_region(devid, 1);
}

module_init(hello_init);
module_exit(hello_exit);

MODULE_LICENSE("GPL");

Makefile文件:

# SPDX-License-Identifier: Apache-2.0

obj-m := completion_dev.o
KDIR := /lib/modules/$(shell uname -r)/build
PWD := $(shell pwd)

default:
    $(MAKE) -C $(KDIR) M=$(PWD) SUBDIRS=$(PWD) modules

clean:
    $(MAKE) -C $(KDIR) M=$(PWD) SUBDIRS=$(PWD) clean

指向编译安装命令:

make
insmod completion_dev.ko

3.2 应用程序

main.c文件定义:

#include <stdint.h>
#include <sys/types.h>
#include <stdlib.h>
#include <unistd.h>
#include <fcntl.h>
#include <string.h>
#include <sys/ioctl.h>
#include <unistd.h>
#include <stdio.h>
#include <pthread.h>
#include <poll.h>

#define DELAY_CMD_0_WRITE   _IOW('A',0,unsigned int)
#define DELAY_CMD_1_READ   _IOW('A',1,unsigned int)

static void* write01(void *ptr) 
{
	int fd = open("/dev/hello", O_RDWR, O_NONBLOCK);
	if (fd < 0) {
		perror("open");
		printf("error open\n");
		return NULL;
	}
	printf("write start \n");
	ioctl(fd, DELAY_CMD_0_WRITE, 100);
	printf("write end \n");
	close(fd);
	return NULL;
}

static void* read01(void *ptr) 
{
	int fd = open("/dev/hello", O_RDWR, O_NONBLOCK);
	if (fd < 0) {
		perror("open");
		printf("error open\n");
		return NULL;
	}
	printf("read start \n");
	ioctl(fd, DELAY_CMD_1_READ, 100);
	printf("read end \n");
	close(fd);
	return NULL;
}

/**
 * first read then  write
 */
int main() 
{
	pthread_t thread, thread2;
	pthread_create(&thread, NULL, write01, NULL);
	sleep(2);
	pthread_create(&thread2, NULL, read01, NULL);
	pthread_join(thread, NULL);
	pthread_join(thread2, NULL);
	printf("main exit\n");
	return EXIT_SUCCESS;
}

编译运行:

gcc -o main main.c -lpthread
./main

应用先发0,再发1 ,驱动中的0会等待1的完成量,可以看到打印信息:

write start
read start
write end
read end
main exit

参考文章

[1] linux驱动移植-SPI控制器驱动

[2] 深入理解Linux内核 -- completion机制

[3] linux内核完成量completion使用说明