《Linux – Linux高级编程 – 第二部分 进程与线程》第2章 线程(二)

2 线程同步与互斥

先来了解同步和互斥的基本概念:

临界资源:某些资源来说,其在同一时间只能被一段机器指令序列所占用。这些一次只能被一段指令序列所占用的资源就是所谓的临界资源。

临界区:对于临界资源的访问,必须是互斥进行。也就是当临界资源被一个指令序列占用时,另一个需要访问相同临界资源的指令序列就不能被执行。指令序列不能执行的实际意思就是其所在的进程/线程会被阻塞。所以我们定义程序内访问临界资源的代码序列被称为临界区。

互斥:是指同事只允许一个访问者对临界资源进行访问,具有唯一性和排它性。但互斥无法限制访问这个对资源的访问顺序,即访问时无序的。

同步:是指在互斥的基础上,通过其他机制实现访问者对资源的有序访问。

2.2多线程的同步与互斥

2.2.1为什么要用多线程技术

1.避免阻塞,大家知道,单个进程只有一个主线程,当主线程阻塞的时候,整个进程也就阻塞了,无法再去做其它的一些功能了。

2.避免CPU空转,应用程序经常会涉及到RPC,数据库访问,磁盘IO等操作,这些操作的速度比CPU慢很多,而在等待这些响应时,CPU却不能去处理新的请求,导致这种○3提升效率,一个进程要独立拥有4GB的虚拟地址空间,而多个线程可以共享同一地址空间,线程的切换比进程的切换要快得多。

2.2.2如何使用多线程技术进行编程

下面给出个多线程程序,一个最简单的模拟售票系统,代码如下:

/**Includes*********************************************************************/
#include <stdio.h>  
#include <stdlib.h>  
#include <string.h>  
#include <pthread.h>  
#include <unistd.h> 

/**【宏定义】*******************************************************************/

/**【全局变量声明】*************************************************************/
int tickets = 20; 

/**【函数声明】*    ************************************************************/
void *ticketsell1(void *);  
void *ticketsell2(void *);   

int main(int argc, char *argv[])
{ 
    pthread_t id1,id2;  
    int error;  

    error = pthread_create(&id1, NULL, ticketsell1, NULL);  
    if(error != 0)  
    {  
        printf("pthread is not created!\n");  
        return -1;  
    }  

    error = pthread_create(&id2, NULL, ticketsell2, NULL);  
    if(error != 0)  
    {  
        printf("pthread is not created!\n");  
        return -1;  
    }  

    pthread_join(id1,NULL);  
    pthread_join(id2,NULL);  

    return 0;  
}  

void *ticketsell1(void *arg)  
{  
    while(1)  
    {  
        if(tickets > 0)  
        {  
//          usleep(1000);  
            printf("ticketse1 sells ticket:%d\n",tickets--);  
        }  
        else  
        {  
            break;  
        }  
    }  
    return (void *)0;  
}  

void *ticketsell2(void *arg)  
{  
    while(1)  
    {  
        if(tickets > 0)  
        {  
//          usleep(1000);  
            printf("ticketse2 sells ticket:%d\n",tickets--);  
        }  
        else  
        {  
            break;  
        }  
    }  

    return (void *)0;  
}  

编译运行:

cuuvhF.png

看到结果,我们发现能正常卖票的,一部分连续是ticketsel2,另一部分是ticketsel1;此时,其实存在一个隐含的问题,就是线程间的切换,在单CPU系统中,CPU是有时间片时间,时间片到了,就要执行其它的线程,假设thread1执行到if里面,但在printf执行前发生了线程切换,那么会发生什么呢?我们在这里用usleep函数(放开程序中的usleep注释行)进行强制模拟切换。
我们看看结果:

cuKi0x.png

运行程序发现竟然有0号票被卖出了,这显然是错误的!当thread1的if里面发生线程切换时,thread2得到运行,把最后一张票卖了,此时thread1恢复运行,结果卖出了0号票,这里我们需要的是火车票的票数数据对于所有线程而言是同步的,所以就要用到线程同步技术了。

2.3多线程的同步

多线程的同步方式有很多种,例如互斥锁,条件变量,信号量,读写锁。

2.3.1互斥量

当我们在进行多线程编程的时候,如果有多个线程共享相同的内存时,我们需要确保每个线程看到一致的数据视图。如果每个线程使用的变量都是其他线程不会读取和修改的,或对每个线程是只读的,那么久不存在一致性的问题。但是,当一个线程可以修改变量,其他线程同样也能读取或修改变量的时候,我们就需要对这些线程进行同步。

在Linux上进行多线程编程时,我们常用到互斥量(Mutex)。

互斥量(Mutex)从本质上说是一把锁,在访问共享资源前对互斥量进行加锁,在访问完成后释放锁。当我们对互斥量进行加锁之后,任何其他试图再次对互斥量加锁的线程都会被阻塞直到当前线程释放该互斥锁。如果释放互斥量时有一个以上的线程阻塞,那么所有之前尝试对互斥量加锁的线程都会变成可运行状态,当第一个变为可运行的线程对互斥量加锁后,其他线程只能再次阻塞。在这种方式下,每次只有一个线程可以向前执行。

引入互斥(mutual exlusion)锁的目的是用来保证共享数据的完整性。
互斥锁主要用来保护临界资源。每个临界资源都有一个互斥锁来保护,任何时刻最多只能有一个线程能访问该资源;线程必须先获得互斥锁才能访问临界资源,访问完资源后释放该锁。如果无法获得锁,线程会阻塞直到获得锁为止。

为了让线程访问数据不产生冲突,这就需要对变量加锁,使得同一时刻只有一个线程可以访问变量。互斥量的本质就是锁,访问共享资源前对互斥量加锁,访问完成后解锁。

当互斥量加锁加锁以后,其他需要访问该互斥量量的线程将被阻塞。

当互斥量解锁之后,所有的因为这个互斥量阻塞的线程将变为就绪态,第一个获得CPU的线程会获得互斥量,变为运行态,而其他线程会变为阻塞,在这种方式下访问互斥量每个只有一个线程向前执行。

通常,我们在临界区前上锁,临界区后解锁;互斥量是用pthread_mutex_t数据类型表示的。在使用互斥变量以前,必须首先对它进行初始化,可以把它设置为常量PTHREAD_MUTEX_INITIALIZER(只适应于静态分配的互斥量),也可以通过调用pthread_mutex_init函数进行初始化。如果动态分配互斥量(例如,通过调用malloc函数),在释放内存前调用pthread_mutex_destory。

互斥变量时用pthread_mutex_t数据类型表示的。在使用互斥变量之前,必须对它进行初始化。我们可以通过调用pthread_mutex_init函数进行初始化,也可以把它设置为常量PTHREAD_MUTEX_INITIALIZER(只适用静态分配的互斥量)。

要用默认的属性初始化互斥量,只需把attr设为NULL。

pthread_mutex_init () 函数描述如下:

头文件 #include
函数声明 int pthread_mutex_init(pthread_mutex_t mutex, const pthread_mutex_attr attr);
参数 mutex:互斥锁。
attr:互斥锁的属性。
返回值 成功返回0,失败返回错误编码。

要用默认的属性初始化互斥量,只需要把attr设置为NULL。

pthread_mutex_destory () 函数描述如下:

头文件 #include
函数声明 int pthread_mutex_destory (pthread_mutex_t *mutex);
参数 *mutex:互斥锁。
返回值 成功返回0,失败返回错误编码。

对互斥量进行加锁,需要用pthread_mutex_lock。如果互斥量已经上锁,调用线程将阻塞直到互斥量被解锁。对互斥量解锁,需要调用pthread_mutex_unlock。

pthread_mutex_lock () 函数描述如下:

头文件 #include
函数声明 int pthread_mutex_lock(pthread_mutex_t *mutex);
参数 *mutex:互斥锁。
返回值 成功返回0,失败返回错误编码。

pthread_mutex_unlock () 函数描述如下:

头文件 #include
函数声明 int pthread_mutex_unlock(pthread_mutex_t *mutex);
参数 *mutex:互斥锁。
返回值 成功返回0,失败返回错误编码。

