Dubbo(三)_spi

发布时间 2023-07-29 23:52:19作者: Stitches

Dubbo SPI源码分析

Dubbo SPI 的核心实现是 ExtensionLoader,分析时先分析 ExtensionLoader 的成员变量和对公方法,依次分析扩展点的加载、扩展点的依赖注入、扩展点的自适应、扩展点的激活。

分析中的名词约定:

  • 扩展点————扩展点实例化的对象,如org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol();
  • 扩展点类型————扩展点的接口,如org.apache.dubbo.rpc.Protocol
  • 扩展点 Class————扩展点实例化对象的 Class,如org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol

核心容器:

  • 配置类:SERVICES_DIRECTORYDUBBO_DIRECTORYDUBBO_INTERNAL_DIRECTORY 这三个目录下的扩展点配置都会被加载进来;
  • 全局容器类:EXTENSION_LOADERS 存储的是 ExtensionLoader 的容器,保存所有扩展点Class到 ExtensionLoader的映射、EXTENSION_INSTANCES 存储的是扩展点Class 到扩展点的映射。
  • 局部容器类:cachedInstances 保存的是扩展点对象;cachedAdaptiveInstance 存储自适应扩展点对象、cachedAdaptiveClass 存储自适应扩展点 Class。
  • 功能工具类:cachedNames 存储所有扩展点 Class 到扩展点定义 Key的映射、cachedClasses 存储所有扩展点名称到扩展点 Class的映射、cachedActivates 存储扩展点 Key到激活对象的映射。

ExtensionLoader 成员变量

public class ExtensionLoader<T> {

    private static final Logger logger = LoggerFactory.getLogger(ExtensionLoader.class);
    // 加载扩展点实例化对象的文件路径
    private static final String SERVICES_DIRECTORY = "META-INF/services/";
    private static final String DUBBO_DIRECTORY = "META-INF/dubbo/";
    private static final String DUBBO_INTERNAL_DIRECTORY = DUBBO_DIRECTORY + "internal/";

    // 解析扩展点对象名称的正则表达式
    private static final Pattern NAME_SEPARATOR = Pattern.compile("\\s*[,]+\\s*");

    // Key:扩展点类型  Value:ExtensionLoader对象  每种扩展点接口对应一个 ExtensionLoader对象
    private static final ConcurrentMap<Class<?>, ExtensionLoader<?>> EXTENSION_LOADERS = new ConcurrentHashMap<>();

    // 扩展点实例容器,存储所有实例化后的扩展点对象
    private static final ConcurrentMap<Class<?>, Object> EXTENSION_INSTANCES = new ConcurrentHashMap<>();

    private final Class<?> type;

    private final ExtensionFactory objectFactory;

    // 扩展点Class————扩展点名
    private final ConcurrentMap<Class<?>, String> cachedNames = new ConcurrentHashMap<>();

    // 扩展点名————扩展点Class
    private final Holder<Map<String, Class<?>>> cachedClasses = new Holder<>();

    // 激活的扩展点容器,Key:扩展点名  Value:激活的扩展点对象
    private final Map<String, Object> cachedActivates = new ConcurrentHashMap<>();

    // 扩展点对象的 Holder 容器,用于保存已经实例化的扩展点对象
    private final ConcurrentMap<String, Holder<Object>> cachedInstances = new ConcurrentHashMap<>();

    // 自适应扩展点对象的持有者,持有了Dubbo生成的自适应扩展点对象
    private final Holder<Object> cachedAdaptiveInstance = new Holder<>();

    // 自适应扩展点Class 持有者,持有了Dubbo生成的自适应扩展点类
    private volatile Class<?> cachedAdaptiveClass = null;

    // 默认扩展点名称,@SPI注解里的 Value值
    private String cachedDefaultName;
    private volatile Throwable createAdaptiveInstanceError;

    private Set<Class<?>> cachedWrapperClasses;

    private Map<String, IllegalStateException> exceptions = new ConcurrentHashMap<>();

ExtensionLoader 对公方法

ExtensionLoader 的核心对公方法包括:

