0%

AQS-组件

AQS组件

CountDownLatch

CountDownLatch中count down是倒数的意思,latch则是门闩的含义。整体含义可以理解为倒数的门栓,似乎有一点【三二一,芝麻开门 】的感觉。CountDownLatch的作用也是如此,在构造CountDownLatch的时候需要传入一个整数n,在这个整数“倒数”到0之前,主线程需要等待在门口,而这个“倒数”过程则是由各个执行线程驱动的,每个线程执行完一个任务“倒数”一次。总结来说,CountDownLatch的作用就是等待其他的线程都执行完任务,必要时可以对各个任务的执行结果进行汇总,然后主线程才继续往下执行。

image-20210504092638245

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
final CountDownLatch countDownLatch = new CountDownLatch(n);

for (int i = 0; i < n; i++) {
final int threadNum = i;
exec.execute(() -> {
try {
test(threadNum); //需要被等待的线程执行的方法
} catch (Exception e) {
log.error("exception", e);
} finally {
countDownLatch.countDown();

}
});
}
countDownLatch.await();
//countDownLatch.await(10, TimeUnit.MILLISECONDS);
//await还可以设置等待的时间(参数1:等待的时间长度; 参数2:等待的时间单位)
log.info("analyze here")

Semaphore

信号量在操作系统中是很重要的概念,Java并发库里的Semaphore就可以很轻松的完成类似操作系统信号量的控制。它可以很容易控制系统中某个资源被同时访问的线程个数。Semaphore常用来做流量分流,特别是对公共资源有限的场景,比如数据库连接。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/**
* 1、普通调用
*/
try {
semaphore.acquire(); // 获取一个许可
test();//需要并发控制的内容
semaphore.release(); // 释放一个许可
} catch (Exception e) {
log.error("exception", e);
}

/**
* 2、同时获取多个许可,同时释放多个许可
*/
try {
semaphore.acquire(2);
test();
semaphore.release(2);
} catch (Exception e) {
log.error("exception", e);
}

/*
* 3、尝试获取许可,获取不到不执行
*/
try {
if (semaphore.tryAcquire()) {
test(threadNum);
semaphore.release();
}
} catch (Exception e) {
log.error("exception", e);
}

/*
* 4、尝试获取许可一段时间,获取不到不执行
* 参数1:等待时间长度 参数2:等待时间单位
*/
try {
if (semaphore.tryAcquire(5000, TimeUnit.MILLISECONDS)) {
test(threadNum);
semaphore.release();
}
} catch (Exception e) {
log.error("exception", e);
}

CyclicBarrier

CyclicBarrier也叫同步屏障,可以让一组线程达到一个屏障时被阻塞,直到最后一个线程达到屏障时,所以被阻塞的线程才能继续执行。CyclicBarrier好比一扇门,默认情况下关闭状态,堵住了线程执行的道路,直到所有线程都就位,门才打开,让所有线程一起通过。

image-20210504092709136

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
//想象一个场景,运动会男子100米决赛,5名选手。每个运动员都就位后才开始。
//公共线程循环调用方法
private static CyclicBarrier barrier = new CyclicBarrier(5);

public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newCachedThreadPool();

for (int i = 0; i < 5; i++) {
final int threadNum = i;
Thread.sleep(1000);
executor.execute(() -> {
try {
race(threadNum);
} catch (Exception e) {
log.error("exception", e);
}
});
}
executor.shutdown();
}

//使用方法1:每个线程都持续等待
private static void race(int threadNum) throws Exception {
Thread.sleep(1000);
log.info("{} is ready", threadNum);
barrier.await();
log.info("{} continue", threadNum);
}

//使用方法2:每个线程只等待一段时间
private static void race(int threadNum) throws Exception {
Thread.sleep(1000);
try {
barrier.await(2000, TimeUnit.MILLISECONDS);
} catch (InterruptedException | BrokenBarrierException | TimeoutException e) {
log.warn("BarrierException", e);
}
}

//使用方法3:在初始化的时候设置runnable,当线程达到屏障时优先执行runnable
private static CyclicBarrier barrier = new CyclicBarrier(5, () -> {
log.info("callback is running");
});

与CountDownLatch区别

  • 计数器可重复用
  • 描述一个或多个线程等待其他线程的关系/多个线程相互等待

FutureTask

