第七周学习笔记

发布时间 2023-10-27 16:51:38作者: 最好的护盾电池使用者

并发编程

并行计算导论

顺序算法与并行算法

顺序算法

begin
  step_1
  step_2
  ……
  step_n
end
// next step

并行算法

cobegin
  task_1
  task_2
  ……
  task_n
coend
// next step

并行性与并发性

在单CPU系统中,一次只能执行一个任务。不同的任务只能并发执行,即在逻辑上并行执行。并发性是通过多任务处理来实现的。

线程

线程的原理

线程是操作系统能够进行运算调度的最小单位,它被包含在进程中,是进程中的实际运作单位。线程是比进程更小的能独立运行的基本单位,线程自己基本上不拥有系统资源,只拥有一点在运行中必不可少的资源(如程序计数器、一组寄存器和栈),但是它可与同属一个进程的其他的线程共享进程所拥有的全部资源。同一个进程中的多个线程之间可以并发执行,共享进程的资源,但是每个线程有自己的执行路径。线程可以看作是轻量级的进程,它是进程中的一个实体,是被系统独立调度和分派的基本单位。

线程的优点

  1. 线程创建和切换速度更快

  2. 线程的响应速度更快

  3. 线程更适合并行计算

线程的缺点

  1. 由于地址空间共享,线程需要来自用户的明确同步

  2. 许多库函数可能对线程不安全

  3. 在单CPU系统上,使用线程解决问题实际上要比使用顺序程序慢,这是由在运行时创建线程和切换上下文系统开销造成的

线程操作

线程的执行轨迹与进程类似。线程可在内核模式或用户模式下执行。在用户模式下,线程在进程的相同地址空间中执行,但每个线程都有自己的执行堆栈。线程是独立的执行单元,可根据操作系统内核的调度策略,对内核进行系统调用,变为挂起、激活以继续执行等。为了利用线程的共享地址空间,操作系统内核的调度策略可能会优先选择同一进程中的线程,而不是不同进程中的线程。

线程管理函数

Pthread 库提供了用于线程管理的以下 API。

pthread_create(thread, attr, function, arg): create thread
pthread_exit(status)                       : terminate thread
pthread_cancel(thread)                     : cancel thread
pthread_attr_init(attr)                    : initialize thread attributes
pthread_attr_destroy(arr)                  : destroy thread attributes

创建线程

使用 pthread_create() 函数创建线程。

int pthread_create(pthread_t *pthread_id, pthread_attr_t *attr, void *(*func)(void *),void *arg);

如果成功则返回0,如果失败则返回错误代码。

pthread_create()函数的参数为

  • pthread_id是指向pthread_t类型变量的指针。它会被操作系统内核分配的唯一线程ID填充。

  • attr是指向另一种不透明数据类型的指针,它指定线程属性。

  • func是要执行的新线程函数的入口地址。

  • arg是指向线程函数参数的指针,可写为:void *func(void *arg)

attr参数的使用步骤:

  1. 定义一个pthread属性变量 pthread_attr_t attr。

  2. 用pthread_attr_init(&attr)初始化属性变量。

  3. 设置属性变量并在pthread_create()调用中使用。

  4. 必要时,通过 pthread_attr_destroy (&attr)释放attr资源。

如何创建一个分离线程:

pthread attr_t attr; // define an attr variable
pthread_attr_init(&attr); // initialize attr
pthread attx _setdetachstate(&attr,PTHREAD_CREATE_DETACHED); // set attr
pthread_create(&thread_id, &attr, Eunc,NULL); // create thread with attr
pthread_attr destroy(&attr); // optional:destroy attr

每个线程创建使用默认大小堆栈创建。线程可以使用函数size_t pthread_attr_getstacksize()来找到堆栈大小。

创建具有特定堆栈大小的线程:

 pthread _attr_t attr; // attr variab1e
 size_t stacksize; // stack size
 pthread_attr_init(&attr); // initialize attr
 stacksize =0x10000; // stacksize=16KB
 pthread_attx_setstacksize (&attr,stacksize); // set stack size in attr
 pthread_create(&threads [t],&attr, func,NULL); // create thread with stack
size

如果attr参数为NULL,将使用默认属性创建线程。实际上,这是创建线程的建议方法,除非有必要更改线程属性,否则应该遵循这种方法。接下来,我们将attr设置为NULL,就可始终使用默认属性。

线程ID

线程ID是一种不透明的数据类型,取决于实现情况。因此,不应该直接比较线程ID。如果需要,可以使用pthread_equal()函数对它们进行比较。

