多线程常用设计

1 读写分离锁设计模式

场景:多线程对共享资源进行读写操作时同时读是可以的;某线程在进行写操作,则其他线程无法进行读写操作;某些线程在执行读操作,则其他线程无法进行写操作。

==当读操作明显多于写操作时,对读操作不加锁明显能提升性能。==

读写分离锁的实现

Lock接口以及其子类WriteLock、ReadLock,ReadWriteLock接口用于创建read lock和write lock,并提供查询reader和writer数量的方法;

ReadLock实现:

lock方法:获取排他锁,检查当前writer数量是否大于0,大于0表示有线程在进行写操作,则wait;小于等于0表示没有其他线程在写,则成功获取锁,并将reader数量加1;

1
2
3
4
5
6
7
8
public void lock() throws InterruptedException {
synchronized(readWriteLock.getMutex()) {
while(readWriteLock.getWritingWriters() > 0 || (readWriteLock.getPreferWrite() && readWriteLock.getWaitingWriters() > 0)) {
readWriteLock.getMutex().wait();
}
readWriteLock.incrementReadingReaders();
}
}

unlock方法:获取排它锁,然后将reader数量减1,并notifyAll其他等待的线程。

WriteLock实现:

lock方法:获取排它锁,将等待的writer数量加1,然后检查reader和writer的数量,大于0则wait,都小于等于0表示可进行写操作,将等待的writer数量减1,然后对writer数量加1;

1
2
3
4
5
6
7
8
9
10
11
12
13
public void lock() throws InterruptedException {
synchronized(readWriteLock.getMutex()) {
try {
readWriteLock.incrementWaitingWriters();
while(readWriteLock.getReadingReaders() > 0 || readWriteLock.getWritingWriters() > 0) {
readWriteLock.getMutex().wait();
} finally {
readWriteLock.decrementWaitingWriters();
}
readWriteLock.incrementWritingWriters();
}
}
}

unlock方法:获取排它锁,将writer数量减1,并notifyAll其他等待的线程。

1
2
3
4
5
6
7
public void unlock() {
synchronized(readWriteLock.getMutex()) {
readWriteLock.decrementWritingWriters();
readWriteLock.changePrefer(false);
readWriteLock.getMutex().notifyAll();
}
}

排它锁的使用是为了保护reader和writer计数器。

技巧:可设置一个写偏好变量,当读操作获取到锁时,将写偏好变量设置为true,表示读操作完成后写操作优先去获取锁(在读锁的lock方法中检查偏好变量是否为true);当写操作完成时释放锁,并将写操作偏好设置为false,让读操作有更多机会获取锁。

2 Future设计

核心思路:将任务提交后,返回一个Future对象。当任务执行线程在任务完成后将返回值写回到Future中,并设置完成标识,并notifyAll所有wait结果的调用方。如果任务还没有完成,调用方通过future.get获取返回值时,future内部会检查是否任务已经完成,如果没有完成,则进入wait,等待完成后的notify,已完成则返回结果。

核心代码

任务提交:

1
2
3
4
5
6
7
8
public Future<?> submit(Task<IN, OUT> task, IN input) {
final FutureTask<OUT> future = new FutureTask<>();
new Thread(() -> {
OUT result = task.get(input);
future.finish(result);
}).start();
return future;
}

Future的设置返回值finish方法实现:

1
2
3
4
5
6
7
8
protected void finish(T result) {
synchronized(LOCK) {
if(isDone) return;
this.result = result;
this.isDone = true;
LOCK.notifyAll();
}
}

Future获取返回值的get方法实现:

1
2
3
4
5
6
7
8
public T get() throws InterruptedException {
synchronized(LOCK) {
while(!isDone) {
LOCK.wait();
}
return result;
}
}

补充

  • get方法可以加一个超时功能,wait到一定时间没有返回则抛出异常。

3 上下文设计模式

使用ThreadLocal可以实现线程间数据隔离。