通常创建一个线程有2种方式,一种是直接继承Thread,另外一种就是实现Runnable接口。这2种方式都有一个缺陷就是:在执行完任务之后无法获取执行结果。如果需要获取执行结果,就必须通过共享变量或者使用线程通信的方式来达到效果,这样使用起来就比较麻烦。而自从Java 1.5开始,就提供了Callable和Future,通过它们可以在任务执行完毕之后得到任务执行结果。

Callable与Runnable

先说一下java.lang.Runnable吧,它是一个接口,在它里面只声明了一个run()方法:

1
2
3
public interface Runnable {
public abstract void run();
}

由于run()方法返回值为void类型,所以在执行完任务之后无法返回任何结果。

Callable位于java.util.concurrent包下,它也是一个接口,在它里面也只声明了一个方法,只不过这个方法叫做call():

1
2
3
4
5
6
7
8
9
public interface Callable<V> {
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
V call() throws Exception;
}

可以看到,这是一个泛型接口,call()函数返回的类型就是传递进来的V类型。

那么怎么使用Callable呢?一般情况下是配合ExecutorService来使用的,在ExecutorService接口中声明了若干个submit方法的重载版本:

1
2
3
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);

Future

Future就是对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果。必要时可以通过get方法获取执行结果,该方法会阻塞直到任务返回结果。

Future类位于java.util.concurrent包下,它是一个接口:

1
2
3
4
5
6
7
8
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}

也就是说Future提供了三种功能:

  • 判断任务是否完成;
  • 能够中断任务;
  • 能够获取任务执行结果。

因为Future只是一个接口,所以是无法直接用来创建对象使用的,因此就有了下面的FutureTask。

FutureTask

FutureTask类实现了RunnableFuture接口,RunnableFuture继承了Runnable接口和Future接口,而FutureTask实现了RunnableFuture接口。所以它既可以作为Runnable被线程执行,又可以作为Future得到Callable的返回值。

使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class FutureTaskExample {
public static void main(String[] args) throws Exception {
FutureTask<String> futureTask = new FutureTask<String>(new Callable<String>() {
@Override
public String call() throws Exception {
log.info("do something in callable");
Thread.sleep(5000);
return "Done";
}
});

new Thread(futureTask).start();
log.info("do something in main");
Thread.sleep(1000);
String result = futureTask.get();
log.info("result:{}", result);
}
}

ForkJoin

ForkJoin是Java7提供的一个并行执行任务的框架,是把大任务分割成若干个小任务,待小任务完成后将结果汇总成大任务结果的框架。主要采用的是工作窃取算法,工作窃取算法是指某个线程从其他队列里窃取任务来执行。

image-20210504092914046

在窃取过程中两个线程会访问同一个队列,为了减少窃取任务线程和被窃取任务线程之间的竞争,通常我们会使用双端队列来实现工作窃取算法。被窃取任务的线程永远从队列的头部拿取任务,窃取任务的线程从队列尾部拿取任务。

局限性

  • 任务只能使用fork和join作为同步机制,如果使用了其他同步机制,当他们在同步操作时,工作线程就不能执行其他任务了。比如在fork框架使任务进入了睡眠,那么在睡眠期间内在执行这个任务的线程将不会执行其他任务了
  • 所拆分的任务不应该去执行IO操作,如读和写数据文件。
  • 任务不能抛出检查异常。必须通过必要的代码来处理他们。

使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
//模拟加和运算
@Slf4j
public class ForkJoinTaskExample extends RecursiveTask<Integer> {

public static final int threshold = 2;//设定不大于两个数相加就直接for循环,不适用框架
private int start;
private int end;

public ForkJoinTaskExample(int start, int end) {
this.start = start;
this.end = end;
}

@Override
protected Integer compute() {
int sum = 0;
//如果任务足够小就计算任务
boolean canCompute = (end - start) <= threshold;
if (canCompute) {
for (int i = start; i <= end; i++) {
sum += i;
}
} else {
// 如果任务大于阈值,就分裂成两个子任务计算(分裂算法,可依情况调优)
int middle = (start + end) / 2;
ForkJoinTaskExample leftTask = new ForkJoinTaskExample(start, middle);
ForkJoinTaskExample rightTask = new ForkJoinTaskExample(middle + 1, end);

// 执行子任务
leftTask.fork();
rightTask.fork();

// 等待任务执行结束合并其结果
int leftResult = leftTask.join();
int rightResult = rightTask.join();

// 合并子任务
sum = leftResult + rightResult;
}
return sum;
}

public static void main(String[] args) {
ForkJoinPool forkjoinPool = new ForkJoinPool();

//生成一个计算任务,计算1+2+3+4...100
ForkJoinTaskExample task = new ForkJoinTaskExample(1, 100);

//执行一个任务
Future<Integer> result = forkjoinPool.submit(task);

try {
log.info("result:{}", result.get());
} catch (Exception e) {
log.error("exception", e);
}
}
}

