Android RTMP协议直播之C/C++下POSIX多线程编程

之前的文章对H264标准以及Android音视频采集做了相关的讲解,随着本文开始将会进入Native层进行编码,试想一下一个视频有很多帧如果这些帧都要以高效率的方式进行有序编码然后推流。这个过程显然是不能在主线程中进行的,所以就引入了POSIX多线程编程。如果你对C/C++中的多线程编程不太熟悉,那么本文将带你进入C/C++的多线程编程。通过本文将会学习到在C/C++中如何实现消费者生产者模型。

线程(Thread)是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。使用多线程编程能够充分利用CPU完成需要完成的任务。

不同平台的线程

  • Unix线程
  • POSIX线程:
    POSIX线程(POSIX threads),简称Pthreads,是线程的POSIX标准。该标准定义了创建和操纵线程的一整套API。在类Unix操作系统(Unix、Linux、Mac OS X等)中,都使用Pthreads作为操作系统的线程。
  • Win32线程
  • JAVA线程

POSIX线程

线程通过硬件和操作系统支持提供,但由于硬件和软件之间存在很大差异,因此需要统一设置。1995年,POSIX成为UNIX中许多系统调用的标准接口,包括线程enviroemnt。所谓的POSIX线程,或者pthreads是使用C语言构建的几乎所有语言和设置中的线程编程的关键模型,例如Java和python以及其他高级语言。

线程的生命周期与流程非常相似,始于创建。但是,线程不是从父级派生来创建子级的,而是简单地创建一个以起始函数作为入口点。线程不会像进程一样终止,而是与主线程连接,或者它们被分离以自行运行直到完成。

使用POSIX需要引入头文件<pthread.h>,在pthread.h头文件中定义了线程相关的函数原型

1
2
3
4
5
6
7
8
//创建一个线程
int pthread_create(pthread_t * thread, const pthread_attr_t * attr, void *(*start_routine)(void *), void *arg);
//退出线程
void pthread_exit(void *retval);
//取消线程
int pthread_cancel(pthread_t thread);
//等待一个线程
int pthread_join(pthread_t thread, void ** retval);

pthread_t

使用pthread_create创建一个线程

1
2
3
4
5
6
7
8
9
10
//当thread1进入runing状态就会调用这个函数
void* thread_func(void *arg){
printf("Hello World!\n");
return NULL;
}

//定义pthread_t
pthread_t thread1;
//创建一个线程,这句代码执行后thread1就进入了runing状态,其中“thread_func”为函数指针,如Java中的run方法。
pthread_create(&thread1,NULL,thread_func,NULL);

创建一个线程并携带参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#include <pthread.h>


void * hello_arg(void * args){

char * str = (char *) args;

printf("%s", str);

return NULL;
}

int main(int argc, char * argv[]){

char hello[] = "Hello World!\n";

pthread_t thread;

pthread_create(&thread, NULL, hello_arg, hello);
pthread_join(thread, NULL);

return 0;
}

使用pthread_create函数的时候我们将字符集hello作为参数传递到回调函数hello_arg中进行打印。

使用thread_join
和Java中的Join累类似,thread_join用于等到一个函数的执行完成。

1
pthread_join(thread1,NULL);

pthread_mutex_t & pthread_cond_t

pthread_mutex_t是POSIX中的互斥锁,pthread_cond_t可以理解为条件变量,设想一下如果没有使用生产者消费者模型来处理会是怎样的情况,消费者线程需要消费产品如果没有产品可以被消费那么一直处于死循环中,这样对性能的开销非常大。但是如何确保消费者在有产品可被消费的时候再去处理呢?这就引入了生产者消费者模型,在JAVA中实现生产者消费者模型可以使用Condition或者阻塞队列来实现,我们需要通过一个变量或者说一个互斥锁来确保产品的可用性。这个变量就是pthread_mutex_t,当我们生产产品的时候对互斥锁lock生产完成后unlock并通知消费者去消费。同样消费者在消费的时候对互斥锁lock消费完成unlock并通知生产者生产产品,生产者与消费者是如何通知的呢?没错,就是pthread_cond_t。接下来我们编写一个生产者消费者模型来演示pthread_mutex_t以及pthread_cond_t的用法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
#include <stdio.h>
#include <unistd.h>
#include <pthread.h>
#include <string.h>

