page contents

Java多线程批量操作教程,居然有人不做事务控制

本文讲述了Java 多线程批量操作教程,居然有人不做事务控制!具有很好的参考价值,希望对大家有所帮助。一起跟随六星小编过来看看吧,具体如下:

attachments-2023-08-YDFu5YTX64cb0413537b7.png本文讲述了Java 多线程批量操作教程,居然有人不做事务控制!具有很好的参考价值,希望对大家有所帮助。一起跟随六星小编过来看看吧,具体如下:

项目概况如下:

项目代码基于:MySQL 数据

开发框架为:SpringBoot、Mybatis

开发语言为:Java8

项目代码:

https://gitee.com/john273766764/springboot-mybatis-threads

公司业务中遇到一个需求,需要同时修改最多约 5 万条数据,而且还不支持批量或异步修改操作。于是只能写个 for 循环操作,但操作耗时太长,只能一步一步寻找其他解决方案。具体操作如下。

循环操作的代码

先写一个最简单的 for 循环代码,看看耗时情况怎么样:

/***

 * 一条一条依次对50000条数据进行更新操作

 * 耗时:2m27s,1m54s

 */

@Test

void updateStudent() {

    List<Student> allStudents = studentMapper.getAll();

    allStudents.forEach(s -> {

        //更新教师信息

        String teacher = s.getTeacher();

        String newTeacher = "TNO_" + new Random().nextInt(100);

        s.setTeacher(newTeacher);

        studentMapper.update(s);

    });

}

循环修改整体耗时约 1 分 54 秒,且代码中没有手动事务控制应该是自动事务提交,所以每次操作事务都会提交所以操作比较慢,我们先对代码中添加手动事务控制,看查询效率怎样。

使用手动事务的操作代码

修改后的代码如下:

@Autowired

private DataSourceTransactionManager dataSourceTransactionManager;

@Autowired

private TransactionDefinition transactionDefinition;

/**

 * 由于希望更新操作 一次性完成,需要手动控制添加事务

 * 耗时:24s

 * 从测试结果可以看出,添加事务后插入数据的效率有明显的提升

 */

@Test

void updateStudentWithTrans() {

    List<Student> allStudents = studentMapper.getAll();

    TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);

    try {

        allStudents.forEach(s -> {

            //更新教师信息

            String teacher = s.getTeacher();

            String newTeacher = "TNO_" + new Random().nextInt(100);

            s.setTeacher(newTeacher);

            studentMapper.update(s);

        });

        dataSourceTransactionManager.commit(transactionStatus);

    } catch (Throwable e) {

        dataSourceTransactionManager.rollback(transactionStatus);

        throw e;

    }

}

添加手动事务操控制后,整体耗时约 24 秒,这相对于自动事务提交的代码,快了约 5 倍,对于大量循环数据库提交操作,添加手动事务可以有效提高操作效率。

尝试多线程进行数据修改

添加数据库手动事务后操作效率有明细提高,但还是比较长,接下来尝试多线程提交看是不是能够再快一些。

先添加一个 Service 将批量修改操作整合一下,具体代码如下:

StudentServiceImpl.java:

@Service

public class StudentServiceImpl implements StudentService {

    @Autowired

    private StudentMapper studentMapper;

    @Autowired

    private DataSourceTransactionManager dataSourceTransactionManager;

    @Autowired

    private TransactionDefinition transactionDefinition;

    @Override

    public void updateStudents(List<Student> students, CountDownLatch threadLatch) {

        TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);

        System.out.println("子线程:" + Thread.currentThread().getName());

        try {

            students.forEach(s -> {

                // 更新教师信息

                // String teacher = s.getTeacher();

                String newTeacher = "TNO_" + new Random().nextInt(100);

                s.setTeacher(newTeacher);

                studentMapper.update(s);

            });

            dataSourceTransactionManager.commit(transactionStatus);

            threadLatch.countDown();

        } catch (Throwable e) {

            e.printStackTrace();

            dataSourceTransactionManager.rollback(transactionStatus);

        }

    }

}

批量测试代码,我们采用了多线程进行提交,修改后测试代码如下:

@Autowired

private DataSourceTransactionManager dataSourceTransactionManager;

@Autowired

private TransactionDefinition transactionDefinition;

@Autowired

private StudentService studentService;