  • getExtensionLoader(Class class):获取扩展点接口 Class对应的 ExtensionLoader 对象;
  • getExtension(String key):根据 key获取扩展点;
  • getAdaptiveExtension():获取自适应的扩展点;
  • getActivateExtension(URL, String, String):获取激活的扩展点集合。

ExtensionLoader 对象初始化原理

public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> type) {
    // 扩展点类型不为空
    if (type == null) {
        throw new IllegalArgumentException("Extension type == null");
    }
    // 扩展点类型必须是接口
    if (!type.isInterface()) {
        throw new IllegalArgumentException("Extension type (" + type + ") is not an interface!");
    }
    // 扩展点类型必须有 @SPI 注解
    if (!withExtensionAnnotation(type)) {
        throw new IllegalArgumentException("Extension type (" + type +
                ") is not an extension, because it is NOT annotated with @" + SPI.class.getSimpleName() + "!");
    }

    // 根据扩展点类型获取 ExtensionLoader,每种扩展点都拥有一个 ExtensionLoader,全部保存在 EXTENSION_LOADERS中
    ExtensionLoader<T> loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
    if (loader == null) {
        EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader<T>(type));
        loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
    }
    return loader;
}

所以 ExtensionLoader 的初始化包括:

  1. 校验参数,必须是带有 @SPI 注解的接口类型;
  2. 实例化 ExtensionLoader并且保存到全局变量 EXTENSION_LOADERS中;

获取扩展点的原理

第一次获取扩展点主要对应了 public T getExtension(String name) 方法。

public T getExtension(String name) {
    // 扩展点名称非空
    if (StringUtils.isEmpty(name)) {
        throw new IllegalArgumentException("Extension name == null");
    }
    // true代表获取默认扩展点
    if ("true".equals(name)) {
        return getDefaultExtension();
    }
    //从 cachedInstances中获取扩展点Holder,如果没有就创建新的
    final Holder<Object> holder = getOrCreateHolder(name);
    Object instance = holder.get();
    if (instance == null) {
        synchronized (holder) {
            instance = holder.get();
            if (instance == null) {
                //根据名称创建扩展点
                instance = createExtension(name);
                holder.set(instance);
            }
        }
    }
    return (T) instance;
}
//ExtensionLoader.getOrCreateHolder()
private Holder<Object> getOrCreateHolder(String name) {
    //先从cachedInstances里获取Holder
    Holder<Object> holder = cachedInstances.get(name);
    //没有就创建一个
    if (holder == null) {
        cachedInstances.putIfAbsent(name, new Holder<>());
        holder = cachedInstances.get(name);
    }
    return holder;
}

加载扩展点的核心方法:

private T createExtension(String name) {
    // 加载扩展点Class,包含了解析配置文件、加载类等逻辑
    Class<?> clazz = getExtensionClasses().get(name);
    if (clazz == null) {
        throw findException(name);
    }
    try {
        T instance = (T) EXTENSION_INSTANCES.get(clazz);
        // 如果扩展点不存在,就创建一个新的对象
        if (instance == null) {
            EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance());
            instance = (T) EXTENSION_INSTANCES.get(clazz);
        }
        // 依赖注入,后面单独讲
        injectExtension(instance);
        Set<Class<?>> wrapperClasses = cachedWrapperClasses;
        if (CollectionUtils.isNotEmpty(wrapperClasses)) {
            for (Class<?> wrapperClass : wrapperClasses) {
                instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
            }
        }
        return instance;
    } catch (Throwable t) {
        throw new IllegalStateException("Extension instance (name: " + name + ", class: " +
                type + ") couldn't be instantiated: " + t.getMessage(), t);
    }
}