如果线程不希望被阻塞,它可以使用pthread_thread_trylock尝试对互斥量进行加锁。如果调用pthread_mutex_trylock时互斥量处于未锁住状态,那么pthread_mutex_trylock将锁住互斥量,不会出现阻塞直到返回0,否则pthread_mutex_trylock就会失败,不能锁住互斥量,返回EBUSY。
pthread_mutex_trylock () 函数描述如下:

头文件 #include
函数声明 int pthread_mutex_trylock(pthread_mutex_t *mutex);
参数 *mutex:互斥锁。
返回值 成功返回0,失败返回错误编码。

下面我们通过一个实例来说明互斥量的使用。

/**Includes*********************************************************************/
#include <stdio.h>  
#include <stdlib.h>  
#include <string.h>  
#include <pthread.h>  
#include <unistd.h> 

/**【宏定义】*******************************************************************/
//#define _LOCK_  

/**【全局变量声明】*************************************************************/
unsigned int value1,value2,count; 
pthread_mutex_t mutex;  

/**【函数声明】*    ************************************************************/
void *thread_function(void *arg);  

int main(int argc, char *argv[])
{ 
    pthread_t thread_id;  

    if(pthread_mutex_init(&mutex,NULL) < 0)  
    {  
        perror("fail to mutex_init");  
        exit(-1);  
    }  
    if(pthread_create(&thread_id,NULL,thread_function,NULL) != 0)  
    {  
        perror("fail to pthread_create");  
        exit(-1);  
    }  
    while(1)  
    {  
        count++;  
#ifdef _LOCK_  
        pthread_mutex_lock(&mutex);  
#endif  
        value1 = count;  
        value2 = count;  
#ifdef _LOCK_  
        pthread_mutex_unlock(&mutex);  
#endif  
    }  
    return 0;  
}

void *thread_function(void *arg)  
{  
    while(1)  
    {  
#ifdef _LOCK_  
        pthread_mutex_lock(&mutex);  
#endif  
        if(value1 != value2)  
        {  
            printf("count = %d,value1 = %d,value2 = %d\n",count,value1,value2);  
            usleep(100000);  
        }  
#ifdef _LOCK_  
        pthread_mutex_unlock(&mutex);  
#endif  
    }  
    return NULL;  
}  

编译运行:

cuMyZt.png

我们可以看到,数据是不断被打印的,说明线程是可以访问临界资源的。

我们把#define _LOCK_前面的注释去掉,这时就加上了互斥锁,执行程序,此时,并没有数据被打印,说明此时a线程中 value1 与 value 2 一直是相等的,说明主线程一直在执行,线程并无法访问临界资源的。

cuMRJS.png

我们来看看互斥锁如何解决多线程之间的同步问题。程序用互斥锁后如下:

/**Includes*********************************************************************/
#include <stdio.h>  
#include <stdlib.h>  
#include <string.h>  
#include <pthread.h>  
#include <unistd.h> 

/**【宏定义】*******************************************************************/

/**【全局变量声明】*************************************************************/
int tickets = 20;  
pthread_mutex_t mutex;  

/**【函数声明】*    ************************************************************/
void *ticketsell1(void *);  
void *ticketsell2(void *);  

/**
  * @brief     主函数
  * @param     argc
               argv
  * @retval    None
  */
int main(int argc, char *argv[])
{  
    pthread_t id1,id2;  
    pthread_mutex_init(&mutex, NULL); 
    int error;  

    error = pthread_create(&id1, NULL, ticketsell1, NULL);  
    if(error != 0)  
    {  
        printf("pthread is not created!\n");  
        return -1;  
    }  

    error = pthread_create(&id2, NULL, ticketsell2, NULL);  
    if(error != 0)  
    {  
        printf("pthread is not created!\n");  
        return -1;  
    }  

    pthread_join(id1,NULL);  
    pthread_join(id2,NULL);  

    return 0;  
}  

void *ticketsell1(void *arg)  
{  
    while(1)  
    {  
        pthread_mutex_lock(&mutex);//给互斥量上锁  
        if(tickets > 0)  
        {  
            usleep(1000);  
            printf("ticketse1 sells ticket:%d\n",tickets--);  
            pthread_mutex_unlock(&mutex);//给互斥量解锁  

        }  
        else  
        {  
            pthread_mutex_unlock(&mutex);//给互斥量解锁  
            break;  
        }  
        pthread_yield();//线程调度函数,使每个线程都有执行机会  
    }  
    return (void *)0;  
}  

void *ticketsell2(void *arg)  
{  
    while(1)  
    {  
        pthread_mutex_lock(&mutex);//给互斥量上锁  
        if(tickets > 0)  
        {  
            usleep(1000);  
            printf("ticketse2 sells ticket:%d\n",tickets--);  
            pthread_mutex_unlock(&mutex);//给互斥量解锁  
        }  
        else  
        {  
            pthread_mutex_unlock(&mutex);//给互斥量解锁  
            break;  
        }  
        pthread_yield();//线程调度函数,是两个线程都有执行机会  
    }  

    return (void *)0;  
}   

编译运行:

cuMHoV.png

可以通过互斥量来访问数据,确保同一时间只有一个线程访问数据。互斥量从本质上一把锁,在访问资源前后对互斥量进行设置(加锁),在访问完全后释放(解锁)互斥量。对互斥量进行加锁以后,任何其他试图再次对互斥量加锁的线程都会被阻塞直到当前线程释放该互斥量。如果释放互斥量时有一个以上的线程阻塞,那么所有该锁上的阻塞线程都编程可运行状态,第一个变为运行状态就可以对互斥量加锁,其他线程看到互斥量依然是锁着的,只有再次等待它重新变为可用状态。在这种方式下,每次只有一个线程可以向前执行。

下面通过一个例子来说明互斥量的作用。

/**Includes*********************************************************************/
#include <stdio.h>  
#include <stdlib.h>  
#include <string.h>  
#include <pthread.h>  
#include <unistd.h> 

/**【宏定义】*******************************************************************/

/**【全局变量声明】*************************************************************/
pthread_mutex_t g_mutex_lock;
int g_count = 0;

/**【函数声明】*    ************************************************************/
void *thread_fun_1(void *data);
void *thread_fun_2(void *data);
void *thread_fun_3(void *data);

int main(int argc, char const *argv[])
{
    int ret;
    pthread_t pid[3];

    ret = pthread_mutex_init(&g_mutex_lock, NULL);
    if (ret != 0) {
        printf("mutex init failed\n");
        return -1;
    }

    pthread_create(&pid[0], NULL, thread_fun_1, NULL);
    pthread_create(&pid[1], NULL, thread_fun_2, NULL);
    pthread_create(&pid[2], NULL, thread_fun_3, NULL);

    pthread_join(pid[0], NULL);
    pthread_join(pid[1], NULL);
    pthread_join(pid[2], NULL);

    pthread_mutex_destroy(&g_mutex_lock);

    return 0;
}

void *thread_fun_1(void *data)
{
    pthread_mutex_lock(&g_mutex_lock);

    g_count++;
    printf("%s g_count: %d\n", __func__, g_count);

    pthread_mutex_unlock(&g_mutex_lock);
}

void *thread_fun_2(void *data)
{
    pthread_mutex_lock(&g_mutex_lock);

    g_count++;
    printf("%s g_count: %d\n", __func__, g_count);

    pthread_mutex_unlock(&g_mutex_lock);
}

void *thread_fun_3(void *data)
{
    pthread_mutex_lock(&g_mutex_lock);

    g_count++;
    printf("%s g_count: %d\n", __func__, g_count);

    pthread_mutex_unlock(&g_mutex_lock);
}

编译运行:

cuMXz4.png

对数据g_count的操作进行加锁之后,同一个时间只有一个线程能获取到锁,也就是只有一个线程能对g_count进行操作,保证了g_count的数据的准确性。

有些情况下,2个不同的操作是不能同时进行的,例如fingerprint中的enroll和verify同一时间只能有一个操作进行。保持操作的互斥性本质上其实还是在保护共有的数据。看下下面的例子,打印hello的时候,world是无法打印的,如果希望打印world只能等待打印hello的线程退出之后再打印。

