AQS组件 CountDownLatch CountDownLatch中count down是倒数的意思,latch则是门闩的含义。整体含义可以理解为倒数的门栓 ,似乎有一点【三二一,芝麻开门 】的感觉。CountDownLatch的作用也是如此,在构造CountDownLatch的时候需要传入一个整数n,在这个整数“倒数”到0之前,主线程需要等待在门口,而这个“倒数”过程则是由各个执行线程驱动的,每个线程执行完一个任务“倒数”一次。总结来说,CountDownLatch的作用就是等待其他的线程都执行完任务,必要时可以对各个任务的执行结果进行汇总,然后主线程才继续往下执行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 final CountDownLatch countDownLatch = new CountDownLatch (n);for (int i = 0 ; i < n; i++) { final int threadNum = i; exec.execute(() -> { try { test(threadNum); } catch (Exception e) { log.error("exception" , e); } finally { countDownLatch.countDown(); } }); } countDownLatch.await(); log.info("analyze here" )
Semaphore 信号量在操作系统中是很重要的概念,Java并发库里的Semaphore就可以很轻松的完成类似操作系统信号量的控制。它可以很容易控制系统中某个资源被同时访问的线程个数 。Semaphore常用来做流量分流,特别是对公共资源有限的场景,比如数据库连接。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 try { semaphore.acquire(); test(); semaphore.release(); } catch (Exception e) { log.error("exception" , e); } try { semaphore.acquire(2 ); test(); semaphore.release(2 ); } catch (Exception e) { log.error("exception" , e); } try { if (semaphore.tryAcquire()) { test(threadNum); semaphore.release(); } } catch (Exception e) { log.error("exception" , e); } try { if (semaphore.tryAcquire(5000 , TimeUnit.MILLISECONDS)) { test(threadNum); semaphore.release(); } } catch (Exception e) { log.error("exception" , e); }
CyclicBarrier CyclicBarrier也叫同步屏障,可以让一组线程达到一个屏障时被阻塞,直到最后一个线程达到屏障时,所以被阻塞的线程才能继续执行 。CyclicBarrier好比一扇门,默认情况下关闭状态,堵住了线程执行的道路,直到所有线程都就位,门才打开,让所有线程一起通过。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 private static CyclicBarrier barrier = new CyclicBarrier (5 );public static void main (String[] args) throws Exception { ExecutorService executor = Executors.newCachedThreadPool(); for (int i = 0 ; i < 5 ; i++) { final int threadNum = i; Thread.sleep(1000 ); executor.execute(() -> { try { race(threadNum); } catch (Exception e) { log.error("exception" , e); } }); } executor.shutdown(); } private static void race (int threadNum) throws Exception { Thread.sleep(1000 ); log.info("{} is ready" , threadNum); barrier.await(); log.info("{} continue" , threadNum); } private static void race (int threadNum) throws Exception { Thread.sleep(1000 ); try { barrier.await(2000 , TimeUnit.MILLISECONDS); } catch (InterruptedException | BrokenBarrierException | TimeoutException e) { log.warn("BarrierException" , e); } } private static CyclicBarrier barrier = new CyclicBarrier (5 , () -> { log.info("callback is running" ); });
与CountDownLatch区别
计数器可重复用
描述一个或多个线程等待其他线程的关系/多个线程相互等待
FutureTask 通常创建一个线程有2种方式,一种是直接继承Thread,另外一种就是实现Runnable接口。这2种方式都有一个缺陷就是:在执行完任务之后无法获取执行结果 。如果需要获取执行结果,就必须通过共享变量或者使用线程通信的方式来达到效果,这样使用起来就比较麻烦。而自从Java 1.5开始,就提供了Callable和Future,通过它们可以在任务执行完毕之后得到任务执行结果。
Callable与Runnable
先说一下java.lang.Runnable
吧,它是一个接口,在它里面只声明了一个run()方法:
1 2 3 public interface Runnable { public abstract void run () ; }
由于run()方法返回值为void类型,所以在执行完任务之后无法返回任何结果。
Callable位于java.util.concurrent
包下,它也是一个接口,在它里面也只声明了一个方法,只不过这个方法叫做call():
1 2 3 4 5 6 7 8 9 public interface Callable <V> { V call () throws Exception; }
可以看到,这是一个泛型接口,call()函数返回的类型就是传递进来的V类型。
那么怎么使用Callable呢?一般情况下是配合ExecutorService来使用的,在ExecutorService接口中声明了若干个submit方法的重载版本:
1 2 3 <T> Future<T> submit (Callable<T> task) ; <T> Future<T> submit (Runnable task, T result) ; Future<?> submit(Runnable task);
Future
Future就是对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果。必要时可以通过get方法获取执行结果,该方法会阻塞直到任务返回结果。
Future类位于java.util.concurrent
包下,它是一个接口:
1 2 3 4 5 6 7 8 public interface Future <V> { boolean cancel (boolean mayInterruptIfRunning) ; boolean isCancelled () ; boolean isDone () ; V get () throws InterruptedException, ExecutionException; V get (long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
也就是说Future提供了三种功能:
判断任务是否完成;
能够中断任务;
能够获取任务执行结果。
因为Future只是一个接口,所以是无法直接用来创建对象使用的,因此就有了下面的FutureTask。
FutureTask
FutureTask类实现了RunnableFuture接口,RunnableFuture继承了Runnable接口和Future接口,而FutureTask实现了RunnableFuture接口。所以它既可以作为Runnable被线程执行,又可以作为Future得到Callable的返回值。
使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public class FutureTaskExample { public static void main (String[] args) throws Exception { FutureTask<String> futureTask = new FutureTask <String>(new Callable <String>() { @Override public String call () throws Exception { log.info("do something in callable" ); Thread.sleep(5000 ); return "Done" ; } }); new Thread (futureTask).start(); log.info("do something in main" ); Thread.sleep(1000 ); String result = futureTask.get(); log.info("result:{}" , result); } }
ForkJoin ForkJoin是Java7提供的一个并行执行任务的框架,是把大任务分割成若干个小任务,待小任务完成后将结果汇总成大任务结果的框架。主要采用的是工作窃取算法 ,工作窃取算法是指某个线程从其他队列里窃取任务来执行。
在窃取过程中两个线程会访问同一个队列,为了减少窃取任务线程和被窃取任务线程之间的竞争,通常我们会使用双端队列 来实现工作窃取算法。被窃取任务的线程永远从队列的头部拿取任务,窃取任务的线程从队列尾部拿取任务。
局限性
任务只能使用fork和join作为同步机制,如果使用了其他同步机制,当他们在同步操作时,工作线程就不能执行其他任务了。比如在fork框架使任务进入了睡眠,那么在睡眠期间内在执行这个任务的线程将不会执行其他任务了
所拆分的任务不应该去执行IO操作,如读和写数据文件。
任务不能抛出检查异常。必须通过必要的代码来处理他们。
使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 @Slf4j public class ForkJoinTaskExample extends RecursiveTask <Integer> { public static final int threshold = 2 ; private int start; private int end; public ForkJoinTaskExample (int start, int end) { this .start = start; this .end = end; } @Override protected Integer compute () { int sum = 0 ; boolean canCompute = (end - start) <= threshold; if (canCompute) { for (int i = start; i <= end; i++) { sum += i; } } else { int middle = (start + end) / 2 ; ForkJoinTaskExample leftTask = new ForkJoinTaskExample (start, middle); ForkJoinTaskExample rightTask = new ForkJoinTaskExample (middle + 1 , end); leftTask.fork(); rightTask.fork(); int leftResult = leftTask.join(); int rightResult = rightTask.join(); sum = leftResult + rightResult; } return sum; } public static void main (String[] args) { ForkJoinPool forkjoinPool = new ForkJoinPool (); ForkJoinTaskExample task = new ForkJoinTaskExample (1 , 100 ); Future<Integer> result = forkjoinPool.submit(task); try { log.info("result:{}" , result.get()); } catch (Exception e) { log.error("exception" , e); } } }
BlockingQueue BlockingQueue接口表示一个线程安全的队列,可以放入并获取实例。BlockingQueue通常用于使线程产生对象,而另一线程则使用该对象。主要应用场景就是:生产者消费者模型。
生产线程将持续生产新对象并将它们插入队列,直到队列达到它可以包含的上限。换句话说,这是极限。如果阻塞队列达到其上限,则会在尝试插入新对象时阻塞生产线程。在消耗线程将对象带出队列之前,它一直处于阻塞状态。消费线程不断将对象从阻塞队列中取出,并对其进行处理。如果消费线程试图将对象从空队列中取出,则消费线程将被阻塞,直到生成的线程将对象放入队列。
方法
BlockingQueue有4种不同的方法来插入、删除和检查队列中的元素。每一组方法的行为都是不同的,以防被请求的操作不能立即执行。下面是这些方法的一个表:
Throws Exception
Special Value
Blocks
Times out
Insert
add(o)
offer(o)
put(o)
offer(o, timeout, timeunit)
Remove
remove(o)
poll()
take()
poll(timeout, timeunit)
Examine
element()
peek()
Throws Exception: 如果尝试的操作不可能立即发生,则抛出一个异常。
Special Value :如果尝试的操作不能立即执行,则会返回一个特殊值(通常为true / false)。
Blocks:如果尝试的操作不可能立即执行,那么该方法将阻塞。
Times out:如果尝试的操作不可能立即执行,则该方法调用将阻塞,但不会超过给定的超时。 返回一个特殊值,告诉操作是否成功(通常为true / false)。
实现
由于BlockingQueue是一个接口,因此您需要使用它的一个实现来使用它。java.util.concurrent包具有以下BlockingQueue接口(在Java 6中)的实现:
ArrayBlockingQueue
DelayQueue
LinkedBlockingQueue]
PriorityBlockingQueue
SynchronousQueue
使用
该示例使用BlockingQueue接口的ArrayBlockingQueue实现。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 public class BlockingQueueExample { public static void main (String[] args) throws Exception { BlockingQueue queue = new ArrayBlockingQueue (1024 ); Producer producer = new Producer (queue); Consumer consumer = new Consumer (queue); new Thread (producer).start(); new Thread (consumer).start(); Thread.sleep(4000 ); } } @Slf4j class Producer implements Runnable { protected BlockingQueue queue = null ; public Producer (BlockingQueue queue) { this .queue = queue; } public void run () { try { queue.put("1" ); log.info("put 1" ); Thread.sleep(1000 ); queue.put("2" ); log.info("put 2" ); Thread.sleep(1000 ); queue.put("3" ); log.info("put 3" ); } catch (InterruptedException e) { e.printStackTrace(); } } } @Slf4j class Consumer implements Runnable { protected BlockingQueue queue = null ; public Consumer (BlockingQueue queue) { this .queue = queue; } public void run () { try { log.info("take {}" ,queue.take()); log.info("take {}" ,queue.take()); log.info("take {}" ,queue.take()); } catch (InterruptedException e) { e.printStackTrace(); } } }
1 2 3 4 5 6 19 :15 :27.756 [Thread-0 ] INFO com.mmall.concurrency.example.aqs.Producer - put 1 19 :15 :27.756 [Thread-1 ] INFO com.mmall.concurrency.example.aqs.Consumer - take 1 19 :15 :28.776 [Thread-0 ] INFO com.mmall.concurrency.example.aqs.Producer - put 2 19 :15 :28.776 [Thread-1 ] INFO com.mmall.concurrency.example.aqs.Consumer - take 2 19 :15 :29.790 [Thread-0 ] INFO com.mmall.concurrency.example.aqs.Producer - put 3 19 :15 :29.790 [Thread-1 ] INFO com.mmall.concurrency.example.aqs.Consumer - take 3