美文网首页读书
面试必须要掌握的内容:多线程与Spring容器事务机制

面试必须要掌握的内容:多线程与Spring容器事务机制

作者: 南门屋 | 来源:发表于2022-06-23 20:56 被阅读0次
image.png

Spring 负责所有底层事务管理细节,并为不同的事务 API 提供一致的编程模型,但有多少人真正了解它在多线程环境中的行为方式?是否可以在多个线程中打开事务并写入数据?

让我们退一步思考一下EntityManager。

EntityManager的工作是与一个会话或被它管理的对象的缓存一起进行的。这意味着它有一个状态,而在几个线程之间共享状态会导致竞争条件;

所以,第一条规则是每个线程使用一个EntityManager。

事实上,Spring负责保持每个线程的事务性上下文。

假设我们想并行处理一个对象列表并将其存储在数据库中。我们想把这些对象分成专门的小块,并把每个小块传递给一个单独线程中的处理方法。然后,每个线程中处理的结果应该被收集起来并呈现给用户。

我将从定义一个负责处理的服务接口开始:

<font><i>/**

  • Service interface defining the contract for object identifiers processing
    /</i></font><font><b>public</b> <b>interface</b> ProcessingService { </font><font><i>/*
    • Processes the list of objects identified by id and returns a an identifiers
    • list of the successfully processed objects
    • @param objectIds List of object identifiers
    • @return identifiers list of the successfully processed objects
      */</i></font><font>
      List<Integer> processObjects(List objectIds);
      }</font>

这项服务的默认实现是基于数据库存储的。然而,这个例子是非常简单的:

<font><i>/**

  • Service implementation for database related ids processing
    */</i></font><font>@Service(</font><font>"ProcessingDBService"</font><font>)<b>public</b> <b>class</b> ProcessingDBService implements ProcessingService {
    <b>private</b> <b>static</b> <b>final</b> Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

    @Transactional
    @Override <b>public</b> List processObjects(List objectIds) { </font><font><i>// Process and save to DB</i></font><font>

    logger.info(</font><font>"Running in thread "</font><font> + Thread.currentThread().getName() + </font><font>" with object ids "</font><font> + objectIds.toString());
    <b>return</b> objectIds.stream().collect(Collectors.toList());
    }
    }</font>

现在,我们希望有一个选项,可以在块chunk(类似队列一样数据块)和并行进程中运行这个处理。

为了保持代码的干净和与运行时环境的解耦,我们将使用装饰器模式,如下所示。

<font><i>/**

  • Service implementation for parallel chunk processing
    */</i></font><font>
    @Service
    @Primary
    @ConditionalOnProperty(prefix = </font><font>"service"</font><font>, name = </font><font>"parallel"</font><font>, havingValue = </font><font>"true"</font><font>)
    <b>public</b> <b>class</b> ProcessingServiceParallelRunDecorator implements ProcessingService {

    <b>private</b> ProcessingService delegate;

    <b>public</b> ProcessingServiceParallelRunDecorator(ProcessingService delegate) {
    <b>this</b>.delegate = delegate;
    }

    </font><font><i>/**

    • In a real scenario it should be an external configuration
      */</i></font><font>
      <b>private</b> <b>int</b> batchSize = 10;

    @Override
    <b>public</b> List<Integer> processObjects(List objectIds) { List<List<Integer>> chuncks = getBatches(objectIds, batchSize); List<List<Integer>> processedObjectIds = chuncks.parallelStream().map(delegate::processObjects)
    .collect(Collectors.toList());

     <b>return</b> flatList(processedObjectIds);
    

    }

    <b>private</b> List<List<Integer>> getBatches(List collection, <b>int</b> batchSize) {
    <b>return</b> IntStream.iterate(0, i -> i < collection.size(), i -> i + batchSize)
    .mapToObj(i -> collection.subList(i, Math.min(i + batchSize, collection.size())))
    .collect(Collectors.toList());
    }

    <b>private</b> List<Integer> flatList(List> listOfLists) {
    <b>return</b> listOfLists.stream().collect(ArrayList::<b>new</b>, List::addAll, List::addAll);
    }
    </font>

实际的调用被委托给一个处理服务的实现,但Decorator负责跨线程的工作分配和收集结果。

装饰器模式的实现使客户端代码不知道实际的实现。可以直接注入单线程版本,也可以注入多线程版本,而无需直接改变客户端代码。

为了理解客户端代码可能是什么样子的,让我们看一下一个简单的单元测试。在此我向大家推荐一个架构学习交流圈。交流学习指导伪鑫:946009740(里面有大量的面试题及答案)里面会分享一些资深架构师录制的视频录像:有Spring,MyBatis,Netty源码分析,高并发、高性能、分布式、微服务架构的原理,JVM性能优化、分布式架构等这些成为架构师必备的知识体系。还能领取免费的学习资源,目前受益良多

@RunWith( SpringJUnit4ClassRunner.<b>class</b> )
@SpringBootTest(properties = { <font>"service.parallel=false"</font><font> })<b>public</b> <b>class</b> ProcessingServiceTest {

@Autowired
ProcessingService processingService;

ProcessingService processingServiceDecorator;

@Test <b>public</b> <b>void</b> shouldRunParallelProcessingUsingDecorator() {
processingServiceDecorator = <b>new</b> ProcessingServiceParallelRunDecorator(processingService);
List objectIds = Arrays.asList(<b>new</b> Integer[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12});

   List resultList = processingServiceDecorator.processObjects(objectIds);

   Assert.assertEquals(objectIds, resultList);

}

}</font>

该代码传递了一个objectIds的列表,并运行测试中明确创建的Decorator服务。

预计由于内部配置的chunk大小为10,两个线程将处理数据。通过检查日志,我们可以看到以下几行。

ProcessingDBService: Running in thread ForkJoinPool.commonPool-worker-3 with object ids [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]ProcessingDBService: Running in thread main with object ids [11, 12]

在正好两个线程中,并行流使用主线程进行处理,第二个线程在它们之间分配工作。

这种处理的一个重要方面是事务处理。前10个元素在一个事务中被处理,而最后2个元素在另一个事务中被处理。

如果你看一下ProcessingDBService,你会看到公共方法被注解了@Transactional注解。这就是Spring预计的工作方式:它负责在专用的ThreadLocal对象中为每个线程保存事务上下文,不支持在一个事务中运行多个线程。

本文没有涉及错误处理,但会在以后的文章中加以阐述。还有一点需要注意的是,Decorator类中的依赖注入是基于构造器注入的。为了被Spring容器所管理,你可能需要在构造器中使用@Qualifier。

相关文章

网友评论

    本文标题:面试必须要掌握的内容:多线程与Spring容器事务机制

    本文链接:https://www.haomeiwen.com/subject/dmozprtx.html