ListenerBus
spark有一个trait——ListenerBus
,事件总线,把事件提交给监听器1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96/**
* An event bus which posts events to its listeners.
*/
private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {
private[this] val listenersPlusTimers = new CopyOnWriteArrayList[(L, Option[Timer])]
// Marked `private[spark]` for access in tests.
private[spark] def listeners = listenersPlusTimers.asScala.map(_._1).asJava
/**
* Returns a CodaHale metrics Timer for measuring the listener's event processing time.
* This method is intended to be overridden by subclasses.
*/
protected def getTimer(listener: L): Option[Timer] = None
/**
* Add a listener to listen events. This method is thread-safe and can be called in any thread.
*/
final def addListener(listener: L): Unit = {
listenersPlusTimers.add((listener, getTimer(listener)))
}
/**
* Remove a listener and it won't receive any events. This method is thread-safe and can be called
* in any thread.
*/
final def removeListener(listener: L): Unit = {
listenersPlusTimers.asScala.find(_._1 eq listener).foreach { listenerAndTimer =>
listenersPlusTimers.remove(listenerAndTimer)
}
}
/**
* This can be overriden by subclasses if there is any extra cleanup to do when removing a
* listener. In particular AsyncEventQueues can clean up queues in the LiveListenerBus.
*/
def removeListenerOnError(listener: L): Unit = {
removeListener(listener)
}
/**
* Post the event to all registered listeners. The `postToAll` caller should guarantee calling
* `postToAll` in the same thread for all events.
*/
def postToAll(event: E): Unit = {
// JavaConverters can create a JIterableWrapper if we use asScala.
// However, this method will be called frequently. To avoid the wrapper cost, here we use
// Java Iterator directly.
val iter = listenersPlusTimers.iterator
while (iter.hasNext) {
val listenerAndMaybeTimer = iter.next()
val listener = listenerAndMaybeTimer._1
val maybeTimer = listenerAndMaybeTimer._2
val maybeTimerContext = if (maybeTimer.isDefined) {
maybeTimer.get.time()
} else {
null
}
try {
doPostEvent(listener, event)
if (Thread.interrupted()) {
// We want to throw the InterruptedException right away so we can associate the interrupt
// with this listener, as opposed to waiting for a queue.take() etc. to detect it.
//一旦监听器中断,立即抛异常。而不是等待queue.take()等等,来发现中断
throw new InterruptedException()
}
} catch {
case ie: InterruptedException =>
logError(s"Interrupted while posting to ${Utils.getFormattedClassName(listener)}. " +
s"Removing that listener.", ie)
removeListenerOnError(listener)//删除监听器,线程安全
case NonFatal(e) =>
logError(s"Listener ${Utils.getFormattedClassName(listener)} threw an exception", e)
} finally {
if (maybeTimerContext != null) {
maybeTimerContext.stop()
}
}
}
}
/**
* Post an event to the specified listener. `onPostEvent` is guaranteed to be called in the same
* thread for all listeners.
* 具体对待提交的事件。留给子类继承
*/
protected def doPostEvent(listener: L, event: E): Unit
private[spark] def findListenersByClass[T <: L : ClassTag](): Seq[T] = {
val c = implicitly[ClassTag[T]].runtimeClass
listeners.asScala.filter(_.getClass == c).map(_.asInstanceOf[T]).toSeq
}
}
主要依靠CopyOnWriteArrayList
保障线程安全,主动发现InterruptedException
,而不是被动等待。具体的操作,留给子类实现doPostEvent
ListenerBus的继承体系
SparkListenerBus
1 | private[spark] trait SparkListenerBus |
SparkListenerBus
根据模式匹配,根据不同的SparkListenerEvent
,调用SparkListenerInterface
的不同方法.doPostEvents是同步调用的,当调度频繁时,可能导致写入延迟,事件丢失
SparkListenerInterface
1 | private[spark] trait SparkListenerInterface { |
官方建议继承SparkListener
or SparkFirehoseListener
,而不是直接实现SparkListenerInterface
1 | //空实现 |
SparkListenerEvent
1 |
|
AsyncEventQueue
1 | /** |
AsyncEventQueue中的dispatch
方法,会调用LiveListenerBus#LiveListenerBus.withinListenerThread.withValue
方法,removeListenerOnError实际调用LiveListenerBus#removeListener
方法。stop
方法不会中断线程,会等待线程执行完,但不接受新事件
LiveListenerBus
1 | /** |
withValue
里对threadlocal的操作,和 RDD.withScope有异曲同工之妙
DAGScheduler、SparkContext等都是LiveListenerBus的事件来源,它们都是通过调用LiveListenerBus的post方法将消息交给异步线程listenerThread处理