int pthread_equal (pthread_t t1, pthread_t t2);

如果是不同的线程,则返回0,否则返回非0。

线程终止

线程函数结束后,线程即终止。或者,线程可以调用函数

int pthread_exit (void *status);

进行显式终止,其中状态是线程的退出状态。通常,0退出值表示正常终止,非0值表示异常终止。

线程连接

一个线程可以等待另一个线程的终止,通过:

int pthread join (Dthreadt thread,voia w*gtatus_ptr);

终止线程的退出状态以 status_ptr返回。

线程示例

用线程计算矩阵的和

#include<stdio.h>
#include<stdlib.h>
#include<pthread.h>
#define N 4
int A[N][N],sum[N];

void *func(void *arg)
{
  int j,row;
  pthread_t tid=pthread_self();
  row=(int)arg;
  printf("Thread %d [%lu] computes sum of row %d\n",row,tid,row);
  for(j=0;j<N;j++)
    sum[row]+=A[row][j];
  printf("Thread %d [%lu] done: sum[%d] = %d\n",row,tid,row,sum[row]);
  pthread_exit((void*)0);
}

int main(int argc,char *argv[])
{
  pthread_t thread[N];
  int i,j,r,total=0;
  void *status;
  printf("Main: initialize A matrix\n");
  for(i=0;i<N;i++)
    {
      sum[i]=0;
      for(j=0;j<N;j++)
    {
      A[i][j]=i*N+j+1;
      printf("%4d ",A[i][j]);
    }
      printf("\n");
    }
  printf("Main: create %d threads\n",N);
  for(i=0;i<N;i++)
    {
      pthread_create(&thread[i],NULL,func,(void *)i);
    }
  printf("Main: try to join with thread\n");
  for(i=0;i<N;i++)
    {
      pthread_join(thread[i],&status);
      printf("Main: joined with %d [%lu]: status=%d\n",i,thread[i],(int)status);
    }
  printf("Main: compute and print total sum: ");
  for(i=0;i<N;i++)
    total+=sum[i];
  printf("total = %d\n",total);
  pthread_exit(NULL);
}

用并发线程快速排序

#include<stdio.h>
#include<stdlib.h>
#include<pthread.h>

typedef struct{
  int upperbound;
  int lowerbound;
}PARM;

#define N 10
int a[N]={5,1,6,4,7,2,9,8,0,3};

int print()
{
  int i;
  printf("[ ");
  for(i=0;i<N;i++)
    printf("%d ",a[i]);
  printf("]\n");
}

void *qsort1(void *aptr)
{
  PARM *ap,aleft,aright;
  int pivot,pivotIndex,left,right,temp;
  int upperbound,lowerbound;

  pthread_t me,leftThread,rightThread;
  me=pthread_self();
  ap=(PARM *)aptr;
  upperbound=ap->upperbound;
  lowerbound=ap->lowerbound;
  pivot=a[upperbound];
  left=lowerbound-1;
  right=upperbound;
  if(lowerbound>=upperbound)
    pthread_exit(NULL);

  while(left<right)
  {
    do{left++;}while(a[left]<pivot);
    do{right--;}while(a[right]>pivot);
    if(left<right){
      temp=a[left];
      a[left]=a[right];
      a[right]=temp;
    }
  }
  print();
  pivotIndex=left;
  temp=a[pivotIndex];
  a[pivotIndex]=pivot;
  a[upperbound]=temp;
  aleft.upperbound=pivotIndex-1;
  aleft.lowerbound=lowerbound;
  aright.upperbound=upperbound;
  aright.lowerbound=pivotIndex+1;
  printf("%lu: create left and right threads\n",me);
  pthread_create(&leftThread,NULL,qsort1,(void *)&aleft);
  pthread_create(&rightThread,NULL,qsort1,(void *)&aright);
  pthread_join(leftThread,NULL);
  pthread_join(rightThread,NULL);
  printf("%lu: joined with left & right threads\n",me);
}

int main(int argc,char *argv[])
{
  PARM arg;
  int i,*array;
  pthread_t me,thread;
  me=pthread_self();
  printf("main %lu: unsorted array = ",me);
  print();
  arg.upperbound=N-1;
  arg.lowerbound=0;
  printf("main %lu create a thread to do QS\n",me);
  pthread_create(&thread,NULL,qsort1,(void *)&arg);
  pthread_join(thread,NULL);
  printf("main %lu sorted array = ",me);
  print();
}

线程同步

互斥量