/**Includes*********************************************************************/
#include <stdio.h>  
#include <stdlib.h>  
#include <string.h>  
#include <pthread.h>  
#include <unistd.h> 

/**【宏定义】*******************************************************************/

/**【全局变量声明】*************************************************************/
pthread_mutex_t g_mutex_lock;

/**【函数声明】*    ************************************************************/
void *thread_fun_1(void *data);
void *thread_fun_2(void *data);
void do_print_hello();
void do_print_world();

int main(int argc, char const *argv[])
{
    int ret;
    int cid;

    ret = pthread_mutex_init(&g_mutex_lock, NULL);
    if (ret != 0) 
    {
        printf("mutex init failed\n");
        return -1;
    }

    while (1) 
    {
        scanf("%d", &cid);
        getchar();

        switch (cid) 
        {
            case 0:
                do_print_hello();
            break;

            case 1:
                do_print_world();
            break;

            default:
            break;
        }
    }

    pthread_mutex_destroy(&g_mutex_lock);

    return 0;
}

void *thread_fun_1(void *data)
{
    pthread_mutex_lock(&g_mutex_lock);
    int i = 0;

    while (i < 3) 
    {
        printf("hello ,id = 0x%x\n",(unsigned int)pthread_self());
        i++;
        sleep(1);
    }

    pthread_mutex_unlock(&g_mutex_lock);
    printf("hello exit\n");
    pthread_exit(NULL);

}

void *thread_fun_2(void *data)
{
    pthread_mutex_lock(&g_mutex_lock);
    int i = 0;

    while (i < 3) 
    {
        printf("world  ,id = 0x%x\n",(unsigned int)pthread_self());
        i++;
        sleep(1);
    }

    pthread_mutex_unlock(&g_mutex_lock);
    printf("world exit\n");
    pthread_exit(NULL);

}

void do_print_hello()
{
    pthread_t pth_id;
    int result = pthread_create(&pth_id, NULL, thread_fun_1, NULL);
}

void do_print_world()
{
    pthread_t pth_id;
    int result = pthread_create(&pth_id, NULL, thread_fun_2, NULL);
}

编译运行:

cuQ9dx.png

先输入0在输入1,当打印完“hello”后,会打印“world”。上面的互斥锁是阻塞式的锁,也可以通过非阻塞式的锁进行,看下面的例子,pthread_mutex_trylock()函数如果获取到互斥锁了,会返回0,如果没有获取的互斥锁,会立即返回一个非0值,例子中如果当前正在打印“hello”,发出打印“world”命令之后,通过pthread_mutex_trylock()就能知道当前有没有打印线程正在运行。

/**Includes*********************************************************************/
#include <stdio.h>  
#include <stdlib.h>  
#include <string.h>  
#include <pthread.h>  
#include <unistd.h> 

/**【宏定义】*******************************************************************/

/**【全局变量声明】*************************************************************/
pthread_mutex_t g_mutex_lock;

/**【函数声明】*    ************************************************************/
void *thread_fun_1(void *data);
void *thread_fun_2(void *data);
void do_print_hello();
void do_print_world();

int main(int argc, char const *argv[])
{
    int ret;
    int cid;

    ret = pthread_mutex_init(&g_mutex_lock, NULL);
    if (ret != 0) 
    {
        printf("mutex init failed\n");
        return -1;
    }

    while (1) 
    {
        scanf("%d", &cid);
        getchar();

        switch (cid) 
        {
            case 0:
                do_print_hello();
            break;

            case 1:
                do_print_world();
            break;

            default:
            break;
        }
    }

    pthread_mutex_destroy(&g_mutex_lock);

    return 0;
}

void *thread_fun_1(void *data)
{
    if (pthread_mutex_trylock(&g_mutex_lock) != 0) 
    {
        printf("hello:%s\n",__func__);
        sleep(1);
        printf("hello return\n");
        return 0;
    }

    int i = 0;
    while (i < 5) 
    {
        printf("hello ,id = 0x%x\n",(unsigned int)pthread_self());
        i++;
        sleep(1);
    }

    pthread_mutex_unlock(&g_mutex_lock);
    printf("hello exit\n");
    pthread_exit(NULL);

}

void *thread_fun_2(void *data)
{
    if (pthread_mutex_trylock(&g_mutex_lock) != 0) 
    {
        printf("world:%s\n",__func__);
        sleep(1);
        printf("world return\n");
        return 0;
    }

    int i = 0;

    while (i < 5) 
    {
        printf("world ,id = 0x%x\n",(unsigned int)pthread_self());
        i++;
        sleep(1);
    }

    pthread_mutex_unlock(&g_mutex_lock);
    printf("world exit\n");
    pthread_exit(NULL);

}

void do_print_hello()
{
    pthread_t pth_id;
    int result = pthread_create(&pth_id, NULL, thread_fun_1, NULL);
}

void do_print_world()
{
    pthread_t pth_id;
    int result = pthread_create(&pth_id, NULL, thread_fun_2, NULL);
}

编译运行:

cuQeOA.png

先输入0在输入1,第一个线程在打印完成后正常退出,第二个线程锁失败,退出线程。

上述,使用默认的属性其实已经能满足一些多线程同步的实现。但有时候我们可能需要设置互斥量的属性。下面,主要来谈谈互斥量的属性。

就像线程具有属性一样,线程的同步对象也有属性。值得注意的3个属性是:进程共享属性、健壮属性及类型属性。我们只讨论类型属性,至于进程共享属性和健壮属性有兴趣的朋友可以进行资料查找。

我们可以使用pthread_mutexattr_init初始化pthread_mutexattr_t结构,用pthread_mutexattr_destroy来反初始化。

头文件 #include
函数声明 int pthread_mutexattr_init(pthread_mutexattr_t *attr);
参数 *attr:互斥锁的属性。
返回值 成功返回0,失败返回错误编码。
头文件 #include
函数声明 int pthread_mutexattr_destroy (pthread_mutexattr_t *attr);
参数 *attr:互斥锁的属性。
返回值 成功返回0,失败返回错误编码。

pthread_mutexattr_init函数将默认的互斥量属性初始化pthread_mutexattr_t结构。

类型互斥量属性控制着互斥量的锁定特性。POSIX.1定义了4种类型。

PTHREAD_MUTEX_NORMAL 一种标准互斥量类型,不做任何特殊的错误检查或死锁检测。
PTHREAD_MUTEX_ERRORCHECK 此互斥量类型提供错误检查。
PTHREAD_MUTEX_RECURSIVE 此互斥量类型允许同一线程在互斥量解锁之前对该互斥量进行;多次加锁。递归互斥量维护锁的计数,在解锁次数和加锁次数,不相同的情况下,不会释放锁。
PTHREAD_MUTEX_DEFAULT 此互斥量类型可以提供默认特性和行为。

我们可以使用pthread_mutexattr_gettype函数得到互斥量类型属性,用pthread_mutexattr_settype函数修改互斥量类型属性。

头文件 #include
函数声明 int pthread_mutexattr_gettype(const pthread_mutexattr_t attr, int type);
参数 attr:互斥锁的属性。
type:属性类型。
返回值 成功返回0,失败返回错误编码。
头文件 #include
函数声明 int pthread_mutexattr_settype(const pthread_mutexattr_t *attr, int type);
参数 attr:互斥锁的属性。
type:属性类型。
返回值 成功返回0,失败返回错误编码。

2.3.2避免死锁

如果线程试图对同一个互斥量加锁两次,那么它自身就会陷入死锁状态,但是使用互斥量时,还有其他不太明显的方式也能产生死锁。例如,程序中使用一个以上的互斥量时,如果允许一个线程一直占有第一个互斥量,并且在试图锁住第二个互斥量时处于阻塞状态,但是拥有第二个互斥量的线程也在试图锁住第一个互斥量,因为两个线程都在相互请求另外一个线程拥有的资源,所以这两个线程都无法前进运行,于是就产生死锁。

/**Includes*********************************************************************/
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <sys/types.h>
#include <pthread.h>

/**【自定义结构】***************************************************************/
struct foo
{
    int                 f_count;
    int                 f_addtimes;
    pthread_mutex_t     f_mutex;
};

/**【宏定义】*******************************************************************/

