Java高进进阶之FastThreadLocal源码详解(修复ThreadLocal的缺陷)

前言

ThreadLocal被ThreadLocalMap中的高进entry的key弱引用,如果出现GC的进阶情况时,

没有被其他对象引用,源码会被回收,详解修复但是缺陷ThreadLocal对应的value却不会回收,容易造成内存泄漏,高进这也间接导致了内存溢出以及数据假丢失;

那么问题来了,进阶有没有更高效的源码ThreadLocal有;

今天我们就来分析一波FastThreadLocalThread

一、FastThreadLocalThread源码分析

Netty为了在某些场景下提高性能,详解修复改进了jdk ThreadLocal,缺陷Netty实现的高进FastThreadLocal 优化了Java 原生 ThreadLocal 的访问速度,存储速度。进阶避免了检测弱引用带来的源码 value 回收难问题,和数组位置冲突带来的详解修复线性查找问题,解决这些问题并不是缺陷没有代价;

Netty实现的 FastThreadLocal 底层也是通过数组存储 value 对象,与Java原生ThreadLocal使用自身作为Entry的key不同,FastThreadLocal通过保存数组的全局唯一下标,实现了对value的快速访问。云服务器提供商同时FastThreadLocal 也实现了清理对象的方法;

1、FastThreadLocalThread

在Netty中,要使用 FastThreadLocal 实现线程本地变量需要将线程包装成 FastThreadLocalThread ,如果不是 FastThreadLocalThread ,会使用 slowThreadLocalMap的 ThreadLocal 来存储变量副本;