最简单的同步工具是锁,它允许执行实体仅在有锁的情况下才能执行。在 Pthread 中,锁被称为互斥量,意思是相互排斥。互斥变量是用 pthread_mutex_t 类型声明,在使用之前必须对它们进行初始化。有两种方法可以初始化互斥量。

  1. 一种是静态方案,如:pthread_mutex_t m = PTHREAD_MUTEX_INITIALIZER 定义互斥量 m,并使用默认属性对其进行初始化。

  2. 另一种是动态方案,使用pthread_mutex_init()函数,可通过 attr 参数设置互斥属性,如:pthread_mutex_init(pthread_mutex_t *m, pthread_mutexattr_t, *attr); 通常,attr 参数可以设置为 NULL,作为默认属性。

初始化完成后,线程可通过以下函数使用互斥量。

int pthread_mutex_lock (pthread_mutex_t *m); // lock mutex
int pthread_mutex_unlock (pthread_mutex_t *m); // unlock mutex
int pthread_mutex_trylock (pthread_mutex_t *m); // try to lock mutex
int pthread_mutex_destroy (pthread_mutex_t *m); // destroy mutex

线程使用互斥量来保护共享数据对象。互斥量的典型用法如下。线程先创建一个互斥量并对它进行一次初始化。新创建的互斥量处于解锁状态,没有所有者。每个线程都试图访问一个共享数据对象:

pthread_mutex_lock(&m); // lock mutex
access shared data object; // access shared data in a critical region
pthread_mutex_unlock(&m); // unlock mutex

当线程执行pthread_mutex_lock(&m)时,如果互斥量被解锁,它将封锁互斥量,成为互斥量的所有者并继续执行。否则,它将被阻塞并在互斥量等待队列中等待。只有获取了互斥量的线程才能访问共享数据对象。

死锁预防

互斥量使用封锁协议。如果某线程不能获取互斥量,就会被阻塞,等待互斥量解锁后再继续。在任何封锁协议中,误用加锁可能会产生一些问题。最常见和突出的问题是死锁。死锁是一种状态,在这种状态下,许多执行实体相互等待,因此都无法继续下去。

与竞态条件类似,死锁决不能存在于并发程序中。有许多方法可以解决可能的死锁问题,其中包括死锁预防、死锁规避、死锁检测和恢复等。在实际系统中,唯一可行的方法是死锁预防,试图在设计并行算法时防止死锁的发生。一种简单的死锁预防方法是对互斥量进行排序,并确保每个线程只在一个方向请求互斥量,这样请求序列中就不会有循环。

但是,仅使用单向加锁请求来设计每个并行算法是不可能的。在这种情况下,可以使用条件加锁函数 pthread_mutex_trylock() 来预防死锁。如果互斥量已被加锁,则 trylock()函数会立即返回一个错误。在这种情况下,调用线程可能会释放它已经获取的一些互斥量以便进行退避,从而让其他线程继续执行。在上面的交叉加锁示例,我们可以重新设计一个线程,例如 T1,利用条件加锁和退避来预防死锁。

/*Thread T1*/
while(1){
    lock(m1);
    if(!trylock(m2))
        unlock(m1);
    else
        break;
}

条件变量

作为锁,互斥量仅用于确保线程只能互斥地访问临界区中的共享数据对象。条件变量提供了一种线程协作 的方法。条件变量总是与互斥变量一起使用。

条件变量通过两种方法进行初始化:

  1. 一种是静态方法,在声明时,如:pthreadd_cond_con = PTHREAD_COND_]INITIALIZER;定义一个条件变量con,并使用默认属性对其进行初始化。

  2. 另一种是动态方法,使用pthread_cond_init()函数,可通过attr参数设置条件变量。为简便起见,我们总是使用NULL attr参数作为默认属性。

以下代码展示了如何使用条件变量:

pthread_mutex_t con_mutex; // mutex for a condition variable
pthread_cond_tcon; //a condition variable that relies on con_mutex
pthread_mutex_init(&con_mutex, NULL); // initialize mutex
pthread_cond_init(&con,NULL); // initialize con

当使用条件变量时,线程必须先获取相关的互斥量。然后,它在互斥量的临界区内执行操作,然后释放互斥量,如下所示:

pthread_mutex_lock(&con_mutex);
  modify or test shared data objects
  use condition variable con to wait or signal conditions
pthread_mutex_unlock(&con_mutex);