/**【全局变量声明】*************************************************************/

/**【函数声明】*    ************************************************************/
struct foo * foo_alloc();
void foo_addtimes(struct foo *fp);
void foo_add(struct foo *fp);  //调用foo_addtimes对f_mutex加锁两次
void * thread_func1(void *arg);
void foo_rele(struct foo *fp);

int main(int argc, char const *argv[])
{
    pthread_t pid1;
    int err;
    void *pret;
    struct foo *fobj;
    fobj = foo_alloc();
    pthread_create(&pid1,NULL,thread_func1,(void*)fobj);
    pthread_join(pid1,&pret);
    foo_rele(fobj);

    printf("thread 1 exit code is: %ld\n",(long)pret);
    exit(0);
}

struct foo * foo_alloc()
{
    struct foo* fp;
    fp = (struct foo*)malloc(sizeof(struct foo));
    if(fp != NULL)
    {
        fp->f_count = 0;
        fp->f_addtimes = 0;
        pthread_mutex_init(&fp->f_mutex,NULL);
    }
    return fp;
}

void foo_addtimes(struct foo *fp)
{
    pthread_mutex_lock(&fp->f_mutex);
    fp->f_addtimes++;
    pthread_mutex_unlock(&fp->f_mutex);
}

void foo_add(struct foo *fp)  //调用foo_addtimes对f_mutex加锁两次
{
    pthread_mutex_lock(&fp->f_mutex);
    fp->f_count++;
    foo_addtimes(fp);  
    pthread_mutex_unlock(&fp->f_mutex);
}

void foo_rele(struct foo *fp) /* release a reference to the object */
{
    pthread_mutex_destroy(&fp->f_mutex);
    free(fp);
}

void * thread_func1(void *arg)
{
    struct foo *fp = (struct foo*)arg;
    printf("thread 1 start.\n");
    foo_add(fp);  //调用函数执行,造成死锁
    printf("in thread 1 count = %d\n",fp->f_count);
    printf("thread 1 exit.\n");
    pthread_exit((void*)1);
}

编译运行:

cuQwkV.png

从结果可以看出程序执行到foo_add()时候陷入了死锁。因为foo_add函数调用foo_addtimes函数,使得加锁两次,导致死锁。解决这个问题可以将foo_addtimes函数中的锁去掉,或将foo_addtimes函数合并到foo_add中。除了这些办法,还可以设置互斥量属性,设置为递归锁。

/**Includes*********************************************************************/
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <sys/types.h>
#include <pthread.h>

/**【自定义结构】***************************************************************/
struct foo
{
    int                 f_count;
    int                 f_addtimes;
    pthread_mutex_t     f_mutex;
};

/**【宏定义】*******************************************************************/

/**【全局变量声明】*************************************************************/

/**【函数声明】*    ************************************************************/
struct foo * foo_alloc();
void foo_addtimes(struct foo *fp);
void foo_add(struct foo *fp);  
void foo_rele(struct foo *fp);
void * thread_func1(void *arg);
void * thread_func2(void *arg);

int main(int argc, char const *argv[])
{
    pthread_t pid1,pid2;
    int err;
    void *pret;
    struct foo *fobj;
    fobj = foo_alloc();

    pthread_create(&pid1,NULL,thread_func1,(void*)fobj);
    pthread_create(&pid2,NULL,thread_func2,(void*)fobj);

    pthread_join(pid1,&pret);
    printf("thread 1 exit code is: %ld\n",(long)pret);

    pthread_join(pid2,&pret);

    foo_rele(fobj);
    printf("thread 2 exit code is: %ld\n",(long)pret);

    exit(0);
}

struct foo * foo_alloc()
{
    struct foo* fp;
    fp = (struct foo*)malloc(sizeof(struct foo));
    if(fp != NULL)
    {
        fp->f_count = 0;
        fp->f_addtimes = 0;
        //设置互斥量类型为递归锁,可以对已加锁再次加锁
        pthread_mutexattr_t attr;
        pthread_mutexattr_init(&attr);
        pthread_mutexattr_settype(&attr,PTHREAD_MUTEX_RECURSIVE);
        pthread_mutex_init(&fp->f_mutex,&attr);
    }
    return fp;
}

void foo_addtimes(struct foo *fp)
{
    pthread_mutex_lock(&fp->f_mutex);
    fp->f_addtimes++;
    pthread_mutex_unlock(&fp->f_mutex);
}

void foo_add(struct foo *fp)  //调用foo_addtimes对f_mutex加锁两次
{
    pthread_mutex_lock(&fp->f_mutex);
    fp->f_count++;
    foo_addtimes(fp);  
    pthread_mutex_unlock(&fp->f_mutex);
}

void foo_rele(struct foo *fp) /* release a reference to the object */
{
    pthread_mutex_destroy(&fp->f_mutex);
    free(fp);
}

void * thread_func1(void *arg)
{
    struct foo *fp = (struct foo*)arg;
    printf("thread 1 start.\n");
    foo_add(fp); 
    printf("in thread 1 count = %d\n",fp->f_count);
    printf("thread 1 exit.\n");
    pthread_exit((void*)1);
}

void * thread_func2(void *arg)
{
    struct foo *fp = (struct foo*)arg;
    printf("thread 2 start.\n");
    foo_add(fp);
    printf("in thread 2 count = %d\n",fp->f_count);
    printf("thread 2 exit.\n");
    pthread_exit((void*)2);
}

编译运行:

cuQrpF.png

可以通过控制互斥量加锁的顺序来避免死锁的发生。例如,假设需要对两个互斥量A和B同时加锁。如果所有线程总是在对互斥量B加锁之前锁住互斥量A,那么使用者两个虎刺量就不会产生死锁(当然在其他的资源上也有可能出现死锁)。类似地,如果所有的线程总是锁住互斥量A之前锁住互斥量B,那么也不会发生死锁。可能出现的死锁只会发生在一个线程试图锁在另一个线程相反的顺序锁住的互斥量。

有时候,应用程序的结构使得对互斥量进行排序是很困难的。如果涉及了太多的锁和数据结构。可用的函数并不能把它转换成简单的层次。这种情况可以使用phread_mutex_trylock接口避免死锁。如果已经占有某些锁而且pthread_mutex_trylock接口返回成功,那么就可以前进。但是,如果不能获取锁,可以先释放已经占有的锁,做好清理工作,然后过一段时间再试试。

下面我们通过两个互斥量的使用方法。在同时需要两个互斥量时,总是让他们以相同的顺序加锁,这样可以避免死锁。第二个互斥量维护者一个用于跟踪foo数据结构的散列列表。这样hashlock互斥量既可以保护foo数据结构中的散列表fh,又可以保护散列链字段f_next。foo结构中的f_lock互斥量保护对foo结构中其他字段的访问。

/**Includes*********************************************************************/
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <sys/types.h>
#include <pthread.h>

/**【自定义结构】***************************************************************/
struct foo
{
    int                 f_count;
    pthread_mutex_t     f_lock;
    int                 f_id;
    struct foo         *f_next;/*protected by hashlock*/
    /*...more stuff here ...*/
};

/**【宏定义】*******************************************************************/
#define NHASH 29
#define HASH(id)  (((unsigned long)id)%NHASH) 

/**【全局变量声明】*************************************************************/
struct foo *fh[NHASH];
pthread_mutex_t hashlock = PTHREAD_MUTEX_INITIALIZER;

/**【函数声明】*    ************************************************************/
struct foo *foo_alloc(int id);
void foo_hold(struct foo *fp);
struct foo * foo_find(int id);
void foo_rele(struct foo *fp);

void * thread_func1(void *arg);
void * thread_func2(void *arg);

int main(int argc, char const *argv[])
{
    pthread_t pid1,pid2;
    int err;
    void *pret;
    struct foo *fobj;
    fobj = foo_alloc(45);

    pthread_create(&pid1,NULL,thread_func1,(void*)fobj);
    pthread_create(&pid2,NULL,thread_func2,(void*)fobj);

    pthread_join(pid1,&pret);
    printf("thread 1 exit code is: %ld\n",(long)pret);

    pthread_join(pid2,&pret);
    printf("thread 2 exit code is: %ld\n",(long)pret);
    foo_rele(fobj);

    exit(0);
}