BlockingQueue

BlockingQueue接口表示一个线程安全的队列,可以放入并获取实例。BlockingQueue通常用于使线程产生对象,而另一线程则使用该对象。主要应用场景就是:生产者消费者模型。

img

生产线程将持续生产新对象并将它们插入队列,直到队列达到它可以包含的上限。换句话说,这是极限。如果阻塞队列达到其上限,则会在尝试插入新对象时阻塞生产线程。在消耗线程将对象带出队列之前,它一直处于阻塞状态。消费线程不断将对象从阻塞队列中取出,并对其进行处理。如果消费线程试图将对象从空队列中取出,则消费线程将被阻塞,直到生成的线程将对象放入队列。

方法

BlockingQueue有4种不同的方法来插入、删除和检查队列中的元素。每一组方法的行为都是不同的,以防被请求的操作不能立即执行。下面是这些方法的一个表:

Throws Exception Special Value Blocks Times out
Insert add(o) offer(o) put(o) offer(o, timeout, timeunit)
Remove remove(o) poll() take() poll(timeout, timeunit)
Examine element() peek()
  • Throws Exception: 如果尝试的操作不可能立即发生,则抛出一个异常。
  • Special Value :如果尝试的操作不能立即执行,则会返回一个特殊值(通常为true / false)。
  • Blocks:如果尝试的操作不可能立即执行,那么该方法将阻塞。
  • Times out:如果尝试的操作不可能立即执行,则该方法调用将阻塞,但不会超过给定的超时。
    返回一个特殊值,告诉操作是否成功(通常为true / false)。

实现

由于BlockingQueue是一个接口,因此您需要使用它的一个实现来使用它。java.util.concurrent包具有以下BlockingQueue接口(在Java 6中)的实现:

  • ArrayBlockingQueue
  • DelayQueue
  • LinkedBlockingQueue]
  • PriorityBlockingQueue
  • SynchronousQueue

使用

该示例使用BlockingQueue接口的ArrayBlockingQueue实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
//BlockingQueueExample类在不同的线程中启动生产者和消费者。生产者将字符串插入共享BlockingQueue中,消费者将它们取出。
public class BlockingQueueExample {
public static void main(String[] args) throws Exception {

BlockingQueue queue = new ArrayBlockingQueue(1024);

Producer producer = new Producer(queue);
Consumer consumer = new Consumer(queue);

new Thread(producer).start();
new Thread(consumer).start();

Thread.sleep(4000);
}


}
//这是生产者类。注意它在每个put()调用之间的使用sleep。这将导致消费者在等待队列中的对象时阻塞。
@Slf4j
class Producer implements Runnable{

protected BlockingQueue queue = null;

public Producer(BlockingQueue queue) {
this.queue = queue;
}

public void run() {
try {
queue.put("1");
log.info("put 1");
Thread.sleep(1000);
queue.put("2");
log.info("put 2");
Thread.sleep(1000);
queue.put("3");
log.info("put 3");

} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
//这是消费者类。它只是从队列中取出对象,并将它们打印到System.out。
@Slf4j
class Consumer implements Runnable{

protected BlockingQueue queue = null;

public Consumer(BlockingQueue queue) {
this.queue = queue;
}

public void run() {
try {
log.info("take {}",queue.take());
log.info("take {}",queue.take());
log.info("take {}",queue.take());


} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

1
2
3
4
5
6
19:15:27.756 [Thread-0] INFO com.mmall.concurrency.example.aqs.Producer - put 1
19:15:27.756 [Thread-1] INFO com.mmall.concurrency.example.aqs.Consumer - take 1
19:15:28.776 [Thread-0] INFO com.mmall.concurrency.example.aqs.Producer - put 2
19:15:28.776 [Thread-1] INFO com.mmall.concurrency.example.aqs.Consumer - take 2
19:15:29.790 [Thread-0] INFO com.mmall.concurrency.example.aqs.Producer - put 3
19:15:29.790 [Thread-1] INFO com.mmall.concurrency.example.aqs.Consumer - take 3