当前位置:首页 > CN2资讯 > 正文内容

Java并发(线程同步、死锁、多线程编排)

1天前CN2资讯

线程同步的方式有哪些?

线程同步

线程同步,是多线程编程中的一种机制,用于协调多个线程的执行顺序,确保它们在共享资源或关键操作上按照预定的规则运行,避免因并发访问导致的数据不一致、竞态条件(Race Condition)等问题。

线程同步的方式有哪些?

  • synchronized 关键字,通过 JVM 内置的锁机制实现线程同步,确保同一时刻只有一个线程访问共享资源。既可以修饰实例方法也可以修饰静态方法,也可以锁代码块和锁住某个具体的实例对象
  • public synchronized void method() { ... } // 实例方法锁(this) public static synchronized void method() { ... } // 类方法锁(Class 对象) // 锁实例对象 synchronized (lockObject) { // 同步代码块 }
  • ReentrantLock 基于 java.util.concurrent.locks.Lock 接口实现的可重入互斥锁,需显式调用 lock() 和 unlock()进行加锁和解锁。
    支持公平锁、可中断锁、超时锁以及多条件变量(Condition),相比 synchronized 提供了更高的灵活性。
  • ReentrantLock lock = new ReentrantLock(); lock.lock(); try { // 同步代码 } finally { lock.unlock(); }
  • Semaphore(信号量),允许多个线程同时访问资源,但是限制访问线程的数量。
  • Semaphore semaphore = new Semaphore(3); // 初始许可数为3 semaphore.acquire(); // 获取许可 try { // 同步代码 } finally { semaphore.release(); // 释放许可 }
  • CountDownLatch,允许多个线程等待其他线程执行完毕之后再执行,用于线程间的协作。
  • public class LatchDemo { public static void main(String[] args) throws InterruptedException { int threadCount = 3; CountDownLatch latch = new CountDownLatch(threadCount); for (int i = 1; i <= threadCount; i++) { new Thread(() -> { System.out.println(Thread.currentThread().getName() + " 初始化完成"); latch.countDown(); // 子线程完成,计数器减 1 }, "线程-" + i).start(); } latch.await(); // 主线程等待所有子线程完成 System.out.println("所有子线程完成,主线程继续执行"); } }
  • CyclicBarrier,多个线程互相等待,所有线程都到到屏障点后,再继续执行。线程计数器可重置。
  • import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class CyclicBarrierExample { // 总和变量(线程安全) private static int sum = 0; // 线程池 private static final ExecutorService executor = Executors.newFixedThreadPool(5); public static void main(String[] args) { // 定义需要等待的线程数量(5个) CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> { // 所有线程到达屏障后执行的回调(汇总结果) System.out.println("所有线程已完成计算,总和为: " + sum); }); // 启动5个线程 for (int i = 0; i < 5; i++) { executor.execute(() -> { try { // 模拟线程计算 int value = (int) (Math.random() * 100); System.out.println(Thread.currentThread().getName() + " 计算值: " + value); // 将计算结果累加到总和中 sum += value; // 调用await()等待其他线程到达屏障 cyclicBarrier.await(); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } }); } // 关闭线程池 executor.shutdown(); } }
  • Phaser,和CyclicBarrier类似,但是支持更灵活的屏障操作,适用于复杂多阶段任务,支持动态注册/注销参与者,并可控制各参与者的阶段进度,并可以控制各个参与者的到达和离开。
  • Phaser phaser = new Phaser(1); // 初始参与线程 phaser.bulkRegister(3); // 动态注册3个工作线程 for (int i = 0; i < 3; i++) { new Thread(() -> { for (int phase = 0; phase < 2; phase++) { phaser.arriveAndAwaitAdvance(); // 阶段1:等待所有线程完成阶段1 // 执行阶段2任务 } phaser.arriveAndDeregister(); // 完成并注销 }).start(); } // 主线程等待所有线程完成 phaser.arriveAndAwaitAdvance();
  • 其他,另外还有volatile,这种能保证可见性和有序性,但不能保证原子性的关键字。以及基于CAS实现的Atomic 类(无锁同步),但是仅适用于简单数据类型和部分操作(如 getAndAdd)。
  • 死锁

    死锁,通常是指在两个或多个进程(或线程、事务)在执行过程中,因争夺资源而陷入相互等待的状态,导致所有进程都无法继续执行。

    什么情况下会产生死锁?

    产生死锁的四个必要条件

    • 互斥(Mutual Exclusion),一个资源只能被一个进程占用,其他进程必须等待其释放。
    • 占有并等待(Hold and Wait),进程在持有资源的同时,申请新的资源。
    • 不可剥夺(No Preemption),资源只能由持有它的进程主动释放,不能被强制剥夺。
    • 循环等待(Circular Wait),存在一个进程环,每个进程都在等待下一个进程所持有的资源。

    如何解决死锁?

    上面我们已经知道产生死锁有四个必要条件,那解决死锁,只需要破坏死锁的这些必要条件即可。
    一般从以下几方面入手即可:

    • 破坏“占有并等待”条件
  • 进程或线程一次性申请所需的所有资源,否则不分配任何资源。(可能导致资源利用率低,进程长期等待资源)
  • 进程或线程申请资源时,必须释放已持有的所有资源。(可能导致频繁的资源释放和重新申请,增加系统开销)
    • 破坏“不可剥夺”条件,允许系统强制回收资源。(可能中断进程的正常执行,导致数据不一致)
    • 破坏“循环等待”条件,要求进程或线程按顺序申请资源。保证多个进程(线程)的执行顺序相同即可避免循环等待。这是最常用的解决死锁的方法。
      例如:事务1的执行顺序是:A->B->C,事务2的执行顺序是:C->D->A,这种情况下就容易产生死锁。因为事务1占用了A,等待C,但是事务2占用了C但是等待A。因此只需要把事务2的执行顺序改成:A->D->C,这样事务2在执行时,会发现事务1占用着A呢,因此事务会先不执行,等待事务1释放A。

    死锁如何恢复?

    • 回滚进程或线程,可以执行一个或多个进程(或线程)回滚到安全状态,释放资源。一般回滚时,要遵循按优先级选择(优先级低的进程先回滚) 。按资源占用时间选择(占用时间短的进程先回滚)。
    • 终止进程或线程,直接终止全部或部分死锁进程(或线程),释放资源。(可能导致数据丢失或事务不完整)
    • 资源剥夺,从某些进程或线程中强制回收资源分配给其他进程。
    • 超时机制,为进程或线程设置等待资源的超时时间,若超时则自动放弃请求并释放已占资源。

    数据库中的死锁

    在操作数据库时,如果有多个事务并发执行,也是可能发生死锁的。当事务1持有资源A的锁,但是尝试获取资源B的锁,而事务2持有资源B的锁,尝试获取资源A的锁的时候,这时候就会发生死锁的情况。

    当数据库发生死锁的时候,会报出来如下的错误:

    Error updating database. Cause: ERR-CODE: [TDDL-4614][ERR EXECUTE ON MYSQL] Deadlock found when trying to get lock; The error occurred while setting parameters### SQL: update test_table set updated=now(),type_state = ? where test_num = 123

    数据库操作中如何避免死锁?

    一般对于数据库的死锁,主要是避免发生并发更新同一资源的操作。或者可以考虑保证操作的顺序,比如多个事务都是先操作资源A、再操作资源B,这样就能有效的避免死锁。

    还有一些其他优化措施:
    减少事务持有锁的时间:尽快提交或回滚事务。
    锁粒度控制:使用行级锁而非表级锁,减少资源竞争。
    避免嵌套事务:减少循环等待的可能性。

    多线程编排

    在 Java 中,多线程的编排可以通过多种方式实现,主要涉及 线程池同步机制并发工具类 以及 任务协调工具(如 Future、CompletableFuture)等。

    CompletableFuture怎么实现多线程异步编排?

    CompletableFuture,提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以通过回调的方式处理计算结果,并且提供了转换和组合CompletableFuture的方法。
    我在【上一篇文章】提过CompletableFuture底层就是用ForkJoinPool来实现,那么CompletableFuture如何使用来实现多线程任务编排的呢?

    单个任务
    runAsync:无返回值
    CompletableFuture<Void> future = CompletableFuture.runAsync(() -> { System.out.println("无返回值任务执行中"); });
    supplyAsync:有返回值
    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { return "异步任务结果"; }); System.out.println(future.get()); // 输出: 异步任务结果
    指定线程池

    因为CompletableFuture默认底层是使用的ForkJoinPool.commonPool(),但是也是可以自定义线程池,配置线程的一些指定信息。

    ExecutorService executor = Executors.newFixedThreadPool(2); CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { return "自定义线程池任务"; }, executor);
    两个任务编排

    thenApplyAsync:能接收上一次的执行结果,还可以有返回值

    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello") .thenApplyAsync(result -> result + " World");// 运行结果 Hello World

    thenRunAsync:不能接收上一次的执行结果,并且也没返回值

    CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> { return "Hello"; }).thenRunAsync(() -> { System.out.println("after Hello World"); });
    组合多个任务编排
  • 串行组合(thenCompose,可以将前一个任务的结果传递给下一个任务(链式依赖)
  • CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello") .thenCompose(result -> CompletableFuture.supplyAsync(() -> result + " World")); System.out.println(future.get()); // 输出: Hello World
  • 并行组合(thenCombine),将两个独立任务的结果进行合并
  • CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello"); CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "World"); future1.thenCombine(future2, (result1, result2) -> result1 + " " + result2) .thenAccept(System.out::println); // 输出: Hello World
  • 多任务并行(allOf / anyOf)
    • allOf:等待所有任务完成
    CompletableFuture<Void> allFutures = CompletableFuture.allOf( CompletableFuture.runAsync(() -> System.out.println("Task 1")), CompletableFuture.runAsync(() -> System.out.println("Task 2")) ); allFutures.get(); // 等待所有任务完成
    • anyOf:任一任务完成即触发
    CompletableFuture<Object> anyFuture = CompletableFuture.anyOf( CompletableFuture.supplyAsync(() -> "Task 1"), CompletableFuture.supplyAsync(() -> "Task 2") ); System.out.println(anyFuture.get()); // 输出: Task 1 或 Task 2(取决于哪个先完成)
    任务编排异常处理
  • 捕获异常(exceptionally,在任务抛出异常时提供默认信息。
  • CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { if (Math.random() > 0.5) throw new RuntimeException("失败"); return "成功"; }).exceptionally(ex -> { System.out.println("异常处理: " + ex.getMessage()); return "默认值"; }); System.out.println(future.get());
  • 全局处理(handle / whenComplete
    • handle:处理异常并返回新结果
    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { if (Math.random() > 0.5) throw new RuntimeException("失败"); return "成功"; }).handle((result, ex) -> { if (ex != null) { System.out.println("异常处理: " + ex.getMessage()); return "默认值"; } return result; });
    • whenComplete:无论成功或失败均执行(不可中断)
    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { if (Math.random() > 0.5) throw new RuntimeException("失败"); return "成功"; }).whenComplete((result, ex) -> { if (ex != null) System.out.println("任务失败"); else System.out.println("任务成功: " + result); });
      你可能想看:

      扫描二维码推送至手机访问。

      版权声明:本文由皇冠云发布,如需转载请注明出处。

      本文链接:https://www.idchg.com/info/20340.html

      分享给朋友:

      “Java并发(线程同步、死锁、多线程编排)” 的相关文章

      HostHatch优惠活动揭秘:如何以最低价格获取优质主机服务

      当提到主机服务,HostHatch绝对是个值得信赖的品牌。作为一家成立超过十年的主机商,HostHatch专注于提供高性能的NVMe VPS和大硬盘存储型专用主机。为什么会选择HostHatch呢?除了卓越的服务和强大的基础设施外,吸引人的优惠活动也是一个重要因素。 最近,HostHatch推出了针...

      AWS注册教程:轻松创建你的AWS账户

      在当今数字化时代,云计算的广泛应用早已成为一种趋势。在这种背景下,AWS(亚马逊网络服务)以其强大的技术和丰富的服务,逐渐成为许多人选择的云平台。那么,AWS到底是什么呢?简单来说,它是一个全面的云服务平台,提供包括计算能力、存储选项、数据库、机器学习等各种服务。我一直认为,AWS之所以能够在众多云...

      如何使用DigitalOcean优惠码获取200美元免费额度

      DigitalOcean是一家备受赞誉的云计算服务提供商,专门致力于为开发者和初创公司提供一流的云基础设施解决方案。通过其简单易用的界面和高效的性能,DigitalOcean帮助用户轻松地部署和扩展应用程序。我在使用DigitalOcean时,深刻感受到了它为开发者量身定制的便捷性,毫无疑问,这使得...

      全面解读SFTP教程:安全文件传输的最佳实践

      什么是SFTP? 在计算机网络世界里,SFTP(Secure File Transfer Protocol)是一种安全的文件传输协议。它的主要用途是通过安全的方式在网络中传输数据。与传统的FTP(File Transfer Protocol)相比,SFTP引入了数据加密机制,这样一来,用户在传输文件...

      VPS Speedtest:优化虚拟专用服务器性能的必备工具和策略

      在数字时代,VPS(虚拟专用服务器)已经成为许多企业和个人建站的首选方案。为了确保VPS的性能满足需求,VPS Speedtest便显得格外重要。简单来说,VPS Speedtest就是对虚拟专用服务器的网络速度、带宽和延迟进行测试的一种方式。通过这一过程,我们不仅能了解VPS的现有性能,还能在需要...

      RFCHOST评论:高性能VPS与流媒体解锁的完美选择

      RFCHOST概述 RFCHOST是一家自2015年成立的公司,隶属于上海花卷科技。作为一家新兴的网络服务企业,RFCHOST专注于提供国际线路深层挖掘与构造网络通信服务的一体化解决方案。我一直关注着这个快速发展的品牌,尤其是它在香港和洛杉矶VPS业务上的持续投入与创新。 随着全球数字化进程的加速,...