Java 多线程安全的使用小结

发布时间 2023-11-16 10:07:34作者: 进击的davis

在使用多线程进行编程的过程中,难免遇到共享资源读写问题,这是为了线程安全,一种思路就是使用 来控制并发读写问题。

在通过锁来实现并发安全中,常用的有以下几种:

  • synchronized,对象锁
  • ReentrantLock,重入锁
  • ReentrantReadWriteLock,读写锁

今天从使用的角度来看看这几种锁是如何使用的。

1.synchronized

在学习这把内置重度锁之前,我们先看看在多线程并发写的安全示例:

demo

package org.example;

public class MultiThreadWithNoLock {
    private static int count = 0;

    public static void main(String[] args) throws InterruptedException {
        Thread t1 = new Thread(() -> {
            for (int i = 0; i < 100000; i++) {
                count++;
            }
        });

        Thread t2 = new Thread(() -> {
            for (int i = 0; i < 100000; i++) {
                count--;
            }
        });

        t1.start();
        t2.start();

        // 等待t1,t2执行结束
        t1.join();
        t2.join();

        System.out.println(count); // -35622
    }
}

从结果来看,进行多次的静态属性 count 的写入之后,大概率得到的结果应该都不是0,了解 count++ 或者 count--都知道,这种操作并不是原子操作,其实可以分为 load -> operation -> store 三步,所以在多线程并发操作中,count 最后的值,并不是0。

如果我们使用重度锁-synchronized,看看示例:

package org.example;

public class MultiThreadWithLock {
    private static int count = 0;

    // lock
    private static Object lock = new Object();

    public static void main(String[] args) throws InterruptedException {
        Thread t1 = new Thread(() -> {
            for (int i = 0; i < 100000; i++) {
                synchronized (lock) {
                    count++;
                }
            }
        });

        Thread t2 = new Thread(() -> {
            for (int i = 0; i < 100000; i++) {
                synchronized (lock) {
                    count--;
                }
            }
        });

        t1.start();
        t2.start();

        // 等待t1,t2执行结束
        t1.join();
        t2.join();

        System.out.println(count); // 0
    }
}

从结果来看,最后的count的数值一定是0,这就是加了锁之后,可以保证同一时刻只有一个线程拥有锁,进而才能操作 count 资源。

synchronized 是Java中的关键字,通过利用锁的机制来实现互斥同步的。synchronized 可以保证同一时刻只有一个线程可以执行某个方法或者某个代码块。虽然说 synchronized 是重度锁,但 Java 在1.6版本以后,已对其做了大量优化,性能与 JUC 中的 Lock 和 ReadWriteLock 基本持平,如感兴趣可以进一步测试其性能。[1]

用法

从用法来看,synchronized 主要有以下三种用法:

  • 1.synchronized 关键字修饰实例方法,同步实例对象,当前的实例对象即为锁
  • 2.synchronized 关键字修饰类的静态方法,同步静态方法,当前类的 Class 对象即为锁
  • 3.synchronized 关键字应用在代码块上,如 synchronized (Object) {...},Object对象即为锁,如果Object为类的Class对象,则对应为锁

下面为用法的简单示例。

同步实例方法

package org.example;

public class SyncInstanceMethod {
    private int count;
    
    public synchronized void add() {
        count++;
    }
}

同步静态方法

静态方法的同步是指,用 synchronized 修饰的静态方法,与使用所在类的 Class 对象实现的同步代码块,效果类似。因为在 JVM 中一个类只能对应一个类的 Class 对象,所以同时只允许一个线程执行同一个类中的静态同步方法。

对于同一个类中的多个静态同步方法,持有锁的线程可以执行每个类中的静态同步方法而无需等待。不管类中的哪个静态同步方法被调用,一个类只能由一个线程同时执行。[1]

package org.example;

public class SyncClassStaticMethod {
    private static int count;

    public static synchronized void add() {
        count++;
    }
}

同步代码块

package org.example;

public class SyncCodeBlock {
    private int count;