早期版本ThreadLocal内存泄漏问题:因为早起ThreadLocal内部是与当前线程实例相关联的(用HashMap),所以当线程结束后,Thread实例和存储的数据依然存在,随着系统运行时间的增长可能会越来越多,最终导致内存泄漏。

ThreadLocal内部结构:threadLocal.get时,通过当前线程实例thread获取到thread.threadLocalMap,threadLocalMap的key就是threadLocal实例,threadLocalMap.getEntry(threadLocal)就得到了封装的值entry,Entry是WeakReferencede的子类,Entry中的key就是threadLocal实例,并作为弱引用存在。

当threadLocal设置为null之后,GC会回收entry中的key,但是thread.threadLocalMap中依然有指向entry的引用,所以value不能被回收。如果thread被回收,那么后续的threadLocalMap以及entry中的value也就被回收了。

内存泄漏和内存溢出:泄漏是指应用程序中已经不再持有某个对象的引用,但是GC无法回收,因为其到引用跟root的链路是可达的,所以无法回收。溢出是指内存不足,可见内存泄漏是导致内存溢出的原因之一。

ThreadLocal如何避免内存泄漏:

  1. 使用WeakReference,JVM中触发任意GC都会导致WeakReference被回收;
  2. get或set数据时检查并清除已被GC回收的数据。

4 Latch设计模式

一句话介绍:只有所有条件都满足时,门阀才打开。

实现:属性limit记录需要满足的条件个数,当limit降为0时打开门阀。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public void await() {
synchronized(this) {
while(limit > 0) {
this.wait();
}
}
}

public void countDown() {
synchronized(this) {
if(limit <= 0) {
throw new IllegalStateException("");
}
limit--;
this.notifyAll();
}
}

线程任务调用await方法等待limit降为0,当其他线程执行完之后调用countDown方法对limit减1,然后通知其他等待线程。

带超时功能的实现:

1
2
3
4
5
6
7
8
9
10
11
12
public void await(TimeUnit unit, long time) throws TimeoutException {
if(time <= 0) throw new IllegalArgumentException("参数不合法")
long remainingNanos = unit.toNanos(time);
final long endNanos = System.nanoTime() + remainingNanos;
synchronized(this) {
while(limit > 0) {
if(TimeUnit.NANOSECONDS.toMillis(remainingNanos) <= 0) throw new TimeoutException("time out");
this.wait(TimeUnit.NANOSECONDS.toMillis(remainingNanos));
remainingNanos = endNanos - System.nanoTime();
}
}
}

记录超时时间点,每次检查limit时都重新计算剩余时间,如果剩余时间小于等于0,则超时。

4 几种引用类型介绍

SoftReference: 当内存不足时,GC会回收Soft Reference;

WeakReference: 无论是young GC还是full GC,都会被回收;

PhantomReference: 必须和ReferenceQueue配合使用,它的get方法始终返回null,当Phantom Reference被回收后,会将其插入到ReferenceQueue中,我们可以通过检查queue直到进行了一次GC。

5 Active Object模式

简介: 接收异步消息的主动对象,主动对象就是指其拥有自己的独立线程,它可以接收异步消息并返回处理的结果。

通用Active Object的设计

核心思路:服务类中以@ActiveMethod标注需要Active的方法,创建服务类的代理对象,当调用方法时,代理方法中检查是否标注了@ActiveMethod注解,如果没有则直接反射调用并返回结果,如果有,则将方法、参数以及服务对象封装成message放入队列中,并返回Future,异步线程负责消费队列中的message,调用方可通过Future的get方法拿到最终的返回值。

核心代码:

ActiveMessage的execute方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
public void execute() {
try {
Object result = method.invoke(instance, args);
if(future != null) {
Object realResult = ((Future<?>)result).get();
future.finish(realResult);
}
} catch(Exception e) {
if(future != null) {
future.finish(null);
}
}
}

我们将代理的对象、方法以及参数封装成ActiveMessage,执行线程取到message时调用execute方法执行,结果通过future.finish设置到future中,外部可通过get方法拿到返回值。

