parallelstream的sum 线程安全吗

还没有账号
用户名或密码错误
目录一、并行流介绍二、常用的并行流集合三、应用示例内容一、并行流介绍&&&&&&&&并行流(ParallelStream)是JDK8引入并行处理机制,被执行者无需显示声明为线程,也不需要显示配置线程池,其本质是内部隐含使用了ForkJoinPool线程池,默认线程数与CPU核数相同,但最大并发数可以通过系统变量设置。二、常用的并行流集合& & & & JDK8 引入了lambda语法,习惯这个语法之后能够减少很多代码量;并行流本身只是流java.util.stream.Stream的一部分,它也支持顺序流处理。JDK8中所有java.util.Collection的子类都可以获取到流对象,无论是顺序还是并行,流处理方式都给我们提供了很多便利的接口,比如:&&&&&&&&1、filter接口,对整个集合进行过滤Stream&filter(Predicate&predicate);&&&&&&&&2、map接口,将整个集合转换成新对象,对应原始类型有mapToInt、mapToLong、mapToDouble&Stream&map(Function&mapper);&&&&&&&&3、distinct接口,与sql里的distinct基本一样,去重Stream&distinct();&&&&&&&&4、sorted,快速的对集合进行排序Stream&sorted();&&&&& & 5、forEach接口,对整个集合进行遍历void&forEach(Consumer&action);&&&&&&&&等等,还有一些其他的函数,如limit、skip、peek等。三、应用示例&&&&&&&&定义任务package&com.zhihuiku.threadpool.
public&class&Person&{
public&Person(String&name,&int&age)&{
this.name&=&
this.age&=&
private&String&
private&int&
public&String&getName()&{
public&void&setName(String&name)&{
this.name&=&
public&int&getAge()&{
public&void&setAge(int&age)&{
this.age&=&
}& & & & 并行流示例package&com.zhihuiku.
import&java.util.ArrayL
import&java.util.L
import&java.util.R
import&com.zhihuiku.threadpool.forkjoin.P
public&class&ParallelStreamTest&{
public&static&void&main(String[]&args)&{
ParallelStreamTest&ps&=&new&ParallelStreamTest();
List&persons&=&new&ArrayList();
int&c&=&100000;
for&(int&i&=&0;&i&&&c;&i++)&{
Person&person&=&new&Person(&姓名&&+&i,&new&Random().nextInt(100));
persons.add(person);
ps.start(4,&persons);
public&void&start(int&threadNum,&List&persons)&{
long&s&=&System.currentTimeMillis();
System.setProperty(&java.util.mon.parallelism&,&&&&+&threadNum);
long&count&=&persons.parallelStream().filter(t&-&&{
return&t.getAge()&&&18;
}).count();
long&e&=&System.currentTimeMillis();
System.out.println(&统计结果:&&+&count);
System.out.println(&parallel耗时:&&+&(e&-&s));
}&&&&&&&&执行效果统计结果:81061
parallel耗时:52如需转载,请注明出处:http://www.zhihuiku.net/view/137,作者:萨穆罗
全部评论: 0 条
24小时热门
Powered By11012人阅读
Core Java(27)
Stream是Java 8中的一个大的改进。Stream的功能是,支持集合的各种操作,比如filter, sum, max, min, average, map, reduce等等。所以我个人认为Stream的出现是基于以下原因:
增强集合操作
拥抱函数式编程
充分利用Lambda
执行效率的提高 - 透明支持多线程集合操作
笔者尝试测试一下Stream并发处理的威力,发现面对特别简单的任务,Stream并发处理相较于传统的for each循环,执行效率没有优势。看起来Stream不是免费的午餐,创建Stream还是要一些开销的。所以这促使笔者思考该在什么场景下才使用Stream。
笔者测试两个例子,一个任务非常简单,另外一个稍微复杂一点。从结果看起来,并行Stream总是比串行快,任务简单的情况,For Loop更快,任务复杂一点,并行Stream后来居上,并行带来的改进足以cover创建Stream的开销。
测试的小工具类
public class TimeRecorder {
private long startT
private long endT
public void start() {
startTime = System.currentTimeMillis();
public long end() {
endTime = System.currentTimeMillis();
return endTime - startT
public long getDuration() {
return endTime - startT
任务非常简单的例子
只调用intValue这么一个小方法。
public class StreamDemoSimple {
public static void main(String[] args) {
List&Integer& intList = new LinkedList&Integer&();
for (int i = 1; i &= 1000000; i++) {
intList.add(i);
TimeRecorder recorder = new TimeRecorder();
recorder.start();
intList.stream().forEach(i -& {
i.intValue();
recorder.end();
System.out.print("Stream iterator:");
System.out.println(recorder.getDuration());
recorder.start();
intList.parallelStream().forEach(i -& {
i.intValue();
recorder.end();
System.out.print("Parallel Stream iterator:");
System.out.println(recorder.getDuration());
recorder.start();
for (Integer i : intList) {
i.intValue();
recorder.end();
System.out.print("Normal iterator:");
System.out.println(recorder.getDuration());
Stream iterator:447
Parallel Stream iterator:142
Normal iterator:70
任务稍微复杂的例子
多执行了几步而已。
public class StreamDemo {
public static void main(String[] args) {
List&Integer& intList = new LinkedList&Integer&();
for (int i = 1; i &= 1000000; i++) {
intList.add(i);
TimeRecorder recorder = new TimeRecorder();
recorder.start();
intList.stream().forEach(i -& {
i.intValue();
i.intValue();
i.toString();
i.intValue();
i.intValue();
i.toString();
recorder.end();
System.out.print("Stream iterator:");
System.out.println(recorder.getDuration());
recorder.start();
intList.parallelStream().forEach(i -& {
i.intValue();
i.intValue();
i.toString();
i.intValue();
i.intValue();
i.toString();
recorder.end();
System.out.print("Parallel Stream iterator:");
System.out.println(recorder.getDuration());
recorder.start();
for (Integer i : intList) {
i.intValue();
i.intValue();
i.toString();
i.intValue();
i.intValue();
i.toString();
recorder.end();
System.out.print("Normal iterator:");
System.out.println(recorder.getDuration());
Stream iterator:808
Parallel Stream iterator:313
Normal iterator:377
Stream的适合场景
集合操作超过两个步骤
比如先filter再for each
这时Stream显得优雅简洁,效率也高
任务较重,注重效能,希望并发执行
很容易的就隐式利用了多线程技术。非常好的使用时机。
函数式编程的编码风格里
Stream的设计初衷之一
Lambda,Stream等等新特性使得Java函数式编程更为自然。合适的环境下非常值得合理使用。但是请记住Stream的创建以及传输也有损耗,特别简单的场景可能传统的For Loop更为适合。
&&相关文章推荐
* 以上用户言论只代表其个人观点,不代表CSDN网站的观点或立场
访问:630286次
积分:5881
积分:5881
排名:第4499名
原创:73篇
评论:156条
阅读:26405
阅读:153306
(1)(2)(2)(2)(1)(3)(1)(4)(1)(4)(2)(2)(1)(4)(1)(2)(2)(2)(3)(7)(1)(12)(3)(2)(1)(1)(1)(11)
(window.slotbydup = window.slotbydup || []).push({
id: '4740881',
container: s,
size: '200,200',
display: 'inlay-fix'将一个顺序执行的流转变成一个并发的流只要调用&parallel()方法
public&static&long&parallelSum(long&n){
& & return&Stream.iterate(1L,&i&-&&i&+1).limit(n).parallel().reduce(0L,Long::sum);
并行流就是一个把内容分成多个数据块,并用不不同的线程分别处理每个数据块的流。最后合并每个数据块的计算结果。
将一个并发流转成顺序的流只要调用sequential()方法
stream.parallel() .filter(...)
.sequential() .map(...) .parallel() .reduce();
这两个方法可以多次调用, 只有最后一个调用决定这个流是顺序的还是并发的。
并发流使用的默认线程数等于你机器的处理器核心数。
通过这个方法可以修改这个值,这是全局属性。
System.setProperty(&java.util.mon.parallelism&, &12&);
并非使用多线程并行流处理数据的性能一定高于单线程顺序流的性能,因为性能受到多种因素的影响。
如何高效使用并发流的一些建议:
1. 如果不确定, 就自己测试。
2. 尽量使用基本类型的流 &IntStream,
LongStream, and
DoubleStream
3. 有些操作使用并发流的性能会比顺序流的性能更差,比如limit,findFirst , 依赖元素顺序的操作在并发流中是极其消耗性能的&。findAny的性能就会好很多,应为不依赖顺序。
4. 考虑流中计算的性能(Q)和操作的性能(N)的对比, Q表示单个处理所需的时间, N表示需要处理的数量,如果Q的值越大, 使用并发流的性能就会越高。
5. 数据量不大时使用并发流,性能得不到提升。
6.考虑数据结构:并发流需要对数据进行分解,不同的数据结构被分解的性能时不一样的。
流的数据源和可分解性
LinkedList
IntStream.range
Stream.iterate
7. 流的特性以及中间操作对流的修改都会对数据对分解性能造成影响。 比如固定大小的流在任务分解的时候就可以平均分配,但是如果有filter操作,那么流就不能预先知道在这个操作后还会剩余多少元素。
8. 考虑最终操作的性能:如果最终操作在合并并发流的计算结果时的性能消耗太大,那么使用并发流提升的性能就会得不偿失。
9.需要理解并发流实现机制:
fork/join 框架
fork/join框架是jdk1.7引入的,java8的stream多线程并非流的正是以这个框架为基础的,所以想要深入理解并发流就要学习fork/join框架。
fork/join框架的目的是以递归方式将可以并行的任务拆分成更小的任务,然后将每个子任务的结果合并起来生成整体结果。它是ExecutorService接口的一个实现,它把子任务分配线程池(ForkJoinPool)中的工作线程。要把任务提交到这个线程池,必须创建RecursiveTask&R&的一个子类,如果任务不返回结果则是RecursiveAction的子类。
fork/join框架流程示意图:
废话不多说,上代码:
import&java.util.concurrent.ForkJoinP
import&java.util.concurrent.ForkJoinT
import&java.util.concurrent.RecursiveT
import&java.util.stream.LongS
&*&Created&by&sunjin&on&.
&*&继承RecursiveTask来创建可以用于分支/合并的框架任务
public&class&ForkJoinSumCalculator&extends&RecursiveTask&Long&&{
& & //要求和的数组
private&final&long[]&numbers;
& & //子任务处理的数组开始和终止的位置
private&final&int&start;
& & private&final&int&end;
& & //不在将任务分解成子任务的阀值大小
public&static&final&int&THRESHOLD&=&10000;
& & //用于创建组任务的构造函数
public&ForkJoinSumCalculator(long[]&numbers){
& & & & this(numbers,&0,&numbers.length);
& & //用于递归创建子任务的构造函数
public&ForkJoinSumCalculator(long[]&numbers,int&start,int&end){
& & & & this.numbers&=&
& & & & this.start&=&
& & & & this.end&=&
& & //重写接口的方法
& & protected&Long&compute()&{
& & & & //当前任务负责求和的部分的大小
int&length&=&end&-&start;
& & & & //如果小于等于阀值就顺序执行计算结果
if(length&&=&THRESHOLD){
& & & & & & return&computeSequentially();
& & & & //创建子任务来为数组的前一半求和
& & & & ForkJoinSumCalculator&leftTask&=&new&ForkJoinSumCalculator(numbers,&start,&start&+&length/2);
& & & & //将子任务拆分出去,丢到ForkJoinPool线程池异步执行。
& & & & leftTask.fork();
& & & & //创建子任务来为数组的后一半求和
& & & & ForkJoinSumCalculator&rightTask&=&new&ForkJoinSumCalculator(numbers,&start&+&length/2,&end);
& & & & //第二个任务直接使用当前线程计算而不再开启新的线程。
long&rightResult&=&pute();
& & & & //读取第一个子任务的结果,如果没有完成则等待。
long&leftResult&=&leftTask.join();
& & & & //合并两个子任务的计算结果
return&rightResult&+&leftR
& & //顺序执行计算的简单算法
private&long&computeSequentially(){
& & & & long&sum&=&0;
& & & & for(int&i&=start;&i&&end;&i++){
& & & & & & sum&+=&numbers[i];
& & & & return&
& & //提供给外部使用的入口方法
public&static&long&forkJoinSum(long&n)&{
& & & & long[]&numbers&=&LongStream.rangeClosed(1,&n).toArray();
& & & & ForkJoinTask&Long&&task&=&new&ForkJoinSumCalculator(numbers);
& & & & return&new&ForkJoinPool().invoke(task);
注意事项:
1. 调用join 方法要等到调用这个方法的线程的自己的任务完成之后。
2. 不要直接去调用ForkJoinPool的invoke方法 ,只需要调用RecursiveTask的fork或者compute。
3. 拆解任务时只需要调用一次fork执行其中一个子任务, 另一个子任务直接利用当前线程计算。应为fork方法只是在ForkJoinPool中计划一个任务。
4.任务拆分的粒度不宜太细,不否得不偿失。
由于各种因素,即便任务拆分是平均的,也不能保证所有子任务能同时执行结束, 大部分情况是某些子任务已经结束, 其他子任务还有很多, 在这个时候就会有很多资源空闲, 所以fork/join框架通过工作盗取机制来保证资源利用最大化, 让空闲的线程去偷取正在忙碌的线程的任务。
在没有任务线程中的任务存在一个队列当中, 线程每次会从头部获取一个任务执行,执行完了再从queue的头部获取一个任务,直到队列中的所有任务执行完,这个线程偷取别的线程队列中的任务时会从队列到尾部获取任务,并且执行,直到所有任务执行结束。
从这个角度分析,任务的粒度越小, 资源利用越充分。
工作盗取示意图
可拆分迭代器Spliterator
它和Iterator一样也是用于遍历数据源中的元素,但是他是为并行执行而设计的。 java8 所有数据结构都实现了 这个接口, 一般情况不需要自己写实现代码。但是了解它的实现方式会让你对并行流的工作原理有更深的了解。(未完待续)
本文已收录于以下专栏:
相关文章推荐
转自:https://my.oschina.net/wangzhenchao/blog/754726
摘要: java8的流式处理极大了简化我们对于集合、数组等结构的操作,让我们可以以函...
从java8开始,并行编程变得很容易,通过并行流(parallelStream),可以很轻松的实现多线程并行处理。但是,这里面有个性能“陷阱”,如果不注意,使用并行流的效果反而更差,这个陷阱是什么呢?...
人生苦短,都说必须python,那么我分享下我是如何从小白成为Python资深开发者的吧。2014年我大学刚毕业..
Java 8 引入了流式操作(Stream),通过该操作可以实现对集合(Collection)的并行处理和函数式操作。根据操作返回的结果不同,流式操作分为中间操作和最终操作两种。最终操作返回一特定类型...
流就是让程序员可以用声明式编程来处理集合数据的java接口,简单来说你可以把它看成是遍历数据集的高级迭代器,可以透明的并行处理,所以不需要程序员再写任何多线程代码了。
java7 之前(指令式编程...
1、在Java8之前如果需要从一个苹果对象集合中选出绿色的苹果,通常是这么做的。
public static List filterGreenApples(List list){
    ...
首先因为现在的应用越来越复杂,越来越多模块多系统之间相互依赖,一个操作可能需要使用多个模块或者多个系统提供的多个服务来完成一个功能,如果每个服务顺序的执行,可能需要消耗很多时间,或者前端用户需要得到及...
收集器Collector
collect方法接受的参数 函数称为 收集器,也就是实现数据收集的策略。
一般来说,收集器collector会对元素应用一个转换函数,并将结果累积在一个数据结构中,从而...
一、什么是Lambda表达式
    上一篇中已经看到了Lambda表达式的使用场景,这一篇深入学习一下Java8的这个新特性。
     首先Lambda说一下表达式的定义:它是一个简洁的可以用于传...
过滤出素食
List vegetarian = menu.stream().filter(Dish::isVegetarian).collect(Collectors.toList());
用java编写两个n阶的方阵A和B的相乘程序,结果存放在方阵C中,其中使用Runnable接口实现矩阵的乘法。
方阵A和B的初始值如下:(同时开两个线程)
他的最新文章
讲师:董西成
您举报文章:
举报原因:
原文地址:
原因补充:
(最多只允许输入30个字)8484人阅读
java(24)
about Stream
什么是流?
Stream是java8中新增加的一个特性,被java猿统称为流.
Stream 不是集合元素,它不是数据结构并不保存数据,它是有关算法和计算的,它更像一个高级版本的 Iterator。原始版本的 Iterator,用户只能显式地一个一个遍历元素并对其执行某些操作;高级版本的 Stream,用户只要给出需要对其包含的元素执行什么操作,比如 “过滤掉长度大于 10 的字符串”、“获取每个字符串的首字母”等,Stream 会隐式地在内部进行遍历,做出相应的数据转换。
Stream 就如同一个迭代器(Iterator),单向,不可往复,数据只能遍历一次,遍历过一次后即用尽了,就好比流水从面前流过,一去不复返。
而和迭代器又不同的是,Stream 可以并行化操作,迭代器只能命令式地、串行化操作。顾名思义,当使用串行方式去遍历时,每个 item 读完后再读下一个 item。而使用并行去遍历时,数据会被分成多个段,其中每一个都在不同的线程中处理,然后将结果一起输出。Stream 的并行操作依赖于 Java7 中引入的 Fork/Join 框架(JSR166y)来拆分任务和加速处理过程。Java 的并行 API 演变历程基本如下:
1.0-1.4 中的 java.lang.Thread
5.0 中的 java.util.concurrent
6.0 中的 Phasers 等
7.0 中的 Fork/Join 框架
8.0 中的 Lambda
Stream 的另外一大特点是,数据源本身可以是无限的。
parallelStream是什么
parallelStream其实就是一个并行执行的流.它通过默认的ForkJoinPool,可能提高你的多线程任务的速度.
parallelStream的作用
Stream具有平行处理能力,处理的过程会分而治之,也就是将一个大任务切分成多个小任务,这表示每个任务都是一个操作,因此像以下的程式片段:
List&Integer& numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
numbers.parallelStream()
.forEach(out::println);
你得到的展示顺序不一定会是1、2、3、4、5、6、7、8、9,而可能是任意的顺序,就forEach()这个操作來讲,如果平行处理时,希望最后顺序是按照原来Stream的数据顺序,那可以调用forEachOrdered()。例如:
List&Integer& numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
numbers.parallelStream()
.forEachOrdered(out::println);
注意:如果forEachOrdered()中间有其他如filter()的中介操作,会试着平行化处理,然后最终forEachOrdered()会以原数据顺序处理,因此,使用forEachOrdered()这类的有序处理,可能会(或完全失去)失去平行化的一些优势,实际上中介操作亦有可能如此,例如sorted()方法。
parallelStream背后的男人:ForkJoinPool
要想深入的研究parallelStream之前,那么我们必须先了解ForkJoin框架和ForkJoinPool.本文旨在parallelStream,但因为两种关系甚密,故在此简单介绍一下ForkJoinPool,如有兴趣可以更深入的去了解下ForkJoin***(当然,如果你想真正的搞透parallelStream,那么你依然需要先搞透ForkJoinPool).*
ForkJoin框架是从jdk7中新特性,它同ThreadPoolExecutor一样,也实现了Executor和ExecutorService接口。它使用了一个无限队列来保存需要执行的任务,而线程的数量则是通过构造函数传入,如果没有向构造函数中传入希望的线程数量,那么当前计算机可用的CPU数量会被设置为线程数量作为默认值。
ForkJoinPool主要用来使用分治法(Divide-and-Conquer Algorithm)来解决问题。典型的应用比如快速排序算法。这里的要点在于,ForkJoinPool需要使用相对少的线程来处理大量的任务。比如要对1000万个数据进行排序,那么会将这个任务分割成两个500万的排序任务和一个针对这两组500万数据的合并任务。以此类推,对于500万的数据也会做出同样的分割处理,到最后会设置一个阈值来规定当数据规模到多少时,停止这样的分割处理。比如,当元素的数量小于10时,会停止分割,转而使用插入排序对它们进行排序。那么到最后,所有的任务加起来会有大概2000000+个。问题的关键在于,对于一个任务而言,只有当它所有的子任务完成之后,它才能够被执行。
所以当使用ThreadPoolExecutor时,使用分治法会存在问题,因为ThreadPoolExecutor中的线程无法像任务队列中再添加一个任务并且在等待该任务完成之后再继续执行。而使用ForkJoinPool时,就能够让其中的线程创建新的任务,并挂起当前的任务,此时线程就能够从队列中选择子任务执行。
那么使用ThreadPoolExecutor或者ForkJoinPool,会有什么性能的差异呢?
首先,使用ForkJoinPool能够使用数量有限的线程来完成非常多的具有父子关系的任务,比如使用4个线程来完成超过200万个任务。但是,使用ThreadPoolExecutor时,是不可能完成的,因为ThreadPoolExecutor中的Thread无法选择优先执行子任务,需要完成200万个具有父子关系的任务时,也需要200万个线程,显然这是不可行的。
工作窃取算法
forkjoin最核心的地方就是利用了现代硬件设备多核,在一个操作时候会有空闲的cpu,那么如何利用好这个空闲的cpu就成了提高性能的关键,而这里我们要提到的工作窃取(work-stealing)算法就是整个forkjion框架的核心理念,工作窃取(work-stealing)算法是指某个线程从其他队列里窃取任务来执行。
那么为什么需要使用工作窃取算法呢?
假如我们需要做一个比较大的任务,我们可以把这个任务分割为若干互不依赖的子任务,为了减少线程间的竞争,于是把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应,比如A线程负责处理A队列里的任务。但是有的线程会先把自己队列里的任务干完,而其他线程对应的队列里还有任务等待处理。干完活的线程与其等着,不如去帮其他线程干活,于是它就去其他线程的队列里窃取一个任务来执行。而在这时它们会访问同一个队列,所以为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。
工作窃取算法的优点是充分利用线程进行并行计算,并减少了线程间的竞争,其缺点是在某些情况下还是存在竞争,比如双端队列里只有一个任务时。并且消耗了更多的系统资源,比如创建多个线程和多个双端队列。
用看forkjion的眼光来看ParallelStreams
上文中已经提到了在Java 8引入了自动并行化的概念。它能够让一部分Java代码自动地以并行的方式执行,也就是我们使用了ForkJoinPool的ParallelStream。
Java 8为ForkJoinPool添加了一个通用线程池,这个线程池用来处理那些没有被显式提交到任何线程池的任务。它是ForkJoinPool类型上的一个静态元素,它拥有的默认线程数量等于运行计算机上的处理器数量。当调用Arrays类上添加的新方法时,自动并行化就会发生。比如用来排序一个数组的并行快速排序,用来对一个数组中的元素进行并行遍历。自动并行化也被运用在Java 8新添加的Stream API中。
比如下面的代码用来遍历列表中的元素并执行需要的操作:
List&UserInfo& userInfoList =
DaoContainers.getUserInfoDAO().queryAllByList(new UserInfoModel())
userInfoList.parallelStream().forEach(RedisUserApi::setUserIdUserInfo)
对于列表中的元素的操作都会以并行的方式执行。forEach方法会为每个元素的计算操作创建一个任务,该任务会被前文中提到的ForkJoinPool中的通用线程池处理。以上的并行计算逻辑当然也可以使用ThreadPoolExecutor完成,但是就代码的可读性和代码量而言,使用ForkJoinPool明显更胜一筹。
对于ForkJoinPool通用线程池的线程数量,通常使用默认值就可以了,即运行时计算机的处理器数量。我这里提供了一个示例的代码让你了解jvm所使用的ForkJoinPool的线程数量, 你可以可以通过设置系统属性:-Djava.util.mon.parallelism=N
(N为线程数量),来调整ForkJoinPool的线程数量,可以尝试调整成不同的参数来观察每次的输出结果:
import java.util.ArrayL
import java.util.L
import java.util.S
import java.util.concurrent.CopyOnWriteArrayS
import java.util.concurrent.CountDownL
* 这是一个用来让你更加熟悉parallelStream的原理的实力
* 日18:26:55
* wangguangdong
public class App {
public static void main(String[] args) throws Exception {
System.out.println("Hello World!");
List&Integer& list = new ArrayList&&();
for (int i = 0; i & 10000; i++) {
list.add(i);
Set&Thread& threadSet = new CopyOnWriteArraySet&&();
list.parallelStream().forEach(integer -& {
Thread thread = Thread.currentThread();
threadSet.add(thread);
System.out.println("threadSet一共有" + threadSet.size() + "个线程");
System.out.println("系统一个有"+Runtime.getRuntime().availableProcessors()+"个cpu");
List&Integer& list1 = new ArrayList&&();
List&Integer& list2 = new ArrayList&&();
for (int i = 0; i & 100000; i++) {
list1.add(i);
list2.add(i);
Set&Thread& threadSetTwo = new CopyOnWriteArraySet&&();
CountDownLatch countDownLatch = new CountDownLatch(2);
Thread threadA = new Thread(() -& {
list1.parallelStream().forEach(integer -& {
Thread thread = Thread.currentThread();
threadSetTwo.add(thread);
countDownLatch.countDown();
Thread threadB = new Thread(() -& {
list2.parallelStream().forEach(integer -& {
Thread thread = Thread.currentThread();
threadSetTwo.add(thread);
countDownLatch.countDown();
threadA.start();
threadB.start();
countDownLatch.await();
System.out.print("threadSetTwo一共有" + threadSetTwo.size() + "个线程");
System.out.println("---------------------------");
System.out.println(threadSet);
System.out.println(threadSetTwo);
System.out.println("---------------------------");
threadSetTwo.addAll(threadSet);
System.out.println(threadSetTwo);
System.out.println("threadSetTwo一共有" + threadSetTwo.size() + "个线程");
System.out.println("系统一个有"+Runtime.getRuntime().availableProcessors()+"个cpu");
出现这种现象的原因是,forEach方法用了一些小把戏。它会将执行forEach本身的线程也作为线程池中的一个工作线程。因此,即使将ForkJoinPool的通用线程池的线程数量设置为1,实际上也会有2个工作线程。因此在使用forEach的时候,线程数为1的ForkJoinPool通用线程池和线程数为2的ThreadPoolExecutor是等价的。
所以当ForkJoinPool通用线程池实际需要4个工作线程时,可以将它设置成3,那么在运行时可用的工作线程就是4了。
1. 当需要处理递归分治算法时,考虑使用ForkJoinPool。
2. 仔细设置不再进行任务划分的阈值,这个阈值对性能有影响。
3. Java 8中的一些特性会使用到ForkJoinPool中的通用线程池。在某些场合下,需要调整该线程池的默认的线程数量。
ParallelStreams 的陷阱
上文中我们已经看到了ParallelStream他强大无比的特性,但这里我们就讲告诉你ParallelStreams不是万金油,而是一把双刃剑,如果错误的使用反倒可能伤人伤己.
以下是一个我们项目里使用 parallel streams 的很常见的情况。在这个例子中,我们想同时调用不同地址的api中并且获得第一个返回的结果。
public static String query(String q, List&String& engines) {
Optional&String& result = engines.stream().parallel().map((base) -& {
String url = base +
return WS.url(url).get();
}).findAny();
return result.get();
可能有很多朋友在jdk7用future配合countDownLatch自己实现的这个功能,但是jdk8的朋友基本都会用上面的实现方式,那么自信深究一下究竟自己用future实现的这个功能和利用jdk8的parallelStream来实现这个功能有什么不同点呢?坑又在哪里呢?
让我们细思思考一下整个功能究竟是如何运转的。首先我们的集合元素engines 由ParallelStreams并行的去进行map操作(ParallelStreams使用JVM默认的forkJoin框架的线程池由当前线程去执行并行操作).
然而,这里需要注意的一地方是我们在调用第三方的api请求是一个响应略慢而且会阻塞操作的一个过程。所以在某时刻所有线程都会调用 get() 方法并且在那里等待结果返回.
再回过头仔细思考一下这个功能的实现过程是我们一开始想要的吗?我们是在同一时间等待所有的结果,而不是遍历这个列表按顺序等待每个回答.然而,由于ForkJoinPool workders的存在,这样平行的等待相对于使用主线程的等待会产生的一种副作用.
现在ForkJoin pool (关于forkjion的更多实现你可以去搜索引擎中去看一下他的具体实现方式)
的实现是: 它并不会因为产生了新的workers而抵消掉阻塞的workers。那么在某个时间所有 mon() 的线程都会被用光.也就是说,下一次你调用这个查询方法,就可能会在一个时间与其他的parallel stream同时运行,而导致第二个任务的性能大大受损。或者说,例如你在这个功能里是用来快速返回调用的第三方api的,而在其他的功能里是用于一些简单的数据并行计算的,但是假如你先调用了这个功能,同一时间之后调用计算的函数,那么这里forkjionPool的实现会让你计算的函数大打折扣.
不过也不要急着去吐槽ForkJoinPool的实现,在不同的情况下你可以给它一个ManagedBlocker实例并且确保它知道在一个阻塞调用中应该什么时候去抵消掉卡住的workers.现在有意思的一点是,在一个parallel stream处理中并不一定是阻塞调用会拖延程序的性能。任何被用于映射在一个集合上的长时间运行的函数都会产生同样的问题.
正如我们上面那个列子的情况分析得知,lambda的执行并不是瞬间完成的,所有使用parallel streams的程序都有可能成为阻塞程序的源头,并且在执行过程中程序中的其他部分将无法访问这些workers,这意味着任何依赖parallel streams的程序在什么别的东西占用着common ForkJoinPool时将会变得不可预知并且暗藏危机.
怎么正确使用parallelStream
如果你正在写一个其他地方都是单线程的程序并且准确地知道什么时候你应该要使用parallel streams,这样的话你可能会觉得这个问题有一点肤浅。然而,我们很多人是在处理web应用、各种不同的框架以及重量级应用服务。一个服务器是怎样被设计成一个可以支持多种独立应用的主机的?谁知道呢,给你一个可以并行的却不能控制输入的parallel stream.
很抱歉,请原谅我用的标注[怎么正确使用parallelStream],因为目前为止我也没有发现一个好的方式来让我真正的正确使用parallelStream.下面的网上写的两种方式:
一种方式是限制ForkJoinPool提供的并行数。可以通过使用-Djava.util.mon.parallelism=1 来限制线程池的大小为1。不再从并行化中得到好处可以杜绝错误的使用它(其实这个方式还是有点搞笑的,既然这样搞那我还不如不去使用并行流)。
另一种方式就是,一个被称为工作区的可以让ForkJoinPool平行放置的 parallelStream() 实现。不幸的是现在的JDK还没有实现。
Parallel streams 是无法预测的,而且想要正确地使用它有些棘手。几乎任何parallel streams的使用都会影响程序中无关部分的性能,而且是一种无法预测的方式。。但是在调用stream.parallel() 或者parallelStream()时候在我的代码里之前我仍然会重新审视一遍他给我的程序究竟会带来什么问题,他能有多大的提升,是否有使用他的意义.
stream or parallelStream?
上面我们也看到了parallelStream所带来的隐患和好处,那么,在从stream和parallelStream方法中进行选择时,我们可以考虑以下几个问题:
1. 是否需要并行?
2. 任务之间是否是独立的?是否会引起任何竞态条件?
3. 结果是否取决于任务的调用顺序?
对于问题1,在回答这个问题之前,你需要弄清楚你要解决的问题是什么,数据量有多大,计算的特点是什么?并不是所有的问题都适合使用并发程序来求解,比如当数据量不大时,顺序执行往往比并行执行更快。毕竟,准备线程池和其它相关资源也是需要时间的。但是,当任务涉及到I/O操作并且任务之间不互相依赖时,那么并行化就是一个不错的选择。通常而言,将这类程序并行化之后,执行速度会提升好几个等级。
对于问题2,如果任务之间是独立的,并且代码中不涉及到对同一个对象的某个状态或者某个变量的更新操作,那么就表明代码是可以被并行化的。
对于问题3,由于在并行环境中任务的执行顺序是不确定的,因此对于依赖于顺序的任务而言,并行化也许不能给出正确的结果。
&&相关文章推荐
* 以上用户言论只代表其个人观点,不代表CSDN网站的观点或立场
访问:105839次
积分:1677
积分:1677
排名:千里之外
原创:60篇
转载:16篇
评论:24条
(1)(1)(1)(2)(2)(2)(8)(15)(1)(4)(1)(11)(1)(2)(25)
(window.slotbydup = window.slotbydup || []).push({
id: '4740881',
container: s,
size: '200,200',
display: 'inlay-fix'}

我要回帖

更多关于 jdk8 parallelstream 的文章

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信