    private Object lock = new Object();

    public void add() {
        synchronized (lock) {
            count++;
        }

        System.out.println("Add count!");
    }
}

在使用 synchronized 同步实例方法 过程中要注意,如果同步中有其他实例方法,要注意我们使用的锁是只保证锁范围内的资源受到保护,所以这时应该主要有其他类的实例时,synchronized(xxxClass.class) {...} ,保证xxxClass.class 是全局唯一的。

线程挂起与唤醒-wait()/notify()/notifyAll()

在上文中,我们学习了 synchronized 可以给对象加锁,进而同一时刻已有一个线程获取锁,才能操作临界区。在涉及多线程的同步中,如何主动挂起线程、唤醒线程呢。答案就是 wait()和notify()、notifyAll()

方法解释

  • wait(), 调用wait(),会自动释放当前线程占有的对象锁,并请求 OS 挂起当前线程,该线程就从 Running -> Waitting 状态,等待 notify/notifyAll 唤醒。注意,如果没有释放锁,其他线程是无法进入对象的同步方法或者同步的代码块,也就无法执行唤醒挂起的线程,从而造成死锁
  • notify(),唤醒一个在 waitting 状态的线程,并使它获取到锁,具体唤醒哪个线程,由 JVM 控制。
  • notifyAll(),唤醒所有的在 Waitting 状态的线程,然后这些线程去竞争获取锁。

需要注意的是,任何 Object 对象都有上面的三种方法,但是这里的线程挂起和唤醒不能单独使用,必须结合 synchronized (Object) {},在代码块中使用,否则会报 IllegalMonitorStateException 异常。

涉及具体原因,请参考[1]中:

为什么 `wait`、`notify`、`notifyAll` 不定义在 `Thread` 类中?为什么 `wait`、`notify`、`notifyAll` 要配合 `synchronized` 使用? 理解为什么这么设计,需要了解几个基本知识点:

-   每一个 Java 对象都有一个与之对应的监视器(monitor)
-   每一个监视器里面都有一个 对象锁 、一个 等待队列、一个 同步队列

了解了以上概念,我们回过头来理解前面两个问题。

为什么这几个方法不定义在 Thread 中?

-   由于每个对象都拥有对象锁,让当前线程等待某个对象锁,自然应该基于这个对象(Object)来操作,而非使用当前线程(Thread)来操作。因为当前线程可能会等待多个线程释放锁,如果基于线程(Thread)来操作,就非常复杂了。

为什么 wait、notify、notifyAll 要配合 synchronized 使用?

-   如果调用某个对象的 wait 方法,当前线程必须拥有这个对象的对象锁,因此调用 wait 方法必须在 synchronized 方法和 synchronized 代码块中。

下面是挂起与唤醒的demo:

package org.example;

import java.util.LinkedList;
import java.util.NoSuchElementException;

public class ProducerAndConsumerModel {
    public static void main(String[] args) throws InterruptedException {
        MessageQueue queue = new MessageQueue(2);

        // 多个生产者
        for (int i = 0; i < 3; i++) {
            int id = i;
            new Thread(() -> {
                queue.put(new Message(id, "value:"+id));
            }, "Producer" + i).start();
        }

        Thread.sleep(100L);

        // 单个消费者
        new Thread(() -> {
            while (true) {
                queue.take();
            }
        }, "Consumer").start();
    }
}

// msg queue owned by producer and consumer
class MessageQueue {
    private LinkedList<Message> list = new LinkedList<>();

    // capacity
    private int capacity;

    // wait timeout
    private static long timeout = 1000L;

    public MessageQueue(int capacity) {
        this.capacity = capacity;
    }

