1 读写分离锁设计模式
场景:多线程对共享资源进行读写操作时同时读是可以的;某线程在进行写操作,则其他线程无法进行读写操作;某些线程在执行读操作,则其他线程无法进行写操作。
==当读操作明显多于写操作时,对读操作不加锁明显能提升性能。==
读写分离锁的实现
Lock接口以及其子类WriteLock、ReadLock,ReadWriteLock接口用于创建read lock和write lock,并提供查询reader和writer数量的方法;
ReadLock实现:
lock方法:获取排他锁,检查当前writer数量是否大于0,大于0表示有线程在进行写操作,则wait;小于等于0表示没有其他线程在写,则成功获取锁,并将reader数量加1;
1 | public void lock() throws InterruptedException { |
unlock方法:获取排它锁,然后将reader数量减1,并notifyAll其他等待的线程。
WriteLock实现:
lock方法:获取排它锁,将等待的writer数量加1,然后检查reader和writer的数量,大于0则wait,都小于等于0表示可进行写操作,将等待的writer数量减1,然后对writer数量加1;
1 | public void lock() throws InterruptedException { |
unlock方法:获取排它锁,将writer数量减1,并notifyAll其他等待的线程。
1 | public void unlock() { |
排它锁的使用是为了保护reader和writer计数器。
技巧:可设置一个写偏好变量,当读操作获取到锁时,将写偏好变量设置为true,表示读操作完成后写操作优先去获取锁(在读锁的lock方法中检查偏好变量是否为true);当写操作完成时释放锁,并将写操作偏好设置为false,让读操作有更多机会获取锁。
2 Future设计
核心思路:将任务提交后,返回一个Future对象。当任务执行线程在任务完成后将返回值写回到Future中,并设置完成标识,并notifyAll所有wait结果的调用方。如果任务还没有完成,调用方通过future.get获取返回值时,future内部会检查是否任务已经完成,如果没有完成,则进入wait,等待完成后的notify,已完成则返回结果。
核心代码
任务提交:
1 | public Future<?> submit(Task<IN, OUT> task, IN input) { |
Future的设置返回值finish方法实现:
1 | protected void finish(T result) { |
Future获取返回值的get方法实现:
1 | public T get() throws InterruptedException { |
补充
- get方法可以加一个超时功能,wait到一定时间没有返回则抛出异常。
3 上下文设计模式
使用ThreadLocal可以实现线程间数据隔离。
早期版本ThreadLocal内存泄漏问题:因为早起ThreadLocal内部是与当前线程实例相关联的(用HashMap),所以当线程结束后,Thread实例和存储的数据依然存在,随着系统运行时间的增长可能会越来越多,最终导致内存泄漏。
ThreadLocal内部结构:threadLocal.get时,通过当前线程实例thread获取到thread.threadLocalMap,threadLocalMap的key就是threadLocal实例,threadLocalMap.getEntry(threadLocal)就得到了封装的值entry,Entry是WeakReferencede的子类,Entry中的key就是threadLocal实例,并作为弱引用存在。
当threadLocal设置为null之后,GC会回收entry中的key,但是thread.threadLocalMap中依然有指向entry的引用,所以value不能被回收。如果thread被回收,那么后续的threadLocalMap以及entry中的value也就被回收了。
内存泄漏和内存溢出:泄漏是指应用程序中已经不再持有某个对象的引用,但是GC无法回收,因为其到引用跟root的链路是可达的,所以无法回收。溢出是指内存不足,可见内存泄漏是导致内存溢出的原因之一。
ThreadLocal如何避免内存泄漏:
- 使用WeakReference,JVM中触发任意GC都会导致WeakReference被回收;
- get或set数据时检查并清除已被GC回收的数据。
4 Latch设计模式
一句话介绍:只有所有条件都满足时,门阀才打开。
实现:属性limit记录需要满足的条件个数,当limit降为0时打开门阀。
1 | public void await() { |
线程任务调用await方法等待limit降为0,当其他线程执行完之后调用countDown方法对limit减1,然后通知其他等待线程。
带超时功能的实现:
1 | public void await(TimeUnit unit, long time) throws TimeoutException { |
记录超时时间点,每次检查limit时都重新计算剩余时间,如果剩余时间小于等于0,则超时。
4 几种引用类型介绍
SoftReference: 当内存不足时,GC会回收Soft Reference;
WeakReference: 无论是young GC还是full GC,都会被回收;
PhantomReference: 必须和ReferenceQueue配合使用,它的get方法始终返回null,当Phantom Reference被回收后,会将其插入到ReferenceQueue中,我们可以通过检查queue直到进行了一次GC。
5 Active Object模式
简介: 接收异步消息的主动对象,主动对象就是指其拥有自己的独立线程,它可以接收异步消息并返回处理的结果。
通用Active Object的设计
核心思路:服务类中以@ActiveMethod标注需要Active的方法,创建服务类的代理对象,当调用方法时,代理方法中检查是否标注了@ActiveMethod注解,如果没有则直接反射调用并返回结果,如果有,则将方法、参数以及服务对象封装成message放入队列中,并返回Future,异步线程负责消费队列中的message,调用方可通过Future的get方法拿到最终的返回值。
核心代码:
ActiveMessage的execute方法:
1 | public void execute() { |
我们将代理的对象、方法以及参数封装成ActiveMessage,执行线程取到message时调用execute方法执行,结果通过future.finish设置到future中,外部可通过get方法拿到返回值。
ActiveServiceFactory生成代理对象:
1 | public static <T> T active(T instance) { |
使用ActiveInvocationHandler代理instance实例,使用JDK动态代理技术,所以代理类必须有继承的接口。
ActiveInvocationHandler的invoke方法:
1 | public void invoke(Object proxy, Method method, Object[] args) throws Throwable { |
如果方法注解了@ActiveMethod,如果方法有返回值则必须返回Future,将method、instance、args以及future封装成ActiveMessage放到queue中,然后将future返回。如果没有标注@ActiveMethod,则直接返回调用值。
守护线程进行消息消费
1 | public void run() { |
从队列中取出message进行执行。
6 Event Bus设计模式
消息中间件主要用于解决进程之间消息异步处理的解决方案。
核心接口设计
Bus接口,对外提供post方法用来发送event到topic上,register方法用来注册订阅者subscriber;
Registry注册表,用于记录对应的订阅者subscriber,以及受理消息的回调方法,回调方法我们用注解@Subscriber标识;
Dispatcher分发器,用来将event广播给注册表中监听了topic的订阅者。
设计思路
将订阅者注册到Registry中,注册信息中有订阅者关注的topic以及对应的处理方法。Dispatcher通过topic从注册表Registry中查找对应topic的订阅者,然后调用订阅者的处理方法。Bus对外提供注册、订阅等方法,用户只需要通过Bus操作即可。
7 Event-Driven架构
EDA,事件驱动架构是一种实现组件之间松耦合、易扩展的架构方式。
主要包括:
Events: 事件处理的数据;
Event Handlers: 处理Events的方法;
Event Loop: 维护Events和Event Handlers之间的交互流程,通过对应的类型找到对应的Handler。
同步Event-Driven设计
Message接口,作为Event更高一层的抽象,提供getType方法,返回Event type;
Channel接口,提供dispatch(Message message)方法,用于处理一种类型的消息;
DynamicRouter接口,用于注册消息类型和Channel,以及dispatch方法来分发Message到对应的Channel,它的实现类要提供类型和Channel映射,具体实现如下:
1 | public class EventDispatcher implements DynamicRouter<Message> { |
测试:定义一个InputEvent、ResultEvent作为Event的实现,分别定义对应的Channel,并将Channel注册到dispatcher中,然后使用dispatcher.dispatch分发不同的Event,可以看到由对应的Channel处理。
异步Event-Driven设计
定义一个异步版的Channel - AsyncChannel,与同步版的区别在于执行的时候使用线程执行;
定义一个异步版的Dispatcher - AsyncEventDispatcher,其中map改用线程安全的ConcurrentHashMap。