前言
多线程是Java编程中的重要特性,它允许程序同时执行多个任务,充分利用CPU资源,提高程序性能。然而,多线程编程也带来了线程安全、死锁、性能优化等挑战。本文将全面介绍Java多线程的概念、使用方法、常见问题以及最佳实践,帮助您掌握Java并发编程。
1. 多线程基础概念
1.1 进程与线程
进程(Process):
- 操作系统分配资源的基本单位
- 拥有独立的内存空间
- 进程间通信需要特殊机制(IPC)
- 创建和销毁开销大
线程(Thread):
- CPU调度的基本单位
- 共享进程的内存空间
- 线程间通信简单(共享变量)
- 创建和销毁开销小
对比:
1 2 3 4 5
| 进程 = 工厂 线程 = 工厂里的工人
一个工厂(进程)可以有多个工人(线程) 工人们共享工厂的资源(内存)
|
1.2 并发与并行
并发(Concurrency):
- 多个任务交替执行
- 单核CPU通过时间片轮转实现
- 宏观上同时进行,微观上串行
并行(Parallelism):
- 多个任务真正同时执行
- 需要多核CPU支持
- 真正的同时进行
1 2 3 4 5 6 7 8 9 10 11 12
| 单核CPU: 时间片1: 线程A执行 时间片2: 线程B执行 时间片3: 线程A执行 时间片4: 线程C执行
多核CPU: 核心1: 线程A执行 核心2: 线程B执行 } 同时进行 核心3: 线程C执行
|
1.3 线程的生命周期
Java线程有6种状态:
1 2 3 4 5 6 7 8 9 10 11
| NEW (新建) ↓ start() RUNNABLE (可运行) ↓ 获得CPU时间片 RUNNING (运行中) ↓ sleep()/wait()/阻塞IO BLOCKED/WAITING/TIMED_WAITING (阻塞/等待) ↓ notify()/notifyAll()/超时/IO完成 RUNNABLE (可运行) ↓ run()方法执行完毕 TERMINATED (终止)
|
详细说明:
- NEW(新建):线程对象被创建,但未调用start()方法
- RUNNABLE(可运行):调用start()后,等待CPU调度
- BLOCKED(阻塞):等待获取锁
- WAITING(等待):调用wait()或join(),等待其他线程通知
- TIMED_WAITING(超时等待):调用sleep()或wait(timeout)
- TERMINATED(终止):线程执行完毕或异常终止
2. 创建线程的方式
2.1 继承Thread类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
|
public class MyThread extends Thread { private String threadName; public MyThread(String name) { this.threadName = name; } @Override public void run() { for (int i = 0; i < 5; i++) { System.out.println(threadName + " 执行:" + i); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) { MyThread thread1 = new MyThread("线程1"); MyThread thread2 = new MyThread("线程2"); thread1.start(); thread2.start(); } }
|
优点:
缺点:
- Java不支持多继承,继承Thread后无法继承其他类
- 线程和任务耦合在一起
2.2 实现Runnable接口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
|
public class MyRunnable implements Runnable { private String taskName; public MyRunnable(String name) { this.taskName = name; } @Override public void run() { for (int i = 0; i < 5; i++) { System.out.println(taskName + " 执行:" + i); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) { MyRunnable task1 = new MyRunnable("任务1"); MyRunnable task2 = new MyRunnable("任务2"); Thread thread1 = new Thread(task1); Thread thread2 = new Thread(task2); thread1.start(); thread2.start(); } }
|
优点:
缺点:
2.3 实现Callable接口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
| import java.util.concurrent.*;
public class MyCallable implements Callable<String> { private String taskName; public MyCallable(String name) { this.taskName = name; } @Override public String call() throws Exception { int sum = 0; for (int i = 1; i <= 5; i++) { sum += i; System.out.println(taskName + " 计算:" + i); Thread.sleep(100); } return taskName + " 计算结果:" + sum; } public static void main(String[] args) { MyCallable task1 = new MyCallable("任务1"); MyCallable task2 = new MyCallable("任务2"); FutureTask<String> future1 = new FutureTask<>(task1); FutureTask<String> future2 = new FutureTask<>(task2); Thread thread1 = new Thread(future1); Thread thread2 = new Thread(future2); thread1.start(); thread2.start(); try { String result1 = future1.get(); String result2 = future2.get(); System.out.println(result1); System.out.println(result2); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } }
|
优点:
缺点:
2.4 使用线程池
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| import java.util.concurrent.*;
public class ThreadPoolExample { public static void main(String[] args) { ExecutorService executor = Executors.newFixedThreadPool(3); for (int i = 0; i < 5; i++) { final int taskId = i; executor.submit(() -> { System.out.println("任务" + taskId + " 在线程 " + Thread.currentThread().getName() + " 中执行"); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } }); } executor.shutdown(); } }
|
优点:
- 重用线程,减少创建销毁开销
- 控制并发数量
- 提供更多功能(定时、周期性执行等)
- 生产环境推荐使用
2.5 创建方式对比
| 方式 |
优点 |
缺点 |
适用场景 |
| 继承Thread |
简单直观 |
无法多继承 |
学习、简单场景 |
| 实现Runnable |
灵活,推荐 |
无返回值 |
一般场景 |
| 实现Callable |
有返回值 |
代码复杂 |
需要返回值 |
| 线程池 |
性能好,功能强 |
配置复杂 |
生产环境 |
3. 线程的常用方法
3.1 Thread类的常用方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
| public class ThreadMethods { public static void main(String[] args) throws InterruptedException { Thread thread = new Thread(() -> { System.out.println("线程开始执行"); try { Thread.sleep(2000); } catch (InterruptedException e) { System.out.println("线程被中断"); } System.out.println("线程执行完毕"); }); thread.start(); System.out.println("线程名称:" + thread.getName()); thread.setName("MyThread"); thread.setPriority(Thread.MAX_PRIORITY); System.out.println("线程是否存活:" + thread.isAlive()); thread.join(); System.out.println("主线程继续执行"); Thread.sleep(1000); Thread.yield(); Thread thread2 = new Thread(() -> { while (!Thread.currentThread().isInterrupted()) { System.out.println("工作中..."); try { Thread.sleep(500); } catch (InterruptedException e) { System.out.println("收到中断信号"); break; } } }); thread2.start(); Thread.sleep(2000); thread2.interrupt(); Thread daemonThread = new Thread(() -> { while (true) { System.out.println("守护线程运行中"); try { Thread.sleep(1000); } catch (InterruptedException e) { break; } } }); daemonThread.setDaemon(true); daemonThread.start(); } }
|
3.2 方法详解
start() vs run():
1 2 3 4 5
| thread.start();
thread.run();
|
sleep() vs wait():
1 2 3 4 5 6 7
| Thread.sleep(1000);
synchronized (obj) { obj.wait(); }
|
join() 示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| public class JoinExample { public static void main(String[] args) throws InterruptedException { Thread thread = new Thread(() -> { for (int i = 0; i < 5; i++) { System.out.println("子线程:" + i); try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } }); thread.start(); thread.join(); System.out.println("主线程继续执行"); } }
|
4. 线程同步与锁
4.1 线程安全问题
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
|
public class UnsafeCounter { private int count = 0; public void increment() { count++; } public int getCount() { return count; } public static void main(String[] args) throws InterruptedException { UnsafeCounter counter = new UnsafeCounter(); Thread[] threads = new Thread[10]; for (int i = 0; i < 10; i++) { threads[i] = new Thread(() -> { for (int j = 0; j < 1000; j++) { counter.increment(); } }); threads[i].start(); } for (Thread thread : threads) { thread.join(); } System.out.println("最终计数:" + counter.getCount()); } }
|
4.2 synchronized关键字
方式1:同步方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| public class SafeCounter { private int count = 0; public synchronized void increment() { count++; } public static synchronized void staticMethod() { } public synchronized int getCount() { return count; } }
|
方式2:同步代码块
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| public class SafeCounter { private int count = 0; private final Object lock = new Object(); public void increment() { synchronized (lock) { count++; } } public int getCount() { synchronized (lock) { return count; } } }
|
synchronized原理:
1 2 3 4
| 1. 每个对象都有一个监视器锁(monitor) 2. 线程进入synchronized块时获取锁 3. 线程退出synchronized块时释放锁 4. 其他线程必须等待锁释放才能进入
|
4.3 ReentrantLock(可重入锁)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock;
public class LockCounter { private int count = 0; private final Lock lock = new ReentrantLock(); public void increment() { lock.lock(); try { count++; } finally { lock.unlock(); } } public int getCount() { lock.lock(); try { return count; } finally { lock.unlock(); } } public boolean tryIncrement() { if (lock.tryLock()) { try { count++; return true; } finally { lock.unlock(); } } return false; } }
|
ReentrantLock vs synchronized:
| 特性 |
synchronized |
ReentrantLock |
| 使用方式 |
关键字,自动释放 |
类,手动释放 |
| 灵活性 |
低 |
高 |
| 可中断 |
不支持 |
支持 |
| 超时获取 |
不支持 |
支持 |
| 公平锁 |
非公平 |
可选公平/非公平 |
| 条件变量 |
1个(wait/notify) |
多个(Condition) |
| 性能 |
JDK6后优化,相近 |
相近 |
4.4 ReadWriteLock(读写锁)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
public class ReadWriteLockExample { private int value = 0; private final ReadWriteLock rwLock = new ReentrantReadWriteLock(); public int read() { rwLock.readLock().lock(); try { System.out.println(Thread.currentThread().getName() + " 读取:" + value); Thread.sleep(100); return value; } catch (InterruptedException e) { throw new RuntimeException(e); } finally { rwLock.readLock().unlock(); } } public void write(int newValue) { rwLock.writeLock().lock(); try { System.out.println(Thread.currentThread().getName() + " 写入:" + newValue); this.value = newValue; Thread.sleep(100); } catch (InterruptedException e) { throw new RuntimeException(e); } finally { rwLock.writeLock().unlock(); } } public static void main(String[] args) { ReadWriteLockExample example = new ReadWriteLockExample(); for (int i = 0; i < 3; i++) { new Thread(() -> example.read(), "读线程" + i).start(); } new Thread(() -> example.write(100), "写线程").start(); } }
|
4.5 volatile关键字
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| public class VolatileExample { private volatile boolean flag = false; private volatile int count = 0; public void setFlag() { flag = true; } public void checkFlag() { while (!flag) { } System.out.println("Flag已设置"); } public void increment() { count++; } public static void main(String[] args) throws InterruptedException { VolatileExample example = new VolatileExample(); Thread thread1 = new Thread(() -> { try { Thread.sleep(1000); example.setFlag(); System.out.println("Flag已设置为true"); } catch (InterruptedException e) { e.printStackTrace(); } }); Thread thread2 = new Thread(() -> { example.checkFlag(); }); thread2.start(); thread1.start(); thread1.join(); thread2.join(); } }
|
volatile使用场景:
- 状态标志(如上面的flag)
- 双重检查锁定(DCL)单例模式
- 读多写少的场景
5. 线程通信
5.1 wait() 和 notify()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
| public class WaitNotifyExample { private final Object lock = new Object(); private boolean dataReady = false; public void produce() { synchronized (lock) { System.out.println("生产者:准备数据..."); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } dataReady = true; System.out.println("生产者:数据准备完毕,通知消费者"); lock.notify(); } } public void consume() { synchronized (lock) { while (!dataReady) { try { System.out.println("消费者:数据未准备好,等待..."); lock.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("消费者:接收到数据,开始处理"); } } public static void main(String[] args) { WaitNotifyExample example = new WaitNotifyExample(); Thread consumer = new Thread(() -> example.consume()); Thread producer = new Thread(() -> example.produce()); consumer.start(); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } producer.start(); } }
|
wait() vs notify() vs notifyAll():
wait():释放锁,进入等待状态
notify():随机唤醒一个等待的线程
notifyAll():唤醒所有等待的线程
5.2 生产者消费者模式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78
| import java.util.LinkedList; import java.util.Queue;
public class ProducerConsumer { private final Queue<Integer> queue = new LinkedList<>(); private final int MAX_SIZE = 5; private final Object lock = new Object(); class Producer implements Runnable { @Override public void run() { int value = 0; while (true) { synchronized (lock) { while (queue.size() == MAX_SIZE) { try { System.out.println("队列已满,生产者等待..."); lock.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("生产:" + value); queue.offer(value++); lock.notifyAll(); try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } } } } class Consumer implements Runnable { @Override public void run() { while (true) { synchronized (lock) { while (queue.isEmpty()) { try { System.out.println("队列为空,消费者等待..."); lock.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } int value = queue.poll(); System.out.println("消费:" + value); lock.notifyAll(); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } } } public static void main(String[] args) { ProducerConsumer pc = new ProducerConsumer(); Thread producer = new Thread(pc.new Producer()); Thread consumer = new Thread(pc.new Consumer()); producer.start(); consumer.start(); } }
|
5.3 使用BlockingQueue实现生产者消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue;
public class BlockingQueueExample { public static void main(String[] args) { BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(5); Thread producer = new Thread(() -> { int value = 0; while (true) { try { queue.put(value); System.out.println("生产:" + value); value++; Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } }); Thread consumer = new Thread(() -> { while (true) { try { int value = queue.take(); System.out.println("消费:" + value); Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } }); producer.start(); consumer.start(); } }
|
6. 线程池详解
6.1 为什么使用线程池
优势:
- 降低资源消耗:重用线程,减少创建销毁开销
- 提高响应速度:任务到达时,无需等待线程创建
- 提高线程可管理性:统一分配、调优和监控
- 提供更多功能:定时执行、周期执行等
6.2 ThreadPoolExecutor参数详解
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| import java.util.concurrent.*;
public class ThreadPoolExample { public static void main(String[] args) { ThreadPoolExecutor executor = new ThreadPoolExecutor( 2, 5, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy() ); for (int i = 0; i < 20; i++) { final int taskId = i; executor.execute(() -> { System.out.println("任务" + taskId + " 在线程 " + Thread.currentThread().getName() + " 中执行"); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } }); } executor.shutdown(); } }
|
参数说明:
corePoolSize(核心线程数)
- 线程池中始终保持的线程数量
- 即使线程空闲也不会被回收
maximumPoolSize(最大线程数)
- 线程池允许创建的最大线程数
- 当队列满时,会创建新线程直到达到最大值
keepAliveTime(存活时间)
- 非核心线程空闲时的存活时间
- 超过这个时间,线程会被回收
workQueue(任务队列)
ArrayBlockingQueue:有界队列
LinkedBlockingQueue:无界队列(可能导致OOM)
SynchronousQueue:不存储元素的队列
PriorityBlockingQueue:优先级队列
threadFactory(线程工厂)
handler(拒绝策略)
AbortPolicy:抛出异常(默认)
CallerRunsPolicy:调用者线程执行
DiscardPolicy:丢弃任务
DiscardOldestPolicy:丢弃最老的任务
6.3 线程池执行流程
1 2 3 4 5 6 7 8 9 10 11 12
| 提交任务 ↓ 核心线程数未满? 是 → 创建核心线程执行 否 ↓ 任务队列未满? 是 → 加入队列等待 否 ↓ 线程数未达最大值? 是 → 创建非核心线程执行 否 ↓ 执行拒绝策略
|
6.4 常用线程池类型
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| import java.util.concurrent.*;
public class ExecutorsExample { public static void main(String[] args) { ExecutorService fixedPool = Executors.newFixedThreadPool(3); ExecutorService cachedPool = Executors.newCachedThreadPool(); ExecutorService singlePool = Executors.newSingleThreadExecutor(); ScheduledExecutorService scheduledPool = Executors.newScheduledThreadPool(2); scheduledPool.schedule(() -> { System.out.println("延迟3秒执行"); }, 3, TimeUnit.SECONDS); scheduledPool.scheduleAtFixedRate(() -> { System.out.println("每2秒执行一次"); }, 0, 2, TimeUnit.SECONDS); fixedPool.shutdown(); cachedPool.shutdown(); singlePool.shutdown(); } }
|
⚠️ 阿里巴巴Java开发手册建议:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
|
ThreadPoolExecutor executor = new ThreadPoolExecutor( 10, 20, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100), new ThreadFactoryBuilder() .setNameFormat("my-pool-%d") .build(), new ThreadPoolExecutor.CallerRunsPolicy() );
|
6.5 线程池最佳实践
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76
| import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger;
public class ThreadPoolBestPractice { static class CustomThreadFactory implements ThreadFactory { private final AtomicInteger threadNumber = new AtomicInteger(1); private final String namePrefix; public CustomThreadFactory(String namePrefix) { this.namePrefix = namePrefix; } @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r, namePrefix + "-thread-" + threadNumber.getAndIncrement()); thread.setDaemon(false); thread.setPriority(Thread.NORM_PRIORITY); return thread; } } static class CustomRejectedHandler implements RejectedExecutionHandler { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { System.err.println("任务被拒绝:" + r.toString()); } } public static void main(String[] args) { ThreadPoolExecutor executor = new ThreadPoolExecutor( 5, 10, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100), new CustomThreadFactory("MyPool"), new CustomRejectedHandler() ); for (int i = 0; i < 20; i++) { final int taskId = i; executor.execute(() -> { System.out.println("执行任务" + taskId); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } }); } System.out.println("活跃线程数:" + executor.getActiveCount()); System.out.println("核心线程数:" + executor.getCorePoolSize()); System.out.println("最大线程数:" + executor.getMaximumPoolSize()); System.out.println("队列大小:" + executor.getQueue().size()); System.out.println("已完成任务数:" + executor.getCompletedTaskCount()); executor.shutdown(); try { if (!executor.awaitTermination(60, TimeUnit.SECONDS)) { executor.shutdownNow(); } } catch (InterruptedException e) { executor.shutdownNow(); } } }
|
7. 并发工具类
7.1 CountDownLatch(倒计时门闩)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| import java.util.concurrent.CountDownLatch;
public class CountDownLatchExample { public static void main(String[] args) throws InterruptedException { int workerCount = 5; CountDownLatch latch = new CountDownLatch(workerCount); System.out.println("主线程等待所有工作线程完成..."); for (int i = 0; i < workerCount; i++) { final int workerId = i; new Thread(() -> { try { System.out.println("工作线程" + workerId + " 开始工作"); Thread.sleep((long) (Math.random() * 3000)); System.out.println("工作线程" + workerId + " 完成工作"); } catch (InterruptedException e) { e.printStackTrace(); } finally { latch.countDown(); } }).start(); } latch.await(); System.out.println("所有工作线程已完成,主线程继续执行"); } }
|
使用场景:
- 等待多个线程完成初始化
- 并行计算,等待所有子任务完成
7.2 CyclicBarrier(循环栅栏)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| import java.util.concurrent.CyclicBarrier; import java.util.concurrent.BrokenBarrierException;
public class CyclicBarrierExample { public static void main(String[] args) { int playerCount = 4; CyclicBarrier barrier = new CyclicBarrier(playerCount, () -> { System.out.println("所有玩家已准备就绪,游戏开始!\n"); }); for (int i = 0; i < playerCount; i++) { final int playerId = i; new Thread(() -> { try { System.out.println("玩家" + playerId + " 已准备"); barrier.await(); System.out.println("玩家" + playerId + " 开始游戏"); Thread.sleep((long) (Math.random() * 2000)); System.out.println("玩家" + playerId + " 完成第一关"); barrier.await(); System.out.println("玩家" + playerId + " 开始第二关"); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } }).start(); } } }
|
CountDownLatch vs CyclicBarrier:
| 特性 |
CountDownLatch |
CyclicBarrier |
| 可重用 |
不可重用 |
可重用 |
| 等待方式 |
一个线程等待多个线程 |
多个线程互相等待 |
| 计数方式 |
递减到0 |
递增到指定值 |
| 回调 |
不支持 |
支持 |
7.3 Semaphore(信号量)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| import java.util.concurrent.Semaphore;
public class SemaphoreExample { public static void main(String[] args) { Semaphore parkingLot = new Semaphore(3); for (int i = 0; i < 10; i++) { final int carId = i; new Thread(() -> { try { System.out.println("车辆" + carId + " 尝试进入停车场"); parkingLot.acquire(); System.out.println("车辆" + carId + " 成功停车"); Thread.sleep((long) (Math.random() * 5000)); System.out.println("车辆" + carId + " 离开停车场"); } catch (InterruptedException e) { e.printStackTrace(); } finally { parkingLot.release(); } }).start(); } } }
|
使用场景:
7.4 Exchanger(交换器)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| import java.util.concurrent.Exchanger;
public class ExchangerExample { public static void main(String[] args) { Exchanger<String> exchanger = new Exchanger<>(); new Thread(() -> { try { String data1 = "来自线程1的数据"; System.out.println("线程1准备交换数据:" + data1); String data2 = exchanger.exchange(data1); System.out.println("线程1收到数据:" + data2); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); new Thread(() -> { try { Thread.sleep(2000); String data2 = "来自线程2的数据"; System.out.println("线程2准备交换数据:" + data2); String data1 = exchanger.exchange(data2); System.out.println("线程2收到数据:" + data1); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); } }
|
8. 原子类
8.1 AtomicInteger
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| import java.util.concurrent.atomic.AtomicInteger;
public class AtomicExample { private AtomicInteger count = new AtomicInteger(0); public void increment() { count.incrementAndGet(); } public void decrement() { count.decrementAndGet(); } public int get() { return count.get(); } public boolean compareAndSet(int expect, int update) { return count.compareAndSet(expect, update); } public static void main(String[] args) throws InterruptedException { AtomicExample example = new AtomicExample(); Thread[] threads = new Thread[10]; for (int i = 0; i < 10; i++) { threads[i] = new Thread(() -> { for (int j = 0; j < 1000; j++) { example.increment(); } }); threads[i].start(); } for (Thread thread : threads) { thread.join(); } System.out.println("最终计数:" + example.get()); } }
|
8.2 常用原子类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| import java.util.concurrent.atomic.*;
public class AtomicClasses { public static void main(String[] args) { AtomicInteger atomicInt = new AtomicInteger(0); atomicInt.incrementAndGet(); atomicInt.getAndIncrement(); atomicInt.addAndGet(5); AtomicLong atomicLong = new AtomicLong(0); AtomicBoolean atomicBoolean = new AtomicBoolean(false); atomicBoolean.compareAndSet(false, true); AtomicReference<String> atomicRef = new AtomicReference<>("初始值"); atomicRef.set("新值"); atomicRef.compareAndSet("新值", "更新值"); AtomicIntegerArray atomicArray = new AtomicIntegerArray(10); atomicArray.set(0, 100); atomicArray.incrementAndGet(0); AtomicStampedReference<String> stampedRef = new AtomicStampedReference<>("初始值", 0); int[] stampHolder = new int[1]; String value = stampedRef.get(stampHolder); int stamp = stampHolder[0]; stampedRef.compareAndSet(value, "新值", stamp, stamp + 1); } }
|
9. 常见问题与解决方案
9.1 死锁
死锁示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| public class DeadlockExample { private final Object lock1 = new Object(); private final Object lock2 = new Object(); public void method1() { synchronized (lock1) { System.out.println(Thread.currentThread().getName() + " 获取lock1"); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } synchronized (lock2) { System.out.println(Thread.currentThread().getName() + " 获取lock2"); } } } public void method2() { synchronized (lock2) { System.out.println(Thread.currentThread().getName() + " 获取lock2"); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } synchronized (lock1) { System.out.println(Thread.currentThread().getName() + " 获取lock1"); } } } public static void main(String[] args) { DeadlockExample example = new DeadlockExample(); new Thread(() -> example.method1(), "线程1").start(); new Thread(() -> example.method2(), "线程2").start(); } }
|
死锁的四个必要条件:
- 互斥条件:资源不能被共享
- 请求与保持:持有资源的同时请求新资源
- 不剥夺条件:资源不能被强制剥夺
- 循环等待:存在资源的循环等待链
避免死锁的方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| public class AvoidDeadlock { private final Object lock1 = new Object(); private final Object lock2 = new Object(); public void method1() { synchronized (lock1) { synchronized (lock2) { } } } public void method2() { synchronized (lock1) { synchronized (lock2) { } } } public void methodWithTimeout() { Lock lock1 = new ReentrantLock(); Lock lock2 = new ReentrantLock(); try { if (lock1.tryLock(1000, TimeUnit.MILLISECONDS)) { try { if (lock2.tryLock(1000, TimeUnit.MILLISECONDS)) { try { } finally { lock2.unlock(); } } } finally { lock1.unlock(); } } } catch (InterruptedException e) { e.printStackTrace(); } } }
|
9.2 线程安全的单例模式
双重检查锁定(DCL):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| public class Singleton { private static volatile Singleton instance; private Singleton() { } public static Singleton getInstance() { if (instance == null) { synchronized (Singleton.class) { if (instance == null) { instance = new Singleton(); } } } return instance; } }
|
静态内部类(推荐):
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| public class Singleton { private Singleton() { } private static class SingletonHolder { private static final Singleton INSTANCE = new Singleton(); } public static Singleton getInstance() { return SingletonHolder.INSTANCE; } }
|
枚举(最安全):
1 2 3 4 5 6 7 8 9 10
| public enum Singleton { INSTANCE; public void doSomething() { } }
Singleton.INSTANCE.doSomething();
|
9.3 ThreadLocal
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| public class ThreadLocalExample { private static ThreadLocal<Integer> threadLocal = ThreadLocal.withInitial(() -> 0); public static void main(String[] args) { new Thread(() -> { threadLocal.set(100); System.out.println("线程1设置值:" + threadLocal.get()); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("线程1读取值:" + threadLocal.get()); threadLocal.remove(); }).start(); new Thread(() -> { threadLocal.set(200); System.out.println("线程2设置值:" + threadLocal.get()); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("线程2读取值:" + threadLocal.get()); threadLocal.remove(); }).start(); } }
|
ThreadLocal使用场景:
- 数据库连接管理
- Session管理
- 用户信息传递
- 日期格式化(SimpleDateFormat线程不安全)
⚠️ 注意事项:
- 使用完后必须调用
remove(),避免内存泄漏
- 线程池环境下要特别注意清理
10. 最佳实践
10.1 线程命名
1 2 3 4 5 6 7 8 9 10
| Thread thread = new Thread(() -> { }, "MyWorkerThread-1");
ThreadFactory threadFactory = new ThreadFactoryBuilder() .setNameFormat("my-pool-%d") .setDaemon(false) .build();
|
10.2 异常处理
1 2 3 4 5 6 7 8 9 10 11
| Thread thread = new Thread(() -> { throw new RuntimeException("线程异常"); });
thread.setUncaughtExceptionHandler((t, e) -> { System.err.println("线程 " + t.getName() + " 发生异常:" + e.getMessage()); });
thread.start();
|
10.3 避免过度同步
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| public synchronized void processLargeData() { }
public void processLargeData() { synchronized (this) { } }
|
10.4 使用并发集合
1 2 3 4 5 6 7 8 9 10
| import java.util.concurrent.*;
ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>(); CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>(); ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>(); BlockingQueue<String> blockingQueue = new LinkedBlockingQueue<>();
|
10.5 线程数量设置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
|
public class ThreadPoolConfig { public static ThreadPoolExecutor createThreadPool() { int processors = Runtime.getRuntime().availableProcessors(); int cpuIntensivePoolSize = processors + 1; int ioIntensivePoolSize = processors * 2; return new ThreadPoolExecutor( cpuIntensivePoolSize, cpuIntensivePoolSize * 2, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100), new ThreadFactoryBuilder() .setNameFormat("worker-%d") .build(), new ThreadPoolExecutor.CallerRunsPolicy() ); } }
|
11. 总结
11.1 关键要点
| 主题 |
要点 |
| 线程创建 |
推荐使用Runnable接口或线程池 |
| 线程同步 |
synchronized、Lock、volatile |
| 线程通信 |
wait/notify、BlockingQueue |
| 线程池 |
手动创建ThreadPoolExecutor |
| 并发工具 |
CountDownLatch、CyclicBarrier、Semaphore |
| 原子类 |
AtomicInteger等,无锁并发 |
| 死锁 |
固定顺序获取锁,使用超时机制 |
| 最佳实践 |
线程命名、异常处理、合理设置线程数 |
11.2 学习建议
- 理解基础概念:进程、线程、并发、并行
- 掌握同步机制:synchronized、Lock、volatile
- 熟悉线程池:ThreadPoolExecutor的参数和使用
- 学习并发工具:CountDownLatch、Semaphore等
- 实践项目:在实际项目中应用多线程
- 性能调优:监控、分析、优化线程池配置
- 阅读源码:研究JUC包的源码实现
11.3 推荐资源
书籍:
- 《Java并发编程实战》(Java Concurrency in Practice)
- 《Java并发编程的艺术》
- 《深入理解Java虚拟机》
在线资源:
工具:
- JConsole:监控线程状态
- VisualVM:性能分析
- JProfiler:专业性能分析工具
参考资源