    // produce
    public void put(Message msg) {
        synchronized(list) {
            while (list.size() == capacity) {
                System.out.println("队列已满,生产者进入等待。");
                try {
                    list.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

            list.addLast(msg);
            System.out.printf("生产者生产消息: %s\n", msg.toString());
            // 生产者通知消费者
            list.notifyAll();
        }
    }

    // consume
    public Message take() throws NoSuchElementException {
        synchronized(list) {
            int retry = 3;
            while (list.isEmpty() && retry > 0) {
                System.out.println("队列为空,消费者进入等待。");
                try {
                    list.wait(timeout);
                    retry--;
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            // retry = 0 or ele in queue
            Message msg = list.removeFirst(); // throws Exception
            System.out.printf("消费者消费消息:%s\n", msg.toString());
            // 消费后通知生产者
            list.notifyAll();
            return msg;
        }
    }
}

class Message {
    private int id;

    private Object value;

    public Message(int id, Object value) {
        this.id = id;
        this.value = value;
    }

    @Override
    public String toString() {
        return "Message{id=" + id + ", value=" + value + "}";
    }
}

相关输出:

生产者生产消息: Message{id=0, value=value:0}
生产者生产消息: Message{id=2, value=value:2}
队列已满,生产者进入等待。
消费者消费消息:Message{id=0, value=value:0}
消费者消费消息:Message{id=2, value=value:2}
队列为空,消费者进入等待。
生产者生产消息: Message{id=1, value=value:1}
消费者消费消息:Message{id=1, value=value:1}
队列为空,消费者进入等待。
队列为空,消费者进入等待。
队列为空,消费者进入等待。
Exception in thread "Consumer" java.util.NoSuchElementException
	at java.base/java.util.LinkedList.removeFirst(LinkedList.java:281)
	at org.example.MessageQueue.take(ProducerAndConsumerModel.java:76)
	at org.example.ProducerAndConsumerModel.lambda$main$1(ProducerAndConsumerModel.java:23)
	at java.base/java.lang.Thread.run(Thread.java:1583)

在 demo 中,我们创建了一个容量为 2 的队列:

  • 如果队列满了,生产者等待,被挂起,生产者需等消费者消费后,才有空间继续放入 msg 对象。
  • 如果队列为空,消费者挂起等待,被挂起,消费者需等到生产者将 msg 对象放入队列,为了防止程序在 消费 时一直循环,设置个 retry 参数,到次数后退出,这里抛出 NoSuchElementException 可忽略。
  • 如果队列还有空间,生产者放入数据,调用 list.notifyAll() 唤醒消费者可以消费。
  • 如果队列有数据,消费者消费数据,调用 list.notifyAll() 唤醒生产者生产消息。

线程等待-join()

在结合 synchronized同步机制 下,通过调用 wait()和notify()及notifyAll() 实现不同线程间的消息同步,除此以外 join() 也可以实现子线程与主线程的同步。

先看个 demo:

package org.example;

public class JoinDemo {
    public static void main(String[] args) throws InterruptedException {
        Thread t1 = new Thread(() -> {
            try {
                for (int i = 0; i < 3; i++) {
                    System.out.printf("第%d次打印\n", i+1);
                    Thread.sleep(500L);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        t1.start();

        System.out.println("主线程结束!");
    }
}

运行结果:

主线程结束!
第1次打印
第2次打印
第3次打印

运行上面代码,可看到主线程和子线程各自运行,由于子线程有休眠,主线程得到执行很快就结束了,主线程和子线程各自独立运行。

在子线程加上 join() 调用后的demo:

package org.example;

public class JoinDemo {
    public static void main(String[] args) throws InterruptedException {
        Thread t1 = new Thread(() -> {
            try {
                for (int i = 0; i < 3; i++) {
                    System.out.printf("第%d次打印\n", i+1);
                    Thread.sleep(500L);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });

        t1.start();
        t1.join();

        System.out.println("主线程结束!");
    }
}

运行结果:

第1次打印
第2次打印
第3次打印
主线程结束!

从结果中可看到,主线程等待子线程运行结束后,主线程才打印最后的内容,因为加了 join() 调用后,主线程会等子线程结束后,再执行主线程后面的代码。

image.png

Thread类 中,join()有多个重载方法,即:

  • void join() 直接调用,阻塞主线程的后续执行,等待子线程执行结束
  • void join(long, int)/join(long) 最多阻塞主线程给定时间,到时间后继续执行后续主线程代码
  • boolean join(duration) 超时等待,到期后,返回子线程是否结束。结束则 true

我们看到在该方法中会抛出 InterruptedException,如果引用线程被中断,即抛出该异常。

synchronized可重入性

先看一个demo:

package org.example;

public class SyncReentrant {
    private  int count;

    public synchronized void add() {
        count++;
        addAnother();
    }

    public synchronized void addAnother() {
        count++;
    }

    public static void main(String[] args) throws InterruptedException {
        SyncReentrant syncReentrant = new SyncReentrant();

        Thread t1 = new Thread(() -> {
            syncReentrant.add();
            System.out.println(Thread.currentThread().getName() + " end");
        }, "子线程1");

        Thread t2 = new Thread(() -> {
            syncReentrant.add();
            System.out.println(Thread.currentThread().getName() + " end");
        }, "子线程2");

        t1.start();
        t2.start();

        t1.join();
        t2.join();

        System.out.println("main Thread end");
    }
}

执行结果:

子线程2 end
子线程1 end
main Thread end

我们在实例方法中内嵌调用了实例方法,在前面的学习中,我们知道 synchronized 加在实例方法中,就相当于获取了实例对象的锁,在 add() 中再次调用 addAnother() 也是成功的,说明 synchronized 是可重入的锁。

2.ReentrantLock

ReentrantLock 顾名思义,就是可重入的锁,这个锁是 JUC(java.util.concurrent,Java版本 >1.5) 中的工具类,我们可以通过这个工具类实现共享资源的的同步控制。

先看看这个锁的定义:

public class ReentrantLock implements Lock, java.io.Serializable {}

我们顺着源码走读,这个类实际实现了 Lock接口,看看这个接口:

image.png

Lock接口 中:

  • void lock(),获取锁,如果没有获得锁就一直等待获取锁
  • boolean tryLock(),尝试获取锁,通常与 if分支 结合使用,实际获取到锁立即就返回 true,否则返回 false
  • Boolean tryLock(long time, TimeUnit unit),超时时间内尝试获取锁,如果在等待获取锁的过程中有其他线程打断,会抛异常
  • void unlock(),解锁,释放锁,一般为了保证锁不被一直占有,可加在 try {} finally {lock.unlock()}

在了解了 ReentrantLock 实现的 Lock接口 后,基本就知道这个锁该怎么用了。但需要注意一点的是,这个重入锁实现了两种构造方法,一种是公平锁(按阻塞等待的顺序获得锁),一种是非公平锁(各个线程需要抢占锁),对应的源码实现:

// 默认构造方法,非公平锁
public ReentrantLock() {
    sync = new NonfairSync();
}

// 通过fair参数设置true,构造公平锁,false构造非公平锁
public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

比较典型的应用示例:

加锁-解锁

package org.example;

import java.util.concurrent.locks.ReentrantLock;

public class ReentrantLockUsage {
    public static void main(String[] args) {
        ReentrantLock lock = new ReentrantLock();
        
        // 加锁
        lock.lock();
        try {
            ...
        } finally {
            lock.unlock(); // 解锁
        }
    }
}

尝试加锁-解锁

package org.example;

import java.util.concurrent.locks.ReentrantLock;

public class ReentrantLockUsage {
    public static void main(String[] args) {
        ReentrantLock lock = new ReentrantLock();
        
        // 尝试加锁
        if (lock.tryLock() ) {
            try {
                ...
            } finally {
                lock.unlock(); // 解锁
            }
        }
    }
}


// 带超时时间的尝试加锁
public class ReentrantLockUsage {
    public static void main(String[] args) throws InterruptedException {
        ReentrantLock lock = new ReentrantLock();

        if (lock.tryLock(500L, TimeUnit.MILLISECONDS)) {
            try {
                ...
            } finally {
                lock.unlock();
            }
        }
    }
}

3.ReentrantReadWriteLock

了解过读写锁的应该知道,读写锁之间有一定的关系:

  • 读锁与读锁之间不会互斥,可以同读
  • 读锁写锁之间互斥,先读后写,或者先写后读,读写之间只有一个线程可以完成读或者写的操作
  • 写锁与写锁之间互斥,先持有写锁的线程先操作,后序的写锁线程等待前面的写锁持有的线程操作完释放锁

读写锁的应用场景适合 读多写少 的场景。

参考 ReentrantLock,这里先对源码进行简单的走读。

先看看 ReentrantReadWriteLock 的类声明:

public class ReentrantReadWriteLock
        implements ReadWriteLock, java.io.Serializable {...}

这里的 ReentrantReadWriteLock 实现了 ReadWriteLock接口,那就看看这个接口:

public interface ReadWriteLock {
    /**
     * Returns the lock used for reading.
     *
     * @return the lock used for reading
     */
    Lock readLock();

    /**
     * Returns the lock used for writing.
     *
     * @return the lock used for writing
     */
    Lock writeLock();
}

从接口的声明来看,一共有2个方法:

  • Lock readLock(),返回一个锁,作为读锁
  • Lock writeLock(),返回一个锁,作为写锁

我们再回到 ReentrantReadWriteLock 类结构再看看:

image.png

类似于 ReentrantLock 内部也是有 Sync类,用于同步机制,另外实现两个公平锁和非公平锁的内部类,再就是ReadLock读锁类和WriteLock写锁类,这些是内部类。

基于上面的这些内部类,在构造方法方面同样在构造时也提供了默认无参的构造方法有参的构造方法,以及获取读锁的方法readLock()写锁方法writeLock()

进入源码再看看:

image.png

这里主要看声明的两个读写锁变量。

image.png

这里的构造方法中,无参构造中默认是非公平锁,有参的可以设置公平锁,同时新建读写锁对象。

image.png

这里主要注意两个读写锁的内部类和获取读写锁的方法。

接下来看看读写锁的简单应用:

package org.example;

import java.util.concurrent.locks.ReentrantReadWriteLock;

public class MyReadWriteLock {
    private int count;
    
    public static void main(String[] args) throws InterruptedException {
        MyReadWriteLock myReadWriteLock = new MyReadWriteLock();

        ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true); // 公平锁
        ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
        ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();

        Thread t1 = new Thread(() -> {
            for (int i = 0; i < 3; i++) {
                readLock.lock();
                System.out.println(Thread.currentThread().getName() + " count: " + myReadWriteLock.count);
                readLock.unlock();
            }
        }, "Read-Thread-1");

        Thread t2 = new Thread(() -> {
            try {
                for (int i = 0; i < 3; i++) {
                    writeLock.lock();
                    myReadWriteLock.count++;
                    Thread.sleep(500L);
                    System.out.println(Thread.currentThread().getName() + " count: " + myReadWriteLock.count);
                    writeLock.unlock();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }, "Write-Thread-1");

        Thread t3 = new Thread(() -> {
            for (int i = 0; i < 3; i++) {
                readLock.lock();
                System.out.println(Thread.currentThread().getName() + " count: " + myReadWriteLock.count);
                readLock.unlock();
            }
        }, "Read-Thread-2");

        t1.start();
        t2.start();
        t3.start();

        t1.join();
        t2.join();
        t3.join();

        System.out.println("Main Thread end!");

        /*
        Read-Thread-1 count: 0
        Read-Thread-2 count: 0
        Write-Thread-1 count: 1
        Read-Thread-2 count: 1
        Read-Thread-1 count: 1
        Write-Thread-1 count: 2
        Read-Thread-2 count: 2
        Read-Thread-1 count: 2
        Write-Thread-1 count: 3
        Main Thread end!
         */
    }
}

上面的 demo 中,我们新建了三个子线程,其中两个读线程,一个写线程,读读可同时进行,读写互斥。

参考: