侧边栏壁纸
  • 累计撰写 53 篇文章
  • 累计收到 5 条评论

JUC_基础篇_下

bbchen
2023-05-28 / 0 评论 / 86 阅读 / 正在检测是否收录...

img

阻塞队列

首先它是一个队列,通过一个共享的队列,可以是的数据由队列的一端输入,从另一端输出

image-20230527224530573

阻塞队列在队列为空时,获取操作会阻塞等待,直到队列中有新的元素加入;在队列已满时,插入操作会阻塞等待,直到队列中有元素被取出。

在 Java 中,阻塞队列是通过 java.util.concurrent 包中的接口和类来实现的。常见的阻塞队列有:

  1. ArrayBlockingQueue:基于数组实现的有界阻塞队列,当队列满时,插入操作会阻塞等待;当队列为空时,获取操作会阻塞等待。
  2. LinkedBlockingQueue:基于链表实现的可选有界阻塞队列,如果指定了容量,则是有界的;否则是无界的。当队列满时,插入操作会阻塞等待;当队列为空时,获取操作会阻塞等待。
  3. SynchronousQueue:一个不存储元素的阻塞队列,每个插入操作必须等待一个相应的删除操作,反之亦然。因此,SynchronousQueue 在多线程并发访问时可以实现线程之间的一对一交互。

image-20230527234246204

线程池

线程池(Thread Pool)做的工作主要是控制运行的线程数量,处理过程中将任务放入队列,然后在线程创建后启动这些任务。如果线程数量超过了最大数量,超出数量的线程排队等候,等其他线程执行完毕,在从队列中取出任务来执行。

  • Executors.newFixedThreadPool(int) 一池N线程
  • Executors.newSingleThreadExecutor() 一池一线程
  • Executors.newCachedThreadPool() 可扩容线程
package com.bbedu.juc.pool;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * @projectName: jvm-study
 * @package: com.bbedu.juc.pool
 * @className: ThreadPoolDemo1
 * @author: BBChen
 * @description: 演示线程池的使用
 * @date: 2023/5/28 11:08
 * @version: 1.0
 */
public class ThreadPoolDemo1 {
    public static void main(String[] args) {
        // 一池五线程
        ExecutorService threadPool1 = Executors.newFixedThreadPool(5);

        // 一池一线程
        ExecutorService threadPool2 = Executors.newSingleThreadExecutor();

        // 一池可扩容线程
        ExecutorService threadPool3 = Executors.newCachedThreadPool();

        // 十个客户请求
        try {
            for (int i = 1; i <= 10; i++) {
                // 执行
                threadPool3.execute(() -> {
                    System.out.println(Thread.currentThread().getName() +
                            " 办理业务");
                });
                TimeUnit.MILLISECONDS.sleep(5);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            threadPool3.shutdown();
        }
    }
}

ThreadPoolExecutor参数

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.*defaultThreadFactory*(), *defaultHandler*);

image-20230528112213937

image-20230528113057632

image-20230528113146513

自定义线程池

image-20230528133138961

package com.bbedu.juc.pool;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * @projectName: jvm-study
 * @package: com.bbedu.juc.pool
 * @className: ThreadPoolDemo2
 * @author: BBChen
 * @description: 演示自定义线程池
 * @date: 2023/5/28 13:31
 * @version: 1.0
 */
public class ThreadPoolDemo2 {

    public static void main(String[] args) {
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2,
                5,
                2L,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(3),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy());

        // 十个客户请求
        try {
            for (int i = 1; i <= 10; i++) {
                // 执行
                threadPool.execute(() -> {
                    System.out.println(Thread.currentThread().getName() +
                            " 办理业务");
                });
                TimeUnit.MILLISECONDS.sleep(5);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            threadPool.shutdown();
        }

    }
}

分支合并框架

Fork/Join可以将一个大的任务拆分成多个子任务进行并行处理,最后将子任务结果合并成最后的计算结果,并进行输出。Fork/Join框架要完成两件事情:

  • Fork: 把一个复杂任务进行拆分,大事化小
  • Join: 把分拆任务的结果进行合并
package com.bbedu.juc.forkjoin;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;

/**
 * @projectName: jvm-study
 * @package: com.bbedu.juc.forkjoin
 * @className: ForkJoinDemo
 * @author: BBChen
 * @description: 演示分支合并框架
 * @date: 2023/5/28 13:49
 * @version: 1.0
 */
public class ForkJoinDemo {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        MyTask myTask = new MyTask(0, 100);
        // 创建分支合并池对象
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        ForkJoinTask<Integer> forkJoinTask = forkJoinPool.submit(myTask);

        Integer result = forkJoinTask.get();
        System.out.println("结果为: " + result);
        
        forkJoinPool.shutdown();
    }

}

class MyTask extends RecursiveTask<Integer> {

    // 拆分差值不超过10
    private static final Integer VALUE = 10;
    // 开始值
    private int begin;
    // 结束值
    private int end;
    // 返回值
    private int result;

    public MyTask(int begin, int end) {
        this.begin = begin;
        this.end = end;
    }

    // 拆分合并
    @Override
    protected Integer compute() {
        if ((end - begin) <= VALUE) {
            for (int i = begin; i <= end; i++) {
                result += i;
            }
            return result;
        } else {
            // 拆分
            // 获取中间值
            int mid = begin + (end - begin) / 2;
            MyTask left = new MyTask(begin, mid);
            MyTask right = new MyTask(mid + 1, end);
            left.fork();
            right.fork();

            // 合并
            result = left.join() + right.join();
        }
        return result;
    }
}

异步回调

package com.bbedu.juc.completable;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

/**
 * @projectName: jvm-study
 * @package: com.bbedu.juc.completable
 * @className: CompletableFutureDemo
 * @author: BBChen
 * @description: 演示异步调用
 * @date: 2023/5/28 14:03
 * @version: 1.0
 */
public class CompletableFutureDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 异步调用 有返回值
        CompletableFuture<Void> completableFuture1 =
                CompletableFuture.runAsync(() -> {
                    System.out.println(Thread.currentThread().getName() +
                            " completableFuture1");
                });
        completableFuture1.get();

        // 异步调用 没有返回值
        CompletableFuture<Integer> completableFuture2 =
                CompletableFuture.supplyAsync(() -> {
                    System.out.println(Thread.currentThread().getName() +
                            " completableFuture2");
                    int i = 1 / 0;
                    return 1024;
                });
        completableFuture2.whenComplete((t, u) -> {
            System.out.println("----t= " + t);
            System.out.println("----u= " + u);
        });
    }
}
0

评论

博主关闭了所有页面的评论