// 先从 cachedClasses 中获取扩展点 Class对象
private Map<String, Class<?>> getExtensionClasses() {
    Map<String, Class<?>> classes = cachedClasses.get();
    if (classes == null) {
        synchronized (cachedClasses) {
            classes = cachedClasses.get();
            if (classes == null) {
                // cachedClasses 中没有扩展点对象就设置
                classes = loadExtensionClasses();
                cachedClasses.set(classes);
            }
        }
    }
    return classes;
}
private Map<String, Class<?>> loadExtensionClasses() {
    cacheDefaultExtensionName();
    Map<String, Class<?>> extensionClasses = new HashMap<>();
    // 从配置目录里加载扩展点配置,并解析为 Map<扩展点名,扩展点Class>
    // 注意 1、META-INF/services/;2、META-INF/dubbo/;3、META-INF/dubbo/internal/  目录的配置文件都会被加载
    loadDirectory(extensionClasses, DUBBO_INTERNAL_DIRECTORY, type.getName());
    loadDirectory(extensionClasses, DUBBO_INTERNAL_DIRECTORY, type.getName().replace("org.apache", "com.alibaba"));
    loadDirectory(extensionClasses, DUBBO_DIRECTORY, type.getName());
    loadDirectory(extensionClasses, DUBBO_DIRECTORY, type.getName().replace("org.apache", "com.alibaba"));
    loadDirectory(extensionClasses, SERVICES_DIRECTORY, type.getName());
    loadDirectory(extensionClasses, SERVICES_DIRECTORY, type.getName().replace("org.apache", "com.alibaba"));
    return extensionClasses;
}

解析指定配置文件,并加载对应的类

private void loadDirectory(Map<String, Class<?>> extensionClasses, String dir, String type) {
    String fileName = dir + type;
    try {
        Enumeration<java.net.URL> urls;
        ClassLoader classLoader = findClassLoader();
        if (classLoader != null) {
            urls = classLoader.getResources(fileName);
        } else {
            urls = ClassLoader.getSystemResources(fileName);
        }
        if (urls != null) {
            while (urls.hasMoreElements()) {
                java.net.URL resourceURL = urls.nextElement();
                // 解析文件,并加载扩展点Class
                // 1、逐行读取文件
                // 2、解析出key和Class
                // 3、加载Class,并保存到各个缓存容器里,如cacheNames、cachedAdaptiveClass等
                loadResource(extensionClasses, classLoader, resourceURL);
            }
        }
    } catch (Throwable t) {
        logger.error("Exception occurred when loading extension class (interface: " +
                type + ", description file: " + fileName + ").", t);
    }
}

注意可以借鉴代码中加载资源的写法:

  1. 获取当前的类加载器,先从当前线程获取,没有再从当前类对象 Class 中获取;
  2. classLoader.getResources(xxx) 方法获取对应文件的 URL 资源集合;
  3. 遍历该资源集合,逐个解析每个资源,加载类。
// 获取类加载,当前线程——>Class对象——>系统类加载器
public static ClassLoader getClassLoader(Class<?> clazz) {
    ClassLoader cl = null;
    try {
        cl = Thread.currentThread().getContextClassLoader();
    } catch (Throwable ex) {
        // Cannot access thread context ClassLoader - falling back to system class loader...
    }
    if (cl == null) {
        // No thread context class loader -> use class loader of this class.
        cl = clazz.getClassLoader();
        if (cl == null) {
            // getClassLoader() returning null indicates the bootstrap ClassLoader
            try {
                cl = ClassLoader.getSystemClassLoader();
            } catch (Throwable ex) {
                // Cannot access system ClassLoader - oh well, maybe the caller can live with null...
            }
        }
    }
    return cl;
}
private void loadResource(Map<String, Class<?>> extensionClasses, ClassLoader classLoader, java.net.URL resourceURL) {
    try {
        try (BufferedReader reader = new BufferedReader(new InputStreamReader(resourceURL.openStream(), StandardCharsets.UTF_8))) {
            String line;
            while ((line = reader.readLine()) != null) {
                final int ci = line.indexOf('#');
                if (ci >= 0) {
                    line = line.substring(0, ci);
                }
                line = line.trim();
                if (line.length() > 0) {
                    try {
                        String name = null;
                        int i = line.indexOf('=');
                        if (i > 0) {
                            name = line.substring(0, i).trim();
                            line = line.substring(i + 1).trim();
                        }
                        if (line.length() > 0) {
                            // Class.forName() 加载对应的类
                            loadClass(extensionClasses, resourceURL, Class.forName(line, true, classLoader), name);
                        }
                    } catch (Throwable t) {
                        IllegalStateException e = new IllegalStateException("Failed to load extension class (interface: " + type + ", class line: " + line + ") in " + resourceURL + ", cause: " + t.getMessage(), t);
                        exceptions.put(line, e);
                    }
                }
            }
        }
    } catch (Throwable t) {
        logger.error("Exception occurred when loading extension class (interface: " +
                type + ", class file: " + resourceURL + ") in " + resourceURL, t);
    }
}

注意区分 Class.forName()classloader 的区别:Class.forName() 不仅加载类还进行类的初始化,classloader仅仅加载类。

@CallerSensitive
public static Class<?> forName(String className) 
                    throws ClassNotFoundException {
   Class<?> caller = Reflection.getCallerClass();
   // 第二个参数 true表示类加载后是否必须初始化,不初始化就是不执行static 的代码,即静态代码;
   return forName0(className, 
                   true, 
                   ClassLoader.getClassLoader(caller), 
                   caller);
}
public Class<?> loadClass(String name) 
            throws ClassNotFoundException {
    return loadClass(name, false);
}

protected Class<?> loadClass(String name, boolean resolve)
    throws ClassNotFoundException{
    synchronized (getClassLoadingLock(name)) {
        // 首先检查这个类是否已经被加载
        Class<?> c = findLoadedClass(name);
        if (c == null) {
            long t0 = System.nanoTime();
            try {
               //没有被加载使用父加载器继续加载
                if (parent != null) {
                    c = parent.loadClass(name, false);
                } else {
                    c = findBootstrapClassOrNull(name);
                }
            } catch (ClassNotFoundException e) {}
            if (c == null) {
                long t1 = System.nanoTime();
                c = findClass(name);
                sun.misc.PerfCounter.getParentDelegationTime().addTime(t1 - t0);        
                sun.misc.PerfCounter.getFindClassTime().addElapsedTimeFrom(t1);
                sun.misc.PerfCounter.getFindClasses().increment();
            }
        }
        //如果已经加载了,那就重新加载
       if (resolve) {
            resolveClass(c);
        }
        return c;
    }
}

Dubbo SPI扩展点的依赖注入实现原理

扩展点的依赖注入在方法 private T createExtension(String name) 中的 injectExtension(instance) 被调用。

private T injectExtension(T instance) {

    if (objectFactory == null) {
        return instance;
    }

    try {
        for (Method method : instance.getClass().getMethods()) {
            // 判断是不是 setter 方法
            if (!isSetter(method)) {
                continue;
            }

            if (method.getAnnotation(DisableInject.class) != null) {
                continue;
            }
            Class<?> pt = method.getParameterTypes()[0];
            if (ReflectUtils.isPrimitives(pt)) {
                continue;
            }

            try {
                // 获取需要set的属性名,例如 setUser,就是 User
                String property = getSetterProperty(method);
                // 通过 objectFactory 获取依赖的扩展点
                Object object = objectFactory.getExtension(pt, property);
                if (object != null) {
                    // 根据返回的依赖扩展点对象、method方法对象通过反射调用 setter 方法,将依赖的扩展点注入进去
                    method.invoke(instance, object);
                }
            } catch (Exception e) {
                logger.error("Failed to inject via method " + method.getName()
                        + " of interface " + type.getName() + ": " + e.getMessage(), e);
            }

        }
    } catch (Exception e) {
        logger.error(e.getMessage(), e);
    }
    return instance;
}

总的来说,Dubbo SPI 在获取对应扩展点时的依赖注入逻辑如下:

  1. 遍历扩展点的所有方法,并筛选出依赖扩展点的 setter 方法;
  2. 通过 objectFactory 获取依赖的扩展点;
  3. 通过反射,调用 setter 方法,将依赖的扩展点注入进去。

Dubbo SPI 自适应扩展点的实现原理