在互斥量的临界区中,线程可通过以下函数使用条件变量来相互协作。

  • pthread_cond wait(condition, mutex):该函数会阻塞调用线程,直到发出指定条件的信号。当互斥量被加锁时,应调用该例程。它会在线程等待时自动释放互斥量。互斥量将在接收到信号并唤醒阻塞的线程后自动锁定。

  • pthread_cond_signal(condition):该函数用来发出信号,即唤醒正在等待条件变量的线程或解除阻塞。它应在互斥量被加锁后调用,而且必须解锁互斥量才能完成pthread_cond_wait()

  • pthread_cond_broadcast(condition):该函数会解除被阻塞在条件变量上的所有线程阻塞。所有未阻塞的线程将争用同一个互斥量来访问条件变量。它们的执行顺序取决于线程调度。

生产者-消费者问题

生产者消费者问题是一个经典的并发编程问题,用于描述多个生产者和消费者共享一个有限缓冲区的情况。在这个问题中,生产者负责生产数据并将其放入缓冲区,而消费者负责从缓冲区中取出数据并进行消费。生产者和消费者之间需要进行同步,以确保缓冲区的正确使用,避免数据竞争和死锁等问题。常见的解决方案包括使用互斥锁、条件变量和信号量等同步机制。

解决步骤:

  1. 使用一个共享的缓冲区来存储生产者生产的数据,缓冲区可以是一个队列或者一个固定大小的数组。

  2. 使用两个信号量来控制生产者和消费者的访问,一个是空槽信号量,表示缓冲区中还有多少空槽可以供生产者使用;另一个是满槽信号量,表示缓冲区中有多少槽已经被生产者使用。

  3. 当生产者要生产一个数据时,首先需要获取空槽信号量,如果没有空槽可用,则生产者需要等待。一旦获取到空槽,生产者将数据放入缓冲区,并增加满槽信号量的计数。

  4. 当消费者要消费一个数据时,首先需要获取满槽信号量,如果没有满槽可用,则消费者需要等待。一旦获取到满槽,消费者从缓冲区中取出数据,并减少满槽信号量的计数。

  5. 生产者和消费者的访问需要互斥,可以使用互斥锁来实现。在生产者生产数据和消费者消费数据的过程中,需要对缓冲区进行互斥访问,确保同一时间只有一个线程可以访问缓冲区。

信号量

信号量是进程同步的一般机制。(计数)信号量是一种结构。

struct sem{
    int value; //semaphore (counter) value
    struct process *queue //a queue of blocked processes
}s

使用信号量之前,必须使用一个初始值和一个空等待队列进行初始化。信号量的低级实现保证了每次只能由一个执行实体草做每个信号量,从执行实体的角度来看,对信号量的操作都是原子操作或基本操作。最有名的信号量操作是PV,定义如下:

P(struct sempahore *s)
{
    s->value--;
    if (s->value < 0)
      BLOCK(s);
}
V(struct sempahore *s)
{
    s->value++;
    if (s->value <= 0)
      SIGNAL(s);
}

当BLOCK阻塞信号量等待队列中的调用进程时,SIGNAL会从信号量等待队列中释放一个进程。

屏障

线程连接操作允许某线程(通常是主线程)等待其他线程终止。在等待的所有线程都终止后,主线程可创建新线程来继续执行并行程序的其余部分。创建新线程需要系统开销。在某些情况下,保持线程活动会更好,但应要求它们在所有线程都达到指定同步点之前不能继续活动。在Pthreads 中,可以采用的机制是屏障以及一系列屏障函数。首先,主线程创建一个屏障对象

pthread barrier t barrier;

并且调用

pthread_barrier_init(&barrier NULL,nthreads);

用屏障中同步的线程数字对它进行初始化。然后,主线程创建工作线程来执行任务。工作线程使用

pthread_barrier_wait( &barrier)

在屏障中等待指定数量的线程到达屏障。当最后一个线程到达屏障时,所有线程重新开始执行。在这种情况下,屏障是线程的集合点,而不是它们的坟墓。

Linux中的线程

Linux不区分线程与进程。对于Linux内核,线程只是与其他进程共享某些资源的进程。Linux中进程和线程都是由clone ()系统调用创建的,原型:

int clone(int(*fn)(void *),void *child_stack,int flags,void *arg)

clone()更像一个线程创建函数,它创建一个子进程来执行带有child_stack的函数fn(arg)。flags字段详细说明父进程与子进程共享的资源:

  • CLONE_VM:父进程和子进程共享地址空间

  • CLONE_FS:父进程和子进程共享文件系统信息,例如根节点、CWD

  • CLONE_FILES:父进程和子进程共享打开的文件

  • CLONE_SIGHAND:父进程和子进程共享信号处理函数和已屏蔽信号