io.netty.util.concurrent.DefaultThreadFactory @Override public Thread newThread(Runnable r) {      Thread t = newThread(FastThreadLocalRunnable.wrap(r), prefix + nextId.incrementAndGet());     // 一般daemon为false,意思是不设置为守护线程     if (t.isDaemon() != daemon) {          t.setDaemon(daemon);     }     // 优先级 默认为5     if (t.getPriority() != priority) {          t.setPriority(priority);     }     return t; } protected Thread newThread(Runnable r, String name) {      return new FastThreadLocalThread(threadGroup, r, name); } 

FastThreadLocalThread 继承自Thread类,有如下成员变量:

io.netty.util.concurrent.FastThreadLocalThread // 任务执行完,是否清除FastThreadLocal的标记 private final boolean cleanupFastThreadLocals; // 类似于Thread类中ThreadLocalMap,为了实现FastThreadLocal private InternalThreadLocalMap threadLocalMap; 

2、 InternalThreadLocalMap

FastThreadLocalThread.threadLocalMap 是 InternalThreadLocalMap 对象实例。在第一次获取FTL数据时,会初始化FastThreadLocalThread.threadLocalMap,调用的构造函数如下:

private InternalThreadLocalMap() {              //为了简便,InternalThreadLocalMap父类             //UnpaddedInternalThreadLocalMap不展开介绍             super(newIndexedVariableTable());         }         //默认的数组大小为32,且使用UNSET对象填充数组         //如果下标处数据为UNSET,则表示没有数据         private static Object[] newIndexedVariableTable() {              Object[] array = new Object[32];             Arrays.fill(array, UNSET);             return array;         } 

为了避免写时候影响同一cpu缓冲行的其他数据并发访问,其使用了缓存行填充技术 (cpu 缓冲行填充),在类定义中声明了如下long字段进行填充;

//InternalThreadLocalMap // Cache line padding (must be public)  // With CompressedOops enabled, an instance of this class should occupy at least 128 bytes.  public long rp1, rp2, rp3, rp4, rp5, rp6, rp7, rp8, rp9; 

FTL使用的数组下标是InternalThreadLocalMap中的静态变量nextIndex统一递增生成的:

static final AtomicInteger nextIndex = new AtomicInteger(); public static int nextVariableIndex() {      //Netty中所有FTL数组下标都是通过递增这个静态变量实现的云服务器     //采用静态变量生成所有FTL元素在数组中的下标会造成一个问题,     //会造成InternalThreadLocalMap中数组不必要的自动扩容     int index = nextIndex.getAndIncrement();     if (index < 0) {          nextIndex.decrementAndGet();         throw new IllegalStateException("too many thread-local indexed variables");     }     return index; } 

InternalThreadLocalMap.nextVariableIndex()方法获取FTL在该FastThreadLocalThread.threadLocalMap数组下标,因为InternalThreadLocalMap.nextVariableIndex() 使用静态域 nextIndex 递增维护所有FTL的下标,会造成后面实例化的 FTL 下标过大,如果FTL下标大于其对应 FastThreadLocalThread.threadLocalMap 数组的长度,会进行数组的自动扩容,如下:

private void expandIndexedVariableTableAndSet(int index, Object value) {      Object[] oldArray = indexedVariables;     final int oldCapacity = oldArray.length;     //下面复杂的实现是为了将newCapacity规范为最接近的一个2的指数,      //这段代码在早期的 jdk HashMap 中见过     int newCapacity = index;     newCapacity |= newCapacity >>>  1;     newCapacity |= newCapacity >>>  2;     newCapacity |= newCapacity >>>  4;     newCapacity |= newCapacity >>>  8;     newCapacity |= newCapacity >>> 16;     newCapacity ++;     Object[] newArray = Arrays.copyOf(oldArray, newCapacity);     Arrays.fill(newArray, oldCapacity, newArray.length, UNSET);     newArray[index] = value;     indexedVariables = newArray; } 

3、FastThreadLocal

构造函数:

有两个重要的下标域,FTL不仅在FastThreadLocalThread.threadLocalMap中保存了用户实际使用的value(在数组中的下标为index),还在数组中保存为了实现清理记录的相关数据,也即下标variablesToRemoveIndex,一般情况 variablesToRemoveIndex = 0;因为variablesToRemoveIndex 是静态变量,所以全局唯一;

//如果在该FTL中放入了数据,也就实际调用了其set或get函数,会在        //该FastThreadLocalThread.threadLocalMap数组的        // variablesToRemoveIndex下标处放置一个IdentityHashMap,        //并将该FTL放入IdentityHashMap中,在后续清理时会取出        //variablesToRemoveIndex下标处的云南idc服务商IdentityHashMap进行清理         private static final int variablesToRemoveIndex = InternalThreadLocalMap.nextVariableIndex();        //在threadLocalMap数组中存放实际数据的下标         private final int index;         public FastThreadLocal() {              index = InternalThreadLocalMap.nextVariableIndex();         } 

用户可扩展的函数:

//初始化 value 函数 protected V initialValue() throws Exception {      return null; } //让使用者在该FTL被移除时可以有机会做些操作。 protected void onRemoval(@SuppressWarnings("UnusedParameters") V value) throws Exception {  } 

FastThreadLocalThread

cleanupFastThreadLocals 字段在 4.1 的最新版本中已经没有在用到了

/**  * true,表示FTL会在线程结束时被主动清理 见  FastThreadLocalRunnable 类  * false,需要将FTL放入后台清理线程的队列中  */ // This will be set to true if we have a chance to wrap the Runnable. //这个字段则用于标识该线程在结束时是否会主动清理FTL private final boolean cleanupFastThreadLocals; //次对象将在 第一次 FastThreadLocal.get 和 FastThreadLocal.set 时候创建 private InternalThreadLocalMap threadLocalMap; public FastThreadLocalThread(Runnable target) {      super(FastThreadLocalRunnable.wrap(target));     cleanupFastThreadLocals = true; } 

4、 set 方法

public final void set(V value) {      //判断设置的 value 值是否是缺省值     if (value != InternalThreadLocalMap.UNSET) {          //获取当前线程的 InternalThreadLocalMap , 如果当前线程为FastThreadLocalThread,那么直接通过threadLocalMap引用获取         //否则通过 jdk 原生的 threadLocal 获取         InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();         //FastThreadLocal 对应的 index 下标的 value 替换成新的 value         setKnownNotUnset(threadLocalMap, value);     } else {          //如果放置的对象为UNSET,则表示清理,会对该FTL进行清理,类似毒丸对象         remove();     } } 

这里扩容方会调用 InternalThreadLocalMap.expandIndexedVariableTableAndSet

private void setKnownNotUnset(InternalThreadLocalMap threadLocalMap, V value) {      //在数组下标index处放置实际对象,如果index大于数组length,会进行数组扩容.     if (threadLocalMap.setIndexedVariable(index, value)) {          //放置成功之后,将该FTL加入到 variablesToRemoveIndex 下标的         //IdentityHashMap,等待后续清理         addToVariablesToRemove(threadLocalMap, this);     } }  /**  * 该FTL加入到variablesToRemoveIndex下标的IdentityHashMap  * IdentityHashMap的特性可以保证同一个实例不会被多次加入到该位置  */ @SuppressWarnings("unchecked") private static void addToVariablesToRemove(InternalThreadLocalMap threadLocalMap, FastThreadLocal<?> variable) {      //获取 variablesToRemoveIndex下标处的 IdentityHashMap     Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);     Set<FastThreadLocal<?>> variablesToRemove;     //如果是第一次获取,则 variablesToRemoveIndex下标处的值为 UNSET     if (v == InternalThreadLocalMap.UNSET || v == null) {          //新建一个新的 IdentityHashMap 并         variablesToRemove = Collections.newSetFromMap(new IdentityHashMap<FastThreadLocal<?>, Boolean>());         //放入到下标variablesToRemoveIndex处         threadLocalMap.setIndexedVariable(variablesToRemoveIndex, variablesToRemove);     } else {          variablesToRemove = (Set<FastThreadLocal<?>>) v;     }     //将该FTL放入该IdentityHashMap中     variablesToRemove.add(variable); } 

下面看InternalThreadLocalMap.get()实现:

public static InternalThreadLocalMap get() {      Thread thread = Thread.currentThread();     //首先看当前 thread 是否为FastThreadLocalThread实例     //如果是的话,可以快速通过引用,获取到其 threadLocalMap     if (thread instanceof FastThreadLocalThread) {          return fastGet((FastThreadLocalThread) thread);     } else {          //如果不是,则 jdk 原生慢速获取到其 threadLocalMap         return slowGet();     } } 

5、 get 方法

get方法极为简单,实现如下:

===========================FastThreadLocal========================== public final V get() {          InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();         Object v = threadLocalMap.indexedVariable(index);         if (v != InternalThreadLocalMap.UNSET) {              return (V) v;         }         return initialize(threadLocalMap);     } 

首先获取当前线程的map,然后根据 FastThreadLocal的index 获取value,然后返回,如果是空对象,则通过 initialize 返回,initialize 方法会将返回值设置到 map 的槽位中,并放进 Set 中;

initialize ============================FastThreadLocal========================== private V initialize(InternalThreadLocalMap threadLocalMap) {      V v = null;     try {          //1、获取初始值         v = initialValue();     } catch (Exception e) {          throw new RuntimeException(e);     }     // 2、设置value到InternalThreadLocalMap中     threadLocalMap.setIndexedVariables(index, v);     // 3、添加当前的FastThreadLocal到InternalThreadLocalMap的Set<FastThreadLocal<?>>中     addToVariablesToRemove(threadLocalMap, this);     return v; } //初始化参数:由子类复写 protected V initialValue() throws Exception {      return null; }  获取 ThreadLocalMap 直接通过索引取出对象 如果为空那么调用初始化方法初始化

6、ftl的资源回收机制

netty中ftl的两种回收机制回收机制:

自动:使用ftlt执行一个被FastThreadLocalRunnable wrap的Runnable任务,在任务执行完毕后会自动进行ftl的清理;

手动:ftl和InternalThreadLocalMap都提供了remove方法,在合适的时候用户可以(有的时候也是必须,例如普通线程的线程池使用ftl)手动进行调用,进行显示删除;

FastThreadLocalRunnable final class FastThreadLocalRunnable implements Runnable {      private final Runnable runnable;     @Override     public void run() {          try {              runnable.run();         } finally {              FastThreadLocal.removeAll();         }     }     static Runnable wrap(Runnable runnable) {          return runnable instanceof FastThreadLocalRunnable                  ? runnable : new FastThreadLocalRunnable(runnable);     } } 

如果将线程执行的任务包装成 FastThreadLocalRunnable,那么在任务执行完后自动删除ftl的资源。

===============================FastThreadLocal=========================== public static void removeAll() {      // 获取到map     InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.getIfSet();     if (threadLocalMap == null) {          return;     }     try {          // 获取到Set<FastThreadLocal>集合         Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);         if (v != null && v != InternalThreadLocalMap.UNSET) {              @SuppressWarnings("unchecked")             Set<FastThreadLocal<?>> variablesToRemove = (Set<FastThreadLocal<?>>) v;             // 将Set转换为数组             FastThreadLocal<?>[] variablesToRemoveArray =                     variablesToRemove.toArray(new FastThreadLocal[variablesToRemove.size()]);             // 遍历数组,删除每一个FastThreadLocal对应的value             for (FastThreadLocal<?> tlv: variablesToRemoveArray) {                  tlv.remove(threadLocalMap);             }         }     } finally {          // 删除当前线程的InternalThreadLocalMap         InternalThreadLocalMap.remove();     } } public static void remove() {      Thread thread = Thread.currentThread();     if (thread instanceof FastThreadLocalThread) {           // 将FastThreadLocalThread 内部的map置位null         ((FastThreadLocalThread) thread).setThreadLocalMap(null);     } else {          // 将 ThreadLocal内部ThreadLocalMap 中的value置位null         slowThreadLocalMap.remove();     } } 

remove方法:

===============================FastThreadLocal========================== private void remove() {      remove(InternalThreadLocalMap.getIfSet()); } private void remove(InternalThreadLocalMap threadLocalMap) {      if (threadLocalMap == null) {          return;     }     // 从 InternalThreadLocalMap 中删除当前的FastThreadLocal对应的value并设UNSET     Object v = threadLocalMap.removeIndexedVariable(index);     // 从 InternalThreadLocalMap 中的Set<FastThreadLocal<?>>中删除当前的FastThreadLocal对象     removeFromVariablesToRemove(threadLocalMap, this);     // 如果删除的是有效值,则进行onRemove方法的回调     if (v != InternalThreadLocalMap.UNSET) {          try {              // 回调子类复写的onRemoved方法,默认为空实现             onRemoved((V) v);         } catch (Exception e) {              throw new RuntimeException(e);         }     } } 

总结

只有不断的学习,不断的找到自己的缺点才可以进步;

一起加油;

IT科技
上一篇:并非一个好米任何人都会给你一个好的价格。那你该如何用以有的好米卖出最理想的价格呢?
下一篇:2016年1月1日:注册价格将降至每年7欧元。