美文网首页程序员Java
Java 虚拟线程简介

Java 虚拟线程简介

作者: AlienPaul | 来源:发表于2024-09-18 15:01 被阅读0次

前言

Java传统的线程和操作系统中的线程是一一对应关系,意味着创建一个Java线程的同时会创建出一个操作系统线程。这样会带来如下问题:

  • 操作系统线程的创建代价很高,需要分配堆栈和消耗大量时间。
  • 操作系统线程的上下文切换需要大量的CPU操作,代价高昂。
  • 创建大量的系统线程对操作系统的压力极大,会影响系统的响应速度和稳定性。
  • 在传统Java线程中大量执行阻塞操作会严重浪费系统线程。CPU多核心性能无法充分利用。不得不使用异步非阻塞编程。异步编程写法远比同步复杂。
  • 为了避免创建和销毁大量线程,必须使用线程池化技术。

从Java 19开始引入,在Java 21 GA的虚拟线程解决了传统线程代价过大的问题,能够创建出很多的虚拟线程而无需担心资源占用。用户可以像使用传统线程一样去使用虚拟线程(用户无感知)。

虚拟线程和传统线程最大的区别是:操作系统线程和Java传统线程是一一对应的关系,但操作系统线程和虚拟线程是一对多的关系。

虚拟线程介绍

虚拟线程并非真正的线程。虚拟线程也是基于Thread实现,使用的方式和行为于传统Java线程完全一样。但是虚拟线程在执行的时候离不开系统线程。执行虚拟线程的系统线程称之为Carrier载体线程。

虚拟线程有如下概念:

  • Carrier(载体)线程:在platform(平台/系统)运行的线程,JVM把n个虚拟线程映射为m个carrier线程。
  • Mount(挂载)和Unmount(卸载):把虚拟线程切换到carrier线程运行称为mount。虚拟线程停止执行称之为unmount。

虚拟线程的挂载和卸载操作由JVM内部的调度器实现,用户无需直接干涉。

虚拟线程在开始运行的时候,会被临时挂载到载体线程上。如果遇到下面的情况之一,会被卸载并让出载体线程:

  • 文件/网络IO阻塞
  • 使用Concurrent库引起等待
  • Thread.sleep()

卸载之后载体线程可以用来执行其他的虚拟线程。

载体线程使用池化方式管理,线程池为ForkJoinPool。源代码参见VirtualThread::createDefaultScheduler

ForkJoinPool的配置有如下3个系统参数:

  • jdk.virtualThreadScheduler.parallelism:并行度,决定WorkQueue大小。默认是Runtime.availableProcessors(CPU数量)。
  • jdk.virtualThreadScheduler.maxPoolSize:线程池最大线程数。
  • jdk.virtualThreadScheduler.minRunnable:至少可运行的线程数。

使用方式

  1. 使用类似于传统线程new Thread()的方式,使用Thread类的ofVirtual()方法,创建出一个虚拟线程。
private static Thread startVirtualThread(String name, Runnable runnable) {
    return Thread.ofVirtual().name(name).start(runnable);
}
  1. 使用类似于线程池的方式。通过Executors来创建虚拟线程。每提交一个任务就创建出一个虚拟线程。
try (var executorService = Executors.newVirtualThreadPerTaskExecutor()) {
    executorService.submit(...);
}
  1. 使用指定的线程工厂类创建虚拟线程。
final ThreadFactory factory = Thread.ofVirtual().name("virtualThread-", 0).factory();
try (var executor = Executors.newThreadPerTaskExecutor(factory)) {
    // ...
}

打印Virtual Thread及其挂载的载体线程名称,可以使用System.out.println(Thread.currentThread())方式。

通过Thread.currentThread().isVirtual()来判断当前代码是否在虚拟线程中运行。

使用建议

  • 虚拟线程适用于大量阻塞IO的场景。不适用于CPU密集型负载。虚拟线程并非执行更快的线程。
  • 在虚拟线程中编写同步阻塞请求代码。使用每次请求创建一个虚拟线程的方式。
  • 为每个并发任务创建一个虚拟线程,不要在虚拟线程上使用线程池。
  • 使用Semaphore限制并发度。
  • 不要在虚拟线程中缓存大量可重用对象。
  • 避免虚拟线程频繁和长时间挂起(pinned)的情况。

官网对挂起的情况有说明:

A virtual thread cannot be unmounted during blocking operations when it is pinned to its carrier. A virtual thread is pinned in the following situations:

  • The virtual thread runs code inside a synchronized block or method
  • The virtual thread runs a native method or a foreign function (see Foreign Function and Memory API)

