阻塞队列
首先它是一个队列,通过一个共享的队列,可以是的数据由队列的一端输入,从另一端输出
阻塞队列在队列为空时,获取操作会阻塞等待,直到队列中有新的元素加入;在队列已满时,插入操作会阻塞等待,直到队列中有元素被取出。
在 Java 中,阻塞队列是通过 java.util.concurrent 包中的接口和类来实现的。常见的阻塞队列有:
- ArrayBlockingQueue:基于数组实现的有界阻塞队列,当队列满时,插入操作会阻塞等待;当队列为空时,获取操作会阻塞等待。
- LinkedBlockingQueue:基于链表实现的可选有界阻塞队列,如果指定了容量,则是有界的;否则是无界的。当队列满时,插入操作会阻塞等待;当队列为空时,获取操作会阻塞等待。
- SynchronousQueue:一个不存储元素的阻塞队列,每个插入操作必须等待一个相应的删除操作,反之亦然。因此,SynchronousQueue 在多线程并发访问时可以实现线程之间的一对一交互。
线程池
线程池(Thread Pool)做的工作主要是控制运行的线程数量,处理过程中将任务放入队列,然后在线程创建后启动这些任务。如果线程数量超过了最大数量,超出数量的线程排队等候,等其他线程执行完毕,在从队列中取出任务来执行。
- Executors.newFixedThreadPool(int) 一池N线程
- Executors.newSingleThreadExecutor() 一池一线程
- Executors.newCachedThreadPool() 可扩容线程
package com.bbedu.juc.pool;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* @projectName: jvm-study
* @package: com.bbedu.juc.pool
* @className: ThreadPoolDemo1
* @author: BBChen
* @description: 演示线程池的使用
* @date: 2023/5/28 11:08
* @version: 1.0
*/
public class ThreadPoolDemo1 {
public static void main(String[] args) {
// 一池五线程
ExecutorService threadPool1 = Executors.newFixedThreadPool(5);
// 一池一线程
ExecutorService threadPool2 = Executors.newSingleThreadExecutor();
// 一池可扩容线程
ExecutorService threadPool3 = Executors.newCachedThreadPool();
// 十个客户请求
try {
for (int i = 1; i <= 10; i++) {
// 执行
threadPool3.execute(() -> {
System.out.println(Thread.currentThread().getName() +
" 办理业务");
});
TimeUnit.MILLISECONDS.sleep(5);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPool3.shutdown();
}
}
}
ThreadPoolExecutor参数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.*defaultThreadFactory*(), *defaultHandler*);
自定义线程池
package com.bbedu.juc.pool;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @projectName: jvm-study
* @package: com.bbedu.juc.pool
* @className: ThreadPoolDemo2
* @author: BBChen
* @description: 演示自定义线程池
* @date: 2023/5/28 13:31
* @version: 1.0
*/
public class ThreadPoolDemo2 {
public static void main(String[] args) {
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2,
5,
2L,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
// 十个客户请求
try {
for (int i = 1; i <= 10; i++) {
// 执行
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() +
" 办理业务");
});
TimeUnit.MILLISECONDS.sleep(5);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPool.shutdown();
}
}
}
分支合并框架
Fork/Join可以将一个大的任务拆分成多个子任务进行并行处理,最后将子任务结果合并成最后的计算结果,并进行输出。Fork/Join框架要完成两件事情:
- Fork: 把一个复杂任务进行拆分,大事化小
- Join: 把分拆任务的结果进行合并
package com.bbedu.juc.forkjoin;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
/**
* @projectName: jvm-study
* @package: com.bbedu.juc.forkjoin
* @className: ForkJoinDemo
* @author: BBChen
* @description: 演示分支合并框架
* @date: 2023/5/28 13:49
* @version: 1.0
*/
public class ForkJoinDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
MyTask myTask = new MyTask(0, 100);
// 创建分支合并池对象
ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinTask<Integer> forkJoinTask = forkJoinPool.submit(myTask);
Integer result = forkJoinTask.get();
System.out.println("结果为: " + result);
forkJoinPool.shutdown();
}
}
class MyTask extends RecursiveTask<Integer> {
// 拆分差值不超过10
private static final Integer VALUE = 10;
// 开始值
private int begin;
// 结束值
private int end;
// 返回值
private int result;
public MyTask(int begin, int end) {
this.begin = begin;
this.end = end;
}
// 拆分合并
@Override
protected Integer compute() {
if ((end - begin) <= VALUE) {
for (int i = begin; i <= end; i++) {
result += i;
}
return result;
} else {
// 拆分
// 获取中间值
int mid = begin + (end - begin) / 2;
MyTask left = new MyTask(begin, mid);
MyTask right = new MyTask(mid + 1, end);
left.fork();
right.fork();
// 合并
result = left.join() + right.join();
}
return result;
}
}
异步回调
package com.bbedu.juc.completable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
/**
* @projectName: jvm-study
* @package: com.bbedu.juc.completable
* @className: CompletableFutureDemo
* @author: BBChen
* @description: 演示异步调用
* @date: 2023/5/28 14:03
* @version: 1.0
*/
public class CompletableFutureDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 异步调用 有返回值
CompletableFuture<Void> completableFuture1 =
CompletableFuture.runAsync(() -> {
System.out.println(Thread.currentThread().getName() +
" completableFuture1");
});
completableFuture1.get();
// 异步调用 没有返回值
CompletableFuture<Integer> completableFuture2 =
CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() +
" completableFuture2");
int i = 1 / 0;
return 1024;
});
completableFuture2.whenComplete((t, u) -> {
System.out.println("----t= " + t);
System.out.println("----u= " + u);
});
}
}
评论