/**

 * 对用户而言,27s 任是一个较长的时间,我们尝试用多线程的方式来经行修改操作看能否加快处理速度

 * 预计创建10个线程,每个线程进行5000条数据修改操作

 * 耗时统计

 * 1 线程数:1      耗时:25s

 * 2 线程数:2      耗时:14s

 * 3 线程数:5      耗时:15s

 * 4 线程数:10     耗时:15s

 * 5 线程数:100    耗时:15s

 * 6 线程数:200    耗时:15s

 * 7 线程数:500    耗时:17s

 * 8 线程数:1000    耗时:19s

 * 8 线程数:2000    耗时:23s

 * 8 线程数:5000    耗时:29s

 */

@Test

void updateStudentWithThreads() {

    //查询总数据

    List<Student> allStudents = studentMapper.getAll();

    // 线程数量

    final Integer threadCount = 100;

    //每个线程处理的数据量

    final Integer dataPartionLength = (allStudents.size() + threadCount - 1) / threadCount;

    // 创建多线程处理任务

    ExecutorService studentThreadPool = Executors.newFixedThreadPool(threadCount);

    CountDownLatch threadLatchs = new CountDownLatch(threadCount);

    for (int i = 0; i < threadCount; i++) {

        // 每个线程处理的数据

        List<Student> threadDatas = allStudents.stream()

                .skip(i * dataPartionLength).limit(dataPartionLength).collect(Collectors.toList());

        studentThreadPool.execute(() -> {

            studentService.updateStudents(threadDatas, threadLatchs);

        });

    }

    try {

        // 倒计时锁设置超时时间 30s

        threadLatchs.await(30, TimeUnit.SECONDS);

    } catch (Throwable e) {

        e.printStackTrace();

    }

    System.out.println("主线程完成");

}

多线程提交修改时,我们尝试了不同线程数对提交速度的影响,具体可以看下面表格,多线程修改 50000 条数据时,不同线程数耗时对比(秒)。

attachments-2023-08-WIgNp3IW64cb03ca5f8e8.png

根据表格,我们线程数增大提交速度并非一直增大,在当前情况下约在 2-5 个线程数时,提交速度最快(实际线程数还是需要根据服务器配置实际测试)。

基于两个 CountDownLatch 控制多线程事务提交

由于多线程提交时,每个线程事务时单独的,无法保证一致性,我们尝试给多线程添加事务控制,来保证每个线程都是在插入数据完成后在提交事务。

这里我们使用两个 CountDownLatch 来控制主线程与子线程事务提交,并设置了超时时间为 30 秒。

我们对代码进行了一点修改:

@Override

public void updateStudentsThread(List<Student> students, CountDownLatch threadLatch, CountDownLatch mainLatch, StudentTaskError taskStatus) {

    TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);

    System.out.println("子线程:" + Thread.currentThread().getName());

    try {

        students.forEach(s -> {

            // 更新教师信息

            // String teacher = s.getTeacher();

            String newTeacher = "TNO_" + new Random().nextInt(100);

            s.setTeacher(newTeacher);

            studentMapper.update(s);

        });

    } catch (Throwable e) {

        taskStatus.setIsError();

    } finally {

        threadLatch.countDown(); // 切换到主线程执行

    }

    try {

        mainLatch.await();  //等待主线程执行

    } catch (Throwable e) {

        taskStatus.setIsError();

    }

    // 判断是否有错误,如有错误 就回滚事务

    if (taskStatus.getIsError()) {

        dataSourceTransactionManager.rollback(transactionStatus);

    } else {

        dataSourceTransactionManager.commit(transactionStatus);

    }

}

/**

 * 由于每个线程都是单独的事务,需要添加对线程事务的统一控制

 * 我们这边使用两个 CountDownLatch 对子线程的事务进行控制

 */

@Test

void updateStudentWithThreadsAndTrans() {

    //查询总数据

    List<Student> allStudents = studentMapper.getAll();

    // 线程数量

    final Integer threadCount = 4;


    //每个线程处理的数据量

    final Integer dataPartionLength = (allStudents.size() + threadCount - 1) / threadCount;


    // 创建多线程处理任务

    ExecutorService studentThreadPool = Executors.newFixedThreadPool(threadCount);

    CountDownLatch threadLatchs = new CountDownLatch(threadCount); // 用于计算子线程提交数量

    CountDownLatch mainLatch = new CountDownLatch(1); // 用于判断主线程是否提交

    StudentTaskError taskStatus = new StudentTaskError(); // 用于判断子线程任务是否有错误


    for (int i = 0; i < threadCount; i++) {

        // 每个线程处理的数据

        List<Student> threadDatas = allStudents.stream()

                .skip(i * dataPartionLength).limit(dataPartionLength)

                .collect(Collectors.toList());

        studentThreadPool.execute(() -> {

            studentService.updateStudentsThread(threadDatas, threadLatchs, mainLatch, taskStatus);

        });

    }

    try {

        // 倒计时锁设置超时时间 30s

        boolean await = threadLatchs.await(30, TimeUnit.SECONDS);

        if (!await) { // 等待超时,事务回滚

            taskStatus.setIsError();

        }

    } catch (Throwable e) {

        e.printStackTrace();

        taskStatus.setIsError();

    }

    mainLatch.countDown(); // 切换到子线程执行

    studentThreadPool.shutdown(); //关闭线程池

    System.out.println("主线程完成");

}