可以使用-Djdk.tracePinnedThreads=full/short跟踪pinned的虚拟线程。

接下来举个例子。该例子中的SynchronizedWorkloadwork方法使用synchronized关键字。会造成虚拟线程长时间pin在载体线程上。ReentrantLockWorkload使用ReentrantLock不会有这个问题。

package org.example;

import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.locks.ReentrantLock;

public class VirtualThreadDemo {
    public static void main(String[] args) {
//      这里将载体线程数限制为1,方便演示
        System.setProperty("jdk.virtualThreadScheduler.parallelism", "1");
        System.setProperty("jdk.virtualThreadScheduler.maxPoolSize", "1");
        System.setProperty("jdk.virtualThreadScheduler.minRunnable", "1");
        System.setProperty("jdk.tracePinnedThreads", "full");
        var lockWorkload = new ReentrantLockWorkload();
//        var lockWorkload = new SynchronizedWorkload();
        var workload = new Workload();
        ThreadFactory threadFactory = Thread.ofVirtual().name("workload", 0).factory();
        try (var executorService = Executors.newThreadPerTaskExecutor(threadFactory)) {
            executorService.submit(lockWorkload::work);
            executorService.submit(workload::work);
        }
    }

    static class Workload {
        public void work() {
            try {
                System.out.println("Workload Started");
                System.out.println(Thread.currentThread());
                Thread.sleep(1000);
                System.out.println("Workload Finished");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    static class SynchronizedWorkload {
        public synchronized void work() {
            try {
                System.out.println("SynchronizedWorkload Started");
                System.out.println(Thread.currentThread());
                Thread.sleep(1000);
                System.out.println("SynchronizedWorkload Finished");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    static class ReentrantLockWorkload {
        private static final ReentrantLock lock = new ReentrantLock();

        public void work() {
            try {
                lock.lock();
                System.out.println("ReentrantLockWorkload Started");
                System.out.println(Thread.currentThread());
                Thread.sleep(1000);
                System.out.println("ReentrantLockWorkload Finished");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    }
}

例子中lockWorkloadnew SynchronizedWorkload()的时候,执行输出如下:

SynchronizedWorkload Started
VirtualThread[#22,workload0]/runnable@ForkJoinPool-1-worker-1
Thread[#23,ForkJoinPool-1-worker-1,5,CarrierThreads]
    java.base/java.lang.VirtualThread$VThreadContinuation.onPinned(VirtualThread.java:183)
    java.base/jdk.internal.vm.Continuation.onPinned0(Continuation.java:393)
    java.base/java.lang.VirtualThread.parkNanos(VirtualThread.java:621)
    java.base/java.lang.VirtualThread.sleepNanos(VirtualThread.java:793)
    java.base/java.lang.Thread.sleep(Thread.java:507)
    org.example.VirtualThreadDemo$SynchronizedWorkload.work(VirtualThreadDemo2.java:42) <== monitors:1
    java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
    java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
    java.base/java.lang.VirtualThread.run(VirtualThread.java:309)
SynchronizedWorkload Finished
Workload Started
VirtualThread[#24,workload1]/runnable@ForkJoinPool-1-worker-1
Workload Finished

我们发现SynchronizedWorkloadWorkload实际上时串行执行。当虚拟线程执行进入synchronized代码块的时候会pin到载体线程上,无法卸载。即便是代码块中有sleep或者阻塞IO也不会卸载。在上面的执行结果中还能够看到JVM跟踪到了pinned的虚拟线程日志记录。

修改一下代码,例子中lockWorkloadnew ReentrantLockWorkload()的时候,执行输出如下:

ReentrantLockWorkload Started
VirtualThread[#22,workload0]/runnable@ForkJoinPool-1-worker-1
Workload Started
VirtualThread[#24,workload1]/runnable@ForkJoinPool-1-worker-1
ReentrantLockWorkload Finished
Workload Finished

这是我们期待的结果,ReentrantLockWorkloadWorkload可以并行执行。当ReentrantLockWorkload加锁,进入sleep状态的时候仍然可以卸载,让出载体线程,从而Workload才有能够有机会执行。

参考文献

https://docs.oracle.com/en/java/javase/21/core/virtual-threads.html

https://blog.moyucoding.com/jvm/2023/09/23/ultimate-guide-to-java-virtual-thread

相关文章

网友评论

    本文标题:Java 虚拟线程简介

    本文链接:https://www.haomeiwen.com/subject/iqvnljtx.html