Dubbo 在运行时,直到扩展点被执行时才知道调用哪一个扩展点实现,要做到这一点就不能指定某个扩展点,而是依赖一个代理,由代理决定运行时应该调用哪个扩展点。

Dubbo 实现自适应扩展点的原理如下:

  • 定义了一个 @Adaptive 注解,可以标记在类或方法上,有此标记的都支持自适应扩展点,反之不支持;
  • 通过 ExtensionLoader.getAdaptiveExtension() 获取自适应扩展点,在获取过程中拼接自适应 Class文件,然后编译成 Class并实例化,得到自适应扩展点对象。
  • 对自适应扩展点依赖注入;
public T getAdaptiveExtension() {
    // 先从缓存中获取
    Object instance = cachedAdaptiveInstance.get();
    if (instance == null) {
        if (createAdaptiveInstanceError != null) {
            throw new IllegalStateException("Failed to create adaptive instance: " +
                    createAdaptiveInstanceError.toString(),
                    createAdaptiveInstanceError);
        }
        // 注意并发
        synchronized (cachedAdaptiveInstance) {
            instance = cachedAdaptiveInstance.get();
            if (instance == null) {
                try {
                    // 创建自适应扩展点
                    instance = createAdaptiveExtension();
                    cachedAdaptiveInstance.set(instance);
                } catch (Throwable t) {
                    createAdaptiveInstanceError = t;
                    throw new IllegalStateException("Failed to create adaptive instance: " + t.toString(), t);
                }
            }
        }
    }
    return (T) instance;
}

缓存中没有自适应扩展点时,创建一个自适应扩展点:

private T createAdaptiveExtension() {
    try {
        return injectExtension((T) getAdaptiveExtensionClass().newInstance());
    } catch (Exception e) {
        throw new IllegalStateException("Can't create adaptive extension " + type + ", cause: " + e.getMessage(), e);
    }
}


private Class<?> getAdaptiveExtensionClass() {
    // 加载扩展点Class,包含了配置文件解析,并将解析得到的扩展点Class缓存到 cachedClasses中
    getExtensionClasses();
    if (cachedAdaptiveClass != null) {
        return cachedAdaptiveClass;
    }
    // 核心方法,创建自适应扩展点 Class
    return cachedAdaptiveClass = createAdaptiveExtensionClass();
}

private Class<?> createAdaptiveExtensionClass() {
    // 生成Class字符串,通过拼接字符串实现,Class字符串就是我们编写的java文件的 Class内容
    String code = new AdaptiveClassCodeGenerator(type, cachedDefaultName).generate();
    ClassLoader classLoader = findClassLoader();
    org.apache.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(org.apache.dubbo.common.compiler.Compiler.class).getAdaptiveExtension();
    // 将Class字符串编译为 Class
    return compiler.compile(code, classLoader);
}

生成 Class代码,拼接包信息、import信息、类信息、方法信息

public String generate() {
    // no need to generate adaptive class since there's no adaptive method found.
    if (!hasAdaptiveMethod()) {
        throw new IllegalStateException("No adaptive method exist on extension " + type.getName() + ", refuse to create the adaptive class!");
    }

    StringBuilder code = new StringBuilder();
    code.append(generatePackageInfo());
    code.append(generateImports());
    code.append(generateClassDeclaration());

    Method[] methods = type.getMethods();
    for (Method method : methods) {
        code.append(generateMethod(method));
    }
    code.append("}");

    if (logger.isDebugEnabled()) {
        logger.debug(code.toString());
    }
    return code.toString();
}

生成的自适应扩展点 Class内容有以下特别:

  • 带有 @Adaptive 标记的方法会生成方法内容;
  • 不带 @Adaptive 标记的方法,在调用时直接抛出异常;
  • 自适应方法里的核心内容是:解析URL里的 protocol 参数,然后根据解析结果从 ExtensionLoader 获取指定的扩展点对象。 这样在依赖 Protocol的地方都是依赖的 Protocol$Adaptive ,调用 export() 方法时是调用的 Protocol$Adaptive.export(),然后就实现了运行时指定扩展点的目的。