openMP与pthread有限元 并行计算速度谁速度更高一些

1 #include&omp.h& 2 #include&stdio.h& 3
4 &#define ARRAY_SIZE 1000 5 &#define CHUNK_SIZE 100 6
7 &int main() 8 { 9
int array[ARRAY_SIZE];10
int thread_num = ARRAY_SIZE/CHUNK_SIZE+1;11
omp_set_num_threads(thread_num);12 13
//init array14 &
for(i=0;i&ARRAY_SIZE;i++)16
array[i]=i;18
}19 20 &#pragma omp parallel for schedule(guided,CHUNK_SIZE) private(i)21
for(i=0;i&ARRAY_SIZE;i++)22
int n = array[i];24
int num_of_one=0;25
if(n!=0)26
num_of_one++;28
while((n=n&(n-1))!=0)29
num_of_one++;31
array[i]=num_of_34
for(i=0;i&ARRAY_SIZE;i++)37
printf("%d ",array[i]);39
printf("\n");41
return 0;42 43 }44 45 &
  上面一段代码是通过加了一条函数调用(11行)和一条编译器指令(20行),从而将原来的循环分给多个线程来做。(本程序是计算0~ArraySize-1的每个数中二进制包含1个数)。
  而对于一开始就打算用并行方法来实现的程序,用pthread应该是更方便和更清晰。
下面是分别用pthread和openMP实现的worker_and_consumer:
pthread版:
1 #include&unistd.h& 2 #include&pthread.h& 3 #include&stdio.h& 4 #include&stdlib.h& 5
6 &#define SIZE 100 7 &#define THREAD_NUM_WORKER 15 8 #define THREAD_NUM_CONSUMER 10 9 #define SLEEP_WORKERS 210 #define SLEEP_CONSUMERS 111 12 int warehouse[SIZE];13 int at =-1;14 int is_end =0;15 pthread_mutex_t space = PTHREAD_MUTEX_INITIALIZER;16 pthread_mutex_t end
= PTHREAD_MUTEX_INITIALIZER;17 18 void* consumer_func(void*);19 void* worker_func(void*);20 21 int main()22 {23
pthread_t workers[THREAD_NUM_WORKER];24
pthread_t consumers[THREAD_NUM_CONSUMER];25
int i,j;26
for(i=0;i&THREAD_NUM_WORKER;i++)28
pthread_create(&workers[i],NULL,worker_func,NULL);29
for(j=0;j&THREAD_NUM_CONSUMER;j++)30
pthread_create(&consumers[j],NULL,consumer_func,NULL);31
while(is_end==0)32
scanf("%d",&n);34
if(n==0)35
pthread_mutex_lock(&end);37
is_end=1;38
pthread_mutex_unlock(&end);39
for(i=0;i&THREAD_NUM_WORKER;i++)42
pthread_join(workers[i],NULL);43
for(j=0;j&THREAD_NUM_CONSUMER;j++)44
pthread_join(consumers[j],NULL);45
return 0;46 }47 48 void* worker_func(void* var)49 {50
while(1)51
if(is_end)53
//保护at变量55
pthread_mutex_lock(&space);56
if(SIZE-at-1&0)57
printf("Make %d by worker %lld ",warehouse[++at]=rand(),pthread_self());59
printf("and at is %d\n",at);60
pthread_mutex_unlock(&space);62
sleep(SLEEP_WORKERS);63
return NULL;65 }66 67 68 void* consumer_func(void* var)69 {70
while(1)71
if(is_end)73
pthread_mutex_lock(&space);75
if(at&=0)76
printf("Got %d by consumer %lld\n",warehouse[at--],pthread_self());78
printf("and at is %d\n",at);79
pthread_mutex_unlock(&space);81
sleep(SLEEP_CONSUMERS);82
return NULL;84 }85 86 87 88
openMP版:
1 #include&unistd.h& 2 #include&stdio.h& 3 #include&stdlib.h& 4 #include&omp.h& 5
7 #define SIZE 100 8 #define THREAD_NUM_WORKER 15 9 #define THREAD_NUM_CONSUMER 1010 #define SLEEP_WORKERS 211 #define SLEEP_CONSUMERS 112 13 int warehouse[SIZE];14 int at =-1;15 int is_end =0;16 17 void start_workers()18 {19
omp_set_num_threads(THREAD_NUM_WORKER);20 #pragma omp parallel default(shared)21
if(omp_get_thread_num()==0)23
printf("worker num is %d\n",omp_get_num_threads());24
while(1)25
if(is_end)27
break;28 //保护at变量29 #pragma omp critical(space)30
if(SIZE-at-1&0)32
printf("Make %d by worker %d ",warehouse[++at]=rand(),omp_get_thread_num());34
printf("and at is %d\n",at);35
sleep(SLEEP_WORKERS);38
}40 }41 42 43 void start_consumers(void)44 {45
omp_set_num_threads(THREAD_NUM_CONSUMER);46 #pragma omp parallel default(shared)47
if(omp_get_thread_num()==0)49
printf("consumer num is %d\n",omp_get_num_threads());50
while(1)51
if(is_end)53
break;54 #pragma omp critical(space)55
if(at&=0)57
printf("Got %d by consumer %d\n",warehouse[at--],omp_get_thread_num());59
printf("and at is %d\n",at);60
sleep(SLEEP_CONSUMERS);63
}65 }66 67 int main()68 {69
omp_set_dynamic(0);70
omp_set_nested(1);//这个不设置的话,就不能嵌套fork子线程咯71
//先设置3个线程,每个线程完成一个section72
omp_set_num_threads(3);73 #pragma omp parallel sections74
{75 #pragma omp section76
start_workers();78
}79 #pragma omp section80
start_consumers();82
}83 #pragma omp section84
scanf("%d",&in);87
{89 //保护is_end90 #pragma omg critical(end)91
is_end =1;92
return 0;96 }97 98 99
  最后说一下,用openMP,编译时要加上选项-fopenmp,编译pthread时加上链接-lpthread。另外openMP有个缺点是若是代码中编译指令出错时,找错还是挺麻烦的,就像昨晚我把#pragma omp parallel写成了#pragma omg parallel,结果编译链接通过后却始终只有一个线程(主线程),找了好久...囧!从并行计算的角度,MPI 与 OpenMP 的对比? - 知乎379被浏览47375分享邀请回答3612 条评论分享收藏感谢收起432 条评论分享收藏感谢收起查看更多回答并行计算简介(1) - 简书
