spark-TransportServer

TransportContext有四个重载方法,用来创建TransportServer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/** Create a server which will attempt to bind to a specific port. */
public TransportServer createServer(int port, List<TransportServerBootstrap> bootstraps) {
return new TransportServer(this, null, port, rpcHandler, bootstraps);
}

/** Create a server which will attempt to bind to a specific host and port. */
public TransportServer createServer(
String host, int port, List<TransportServerBootstrap> bootstraps) {
return new TransportServer(this, host, port, rpcHandler, bootstraps);
}

/** Creates a new server, binding to any available ephemeral port. */
public TransportServer createServer(List<TransportServerBootstrap> bootstraps) {
return createServer(0, bootstraps);
}

public TransportServer createServer() {
return createServer(0, new ArrayList<>());
}

TransportServer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
/**
* Server for the efficient, low-level streaming service.
*/
public class TransportServer implements Closeable {
private static final Logger logger = LoggerFactory.getLogger(TransportServer.class);

private final TransportContext context;
private final TransportConf conf;
private final RpcHandler appRpcHandler;
private final List<TransportServerBootstrap> bootstraps;

private ServerBootstrap bootstrap;
private ChannelFuture channelFuture;
private int port = -1;
private NettyMemoryMetrics metrics;

/**
* Creates a TransportServer that binds to the given host and the given port, or to any available
* if 0. If you don't want to bind to any special host, set "hostToBind" to null.
* */
public TransportServer(
TransportContext context,
String hostToBind,
int portToBind,
RpcHandler appRpcHandler,
List<TransportServerBootstrap> bootstraps) {
this.context = context;
this.conf = context.getConf();
this.appRpcHandler = appRpcHandler;//Handler for sendRPC() messages sent by TransportClients.
this.bootstraps = Lists.newArrayList(Preconditions.checkNotNull(bootstraps));

try {
init(hostToBind, portToBind);
} catch (RuntimeException e) {
JavaUtils.closeQuietly(this);
throw e;
}
}

public int getPort() {
if (port == -1) {
throw new IllegalStateException("Server not initialized");
}
return port;
}

private void init(String hostToBind, int portToBind) {

IOMode ioMode = IOMode.valueOf(conf.ioMode());//nio epoll
EventLoopGroup bossGroup =
NettyUtils.createEventLoop(ioMode, conf.serverThreads(), conf.getModuleName() + "-server");
EventLoopGroup workerGroup = bossGroup;

//✈汇集ByteBuf但对本地线程缓存禁用的分配器
PooledByteBufAllocator allocator = NettyUtils.createPooledByteBufAllocator(
conf.preferDirectBufs(), true /* allowCache */, conf.serverThreads());

bootstrap = new ServerBootstrap()
.group(bossGroup, workerGroup)//服务端同时需要bossGroup, workerGroup
.channel(NettyUtils.getServerChannelClass(ioMode))
.option(ChannelOption.ALLOCATOR, allocator)
.childOption(ChannelOption.ALLOCATOR, allocator);

this.metrics = new NettyMemoryMetrics(
allocator, conf.getModuleName() + "-server", conf);//性能统计,大数据组件常用metrics-core

if (conf.backLog() > 0) {
bootstrap.option(ChannelOption.SO_BACKLOG, conf.backLog());
}

if (conf.receiveBuf() > 0) {
bootstrap.childOption(ChannelOption.SO_RCVBUF, conf.receiveBuf());
}

if (conf.sendBuf() > 0) {
bootstrap.childOption(ChannelOption.SO_SNDBUF, conf.sendBuf());
}

bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {//管道初始化回调函数
@Override
protected void initChannel(SocketChannel ch) {
RpcHandler rpcHandler = appRpcHandler;
for (TransportServerBootstrap bootstrap : bootstraps) {
rpcHandler = bootstrap.doBootstrap(ch, rpcHandler);
}
context.initializePipeline(ch, rpcHandler);//TransportClientFactory#createClient也会用到
}
});

InetSocketAddress address = hostToBind == null ?
new InetSocketAddress(portToBind): new InetSocketAddress(hostToBind, portToBind);
channelFuture = bootstrap.bind(address);
channelFuture.syncUninterruptibly();

port = ((InetSocketAddress) channelFuture.channel().localAddress()).getPort();
logger.debug("Shuffle server started on port: {}", port);
}

public MetricSet getAllMetrics() {
return metrics;//大数据常用统计组件
}

@Override
public void close() {
if (channelFuture != null) {
// close is a local operation and should finish within milliseconds; timeout just to be safe
channelFuture.channel().close().awaitUninterruptibly(10, TimeUnit.SECONDS);
channelFuture = null;
}
if (bootstrap != null && bootstrap.group() != null) {
bootstrap.group().shutdownGracefully();
}
if (bootstrap != null && bootstrap.childGroup() != null) {
bootstrap.childGroup().shutdownGracefully();
}
bootstrap = null;
}
}

可以看到,无论是TransportClient、TransportServer,都大量使用了netty

大数据技术栈spark,hadoop等,都用到metrics-core.后面争取学一学

metrics-core github

metrics-core官网