Spark源码解析(二):Spark闭包检查

发布时间 2023-03-31 19:55:00作者: 打卡从这里开始

一、理解 Scala 闭包:Closures

1.1 闭包的定义

闭包就是一个函数和与其相关的引用环境组合的一个整体(实体)。进一步说,闭包是绑定了自由变量的函数实例。

通常来讲,闭包的实现机制是定义一个特殊的数据结构,保存了函数地址指针与闭包创建时的函数的词法环境以及绑定自由变量。

对于闭包最好的解释,莫过于《流程的Python》里给出的“它是延伸了作用域的函数,其中包括函数定义体引用,以及不在定义体定义的非全局变量。核心在于闭包能够访问定义体之外定义的非全局变量。”

在Scala中,函数引入传入的参数是再正常不过的事情了,比如 (x: Int) => x > 0中,唯一在函数体x > 0中用到的变量是x,即这个函数的唯一参数。

除此之外,Scala还支持引用其他地方定义的变量: (x: Int) => x + more,这个函数将more也作为入参,不过这个参数是哪里来的?从这个函数的角度来看,more是一个 自由变量 ,因为函数字面量本身并没有给more赋予任何含义。相反,x是一个绑定变量,因为它在该函数的上下文里有明确的定义:它被定义为该函数的唯一参数。如果单独使用这个函数,而没有在任何处于作用域内的地方定more,编译器将报错:

image-20230224152448542

另一方面,只要能找到名为more的变量,同样的函数就能正常工作:

image-20230224153042153

运行时从任何带有自由变量的函数字面量,比如(x: Int) => x + more创建的函数,按照定义,要求捕获到它的自由变量more的绑定。相应的函数值结果(包含指向被捕获的more变量的引用)就被称为闭包,该名称源于“捕获”其自由变量从而“闭合”该函数字面量的动作

1.2 修改自由变量

上面的例子可能会使我们出现一个疑问:如果more在闭包创建以后被改变会发生什么?

在scala中,答案是闭包可以看到这个改变。

我们继续上面的代码,例子如下:

image-20230224160121423

Scala的闭包捕获的是变量本身,而不是变量引用的值。正如前面示例所展示的,为(x: Int) => x + more创建的闭包能够看到闭包外对more的修改。

反之也是成立的:闭包对于自由变量的修改,闭包外也是可以看到的。

1.3 自由变量多副本

那么,如果一个闭包访问了某个随着程序运行会产生多个副本的变量会如何呢?

例如,如果一个闭包使用了某个函数的局部变量,而这个函数又被调用了多次,会怎么样?闭包每次访问到的是这个变量的哪一个实例呢?

我们通过代码来进行验证:

// 第一次声明 more 变量
scala> var more = 10
more: Int = 10

// 创建闭包函数
scala> val addMore10 = (x: Int) => {x + more}
addMore10: Int => Int = $Lambda$1223/0x000000080079f840@36810c06

// 调用闭包函数
scala> addMore10(9)
res13: Int = 19

// 重新声明 more 变量
scala> var more = 100
more: Int = 100

// 创建新的闭包函数
scala> val addMore100 = (x: Int) => {x + more}
addMore100: Int => Int =  $Lambda$1237/0x00000008007b1840@ac94285

// 引用的是重新声明 more 变量
scala> addMore100(9)
res14: Int = 109

// 引用的还是第一次声明的 more 变量
scala> addMore10(9)
res15: Int = 19

// 对于全局而言 more 还是 100
scala> more
res16: Int = 100

从上面的示例可以看出重新声明 more 后,全局的 more 的值是 100,但是对于闭包函数 addMore10 还是引用的是值为 10 的 more,这是由虚拟机来实现的,虚拟机会保证 more 变量在重新声明后,原来的被捕获的变量副本继续在堆上保持存活。

1.4 闭包的实现

具体的实现细节较为复杂(其实是自己也不是很懂,看字节码有点头疼),这里简单介绍一下

对于普通方法,局部变量在调用的时候是压入栈中的,计算完成之后就会 pop 出,所以在函数的调用完成后就不能再访问这个变量

而Scala中的闭包:实际上是在Heap上创建了一个闭包函数的相关对象,在传入参数后进行初始化,并将最终得到的实例对象的引用压入到栈中