struct foo *foo_alloc(int id) /* allocate the object */
{
    struct foo  *fp;
    int         idx;

    if ((fp = malloc(sizeof(struct foo))) != NULL) 
    {
        fp->f_count = -1;
        fp->f_id = id;
        if (pthread_mutex_init(&fp->f_lock, NULL) != 0) 
        {
            free(fp);
            return(NULL);
        }
        idx = HASH(id);
        pthread_mutex_lock(&hashlock);        
        fp->f_next = fh[idx];
        fh[idx] = fp;
        pthread_mutex_lock(&fp->f_lock);           
        pthread_mutex_unlock(&hashlock);
        /* ... continue initialization ... */
        pthread_mutex_unlock(&fp->f_lock);
    }
    return(fp);
}

void foo_hold(struct foo *fp)/*add a reference to object*/
{
    pthread_mutex_lock(&fp->f_lock);
    fp->f_count++;
    pthread_mutex_unlock(&fp->f_lock);
}

struct foo * foo_find(int id)  /*find an existing object*/
{
    struct foo *fp;

    pthread_mutex_lock(&hashlock);
    for(fp = fh[HASH(id)];fp!=NULL;fp = fp->f_next)
    {
        if(fp->f_id == id)
        {
            foo_hold(fp);
            break;
        }
    }       
    pthread_mutex_unlock(&hashlock);
    return (fp);
}

void foo_rele(struct foo *fp)/*release a reference to the object*/
{
    struct foo *tfp;
    int  idx;

    pthread_mutex_lock(&fp->f_lock);
    if(fp->f_count == 1)/*last reference*/
    {
        pthread_mutex_unlock(&fp->f_lock);
        pthread_mutex_lock(&hashlock);
        pthread_mutex_lock(&fp->f_lock);
        /*need to recheck the condition*/
        if(fp->f_count != 1)
        {
            fp->f_count--;
            pthread_mutex_unlock(&fp->f_lock);
            pthread_mutex_unlock(&hashlock);
            return ;
        }
        /*remove from list*/
        idx = HASH(fp->f_id);
        tfp = fh[idx];
        if(tfp == fp)
        {
            fh[idx] = fp->f_next;
        }
        else
        {
            while(tfp->f_next != fp)
            {
                tfp = tfp->f_next;
            }
            tfp->f_next = fp->f_next;
        }
        pthread_mutex_unlock(&hashlock);
        pthread_mutex_unlock(&fp->f_lock);
        pthread_mutex_destroy(&fp->f_lock);
        free(fp);
    }
    else
    {
        fp->f_count --;
        pthread_mutex_unlock(&fp->f_lock);
    }
}

void * thread_func1(void *arg)
{
    struct foo *fp = (struct foo*)arg;
    printf("thread 1 start.\n");
    foo_hold(fp);  
    printf("in thread 1 count = %d\n",fp->f_count);
    printf("thread 1 exit.\n");
    pthread_exit((void*)1);
}

void * thread_func2(void *arg)
{
    struct foo *fp = (struct foo*)arg;
    printf("thread 2 start.\n");
    foo_hold(fp);
    printf("in thread 2 count = %d\n",fp->f_count);
    printf("thread 2 exit.\n");
    pthread_exit((void*)2);
}

编译运行:

cuQ201.png

从上述代码可以看出,分配函数锁住了散列列表锁,把新的结果添加到了散列桶中,而且在对散列列表的锁解锁之前,先锁定了新结构中的互斥量。因为新的结构是放在全局列表中的,其他线程可以找到它,所以在初始化完成之前,先要求阻塞其他线程试图访问新的结构。

foo_find函数锁住散列列表锁,然后搜索被请求的结构。如果找到了,就增加其引用计数,再在foo_hold函数中锁定foo结果中的f_lock互斥量。

现在有了两个锁以后,foo_rele函数就变得更加复杂了。如果这是最后一个引用,就需要对这个结构互斥量进行解锁,因为我们需要从散列列表中删除这个结构,这样才可以获取散列列表锁,然后重新获取结果互斥量,从上一次获得结构互斥量以来我们可能被阻塞着,所以需要重新检查条件,判断是否还需要释放这个结构。如果另一个线程在我们为满足锁顺序而进行阻塞时发现这个结果并对其引用计数加1,那么只需要简单地对整个引用计数减1,对所有的东西解锁,然后返回。

foo_rele画一个图详细的说一下。

cuQ4fO.png

如果在第一次判断引用计数为1的时候,不对节点进行解锁,直接从哈希表中摘除并进行释放,这里会出问题。因为你需要摘除节点,就必须对哈希表进行加锁,如果没有加锁,则可以有一个线程调用了foo_find拿到了这个节点。又因为释放节点必须先对其解锁,那么就有可能你这里刚解锁,另一个线程通过foo_hold对其进行使用了。然后节点被销毁,可是还有一个线程正在引用它。

线程A: 节点加锁—> —>节点解锁 —->节点销毁
线程B: foo_find拿到节点 –>foo_hold使用节点 –>引用节点出错

这种解锁方法复杂,所以我们需要重新审视原来的设计。我们也可以使用散列列表锁来裱糊整个结构引用计数,使事情大大简化。结果互斥量可以用于保护foo结果中的任何其他东西。

/**Includes*********************************************************************/
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <sys/types.h>
#include <pthread.h>

/**【自定义结构】***************************************************************/
struct foo
{
    int                 f_count;
    pthread_mutex_t     f_lock;
    int                 f_id;
    struct foo         *f_next;/*protected by hashlock*/
    /*...more stuff here ...*/
};

/**【宏定义】*******************************************************************/
#define NHASH 29
#define HASH(id)  (((unsigned long)id)%NHASH) 

/**【全局变量声明】*************************************************************/
struct foo *fh[NHASH];
pthread_mutex_t hashlock = PTHREAD_MUTEX_INITIALIZER;

/**【函数声明】*    ************************************************************/
struct foo *foo_alloc(int id);
void foo_hold(struct foo *fp);
struct foo * foo_find(int id);
void foo_rele(struct foo *fp);

void * thread_func1(void *arg);
void * thread_func2(void *arg);

int main(int argc, char const *argv[])
{
    pthread_t pid1,pid2;
    int err;
    void *pret;
    struct foo *fobj;
    fobj = foo_alloc(45);

    pthread_create(&pid1,NULL,thread_func1,(void*)fobj);
    pthread_create(&pid2,NULL,thread_func2,(void*)fobj);

    pthread_join(pid1,&pret);
    printf("thread 1 exit code is: %ld\n",(long)pret);

    pthread_join(pid2,&pret);
    printf("thread 2 exit code is: %ld\n",(long)pret);
    foo_rele(fobj);

    exit(0);
}

struct foo *foo_alloc(int id) /* allocate the object */
{
    struct foo  *fp;
    int         idx;

    if ((fp = malloc(sizeof(struct foo))) != NULL) 
    {
        fp->f_count = 1;
        fp->f_id = id;
        if (pthread_mutex_init(&fp->f_lock, NULL) != 0) 
        {
            free(fp);
            return(NULL);
        }
        idx = HASH(id);
        pthread_mutex_lock(&hashlock);
        fp->f_next = fh[idx];
        fh[idx] = fp;
        pthread_mutex_lock(&fp->f_lock);
        pthread_mutex_unlock(&hashlock);
        /* ... continue initialization ... */
        pthread_mutex_unlock(&fp->f_lock);
    }
    return(fp);
}

void foo_hold(struct foo *fp) /* add a reference to the object */
{
    pthread_mutex_lock(&hashlock);
    fp->f_count++;
    pthread_mutex_unlock(&hashlock);
}

struct foo *foo_find(int id) /* find an existing object */
{
    struct foo  *fp;

    pthread_mutex_lock(&hashlock);
    for (fp = fh[HASH(id)]; fp != NULL; fp = fp->f_next) {
        if (fp->f_id == id) 
        {
            fp->f_count++;
            break;
        }
    }
    pthread_mutex_unlock(&hashlock);
    return(fp);
}

