JDK的线程池
 线程池状态,RUNNING,SHUTDOWN(不会再接受新任务了),STOP(立刻停止),TIDYING(任务执行完毕,即将TERMINATED),TERMINATED
构造函数
| 1
 | public ThreadPollExecutor(int corePoolsize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler)
 | 
- 核心线程数量
- 最大线程数量
- 就急线程生存时间
- 时间单位
- 阻塞队列
- 线程工厂: 给线程起个名字
- 拒绝策略
拒绝策略
- AbortPolicy 让调用者抛出异常
- CallerRunsPolicy 让调用者运行任务
- DiscardPolicy 放弃本次任务
- DIcardOldestPolicy 放弃队列中最先进入的任务
- Dubbo 抛出异常并记录线程栈信息
- Netty 创建新的线程来执行
- ActiveMQ 等待超时
- PinPoint 拒绝策略链, 比如先用方法A,如果失败就要方法B,…
newFixedThreadPool
 固定大小的线程池
 阻塞队列无界,没有就急线程,nThreads个核心线程, 是非守护线程
 当然我们也可以自己创建线程工厂,自己给线程取名字
| 12
 3
 4
 5
 
 | public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,
 0L, TimeUnit.MILLISECONDS,
 new LinkedBlockingQueue<Runnable>());
 }
 
 | 
newCachedThraedPool
 不固定大小的线程池
 阻塞队列无界,没有核心线程,全是救急线程,但不是无限个,活60秒
| 12
 3
 4
 5
 
 | public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
 60L, TimeUnit.SECONDS,
 new SynchronousQueue<Runnable>());
 }
 
 | 
SynchronousQueue
 如果没有人取出东西,放入操作会被阻塞, 如果没有人放入东西,同理拿出会被阻塞,如果有多个同时拿,这时候就像栈一样,后来的人,会先拿到东西
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 
 | void test10_f1(SynchronousQueue<Integer> integers, String string) throws InterruptedException {Thread.sleep(200);
 new Thread(() -> {
 try {
 logger.debug("begin");
 integers.put(1);
 logger.debug("end");
 } catch (InterruptedException e) {
 e.printStackTrace();
 }
 }, string).start();
 }
 
 void test10_f2(SynchronousQueue<Integer> integers, String string) throws InterruptedException {
 Thread.sleep(200);
 new Thread(() -> {
 try {
 logger.debug("begin");
 integers.take();
 logger.debug("end");
 } catch (InterruptedException e) {
 e.printStackTrace();
 }
 }, string).start();
 }
 
 @Test
 public void test10() throws InterruptedException {
 SynchronousQueue<Integer> integers = new SynchronousQueue<>();
 test10_f1(integers, "1");
 test10_f1(integers, "2");
 test10_f1(integers, "3");
 test10_f1(integers, "4");
 test10_f2(integers, "5");
 test10_f2(integers, "6");
 test10_f2(integers, "7");
 test10_f2(integers, "8");
 test10_f2(integers, "a");
 test10_f2(integers, "b");
 test10_f2(integers, "c");
 test10_f2(integers, "d");
 test10_f1(integers, "e");
 test10_f1(integers, "f");
 test10_f1(integers, "g");
 test10_f1(integers, "h");
 }
 
 | 
 下面是输出, 可以看到,1234按顺序进入,4321按顺序出来
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 
 | 16:33:54.391 [1] DEBUG com.wsx.test.ThreadTest - begin16:33:54.591 [2] DEBUG com.wsx.test.ThreadTest - begin
 16:33:54.792 [3] DEBUG com.wsx.test.ThreadTest - begin
 16:33:54.996 [4] DEBUG com.wsx.test.ThreadTest - begin
 16:33:55.202 [5] DEBUG com.wsx.test.ThreadTest - begin
 16:33:55.202 [5] DEBUG com.wsx.test.ThreadTest - end
 16:33:55.202 [4] DEBUG com.wsx.test.ThreadTest - end
 16:33:55.407 [6] DEBUG com.wsx.test.ThreadTest - begin
 16:33:55.409 [6] DEBUG com.wsx.test.ThreadTest - end
 16:33:55.409 [3] DEBUG com.wsx.test.ThreadTest - end
 16:33:55.609 [7] DEBUG com.wsx.test.ThreadTest - begin
 16:33:55.609 [2] DEBUG com.wsx.test.ThreadTest - end
 16:33:55.609 [7] DEBUG com.wsx.test.ThreadTest - end
 16:33:55.813 [8] DEBUG com.wsx.test.ThreadTest - begin
 16:33:55.814 [8] DEBUG com.wsx.test.ThreadTest - end
 16:33:55.814 [1] DEBUG com.wsx.test.ThreadTest - end
 16:33:56.017 [a] DEBUG com.wsx.test.ThreadTest - begin
 16:33:56.221 [b] DEBUG com.wsx.test.ThreadTest - begin
 16:33:56.425 [c] DEBUG com.wsx.test.ThreadTest - begin
 16:33:56.630 [d] DEBUG com.wsx.test.ThreadTest - begin
 16:33:56.835 [e] DEBUG com.wsx.test.ThreadTest - begin
 16:33:56.836 [e] DEBUG com.wsx.test.ThreadTest - end
 16:33:56.836 [d] DEBUG com.wsx.test.ThreadTest - end
 16:33:57.038 [f] DEBUG com.wsx.test.ThreadTest - begin
 16:33:57.039 [f] DEBUG com.wsx.test.ThreadTest - end
 16:33:57.039 [c] DEBUG com.wsx.test.ThreadTest - end
 16:33:57.244 [g] DEBUG com.wsx.test.ThreadTest - begin
 16:33:57.244 [g] DEBUG com.wsx.test.ThreadTest - end
 16:33:57.244 [b] DEBUG com.wsx.test.ThreadTest - end
 16:33:57.448 [h] DEBUG com.wsx.test.ThreadTest - begin
 16:33:57.449 [h] DEBUG com.wsx.test.ThreadTest - end
 16:33:57.449 [a] DEBUG com.wsx.test.ThreadTest - end
 
 | 
