并发编程
并行计算导论
顺序算法与并行算法
顺序算法:
begin
step_1
step_2
……
step_n
end
// next step
并行算法:
cobegin
task_1
task_2
……
task_n
coend
// next step
并行性与并发性
在单CPU系统中,一次只能执行一个任务。不同的任务只能并发执行,即在逻辑上并行执行。并发性是通过多任务处理来实现的。
线程
线程的原理
线程是操作系统能够进行运算调度的最小单位,它被包含在进程中,是进程中的实际运作单位。线程是比进程更小的能独立运行的基本单位,线程自己基本上不拥有系统资源,只拥有一点在运行中必不可少的资源(如程序计数器、一组寄存器和栈),但是它可与同属一个进程的其他的线程共享进程所拥有的全部资源。同一个进程中的多个线程之间可以并发执行,共享进程的资源,但是每个线程有自己的执行路径。线程可以看作是轻量级的进程,它是进程中的一个实体,是被系统独立调度和分派的基本单位。
线程的优点
-
线程创建和切换速度更快
-
线程的响应速度更快
-
线程更适合并行计算
线程的缺点
-
由于地址空间共享,线程需要来自用户的明确同步
-
许多库函数可能对线程不安全
-
在单CPU系统上,使用线程解决问题实际上要比使用顺序程序慢,这是由在运行时创建线程和切换上下文系统开销造成的
线程操作
线程的执行轨迹与进程类似。线程可在内核模式或用户模式下执行。在用户模式下,线程在进程的相同地址空间中执行,但每个线程都有自己的执行堆栈。线程是独立的执行单元,可根据操作系统内核的调度策略,对内核进行系统调用,变为挂起、激活以继续执行等。为了利用线程的共享地址空间,操作系统内核的调度策略可能会优先选择同一进程中的线程,而不是不同进程中的线程。
线程管理函数
Pthread 库提供了用于线程管理的以下 API。
pthread_create(thread, attr, function, arg): create thread
pthread_exit(status) : terminate thread
pthread_cancel(thread) : cancel thread
pthread_attr_init(attr) : initialize thread attributes
pthread_attr_destroy(arr) : destroy thread attributes
创建线程
使用 pthread_create()
函数创建线程。
int pthread_create(pthread_t *pthread_id, pthread_attr_t *attr, void *(*func)(void *),void *arg);
如果成功则返回0,如果失败则返回错误代码。
pthread_create()
函数的参数为
-
pthread_id是指向pthread_t类型变量的指针。它会被操作系统内核分配的唯一线程ID填充。
-
attr是指向另一种不透明数据类型的指针,它指定线程属性。
-
func是要执行的新线程函数的入口地址。
-
arg是指向线程函数参数的指针,可写为:void *func(void *arg)
attr参数的使用步骤:
-
定义一个pthread属性变量 pthread_attr_t attr。
-
用pthread_attr_init(&attr)初始化属性变量。
-
设置属性变量并在pthread_create()调用中使用。
-
必要时,通过 pthread_attr_destroy (&attr)释放attr资源。
如何创建一个分离线程:
pthread attr_t attr; // define an attr variable
pthread_attr_init(&attr); // initialize attr
pthread attx _setdetachstate(&attr,PTHREAD_CREATE_DETACHED); // set attr
pthread_create(&thread_id, &attr, Eunc,NULL); // create thread with attr
pthread_attr destroy(&attr); // optional:destroy attr
每个线程创建使用默认大小堆栈创建。线程可以使用函数size_t pthread_attr_getstacksize()
来找到堆栈大小。
创建具有特定堆栈大小的线程:
pthread _attr_t attr; // attr variab1e
size_t stacksize; // stack size
pthread_attr_init(&attr); // initialize attr
stacksize =0x10000; // stacksize=16KB
pthread_attx_setstacksize (&attr,stacksize); // set stack size in attr
pthread_create(&threads [t],&attr, func,NULL); // create thread with stack
size
如果attr参数为NULL,将使用默认属性创建线程。实际上,这是创建线程的建议方法,除非有必要更改线程属性,否则应该遵循这种方法。接下来,我们将attr设置为NULL,就可始终使用默认属性。
线程ID
线程ID是一种不透明的数据类型,取决于实现情况。因此,不应该直接比较线程ID。如果需要,可以使用pthread_equal()
函数对它们进行比较。
int pthread_equal (pthread_t t1, pthread_t t2);
如果是不同的线程,则返回0,否则返回非0。
线程终止
线程函数结束后,线程即终止。或者,线程可以调用函数
int pthread_exit (void *status);
进行显式终止,其中状态是线程的退出状态。通常,0退出值表示正常终止,非0值表示异常终止。
线程连接
一个线程可以等待另一个线程的终止,通过:
int pthread join (Dthreadt thread,voia w*gtatus_ptr);
终止线程的退出状态以 status_ptr返回。
线程示例
用线程计算矩阵的和
#include<stdio.h>
#include<stdlib.h>
#include<pthread.h>
#define N 4
int A[N][N],sum[N];
void *func(void *arg)
{
int j,row;
pthread_t tid=pthread_self();
row=(int)arg;
printf("Thread %d [%lu] computes sum of row %d\n",row,tid,row);
for(j=0;j<N;j++)
sum[row]+=A[row][j];
printf("Thread %d [%lu] done: sum[%d] = %d\n",row,tid,row,sum[row]);
pthread_exit((void*)0);
}
int main(int argc,char *argv[])
{
pthread_t thread[N];
int i,j,r,total=0;
void *status;
printf("Main: initialize A matrix\n");
for(i=0;i<N;i++)
{
sum[i]=0;
for(j=0;j<N;j++)
{
A[i][j]=i*N+j+1;
printf("%4d ",A[i][j]);
}
printf("\n");
}
printf("Main: create %d threads\n",N);
for(i=0;i<N;i++)
{
pthread_create(&thread[i],NULL,func,(void *)i);
}
printf("Main: try to join with thread\n");
for(i=0;i<N;i++)
{
pthread_join(thread[i],&status);
printf("Main: joined with %d [%lu]: status=%d\n",i,thread[i],(int)status);
}
printf("Main: compute and print total sum: ");
for(i=0;i<N;i++)
total+=sum[i];
printf("total = %d\n",total);
pthread_exit(NULL);
}
用并发线程快速排序
#include<stdio.h>
#include<stdlib.h>
#include<pthread.h>
typedef struct{
int upperbound;
int lowerbound;
}PARM;
#define N 10
int a[N]={5,1,6,4,7,2,9,8,0,3};
int print()
{
int i;
printf("[ ");
for(i=0;i<N;i++)
printf("%d ",a[i]);
printf("]\n");
}
void *qsort1(void *aptr)
{
PARM *ap,aleft,aright;
int pivot,pivotIndex,left,right,temp;
int upperbound,lowerbound;
pthread_t me,leftThread,rightThread;
me=pthread_self();
ap=(PARM *)aptr;
upperbound=ap->upperbound;
lowerbound=ap->lowerbound;
pivot=a[upperbound];
left=lowerbound-1;
right=upperbound;
if(lowerbound>=upperbound)
pthread_exit(NULL);
while(left<right)
{
do{left++;}while(a[left]<pivot);
do{right--;}while(a[right]>pivot);
if(left<right){
temp=a[left];
a[left]=a[right];
a[right]=temp;
}
}
print();
pivotIndex=left;
temp=a[pivotIndex];
a[pivotIndex]=pivot;
a[upperbound]=temp;
aleft.upperbound=pivotIndex-1;
aleft.lowerbound=lowerbound;
aright.upperbound=upperbound;
aright.lowerbound=pivotIndex+1;
printf("%lu: create left and right threads\n",me);
pthread_create(&leftThread,NULL,qsort1,(void *)&aleft);
pthread_create(&rightThread,NULL,qsort1,(void *)&aright);
pthread_join(leftThread,NULL);
pthread_join(rightThread,NULL);
printf("%lu: joined with left & right threads\n",me);
}
int main(int argc,char *argv[])
{
PARM arg;
int i,*array;
pthread_t me,thread;
me=pthread_self();
printf("main %lu: unsorted array = ",me);
print();
arg.upperbound=N-1;
arg.lowerbound=0;
printf("main %lu create a thread to do QS\n",me);
pthread_create(&thread,NULL,qsort1,(void *)&arg);
pthread_join(thread,NULL);
printf("main %lu sorted array = ",me);
print();
}
线程同步
互斥量
最简单的同步工具是锁,它允许执行实体仅在有锁的情况下才能执行。在 Pthread 中,锁被称为互斥量,意思是相互排斥。互斥变量是用 pthread_mutex_t 类型声明,在使用之前必须对它们进行初始化。有两种方法可以初始化互斥量。
-
一种是静态方案,如:
pthread_mutex_t m = PTHREAD_MUTEX_INITIALIZER
定义互斥量 m,并使用默认属性对其进行初始化。 -
另一种是动态方案,使用
pthread_mutex_init()
函数,可通过 attr 参数设置互斥属性,如:pthread_mutex_init(pthread_mutex_t *m, pthread_mutexattr_t, *attr);
通常,attr 参数可以设置为 NULL,作为默认属性。
初始化完成后,线程可通过以下函数使用互斥量。
int pthread_mutex_lock (pthread_mutex_t *m); // lock mutex
int pthread_mutex_unlock (pthread_mutex_t *m); // unlock mutex
int pthread_mutex_trylock (pthread_mutex_t *m); // try to lock mutex
int pthread_mutex_destroy (pthread_mutex_t *m); // destroy mutex
线程使用互斥量来保护共享数据对象。互斥量的典型用法如下。线程先创建一个互斥量并对它进行一次初始化。新创建的互斥量处于解锁状态,没有所有者。每个线程都试图访问一个共享数据对象:
pthread_mutex_lock(&m); // lock mutex
access shared data object; // access shared data in a critical region
pthread_mutex_unlock(&m); // unlock mutex
当线程执行pthread_mutex_lock(&m)时,如果互斥量被解锁,它将封锁互斥量,成为互斥量的所有者并继续执行。否则,它将被阻塞并在互斥量等待队列中等待。只有获取了互斥量的线程才能访问共享数据对象。
死锁预防
互斥量使用封锁协议。如果某线程不能获取互斥量,就会被阻塞,等待互斥量解锁后再继续。在任何封锁协议中,误用加锁可能会产生一些问题。最常见和突出的问题是死锁。死锁是一种状态,在这种状态下,许多执行实体相互等待,因此都无法继续下去。
与竞态条件类似,死锁决不能存在于并发程序中。有许多方法可以解决可能的死锁问题,其中包括死锁预防、死锁规避、死锁检测和恢复等。在实际系统中,唯一可行的方法是死锁预防,试图在设计并行算法时防止死锁的发生。一种简单的死锁预防方法是对互斥量进行排序,并确保每个线程只在一个方向请求互斥量,这样请求序列中就不会有循环。
但是,仅使用单向加锁请求来设计每个并行算法是不可能的。在这种情况下,可以使用条件加锁函数 pthread_mutex_trylock()
来预防死锁。如果互斥量已被加锁,则 trylock()
函数会立即返回一个错误。在这种情况下,调用线程可能会释放它已经获取的一些互斥量以便进行退避,从而让其他线程继续执行。在上面的交叉加锁示例,我们可以重新设计一个线程,例如 T1,利用条件加锁和退避来预防死锁。
/*Thread T1*/
while(1){
lock(m1);
if(!trylock(m2))
unlock(m1);
else
break;
}
条件变量
作为锁,互斥量仅用于确保线程只能互斥地访问临界区中的共享数据对象。条件变量提供了一种线程协作 的方法。条件变量总是与互斥变量一起使用。
条件变量通过两种方法进行初始化:
-
一种是静态方法,在声明时,如:
pthreadd_cond_con = PTHREAD_COND_]INITIALIZER;
定义一个条件变量con,并使用默认属性对其进行初始化。 -
另一种是动态方法,使用
pthread_cond_init()
函数,可通过attr
参数设置条件变量。为简便起见,我们总是使用NULL attr
参数作为默认属性。
以下代码展示了如何使用条件变量:
pthread_mutex_t con_mutex; // mutex for a condition variable
pthread_cond_tcon; //a condition variable that relies on con_mutex
pthread_mutex_init(&con_mutex, NULL); // initialize mutex
pthread_cond_init(&con,NULL); // initialize con
当使用条件变量时,线程必须先获取相关的互斥量。然后,它在互斥量的临界区内执行操作,然后释放互斥量,如下所示:
pthread_mutex_lock(&con_mutex);
modify or test shared data objects
use condition variable con to wait or signal conditions
pthread_mutex_unlock(&con_mutex);
在互斥量的临界区中,线程可通过以下函数使用条件变量来相互协作。
-
pthread_cond wait(condition, mutex):该函数会阻塞调用线程,直到发出指定条件的信号。当互斥量被加锁时,应调用该例程。它会在线程等待时自动释放互斥量。互斥量将在接收到信号并唤醒阻塞的线程后自动锁定。
-
pthread_cond_signal(condition):该函数用来发出信号,即唤醒正在等待条件变量的线程或解除阻塞。它应在互斥量被加锁后调用,而且必须解锁互斥量才能完成
pthread_cond_wait()
。 -
pthread_cond_broadcast(condition):该函数会解除被阻塞在条件变量上的所有线程阻塞。所有未阻塞的线程将争用同一个互斥量来访问条件变量。它们的执行顺序取决于线程调度。
生产者-消费者问题
生产者消费者问题是一个经典的并发编程问题,用于描述多个生产者和消费者共享一个有限缓冲区的情况。在这个问题中,生产者负责生产数据并将其放入缓冲区,而消费者负责从缓冲区中取出数据并进行消费。生产者和消费者之间需要进行同步,以确保缓冲区的正确使用,避免数据竞争和死锁等问题。常见的解决方案包括使用互斥锁、条件变量和信号量等同步机制。
解决步骤:
-
使用一个共享的缓冲区来存储生产者生产的数据,缓冲区可以是一个队列或者一个固定大小的数组。
-
使用两个信号量来控制生产者和消费者的访问,一个是空槽信号量,表示缓冲区中还有多少空槽可以供生产者使用;另一个是满槽信号量,表示缓冲区中有多少槽已经被生产者使用。
-
当生产者要生产一个数据时,首先需要获取空槽信号量,如果没有空槽可用,则生产者需要等待。一旦获取到空槽,生产者将数据放入缓冲区,并增加满槽信号量的计数。
-
当消费者要消费一个数据时,首先需要获取满槽信号量,如果没有满槽可用,则消费者需要等待。一旦获取到满槽,消费者从缓冲区中取出数据,并减少满槽信号量的计数。
-
生产者和消费者的访问需要互斥,可以使用互斥锁来实现。在生产者生产数据和消费者消费数据的过程中,需要对缓冲区进行互斥访问,确保同一时间只有一个线程可以访问缓冲区。
信号量
信号量是进程同步的一般机制。(计数)信号量是一种结构。
struct sem{
int value; //semaphore (counter) value
struct process *queue //a queue of blocked processes
}s
使用信号量之前,必须使用一个初始值和一个空等待队列进行初始化。信号量的低级实现保证了每次只能由一个执行实体草做每个信号量,从执行实体的角度来看,对信号量的操作都是原子操作或基本操作。最有名的信号量操作是P和V,定义如下:
P(struct sempahore *s)
{
s->value--;
if (s->value < 0)
BLOCK(s);
}
V(struct sempahore *s)
{
s->value++;
if (s->value <= 0)
SIGNAL(s);
}
当BLOCK阻塞信号量等待队列中的调用进程时,SIGNAL会从信号量等待队列中释放一个进程。
屏障
线程连接操作允许某线程(通常是主线程)等待其他线程终止。在等待的所有线程都终止后,主线程可创建新线程来继续执行并行程序的其余部分。创建新线程需要系统开销。在某些情况下,保持线程活动会更好,但应要求它们在所有线程都达到指定同步点之前不能继续活动。在Pthreads 中,可以采用的机制是屏障以及一系列屏障函数。首先,主线程创建一个屏障对象
pthread barrier t barrier;
并且调用
pthread_barrier_init(&barrier NULL,nthreads);
用屏障中同步的线程数字对它进行初始化。然后,主线程创建工作线程来执行任务。工作线程使用
pthread_barrier_wait( &barrier)
在屏障中等待指定数量的线程到达屏障。当最后一个线程到达屏障时,所有线程重新开始执行。在这种情况下,屏障是线程的集合点,而不是它们的坟墓。
Linux中的线程
Linux不区分线程与进程。对于Linux内核,线程只是与其他进程共享某些资源的进程。Linux中进程和线程都是由clone ()系统调用创建的,原型:
int clone(int(*fn)(void *),void *child_stack,int flags,void *arg)
clone()更像一个线程创建函数,它创建一个子进程来执行带有child_stack的函数fn(arg)。flags字段详细说明父进程与子进程共享的资源:
-
CLONE_VM:父进程和子进程共享地址空间
-
CLONE_FS:父进程和子进程共享文件系统信息,例如根节点、CWD
-
CLONE_FILES:父进程和子进程共享打开的文件
-
CLONE_SIGHAND:父进程和子进程共享信号处理函数和已屏蔽信号