本想再次测试一下不同线程数对执行效率的影响时,发现当线程数超过 10 个时,执行时就报错。

具体错误内容如下:

Exception in thread "pool-1-thread-2" org.springframework.transaction.CannotCreateTransactionException: Could not open JDBC Connection for transaction; nested exception is java.sql.SQLTransientConnectionException: HikariPool-1 - Connection is not available, request timed out after 30055ms.

 at org.springframework.jdbc.datasource.DataSourceTransactionManager.doBegin(DataSourceTransactionManager.java:309)

 at org.springframework.transaction.support.AbstractPlatformTransactionManager.startTransaction(AbstractPlatformTransactionManager.java:400)

 at org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:373)

 at com.example.springbootmybatis.service.Impl.StudentServiceImpl.updateStudentsThread(StudentServiceImpl.java:58)

 at com.example.springbootmybatis.StudentTest.lambda$updateStudentWithThreadsAndTrans$3(StudentTest.java:164)

 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

 at java.lang.Thread.run(Thread.java:748)

Caused by: java.sql.SQLTransientConnectionException: HikariPool-1 - Connection is not available, request timed out after 30055ms.

 at com.zaxxer.hikari.pool.HikariPool.createTimeoutException(HikariPool.java:696)

 at com.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:197)

 at com.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:162)

 at com.zaxxer.hikari.HikariDataSource.getConnection(HikariDataSource.java:128)

 at org.springframework.jdbc.datasource.DataSourceTransactionManager.doBegin(DataSourceTransactionManager.java:265)

 ... 7 more

错误的大致意思时,不能为数据库事务打开 jdbc Connection,连接在 30s 的时候超时了。

由于前面启动的十个线程需要等待主线程完成后才能提交,所以一直占用连接未释放,造成后面的进程创建连接超时。

看错误日志中错误的来源是 HikariPool ,我们来重新配置一下这个连接池的参数,将最大连接数修改为 100。

具体配置如下:

# 连接池中允许的最小连接数。缺省值:10

spring.datasource.hikari.minimum-idle=10

# 连接池中允许的最大连接数。缺省值:10

spring.datasource.hikari.maximum-pool-size=100

# 自动提交

spring.datasource.hikari.auto-commit=true

# 一个连接idle状态的最大时长(毫秒),超时则被释放(retired),缺省:10分钟

spring.datasource.hikari.idle-timeout=30000

# 一个连接的生命时长(毫秒),超时而且没被使用则被释放(retired),缺省:30分钟,建议设置比数据库超时时长少30秒

spring.datasource.hikari.max-lifetime=1800000

# 等待连接池分配连接的最大时长(毫秒),超过这个时长还没可用的连接则发生SQLException, 缺省:30秒

再次执行测试发现没有报错,修改线程数为 20 又执行了一下,同样执行成功了。

基于 TransactionStatus 集合来控制多线程事务提交

在同事推荐下我们使用事务集合来进行多线程事务控制,主要代码如下:

@Service

public class StudentsTransactionThread {


    @Autowired

    private StudentMapper studentMapper;

    @Autowired

    private StudentService studentService;

    @Autowired

    private PlatformTransactionManager transactionManager;


    List<TransactionStatus> transactionStatuses = Collections.synchronizedList(new ArrayList<TransactionStatus>());


    @Transactional(propagation = Propagation.REQUIRED, rollbackFor = {Exception.class})

