概览
度量,metric,及对系统运行过程中的状态进行收集统计,如性能、数据等,方便开发者了解状况,针对性优化等。
spark使用metrics进行监控
spark中,度量有3个概念:
- Instance:度量系统的实例名。Spark按照Instance的不同,分为Master、Worker、Application、Driver和Executor
- Source:数据来源,WorkerSource,DAGSchedulerSource,BlockManagerSource等
- Sink:数据输出,MetricsServlet,ConsoleSink,Slf4jSink等
这些概念和flume类似
1 | /** |
MetricsSystem
的注释也很好的说明了这些
下图是org.apache.spark.metrics
包下的内容
source
1
2
3
4private[spark] trait Source {
def sourceName: String // source名称
def metricRegistry: MetricRegistry // metric注册表
}
MetricRegistry是metrics
的内容,暂时只需要知道它维护了一个<String, Metric>注册表1
ConcurrentMap<String, Metric> metrics
Source功能比较简单,以DAGSchedulerSource
为例1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29private[scheduler] class DAGSchedulerSource(val dagScheduler: DAGScheduler)
extends Source {
override val metricRegistry = new MetricRegistry()
override val sourceName = "DAGScheduler"
metricRegistry.register(MetricRegistry.name("stage", "failedStages"), new Gauge[Int] {
override def getValue: Int = dagScheduler.failedStages.size
})
metricRegistry.register(MetricRegistry.name("stage", "runningStages"), new Gauge[Int] {
override def getValue: Int = dagScheduler.runningStages.size
})
metricRegistry.register(MetricRegistry.name("stage", "waitingStages"), new Gauge[Int] {
override def getValue: Int = dagScheduler.waitingStages.size
})
metricRegistry.register(MetricRegistry.name("job", "allJobs"), new Gauge[Int] {
override def getValue: Int = dagScheduler.numTotalJobs
})
metricRegistry.register(MetricRegistry.name("job", "activeJobs"), new Gauge[Int] {
override def getValue: Int = dagScheduler.activeJobs.size
})
/** Timer that tracks the time to process messages in the DAGScheduler's event loop */
val messageProcessingTimer: Timer =
metricRegistry.timer(MetricRegistry.name("messageProcessingTime"))
}
主要做的,就是定义sourceName
,新构造MetricRegistry
,然后注册metrics:ConcurrentMap[String, Metric]
注册表
Gauge输出的内容通常是Source
构造函数入参的属性,本例中,就是dagScheduler.numTotalJobs、dagScheduler.activeJobs.size
sink
source有了数据,还要考虑怎么输出数据,以便后续使用。在spark-shell中的控制台输出,就是一种形式。也可以保留到日志、监控系统等。
1
2
3
4
5private[spark] trait Sink {
def start(): Unit // 启动sink
def stop(): Unit // 停止sink
def report(): Unit // 输出到目的地
}
JmxSink
:通过JmxReporter
,将度量输出到MBean中.通过Java VisualVM,选择MBeans标签页可以对JmxSink所有注册到JMX中的对象进行管理。
MetricsServlet
:在Spark UI的jetty服务中创建ServletContextHandler,将度量数据通过Spark UI展示在浏览器中
Sink
的子类,除了MetricsServlet
之外,基本都是调用reporter.start
,reporter.stop
,reporter.report
来完成trait里的函数
MetricsSystem
//todo
总结
可以看到,spark-metric本质上,依赖metrics的Metric,Report进行操作.