void foo_rele(struct foo *fp) /* release a reference to the object */
{
    struct foo  *tfp;
    int         idx;

    pthread_mutex_lock(&hashlock);
    if (--fp->f_count == 0) 
    {
        /* last reference, remove from list */
        idx = HASH(fp->f_id);
        tfp = fh[idx];
        if (tfp == fp)
        {
            fh[idx] = fp->f_next;
        } 
        else 
        {
            while (tfp->f_next != fp)
                tfp = tfp->f_next;
            tfp->f_next = fp->f_next;
        }
        pthread_mutex_unlock(&hashlock);
        pthread_mutex_destroy(&fp->f_lock);
        free(fp);
    } 
    else 
    {
        pthread_mutex_unlock(&hashlock);
    }
}

void * thread_func1(void *arg)
{
    struct foo *fp = (struct foo*)arg;
    printf("thread 1 start.\n");
    foo_hold(fp);  
    printf("in thread 1 count = %d\n",fp->f_count);
    printf("thread 1 exit.\n");
    pthread_exit((void*)1);
}

void * thread_func2(void *arg)
{
    struct foo *fp = (struct foo*)arg;
    printf("thread 2 start.\n");
    foo_hold(fp);
    printf("in thread 2 count = %d\n",fp->f_count);
    printf("thread 2 exit.\n");
    pthread_exit((void*)2);
}

编译运行:

cuQbnA.png

上面的程序比thread_mutex9中的方式简单多了。两种用途使用简单的锁时,围绕散列列表和引用计数的锁的排列问题就不存在了。多线程的软件设计设计这两者之间的折中。如果锁粒度太粗,就会出现很多很多线程阻塞等待相同的锁,这可能并不能改善并发性。如果锁的粒度太细,那么过多的锁开销会使得性能受到影响,而且代码也会变得复杂。作为一个程序员,需要在满足锁要求的情况下,在代码复杂性和性能之间找到正确的平衡。

2.3.2信号量

Linux提供两种信号量:内核信号量和用户态信号量。用户态信号量又分为POSIX信号量和SYSTEM V信号量。而针对线程常用POSIX信号量。POSIX信号量又分为有名信号量和无名信号量。有名信号量,其值保存在文件中, 所以它可以用于线程也可以用于进程间的同步。无名信号量,其值保存在内存中。本文主要讲解POSIX信号量的无名信号量。POSIX信号量的引用头文件是
信号量代表某一类资源,其值表示系统中该资源当前可用的数量。信号量是一个受保护的变量,只能
通过三种操作来访问:
1)初始化
2)P操作(申请资源)
3)V操作(释放资源)

P(S)含义如下:

if (信号量的值大于0)  
{  
    请资源的任务继续运行;  
    信号量的值 减一;  
}  
else  
{  
    请资源的任务阻塞;  
} 

V(S)含义如下:

if (没有任务在等待该资源)  
{  
    信号量的值 加一;  
}  
else  
{  
    唤醒第一个等待的任务,让其继续运行;  
}  

1)信号量初始化函数:

所需头文件 #include
函数原型 int sem_init(sem_t *sem, int pshared, unsigned int value);
函数参数 sem:初始化信号量
pshared:信号量共享的范围(0:线程间使用 ,非0:进程间使用)
value:信号量初始化
函数返回值 成功:0
失败:-1

1) P操作

所需头文件 #include
函数原型 int sem_wait(sem_t *sem);
函数参数 sem:信号量
函数返回值 成功:0
失败:-1

3)V操作

所需头文件 #include
函数原型 int sem_post(sem_t *sem);
函数参数 sem:信号量
函数返回值 成功:0
失败:-1

下面是个实例:

/**Includes*********************************************************************/
#include <stdio.h>  
#include <stdlib.h>  
#include <string.h>  
#include <pthread.h>  
#include <semaphore.h>  

/**【自定义结构】***************************************************************/

/**【全局变量声明】*************************************************************/
char buf[60];  
sem_t sem;  

/**【全局变量声明】*************************************************************/
void *function(void *arg);  

int main(int argc, char *argv[])  
{  
    pthread_t a_thread;  
    void *thread_result;  
    if(sem_init(&sem,0,0) != 0)  
    {  
        perror("fail to sem_init");  
        exit(-1);  
    }  
    if(pthread_create(&a_thread,NULL,function,NULL) != 0)  
    {  
        perror("fail to pthread_create");  
        exit(-1);  
    }  
    printf("input 'quit' to exit\n");  
    do    
    {  
        fgets(buf,60,stdin);  
        sem_post(&sem);  
    }  
    while(strncmp(buf,"quit",4) != 0);  
    return 0;  
}

void *function(void *arg)  
{  
    while(1)  
    {  
        sem_wait(&sem);  
        printf("you enter %ld characters\n",strlen(buf) - 1);  
    }  
}

编译运行:

culpcQ.png

我们可以看到两个线程是同步的。

再看看用信号量来解决多线程的同步问题,程序代码如下:

/**Includes*********************************************************************/
#include <stdio.h>  
#include <stdlib.h>  
#include <string.h>  
#include <pthread.h>  
#include <semaphore.h>  

/**【自定义结构】***************************************************************/

/**【全局变量声明】*************************************************************/ 
int tickets = 20;  
sem_t mutex,full;  

/**【函数声明】*****************************************************************/
void *ticketsell1(void *);  
void *ticketsell2(void *);  

int main(int argc, char *argv[])  
{  
    pthread_t id1,id2;  
    int error;  
    int ret;  

    ret = sem_init(&mutex, 0 ,1);//初始化mutex信号量为1  
    ret += sem_init(&full, 0 ,0);//初始化full信号量为0  

    if(ret != 0)  
    {  
        printf("sem_init fails!\n");  
    }  

    error = pthread_create(&id1, NULL, ticketsell1, NULL);  
    if(error != 0)  
    {  
        printf("pthread is not created!\n");  
        return -1;  
    }  

    error = pthread_create(&id2, NULL, ticketsell2, NULL);  
    if(error != 0)  
    {  
        printf("pthread is not created!\n");  
        return -1;  
    }  

    pthread_join(id1,NULL);  
    pthread_join(id2,NULL);  

    return 0;  
}  

void *ticketsell1(void *arg)  
{  
    while(1)  
    {  
        sem_wait(&mutex);//mutex信号量进行P操作  
        if(tickets > 0)  
        {  
            usleep(1000);  
            printf("ticketse1 sells ticket:%d\n",tickets--);  
            sem_post(&full);//full信号量进行V操作  
        }  
        else  
        {  
            sem_post(&full);//full信号量进行V操作  
            break;  
        }  
    }  
    return (void *)0;  
}  

void *ticketsell2(void *arg)  
{  
    while(1)  
    {  
        sem_wait(&full);//full信号量进行P操作  
        if(tickets > 0)  
        {  
            usleep(1000);  
            printf("ticketse2 sells ticket:%d\n",tickets--);  
            sem_post(&mutex);//mutex信号量进行V操作  
        }  
        else  
        {  
            sem_post(&mutex);//mutex信号量进行V操作  
            break;  
        }  
    }  

    return (void *)0;  
}  

编译运行:

culPns.png

上面的sem_init函数用来初始化两个信号量的初始化值,这里一个设为1,一个设为0,sem_wait类似于P操作,让信号量减1,如果结果小于0,线程阻塞,否则线程继续执行,sem_post类似于V操作,提升信号量的值,加1,通过这两个信号量之间的互相“救对方”,就可以实现这两个线程的同步执行。

我们编译运行以上程序,发现两个售票点交替卖票,两个线程依次得到机会执行,并且不会有0号票卖出,实现了同步。上面的例子也常用用于控于两个线程执行顺序。

2.3.3条件变量

在服务器编程中常用的线程池,多个线程会操作同一个任务队列,一旦发现任务队列中有新的任务,子线程将取出任务;这里因为是多线程操作,必然会涉及到用互斥锁保护任务队列的情况(否则其中一个线程操作了任务队列,取出线程到一半时,线程切换又取出相同任务)。但是互斥锁一个明显的缺点是它只有两种状态:锁定和非锁定。设想,每个线程为了获取新的任务不断得进行这样的操作:锁定任务队列,检查任务队列是否有新的任务,取得新的任务(有新的任务)或不做任何操作(无新的任务),释放锁,这将是很消耗资源的。