ActiveServiceFactory生成代理对象:

1
2
3
4
public static <T> T active(T instance) {
Object proxy = Proxy.newInstance(instance.getClass().getClassLoader(), instance.getClass().getInterfaces(), new ActiveInvocationHandler<>(instance));
return (T) proxy;
}

使用ActiveInvocationHandler代理instance实例,使用JDK动态代理技术,所以代理类必须有继承的接口。

ActiveInvocationHandler的invoke方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public void invoke(Object proxy, Method method, Object[] args) throws Throwable {
if(method.isAnnotationPresent(ActiveMethod.class)) {
this.checkMethod(method);
ActiveMessage.Builder builder = new ActiveMessage.Builder();
builder.useMethod(method).withArgs(args).forService(instance);
Object result = null;
if(this.isReturnFutureType(method)) {
result = new ActiveFuture<>();
builder.returnFuture((ActiveFuture)result);
}
queue.offer(builder.build());
return result;
} else {
return method.invoke(instance, args);
}
}

如果方法注解了@ActiveMethod,如果方法有返回值则必须返回Future,将method、instance、args以及future封装成ActiveMessage放到queue中,然后将future返回。如果没有标注@ActiveMethod,则直接返回调用值。

守护线程进行消息消费

1
2
3
4
5
6
public void run() {
for(;;) {
ActiveMessage activeMessage = this.queue.take();
activeMessage.execute();
}
}

从队列中取出message进行执行。

6 Event Bus设计模式

消息中间件主要用于解决进程之间消息异步处理的解决方案。

核心接口设计

Bus接口,对外提供post方法用来发送event到topic上,register方法用来注册订阅者subscriber;

Registry注册表,用于记录对应的订阅者subscriber,以及受理消息的回调方法,回调方法我们用注解@Subscriber标识;

Dispatcher分发器,用来将event广播给注册表中监听了topic的订阅者。

设计思路

将订阅者注册到Registry中,注册信息中有订阅者关注的topic以及对应的处理方法。Dispatcher通过topic从注册表Registry中查找对应topic的订阅者,然后调用订阅者的处理方法。Bus对外提供注册、订阅等方法,用户只需要通过Bus操作即可。

7 Event-Driven架构

EDA,事件驱动架构是一种实现组件之间松耦合、易扩展的架构方式。

主要包括:

Events: 事件处理的数据;

Event Handlers: 处理Events的方法;

Event Loop: 维护Events和Event Handlers之间的交互流程,通过对应的类型找到对应的Handler。

同步Event-Driven设计

Message接口,作为Event更高一层的抽象,提供getType方法,返回Event type;

Channel接口,提供dispatch(Message message)方法,用于处理一种类型的消息;

DynamicRouter接口,用于注册消息类型和Channel,以及dispatch方法来分发Message到对应的Channel,它的实现类要提供类型和Channel映射,具体实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class EventDispatcher implements DynamicRouter<Message> {
private final Map<Class<? extends Message>, Channel> routerTable;

public EventDispatcher() {
routerTable = new HashMap<>();
}

public void dispatch(Message message) {
if(routerTable.containsKey(message.getType())) {
routerTable.get(message.getType()).dispatch(message);
} else {
throw new MessageMatcherException("Can't match the channel for [" + message.getType() + "] type");
}
}

public void registerChannel(Class<? extends Message> messageType, Channel<? extends Message> channel) {
this.routerTable.put(messageType, channel);
}

}

测试:定义一个InputEvent、ResultEvent作为Event的实现,分别定义对应的Channel,并将Channel注册到dispatcher中,然后使用dispatcher.dispatch分发不同的Event,可以看到由对应的Channel处理。

异步Event-Driven设计

定义一个异步版的Channel - AsyncChannel,与同步版的区别在于执行的时候使用线程执行;

定义一个异步版的Dispatcher - AsyncEventDispatcher,其中map改用线程安全的ConcurrentHashMap。