newSingleThreadExecutor
 1个核心线程,0个救急线程,使用无界队列,于是任务可以无数个
| 12
 3
 4
 5
 6
 
 | public static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegatedExecutorService
 (new ThreadPoolExecutor(1, 1,
 0L, TimeUnit.MILLISECONDS,
 new LinkedBlockingQueue<Runnable>()));
 }
 
 | 
 1个线程的线程池能叫池吗?我干嘛不自己用? 实际上我们自己创建的话如果碰到一些错误的任务,可能线程就退出了,这里不好处理,但是线程池在该线程退出以后会帮我们重新创建线程
 FinalizableDelegatedExecutorService 是一个装饰者模式,只暴露部分接口,避免后期被修改容量
TimerTask
 这个不重要,他很差,他是串行执行的,如果前面的太耗时会导致后面的被推迟,如果前面发生异常,后面的不会执行
ScheduledExecutorService
 用法和TimerTask很像,但是他不会出现上面的异常影响后续任务的情况
ScheduledExecutorService.scheduleAtFixedTate()
 在初始延迟以后,能够在单位时间内被反复执行
ScheduledExecutorService.scheduleWithFixedDelay()
 在初始延迟以后,反复执行的两个任务之间隔固定时间
函数
submit
 用future来返回,future.get();
invokeAll(tasks)
 提交tasks中的所有任务
invokeAll(tasks,timeout,unit)
 带一个超时时间
invokeAny
 谁最先执行完就返回谁,其他的就不管了
shutdown
 无阻塞,不会接受新提交的任务,但已提交的任务后执行完。
shutdownNow
 打断所有的线程,并返回队列中的任务,
isShutdown
 只要不是running, 就返回true
isTerminated
 只有TREMINATED返回真
awaitTermination
 就一直等,等到超时或者线程结束
正确处理异常
 如果执行过程中没有异常,future.get()正常返回,如果出现异常,future.get()会抛出异常
Fork/Join
 fork能创建新的线程来执行,join会阻塞,这就实现了并行,下面是100的阶乘模998244353
| 12
 3
 4
 5
 6
 7
 8
 9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 
 | Logger logger = LoggerFactory.getLogger(RecursiveTaskTest.class);
 @Test
 public void test() {
 class Task extends RecursiveTask<Integer> {
 private int begin;
 private int end;
 private int mod;
 
 Task(int begin, int end, int mod) {
 this.begin = begin;
 this.end = end;
 this.mod = mod;
 }
 
 @Override
 protected Integer compute() {
 if (begin == end) return begin;
 int mid = begin + end >> 1;
 Task task1 = new Task(begin, mid, mod);
 Task task2 = new Task(mid + 1, end, mod);
 task1.fork();
 task2.fork();
 return Math.toIntExact(1L * task1.join() * task2.join() % mod);
 }
 }
 
 ForkJoinPool forkJoinPool = new ForkJoinPool(3);
 logger.debug("{}", forkJoinPool.invoke(new Task(1, 100, 998244353)));
 
 }
 
 |