而条件变量通过允许线程阻塞和等待另一个线程发送信号的方法弥补了互斥锁的不足,它常和互斥锁一起配合使用。使用时,条件变量被用来阻塞一个线程,当条件不满足时,线程往往解开相应的互斥锁并等待条件发生变化。一旦其他的某个线程改变了条件变量,他将通知相应的条件变量唤醒一个或多个正被此条件变量阻塞的线程。这些线程将重新锁定互斥锁并重新测试条件是否满足。一般说来,条件变量被用来进行线程间的同步。

对应于线程池的场景,我们可以让线程处于等待状态,当主线程将新的任务放入工作队列时,发出通知(其中一个或多个),得到通知的线程重新获得锁,取得任务,执行相关操作。

 条件变量初始化
条件变量和互斥锁一样,都有静态动态两种创建方式,静态方式使用PTHREAD_COND_INITIALIZER常量,如下:

pthread_cond_t cond = PTHREAD_COND_INITIALIZER

动态方式调用函数int pthread_cond_init,API定义如下:

int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr);

条件变量的属性由参数attr指定,如果参数attr为NULL,那么就使用默认的属性设置。尽管POSIX标准中为条件变量定义了属性,但在LinuxThreads中没有实现,因此cond_attr值通常为NULL,且被忽略。多线程不能同时初始化一个条件变量,因为这是原子操作。
如果函数调用成功,则返回0,并将新创建的条件变量的ID放在参数cond中。

 解除条件变量

int pthread_cond_destroy(pthread_cond_t *cond);

调用destroy函数解除条件变量并不会释放存储条件变量的内存空间。

 条件变量阻塞(等待)

int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
int pthread_cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex, const struct timespec *abtime);

等待有两种方式:条件等待pthread_cond_wait()和计时等待pthread_cond_timedwait(),其中计时等待方式如果在给定时刻前条件没有满足,则返回ETIMEDOUT,结束等待,其中abstime以与系统调用time相同意义的绝对时间形式出现,0表示格林尼治时间1970年1月1日0时0分0秒。

无论哪种等待方式,都必须和一个互斥锁配合,以防止多个线程同时请求pthread_cond_wait()或pthread_cond_timedwait()(下同)的竞争条件(Race Condition)。mutex互斥锁必须是普通锁(PTHREAD_MUTEX_TIMED_NP)或者自适应锁(PTHREAD_MUTEX_ADAPTIVE_NP),且在调用pthread_cond_wait()前必须由本线程加锁(pthread_mutex_lock()),而在更新条件等待队列以前,mutex保持锁定状态,并在线程挂起进入等待前解锁。在条件满足从而离开pthread_cond_wait()之前,mutex将被重新加锁,以与进入pthread_cond_wait()前的加锁动作对应。阻塞时处于解锁状态。

 激活

int pthread_cond_signal(pthread_cond_t *cond);
int pthread_cond_broadcast(pthread_cond_t *cond);

pthread_cond_signal函数的作用是发送一个信号给另外一个正在处于阻塞等待状态的线程,使其脱离阻塞状态,继续执行,如果没有线程处在阻塞等待状态,pthread_cond_signal也会成功返回。
共享变量的状态改变必须遵守lock/unlock的规则:需要在同一互斥锁的保护下使用pthread_cond_signal(即pthread_cond_wait必须放在pthread_mutex_lock和pthread_mutex_unlock之间)否则条件变量可以在对关联条件变量的测试和pthread_cond_wait带来的阻塞之间获得信号,这将导致无限期的等待(死锁)。因为他要根据共享变量的状态来决定是否要等待,所以为了避免死锁,必须要在lock/unlock队中。

共享变量的状态改变必须遵守lock/unlock的规则:pthread_cond_signal即可以放在pthread_mutex_lock和pthread_mutex_unlock之间,也可以放在pthread_mutex_lock和pthread_mutex_unlock之后,但是各有优缺点。

若为前者,在某些线程的实现中,会造成等待线程从内核中唤醒(由于cond_signal)然后又回到内核空间(因为cond_wait返回后会有原子加锁的行为),所以一来一回会有性能的问题(上下文切换)。详细来说就是,当一个等待线程被唤醒的时候,它必须首先加锁互斥量(参见pthread_cond_wait()执行步骤)。如果线程被唤醒而此时通知线程任然锁住互斥量,则被唤醒线程会立刻阻塞在互斥量上,等待通知线程解锁该互斥量,引起线程的上下文切换。当通知线程解锁后,被唤醒线程继续获得锁,再一次的引起上下文切换。这样导致被唤醒线程不能顺利加锁,延长了加锁时间,加重了系统不必要的负担。但是在LinuxThreads或者NPTL里面,就不会有这个问题,因为在Linux 线程中,有两个队列,分别是cond_wait队列和mutex_lock队列, cond_signal只是让线程从cond_wait队列移到mutex_lock队列,而不用返回到用户空间,不会有性能的损耗,因此Linux推荐这种形式。

而后者不会出现之前说的那个潜在的性能损耗,因为在signal之前就已经释放锁了。但如果unlock和signal之前,有个低优先级的线程正在mutex上等待的话,那么这个低优先级的线程就会抢占高优先级的线程(cond_wait的线程)。而且,假设而这在上面的放中间的模式下是不会出现的。
而对于pthread_cond_broadcast函数,它使所有由参数cond指向的条件变量阻塞的线程退出阻塞状态,如果没有阻塞线程,则函数无效。

我们再用条件变量来解决同步问题,一般条件变量需要结合互斥量一起使用,代码如下

#include <stdio.h>  
#include <pthread.h>  
#include <semaphore.h>  

void *ticketsell1(void *);  
void *ticketsell2(void *);  
int tickets = 20;  
pthread_mutex_t mutex;  
pthread_cond_t  qready = PTHREAD_COND_INITIALIZER;//静态初始化条件变量;  

int main()  
{  
    pthread_t id1,id2;  
    pthread_mutex_init(&mutex, NULL);  
    int error;  

    error = pthread_create(&id1, NULL, ticketsell1, NULL);  
    if(error != 0)  
    {  
        printf("pthread is not created!\n");  
        return -1;  
    }  

    error = pthread_create(&id2, NULL, ticketsell2, NULL);  
    if(error != 0)  
    {  
        printf("pthread is not created!\n");  
        return -1;  
    }  

    pthread_join(id1,NULL);  
    pthread_join(id2,NULL);  

    return 0;  
}  

void *ticketsell1(void *arg)  
{  
    pthread_mutex_lock(&mutex);  
    while(tickets > 0)  
    {  
        if(tickets%2 == 1)  
        {  
            usleep(1000);  
            printf("ticketse1 sells ticket:%d\n",tickets--);  
            pthread_cond_signal(&qready);//条件改变,发送信号,通知ticketse2  
        }  
        else  
        {  
            pthread_cond_wait(&qready,&mutex);//解开mutex,并等待qready改变  
        }  
    }  
    pthread_mutex_unlock(&mutex);//给互斥量解锁 
    return (void *)0;  
}  

void *ticketsell2(void *arg)  
{  
    pthread_mutex_lock(&mutex);  
    while(tickets > 0)  
    {  
        if(tickets%2 == 0)  
        {  
            usleep(1000);  
            printf("ticketse2 sells ticket:%d\n",tickets--);  
            pthread_cond_signal(&qready);//条件改变,发送信号,通知ticketse1  
        }  
        else  
        {  
            pthread_cond_wait(&qready,&mutex);//解开mutex,并等待qready改变  
        }    
    }  
    pthread_mutex_unlock(&mutex);//给互斥量解锁  
    return (void *)0;  
}  

执行结果如下:

culnc4.png

条件变量通过允许线程阻塞和等待另一个线程发送信号的方法弥补了互斥锁的不足,它常和互斥锁一起使用。使用时,条件变量被用来阻塞一个线程,当条件不满足时,线程往往解开相应的互斥锁并等待条件变量发生变化。一旦其它的某个线程改变了条件变量,它将通知相应的条件变量唤醒一个或多个正被此条件变量阻塞的线程。这些线程将重新锁定互斥锁并重新测试条件是否满足。一般说来,条件变量被用来进行线程间的同步。