并行计算简介(1)
研一上学期上了多核软件设计,以及算法设计与分析的并行算法部分,其中算法的课程大作业是要使用MPI,openmp以及pthread实现大型稀疏矩阵的求解算法。下学期上了分布式与并行计算,主要了解了分布式计算的内容,学了MapReduce,Hadoop,Spark以及了解了虚拟化等相关技术,动手搭建了Hadoop集群,OpenStack云平台,算是对云计算有了初步的了解。
当前利用并行技术开发软件是一个趋势,无论是在移动的APP,桌面应用,还是在云服务应用领域,并行计算越来越得到开发者的关注。下面总结一下并行计算,也算是对这一段时间学习的总结,以下内容是作者学习中的总结,如果有错误请在评论区指出。
什么是并行计算
并行计算是相对于串行计算而言,比如一个矩阵相乘的例子,下面给出串行程序的代码
void matrixMultiplication(int a[][SIZE], int b[][SIZE])
int i,j,k;
for(i = 0; i & c_ i++)
for(j = 0; j & c_ j++)
for(k = 0; k & a_ k++)
c[i][j] += a[i][k] * b[k][j];
在上面的程序中,程序编译运行之后以一个进程(注意区分进程和线程这两个概念)的方式是按照for循环迭代顺序执行。那怎么并行矩阵相乘的代码呢?这里需要使用高级语言级别的并行库,常见的并行库有opemp,pthread,MPI,CUDA等,这些并行库一般都支持C/C++,程序员可以直接调用并行库的函数而不需要实现底层的CPU多核调用。下面给出opemmp版本的矩阵相乘程序。
void matrixMultiplication(int a[][SIZE], int b[][SIZE])
int i,j,k;
#pragma omp parallel shared(a,b,c) private(i,j,k)
#pragma omp for schedule(dynamic)
for(i = 0; i & c_ i++)
for(j = 0; j & c_ j++)
for(k = 0; k & a_ k++)
c[i][j] += a[i][k] * b[k][j];
在没有改动原本代码的基础上实现了矩阵相乘的并行化,实现的办法仅仅是添加了两条openmp编译制导语句,在程序运行到并行代码时,程序的主线程会启动多线程把任务分成n份(n=CPU核心数),然后在多核心上同时计算,最后主线程得到结果。当然除了多线程,程序也可以启动多进程进行并行计算,关于多进程,Linux下的fork()想必很多人都有了解,而mpich是目前很流行的多进程消息传递库。并行化看起来很简单不是么,但是,要设计高效能解决复杂问题的并行程序可不那么容易。
上面提到了多核,这里聊聊多核吧。
多核,就是多核处理器,我们知道,目前的计算机的CPU一般是双核的,有的甚至更多,下图是我的电脑的处理器
我的电脑是8个核心的,其实并行计算就是利用CPU的多核处理器的特点,把之前在单核心上串行的代码并行化,从而缩短得到计算结果的时间。比如上文中提及的矩阵相乘的并行程序,就是把计算任务平分到每个处理器,然后把结果返回,这样相比串行执行时间上要高效。那怎么判断并行程序的效率呢?那就是加速比了,下文会介绍加速比。
关于多核,需要介绍摩尔定律(Moore’s Law)
摩尔定律由英特尔联合创始人之一的Gordon Moore于1965提出,摩尔定律的内容为:
The number of transistors per square inch on integrated circuits had doubledevery year since the integrated circuit was invented.
其中文意思为
当价格不变时,集成电路上每平方英寸可容纳的晶体管的数目,约每隔一年便会增加一倍,性能也将提升一倍。
这意味着没有摩尔定律,就没有如今廉价的处理器。而随着集成电路上的晶体管数据量越来越多,功耗的增加以及过热问题,使得在集成电路上增加更多的晶体管变得更加困难,摩尔定律所预言的指数增长必定放缓。因此,摩尔定律失效。
当前和未来五年,微处理器技术朝着多核方向发展,充分利用摩尔定律带来的芯片面积,放置多个微处理器内核,以及采用更加先进的技术降低功耗。
当然,多核并行计算不仅仅可以使用CPU,而且还可以使用GPU(图形处理器),一个GPU有多大上千个核心,可以同时运行上千个线程。那怎么利用GPU做并行计算呢?可以使用英伟达的CUDA库,不过前提是计算机安装了英伟达的显卡。
并行计算的分类
虽然并行计算可分为时间上的并行和空间上的并行, 时间上的并行是指流水线技术,而空间上的并行则是指多核心并发的执行计算。但是目前并行计算主要研究空间上的并行问题。
按照Flynn分类(1966年)的标准,根据指令流和数据流的多倍性(机器瓶颈部件所能支持最多指令条数或数据个数)可以分成以下几类
单指令流单数据流机SISD,即传统的单处理机
单指令流多数据流机SIMD
多指令流单数据流机MISD
多指令流多数据流机MIMD
下图是并行计算四种分类的计算机体系结构
并行计算的分类
既然有了并行计算,那少不了并行计算机,大家比较熟悉的并行计算机应该是天河一号,天河二号以及最近大家熟知的神威·太湖之光超级计算机。超级计算机涉及CPU,内存,存储,缓存,通信,I/O,网络,制冷等多个领域的知识,由于本文主要介绍并行计算,因此超级计算机的内容不在本文中详述。
并行程序的评价指标
评价并行计算程序效率,不单单从并行程序的执行时间来考虑,而是从与串行程序的对比来去评价。最常用的指标是加速比(speedup)
加速比=串行执行时间/并行执行时间
效率=加速比/处理器个数
成本=并行执行时间×所用处理器的数目
举个例子:用N个处理器计算N个数的和(N为2的整数次幂)
串行计算需要O(N)的时间
并行方法:每个处理器获得一个数,两个处理器之一将其叠加,递归上述步骤。需要O(logN)的时间
加速比S = O(N/logN)
成本C = O(N*logN)
并行计算模型
并行计算模型是并行算法设计与分析的模型,不涉及并行算法实现的程序设计模型和并行算法执行的程序执行模型,如PRAM模型(SIMD模型)。
并行程序设计模型侧重于并行算法如何使用某种程序设计语言正确地编程实现,如MPI,OpenMP,pthread。
并行程序执行模型侧重于并行算法如何在具体的并行机上运行并优化性能,如指令级并行程序执行模型ILPPEM
并行计算模型是算法设计者与体系结构之间的桥梁,并行计算模型屏蔽不同并行机的具体差异,只抽取若干能反映计算特性模型参数,按照模型所定义的计算行为构造成本函数,以此分析时空复杂度。机器参数(如CPU性能参数、存储器参数、通信网络参数)、计算行为(同步或异步)和成本函数(机器参数构成自变量)构成并行计算模型的三要素
写着写着,好像越写越底层了,下面说说并行计算与大数据吧。
并行计算与大数据
大数据不仅给我们带来了大量的数据,而且还带来了计算和查询的麻烦。在数据挖掘领域,对大数据集进行logistic回归,如果不使用并行计算,可能需要好几个月,如果使用分布式的计算平台,计算结果可能只需要几个小时。而在深度学习,人工智能,科学计算领域同样也需要使用并行计算,并行计算缩短了得到结果的时间。
在大数据环境下的并行计算,Hadoop MapReduce和Spark大家一定不会陌生,Hadoop MapReduce把任务分成Map和Reduce两个部分,把大型任务切分成子任务,而Spark提供一种新的存储方式——resilient distributed datasets (RDDs),弹性分布式数据集。RDD是一个容错的、并行的数据结构。RDD只读,可分区,这个数据集的全部或部分可以缓存在内存中,在多次计算间重用。所谓弹性,是指在内存不够时可以与磁盘进行交互。这涉及到RDD的另一个特性:内存计算,就是将数据集存到内存中。同时为了解决内存容量限制的问题,Spark为我们提供了最大的自由度,将所有数据均可以由用户进行cache的设置。RDD还提供了一组丰富的操作来操作这些数据,如map、flatMap、filter、reduce、reduceBy、groupBy等。
下面以一个统计网页中词频的方式来介绍Hadoop MapReduce和Spark,数据存储在MongoDB中,下面是数据的样例。
{ "doc" : "good good day", "url" : "url_1" }
{ "doc" : "hello world good world", "url" : "url_2" }
Hadoop MapReduce
下面给出Map和Reduce的代码
public void map(Object key, BSONObject value, Context context)
throws IOException, InterruptedException {
ArrayList&String& tags = (ArrayList&String&) value.get("tag");
for (int i = 0; i & tags.size(); i++) {
tagSet.add(tags.get(i));
String url = value.get("url").toString();
String doc = value.get("summary").toString().replaceAll("\\p{Punct}|\\d", "")
.replaceAll("\r\n", " ").replace("\r", " ").replace("\n", " ").toLowerCase();
StringTokenizer itr = new StringTokenizer(doc);
HashMap&String, Integer& word_count = new HashMap&String, Integer&();
while (itr.hasMoreTokens()) {
String kk = itr.nextToken();
if (tagSet.contains(kk)) {
if (word_count.containsKey(kk)) {
word_count.put(kk, word_count.get(kk)+1);
word_count.put(kk, 1);
// get word and counts (url, counts)
for (Map.Entry&String, Integer& entry : word_count.entrySet())
BasicBSONObject counts = new BasicBSONObject();
counts.put(url, entry.getValue());
Text myword = new Text(entry.getKey());
context.write(myword, new BSONWritable(counts));
public void reduce(Text key, Iterable&BSONWritable& values, Context context)
throws IOException, InterruptedException {
HashMap&String, Integer& mymap = new HashMap&String, Integer&();
BasicBSONObject result = new BasicBSONObject();
for (BSONWritable val : values) {
@SuppressWarnings("unchecked")
HashMap&String, Integer& temp = (HashMap&String, Integer&) val.getDoc().toMap();
for (Map.Entry&String, Integer& entry : temp.entrySet()) {
mymap.put(entry.getKey(), entry.getValue());
result.putAll(mymap);
context.write(new Text(key.toString()), new BSONWritable(result));
} catch (Exception e) {
// TODO: handle exception
e.printStackTrace();
System.out.println(" \n\n ERROR HERE BulkWriteException \n\n");
写Hadoop MapReduce程序必须重新写map和reduce类,在程序员自定义的map和reduce类添加计算逻辑。但是Spark却没有这种限制。
Spark代码主要是对RDD进行操作,下面给出代码
# 定义统计单词和词频的函数
def f(record):
table = string.maketrans("", "")
raw_summary = record[1]['summary']
temp = raw_summary.encode("utf-8").translate(table, string.punctuation)
temp_str = re.sub("[+——!,。?、~@#¥%……&*()]+".decode("utf8"),
"".decode("utf8"), temp).replace("\r\n", " ").replace("\n", " ")
summary = re.sub(r'([\d]+)', ' ', temp_str).lower()
url = record[1]['url']
_temp = dict(collections.Counter(summary.split()))
result = [(key,{url:value}) for key,value in _temp.items()]
return result
# reduce操作函数
def reduce_values(record):
result = {}
for element in record[1]:
key, value = element.items()[0]
result[key] = result.get(key, 0) + value
return [(record[0], result)]
# 从MongoDB中读取数据
rdd = sc.mongoPairRDD(
"mongodb://localhost/testmr.test_in")
newrdd = rdd.flatMap(f) # map
sortrdd = newrdd.sortByKey() # sort
resultrdd = sortrdd.groupByKey()
.flatMap(reduce_values) # reduce
resultrdd.saveToMongoDB('
mongodb://localhost:27017/testmr.test_out')
在Spark中,虽然有map和reduce这两个RDD的操作接口,但却不是和Hadoop的MapReduce一一对应。在Spark中,可以通过两个RDD的flatMap操作来实现Hadoop MapReduce。
Spark的速度比Hadoop更快。最后Hadoop Reduce和Spark的结果(以下是样例的结果)为
& db.stackout.find().limit(4)
{ "_id" : "world", "data" : [
"url_2" : 2 },
"url_3" : 1 },
"url_4" : 1 } ] }
{ "_id" : "good",
"data" : [
"url_1" : 2 },
"url_2" : 1 },
"url_3" : 2 } ] }
{ "_id" : "day",
"data" : [
"url_1" : 1 } ] }
{ "_id" : "hello",
"data" : [
"url_2" : 1 },
"url_3" : 2 },
"url_4" : 2 } ] }
延伸阅读和推荐书籍
要进一步了解并行计算及并行程序的开发,大家除了需要具备基本的编程能力之外,还需要了解计算机体系结构及操作系统,特别是CPU,cache,内存,多进程,多线程,堆,栈等概念。就比如cache优化的问题吧,如果在实现矩阵相乘并行化程序时考虑cache优化的问题,那么运行的时间又会缩短,加速比又会提高。另外还需要使用一些常用的并行计算的库,不过这些库几乎都比较容易上手使用。不过我觉得在开发并行程序时最重要的是并行算法的设计——把串行问题并行化,虽然矩阵相乘的问题很容易并行化,但是现实世界中的很多计算问题都很复杂,如何将串行问题并行化这也考验着一个开发者的算法基础和数学能力。
下面给出课堂上老师推荐的书,我目前在看《Hadoop 权威指南》。其实还有一本叫做《深入理解计算机系统》的书没有列出,推荐给有经验的程序员阅读。
Distributed and Cloud Computing
计算机体系结构
virtual machines
并行程序设计导论
Hadoop权威指南
并行计算其实是一个很大的概念,今天只是蜻蜓点水,接下来还要好好写文章总结并行计算领域的技术知识。
&No one can make you feel inferior without your consent.&-- Eleanor Roosevelt(黑马程序员)
(给发好人卡来了)
第三方登录:帐号:密码:下次自动登录{url:/nForum/slist.json?uid=guest&root=list-section}{url:/nForum/nlist.json?uid=guest&root=list-section}
贴数:5&分页:砂锅洋芋蛋发信人: Dingocn (砂锅洋芋蛋), 信区: CPlusPlus
标&&题: 多线程编程,用Pthread还是OpenMP?
发信站: 水木社区 (Mon Apr&&8 20:55:00 2013), 站内 && 不太清楚两者的异同和优劣,知道的说一下呗
顺便问一下哪里有些扫盲性质的东西? && 多谢了~
-- && ※ 来源:·水木社区 ·[FROM: 203.116.198.*]
blitz发信人: blitz (blitz), 信区: CPlusPlus
标&&题: Re: 多线程编程,用Pthread还是OpenMP?
发信站: 水木社区 (Mon Apr&&8 20:56:53 2013), 站内 && 多数情况下openmp能很容易直接从串行程序改过来,并且在没有多线程能力的情况下退回单线程执行
pthread得自己搞,而且如果没有多线程执行能力也会强制分时多线程,引入额外负担,好处是比openmp灵活
【 在 Dingocn (砂锅洋芋蛋) 的大作中提到: 】
: 不太清楚两者的异同和优劣,知道的说一下呗
: 顺便问一下哪里有些扫盲性质的东西?
: 多谢了~
: ...................
&& -- && ※ 来源:·水木社区 newsmth.net·[FROM: 159.226.169.*]
Nothing发信人: sarevokcc (Nothing), 信区: CPlusPlus
标&&题: Re: 多线程编程,用Pthread还是OpenMP?
发信站: 水木社区 (Mon Apr&&8 21:03:34 2013), 站内 && 嗯,尤其是计算类的循环代码,omp很容易见效
【 在 blitz 的大作中提到: 】
: 多数情况下openmp能很容易直接从串行程序改过来,并且在没有多线程能力的情况下退回单线程执行
: pthread得自己搞,而且如果没有多线程执行能力也会强制分时多线程,引入额外负担,好处是比openmp灵活
&& -- && ※ 来源:·水木社区 ·[FROM: 131.155.41.*]
P股决定NoZ发信人: poggy (P股决定NoZ), 信区: CPlusPlus
标&&题: Re: 多线程编程,用Pthread还是OpenMP?
发信站: 水木社区 (Mon Apr&&8 23:20:55 2013), 站内 &&&& 【 在 Dingocn (砂锅洋芋蛋) 的大作中提到: 】
: 不太清楚两者的异同和优劣,知道的说一下呗
: 顺便问一下哪里有些扫盲性质的东西?
: 多谢了~
&& 这是两个不同的东西
pthread 多线程, 强调的是多条执行路径,&& 而每个执行路径还是串行的, 比如, 你可以
让每个 pthread 干不同的事情, 个不相干都可以
一个解压缩, 一个运算, 一个内存拷贝, 一个浮点运算 && 而OpenMP是用来把串行任务, 分解为并行执行的库,
强调的是, 串行任务的并行化 && 前者更强调分时, 后者强调分担
-- && ※ 来源:·水木社区 ·[FROM: 123.118.222.*]
肥了,又肥了 &&&_&&&发信人: milksea (肥了,又肥了 &&&_&&&), 信区: CPlusPlus
标&&题: Re: 多线程编程,用Pthread还是OpenMP?
发信站: 水木社区 (Tue Apr&&9 09:16:46 2013), 站内 && pthread 可以相同执行路径也可以不同路径。
要把处理器榨干还是要用 pthread、MPI 这样的东西。
【 在 poggy (P股决定NoZ) 的大作中提到: 】
: 这是两个不同的东西
: pthread 多线程, 强调的是多条执行路径,&&
: 而每个执行路径还是串行的, 比如, 你可以
: ...................
世界上有 10 种人:懂 10 进制的和不懂 10 进制的。 &&&& ※ 来源:·水木社区 newsmth.net·[FROM: 211.99.222.*]
文章数:5&分页:}

我要回帖

更多关于 pthread join 的文章

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信