#define MAX_PRODUCER 1
#define MAX_CONSUMER 3

pthread_mutex_t mutex;
//互斥锁条件变量
pthread_cond_t pthread_cond_product;

void* thread_func(void *arg){
pthread_mutex_lock(&mutex);
char* arg1 = (char*)arg;
for (int i=0; i<10; i++) {
printf("%s,index->%d\n",arg1,i);
sleep(1);
}
pthread_mutex_unlock(&mutex);
return 0;
}

//模拟产品
int product = 0;

//生产者
//当生产者生产产品时消费者不可消费产品 -> 生产者和消费者线程使用同一把锁
void* producer_func(void *arg){
char* threadName=(char*)arg;
for(;;){
//每次生产产品先lock
pthread_mutex_lock(&mutex);
//开始生产产品
printf("%s -> 生产产品\n",threadName);
product++;
printf("%s -> 通知消费者消费\n",threadName);
//通知消费者消费,这个函数调用后如果消费者处于wait状态,则消费者就会消费产品。
pthread_cond_signal(&pthread_cond_product);
//释放锁
pthread_mutex_unlock(&mutex);
sleep(1);
}

return 0;
}

//消费者
void* consumer_func(void *arg){
char* threadName=(char*)arg;
for(;;){
//每次消费的时候先lock住mutex,这样生产者就拿不到锁会进入等待状态。
pthread_mutex_lock(&mutex);
while (product==0) {
//没有产品 等待生产
printf("%s -> 等待产品生产\n",threadName);
//没有产品,通过pthread_cond_wait函数会将当前线程挂起等待状态,直到使用pthread_cond_signal通知。
pthread_cond_wait(&pthread_cond_product, &mutex);
}
//开始消费产品
printf("%s -> 消费产品\n",threadName);
product--;
//每次消费完都需要unlock,否则生产者拿不到锁会一直处于等待状态。
pthread_mutex_unlock(&mutex);
sleep(1);
}
return 0;
}

char* joinThreadName(char* tag,int index){
char *head = (char*)malloc(sizeof(tag)+sizeof(index));

sprintf(head, "%s-%d", tag,index);

return head;

}
int main(int argc, const char * argv[]) {

//多个生产者
pthread_t producer_pthreads[MAX_PRODUCER];
//多个消费者
pthread_t consumer_pthreads[MAX_CONSUMER];
//初始化互斥锁,必须先初始化互斥锁在初始化pthread_cond_T
pthread_mutex_init(&mutex, NULL);
//初始化条件变量
pthread_cond_init(&pthread_cond_product, &mutex);

//生产者线程
for (int i=0; i<MAX_PRODUCER; i++) {
pthread_create(&producer_pthreads[i], NULL, producer_func, joinThreadName("producer", i));

}

//消费者线程
for (int i=0; i<MAX_CONSUMER; i++) {
pthread_create(&consumer_pthreads[i], NULL, consumer_func, joinThreadName("consumer", i));
}

//等待线程执行完成,否则程序运行就被关闭了,因为进程已经结束了。
for (int i=0; i<MAX_PRODUCER; i++) {
pthread_join(producer_pthreads[i], NULL);
}
for (int i=0; i<MAX_CONSUMER; i++) {
pthread_join(consumer_pthreads[i], NULL);
}

//销毁互斥锁
pthread_mutex_destroy(&mutex);
//销毁条件变量
pthread_cond_destroy(&pthread_cond_product);

return 0;
}

总结

上面我们通过对POSIX多线程编程的了解已经知道如何在C/C++中实现多线程,如何利用多线程实现生产者消费者模型。在后续直播项目的讲解中将会利用POSIX多线程编程实现编码推流过程,其实就是生产者消费者模型,以视频编码为例x264编码就是生产者,通过把编码后的数据加入队列生产者(rtmp)不停的从队列中取出数据进行发送。详细的实现过程我们下次再继续讨论。好了本文对POSIX的介绍到此就要结束了,如果对部分概念模糊的话快动手试试吧!

随意分享,您的支持将鼓励我继续创作!