函数pthread_cond_wait使线程阻塞在一个条件变量上,而函数pthread_cond_signal是用来释放被阻塞在条件变量上的一个线程。但是要注意的是,条件变量只是起到阻塞和唤醒线程的作用,具体的判断条件还需用户给出,我这里给出的是tickets是否是偶数这个条件。

接下来我们在看看看一个例子。

#include <stdio.h>  
#include <pthread.h>  
#include <semaphore.h>  

#define BUFFER_SIZE  5   //产品库存数量
#define PRODUCT_CNT 30   //产品生产总数

struct product_cons
{
    int buffer[BUFFER_SIZE]; //产生产品值
    pthread_mutex_t lock;    //互斥锁
    int readpos,writepos;    //读写位置
    pthread_cond_t notempty; //条件变量非空
    pthread_cond_t notfull; //非满
}buffer;

void init(struct product_cons *p)
{
    pthread_mutex_init(&p->lock,NULL);//互斥锁
    pthread_cond_init(&p->notempty,NULL);//条件变量
    pthread_cond_init(&p->notfull,NULL);//条件变量
    p->readpos = 0;//读写位置
    p->writepos = 0;
}

void finish(struct product_cons *p)
{
    pthread_mutex_destroy(&p->lock);//互斥锁
    pthread_cond_destroy(&p->notempty);//条件变量
    pthread_cond_destroy(&p->notfull);//条件变量
    p->readpos = 0;//读写位置
    p->writepos = 0;
}

//存储一个数据
void put(struct product_cons *p,int data)
{
    pthread_mutex_lock(&p->lock);
    if((p->writepos+1)%BUFFER_SIZE == p->readpos)
    {
        printf("producer wait for not full\n");
        pthread_cond_wait(&p->notfull,&p->lock);
    }
    p->buffer[p->writepos] = data;
    p->writepos ++;

    if(p->writepos >= BUFFER_SIZE)
    {
        p->writepos = 0;
    }
    pthread_cond_signal(&p->notempty);
    pthread_mutex_unlock(&p->lock);
}      

//移除一个数据
int get(struct product_cons *p)
{
    int data;
    pthread_mutex_lock(&p->lock);

    if(p->readpos == p->writepos)
    {
        printf("consumer wait for not empty\n");
        pthread_cond_wait(&p->notempty,&p->lock);
    }
    data = p->buffer[p->readpos];
    p->readpos++;

    if(p->readpos>= BUFFER_SIZE)
    {
        p->readpos = 0;
    }
    pthread_cond_signal(&p->notfull);
    pthread_mutex_unlock(&p->lock);

    return data;
}  

void *producer(void *data)//子线程,生产
{
    int n;
    for(n=1; n<=50; n++)
    {
        sleep(1);
        printf("put the %d product ...\n",n);
        put(&buffer,n);
        printf("put the %d product success\n",n);
    }

    return NULL;
}

void *consumer(void *data)//子线程,消费
{
    static int cnt = 0;
    int num;
    while(1)
    {
        sleep(2);
        printf("get product ...\n");
        num = get(&buffer);
        printf("get the %d product success\n",num);
        if(++cnt == PRODUCT_CNT)
        {
            break;
        }
    }
    printf("consmer stopped\n");

    return NULL;
}

int main()  
{  
    pthread_t id1,id2;   
    int error;  

    init(&buffer);

    error = pthread_create(&id1, NULL, producer, NULL);  
    if(error != 0)  
    {  
        printf("pthread is not created!\n");  
        return -1;  
    }  

    error = pthread_create(&id2, NULL, consumer, NULL);  
    if(error != 0)  
    {  
        printf("pthread is not created!\n");  
        return -1;  
    }  

    pthread_join(id1,NULL);  
    pthread_join(id2,NULL);  

    finish(&buffer);
    return 0;  
}  

结果如下所示:

culQBR.png

2.3.4读写锁

读写锁与互斥量类似,不过读写锁有更高的并行性。互斥量要么加锁,要么不加锁,而且同一时候只允许对一个线程加锁。对于一个变量的读取,完全可以让多个线程同时进行,那么可以使用读写锁来操作。

读写锁具备三种状态:

1.读模式下加锁状态 (读锁)
2.写模式下加锁状态 (写锁)
3.不加锁状态

读写锁特性:

1.读写锁是"写模式加锁"时, 解锁前,所有对该锁加锁的线程都会被阻塞。
2.读写锁是"读模式加锁"时, 如果线程以读模式对其加锁会成功;如果线程以写模式加锁会阻塞。
3.读写锁是"读模式加锁"时, 既有试图以写模式加锁的线程,也有试图以读模式加锁的线程。那么读写锁会阻塞随后的读模式锁请求。优先满足写模式锁。读锁、写锁并行阻塞,写锁优先级高

读写锁也叫共享-独占锁。当读写锁以读模式锁住时,它是以共享模式锁住的;当它以写模式锁住时,它是以独占模式锁住的。写独占、读共享。

读写锁非常适合于对数据结构读的次数远大于写的情况。

2.3.4.1读写锁的初始化和销毁

读写锁在使用之前必须初始化。

int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock, const pthread_rwlockattr_t *restrict attr);//attr表读写锁属性,通常使用默认属性,传NULL即可。

使用完成后要销毁读写锁。

int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);

2.3.4.2读写锁的读锁和写锁

 读模式加锁

int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock); //以读方式请求读写锁。(常简称为:请求读锁)
int pthread_rwlock_tryrdlock(pthread_rwlock_t *rwlock);// 非阻塞以读方式请求读写锁(非阻塞请求读锁)

 写模式加锁

int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock); //以写方式请求读写锁。(常简称为:请求写锁)
int pthread_rwlock_trywrlock(pthread_rwlock_t *rwlock); //非阻塞以写方式请求读写锁(非阻塞请求写锁)

 解锁

int pthread_rwlock_unlock(pthread_rwlock_t *rwlock); //解锁

2.3.4.3读写锁实例

我们再用读写锁来解决同步问题,代码如下:


#include <stdio.h>  
#include <pthread.h>  

void *ticketsell1(void *);  
void *ticketsell2(void *);  
int tickets = 20;  
pthread_rwlock_t  rwlock;  

int main()  
{  
    pthread_t id1,id2;  
    pthread_rwlock_init(&rwlock, NULL);//  
    int error;  

    error = pthread_create(&id1, NULL, ticketsell1, NULL);  
    if(error != 0)  
    {  
        printf("pthread is not created!\n");  
        return -1;  
    }  

    error = pthread_create(&id2, NULL, ticketsell2, NULL);  
    if(error != 0)  
    {  
        printf("pthread is not created!\n");  
        return -1;  
    }  

    pthread_join(id1,NULL);  
    pthread_join(id2,NULL);  

    return 0;  
}  

void *ticketsell1(void *arg)  
{  
    while(1)  
    {  
        pthread_rwlock_wrlock(&rwlock);//给读写锁上锁  
        if(tickets > 0)  
        {  
            usleep(1000);  
            printf("ticketse1 sells ticket:%d\n",tickets--);  
            pthread_rwlock_unlock(&rwlock);//给读写锁解锁   
        }  
        else  
        {  
            pthread_rwlock_unlock(&rwlock);//给读写锁解锁  
            break;  
        }  
        pthread_yield();//线程调度函数,使每个线程都有执行机会  
    }  
    return (void *)0;  
}  

void *ticketsell2(void *arg)  
{  
    while(1)  
    {  
         pthread_rwlock_wrlock(&rwlock);//给读写锁上锁  
        if(tickets > 0)  
        {  
            usleep(1000);  
            printf("ticketse2 sells ticket:%d\n",tickets--);  
            pthread_rwlock_unlock(&rwlock);//给读写锁解锁  
        }  
        else  
        {  
            pthread_rwlock_unlock(&rwlock);//给读写锁解锁  
            break;  
        }  
        pthread_yield();//线程调度函数,是两个线程都有执行机会  
    }  

    return (void *)0;  
}  

结果如下所示:

culwDA.png

Related posts

Leave a Comment