    public void updateStudentWithThreadsAndTrans() throws InterruptedException {


        //查询总数据

        List<Student> allStudents = studentMapper.getAll();


        // 线程数量

        final Integer threadCount = 2;


        //每个线程处理的数据量

        final Integer dataPartionLength = (allStudents.size() + threadCount - 1) / threadCount;


        // 创建多线程处理任务

        ExecutorService studentThreadPool = Executors.newFixedThreadPool(threadCount);

        CountDownLatch threadLatchs = new CountDownLatch(threadCount);

        AtomicBoolean isError = new AtomicBoolean(false);

        try {

            for (int i = 0; i < threadCount; i++) {

                // 每个线程处理的数据

                List<Student> threadDatas = allStudents.stream()

                        .skip(i * dataPartionLength).limit(dataPartionLength).collect(Collectors.toList());

                studentThreadPool.execute(() -> {

                    try {

                        try {

                            studentService.updateStudentsTransaction(transactionManager, transactionStatuses, threadDatas);

                        } catch (Throwable e) {

                            e.printStackTrace();

                            isError.set(true);

                        }finally {

                            threadLatchs.countDown();

                        }

                    } catch (Exception e) {

                        e.printStackTrace();

                        isError.set(true);

                    }

                });

            }


            // 倒计时锁设置超时时间 30s

            boolean await = threadLatchs.await(30, TimeUnit.SECONDS);

            // 判断是否超时

            if (!await) {

                isError.set(true);

            }

        } catch (Throwable e) {

            e.printStackTrace();

            isError.set(true);

        }


        if (!transactionStatuses.isEmpty()) {

            if (isError.get()) {

                transactionStatuses.forEach(s -> transactionManager.rollback(s));

            } else {

                transactionStatuses.forEach(s -> transactionManager.commit(s));

            }

        }


        System.out.println("主线程完成");

    }

}

@Override

@Transactional(propagation = Propagation.REQUIRED, rollbackFor = {Exception.class})

public void updateStudentsTransaction(PlatformTransactionManager transactionManager, List<TransactionStatus> transactionStatuses, List<Student> students) {

    // 使用这种方式将事务状态都放在同一个事务里面

    DefaultTransactionDefinition def = new DefaultTransactionDefinition();

    def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW); // 事物隔离级别,开启新事务,这样会比较安全些。

    TransactionStatus status = transactionManager.getTransaction(def); // 获得事务状态

    transactionStatuses.add(status);


    students.forEach(s -> {

        // 更新教师信息

        // String teacher = s.getTeacher();

        String newTeacher = "TNO_" + new Random().nextInt(100);

        s.setTeacher(newTeacher);

        studentMapper.update(s);

    });

    System.out.println("子线程:" + Thread.currentThread().getName());

}

由于这个中方式去前面方式相同,需要等待线程执行完成后才会提交事务,所有任会占用 Jdbc 连接池,如果线程数量超过连接池最大数量会产生连接超时。所以在使用过程中任要控制线程数量。

使用 union 连接多个 select 实现批量 update

有些情况写不支持,批量 update,但支持 insert 多条数据,这个时候可尝试将需要更新的数据拼接成多条 select 语句,然后使用 union 连接起来,再使用 update 关联这个数据进行 update。

具体代码演示如下:

update student,(

 (select  1 as id,'teacher_A' as teacher) union

 (select  2 as id,'teacher_A' as teacher) union

 (select  3 as id,'teacher_A' as teacher) union

 (select  4 as id,'teacher_A' as teacher)

    /* ....more data ... */

    ) as new_teacher

set

 student.teacher=new_teacher.teacher

where

 student.id=new_teacher.id

这种方式在 MySQL 数据库没有配置 allowMultiQueries=true 也可以实现批量更新。

总结

如下:

对于大批量数据库操作,使用手动事务提交可以很多程度上提高操作效率

多线程对数据库进行操作时,并非线程数越多操作时间越快,按上述示例大约在 2-5 个线程时操作时间最快。

对于多线程阻塞事务提交时,线程数量不能过多

如果能有办法实现批量更新那是最好。

更多相关技术内容咨询欢迎前往并持续关注六星社区了解详情。

想高效系统的学习Java编程语言,推荐大家关注一个微信公众号:Java圈子。每天分享行业资讯、技术干货供大家阅读,关注即可免费领取整套Java入门到进阶的学习资料以及教程,感兴趣的小伙伴赶紧行动起来吧。

attachments-2023-03-2AoKIjPQ64014b4ad30a3.jpg

  • 发表于 2023-08-03 09:34
  • 阅读 ( 219 )
  • 分类:Java开发

你可能感兴趣的文章

相关问题

0 条评论

请先 登录 后评论
轩辕小不懂
轩辕小不懂

2403 篇文章

作家榜 »

  1. 轩辕小不懂 2403 文章
  2. 小柒 1658 文章
  3. Pack 1135 文章
  4. Nen 576 文章
  5. 王昭君 209 文章
  6. 文双 71 文章
  7. 小威 64 文章
  8. Cara 36 文章