TransportConf
源码的解释为:A central location that tracks all the settings we expose to users.追踪所有暴露给用户的配置。在SparkContext中,用于给RPC框架提供配置信息
TransportConf
有两个属性:
- ConfigProvider conf:配置提供者
- String module:模块
1 | /** |
ConfigProvider比较简单.MapConfigProvider
是它的实现1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31public class MapConfigProvider extends ConfigProvider {
public static final MapConfigProvider EMPTY = new MapConfigProvider(Collections.emptyMap());
private final Map<String, String> config;
public MapConfigProvider(Map<String, String> config) {
this.config = new HashMap<>(config);
}
public String get(String name) {
String value = config.get(name);
if (value == null) {
throw new NoSuchElementException(name);
}
return value;
}
public String get(String name, String defaultValue) {
String value = config.get(name);
return value == null ? defaultValue : value;
}
public Iterable<Map.Entry<String, String>> getAll() {
return config.entrySet();
}
}
1 | public class TransportConf { |
可以看出,”spark.” + module + “.suffix”就得到了key,用conf.getXxx(key)得到具体value(个人感觉这是鸡肋,没什么太大作用)。
spark通常使用SparkTransportConf
创建TransportConf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55/**
* Provides a utility for transforming from a SparkConf inside a Spark JVM (e.g., Executor,
* Driver, or a standalone shuffle service) into a TransportConf with details on our environment
* like the number of cores that are allocated to this JVM.
*/
object SparkTransportConf {
/**
* Specifies an upper bound on the number of Netty threads that Spark requires by default.
* In practice, only 2-4 cores should be required to transfer roughly 10 Gb/s, and each core
* that we use will have an initial overhead of roughly 32 MB of off-heap memory, which comes
* at a premium.
*
* Thus, this value should still retain maximum throughput and reduce wasted off-heap memory
* allocation. It can be overridden by setting the number of serverThreads and clientThreads
* manually in Spark's configuration.
*/
private val MAX_DEFAULT_NETTY_THREADS = 8
/**
* Utility for creating a [[TransportConf]] from a [[SparkConf]].
* @param _conf the [[SparkConf]]
* @param module the module name
* @param numUsableCores if nonzero, this will restrict the server and client threads to only
* use the given number of cores, rather than all of the machine's cores.
* This restriction will only occur if these properties are not already set.
*/
def fromSparkConf(_conf: SparkConf, module: String, numUsableCores: Int = 0): TransportConf = {
val conf = _conf.clone
// Specify thread configuration based on our JVM's allocation of cores (rather than necessarily
// assuming we have all the machine's cores).
// NB: Only set if serverThreads/clientThreads not already set.
val numThreads = defaultNumThreads(numUsableCores)
conf.setIfMissing(s"spark.$module.io.serverThreads", numThreads.toString)
conf.setIfMissing(s"spark.$module.io.clientThreads", numThreads.toString)
new TransportConf(module, new ConfigProvider {
override def get(name: String): String = conf.get(name)
override def get(name: String, defaultValue: String): String = conf.get(name, defaultValue)
override def getAll(): java.lang.Iterable[java.util.Map.Entry[String, String]] = {
conf.getAll.toMap.asJava.entrySet()
}
})
}
/**
* Returns the default number of threads for both the Netty client and server thread pools.
* If numUsableCores is 0, we will use Runtime get an approximate number of available cores.
*/
private def defaultNumThreads(numUsableCores: Int): Int = {
val availableCores =
if (numUsableCores > 0) numUsableCores else Runtime.getRuntime.availableProcessors()
math.min(availableCores, MAX_DEFAULT_NETTY_THREADS)
}
}
重点有两个:
- 如果 numUsableCores <= 0,那么线程数是系统可用处理器的数量,但是系统的cores不可能全部用于网络传输使用,所以这里还将分配给网络传输的内核数量最多限制在8个
最终确认线程数的,以SparkConf的配置优先:1
2conf.setIfMissing(s"spark.$module.io.serverThreads", numThreads.toString)
conf.setIfMissing(s"spark.$module.io.clientThreads", numThreads.toString)
- 构造一个ConfigProvider的匿名内部类,get的实现实际是代理了
SparkConf
的get方法