总结:Scala 实现闭包的方法是在 heap 中保存了使用不同参数初始化而产生的不同对象,对象中保存了变量的状态,然后调用具体对象的 apply 方法而最后产生不同的结果。

编译后会生成一个Closures$$anonfun$addMore$1的类文件

二、Spark中对于闭包的处理

Spark中闭包检测的目的:

为了防止spark程序中将闭包发送到Executor节点时发生序列化失败,因为闭包函数有可能引用了未序列化的外部变量

调用链

在进入转换算子(例如map)的时候,会先调用

val cleanF = sc.clean(f)

进入clean函数:

private[spark] def clean[F <: AnyRef](f: F, checkSerializable: Boolean = true): F = {
  ClosureCleaner.clean(f, checkSerializable)
  f
}

闭包检测的核心类:ClosureCleaner

最终调用的clean方法:

def clean(
    closure: AnyRef,  //这个就是我们检测的函数
    checkSerializable: Boolean = true,	//在闭包clean后对闭包能否序列化进行检测
    cleanTransitively: Boolean = true): Unit = {	//是否传递性的清理闭包
  clean(closure, checkSerializable, cleanTransitively, Map.empty)
}

核心方法:clean()

首先通过isClosure方法判断是否为闭包类

if (!isClosure(func.getClass) && maybeIndylambdaProxy.isEmpty) {
  logDebug(s"Expected a closure; got ${func.getClass.getName}")
  return
}

isClosure这个函数怎么判断是否为闭包类呢?

发现和我们上文scala的class文件对应起来了

private def isClosure(cls: Class[_]): Boolean = {
  cls.getName.contains("$anonfun$")
}

然后通过getOuterClassesAndObejct()函数获取func中所有对于外部闭包对象的引用

// A list of classes that represents closures enclosed in the given one
val innerClasses = getInnerClosureClasses(func)

// A list of enclosing objects and their respective classes, from innermost to outermost
// An outer object at a given index is of type outer class at the same index
val (outerClasses, outerObjects) = getOuterClassesAndObjects(func)

遍历class文件中的所有field,找到$outer的就是外部的闭包引用,同时,如果外部闭包引用也是闭包类,那么同样获取其外部,直到最外层的非闭包类。

private def getOuterClassesAndObjects(obj: AnyRef): (List[Class[_]], List[AnyRef]) = {
  for (f <- obj.getClass.getDeclaredFields if f.getName == "$outer") {
    f.setAccessible(true)
    val outer = f.get(obj)
    // The outer pointer may be null if we have cleaned this closure before
    if (outer != null) {
      if (isClosure(f.getType)) {
        val recurRet = getOuterClassesAndObjects(outer)
        return (f.getType :: recurRet._1, outer :: recurRet._2)
      } else {
        return (f.getType :: Nil, outer :: Nil) // Stop at the first $outer that is not a closure
      }
    }
  }
  (Nil, Nil)
}

而后,禁止func中出现return返回值,在这里扫描,如果出现return直接报错。

// Fail fast if we detect return statements in closures
getClassReader(func.getClass).accept(new ReturnStatementFinder(), 0)

private class ReturnStatementFinder(targetMethodName: Option[String] = None)
  extends ClassVisitor(ASM9) {
  override def visitMethod(access: Int, name: String, desc: String,
      sig: String, exceptions: Array[String]): MethodVisitor = {

    // $anonfun$ covers indylambda closures
    if (name.contains("apply") || name.contains("$anonfun$")) {
      // A method with suffix "$adapted" will be generated in cases like
      // { _:Int => return; Seq()} but not { _:Int => return; true}
      // closure passed is $anonfun$t$1$adapted while actual code resides in $anonfun$s$1
      // visitor will see only $anonfun$s$1$adapted, so we remove the suffix, see
      // https://github.com/scala/scala-dev/issues/109
      val isTargetMethod = targetMethodName.isEmpty ||
        name == targetMethodName.get || name == targetMethodName.get.stripSuffix("$adapted")

      new MethodVisitor(ASM9) {
        override def visitTypeInsn(op: Int, tp: String): Unit = {
          if (op == NEW && tp.contains("scala/runtime/NonLocalReturnControl") && isTargetMethod) {
            throw new ReturnStatementInClosureException
          }
        }
      }
    } else {
      new MethodVisitor(ASM9) {}
    }
  }
}

todo:后面理解深了继续看,已经不是很能看得懂了