clickhouse-jdbc是clickhouse的jdbc驱动,本文从clickhouse-jdbc探索jdbc
的一般实现
ClickHouseDriver
1 | /** |
主要是构建ClickHouseConnectionImpl
,实现Driver
#connect(String url, java.util.Properties info) throws SQLException 方法
ClickHouseConnectionImpl
1 | public interface ClickHouseConnection extends Connection { |
1 | public class ClickHouseConnectionImpl implements ClickHouseConnection { |
ClickHouseStatementImpl
1 | public class ClickHouseStatementImpl implements ClickHouseStatement { |
executeQuery方法将sql语句组成参数,通过httpclient传给server,返回stream(会用到FastByteArrayOutputStream
[去除synchronized的ByteArrayOutputStream])。然后把stream封装到ClickHouseResultSet
BalancedClickhouseDataSource
当调用getConnection方法,会返回连接到随机主机的connection.也可以周期检查连接是否活跃。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110/**
* <p> Database for clickhouse jdbc connections.
* <p> It has list of database urls.
* For every {@link #getConnection() getConnection} invocation, it returns connection to random host from the list.
* Furthermore, this class has method {@link #scheduleActualization(int, TimeUnit) scheduleActualization}
* which test hosts for availability. By default, this option is turned off.
*/
public class BalancedClickhouseDataSource implements DataSource {
private static final org.slf4j.Logger log = LoggerFactory.getLogger(BalancedClickhouseDataSource.class);
private static final Pattern URL_TEMPLATE = Pattern.compile(JDBC_CLICKHOUSE_PREFIX + "" +
"//([a-zA-Z0-9_:,.-]+)" +
"(/[a-zA-Z0-9_]+" +
"([?][a-zA-Z0-9_]+[=][a-zA-Z0-9_]+([&][a-zA-Z0-9_]+[=][a-zA-Z0-9_]+)*)?" +
")?");
private PrintWriter printWriter;
private int loginTimeoutSeconds = 0;
private final ThreadLocal<Random> randomThreadLocal = new ThreadLocal<Random>();
private final List<String> allUrls;
private volatile List<String> enabledUrls;
private final ClickHouseProperties properties;
private final ClickHouseDriver driver = new ClickHouseDriver();
private boolean ping(final String url) {
try {
driver.connect(url, properties).createStatement().execute("SELECT 1");
return true;
} catch (Exception e) {
return false;
}
}
/**
* Checks if clickhouse on url is alive, if it isn't, disable url, else enable.
*
* @return number of avaliable clickhouse urls
*/
public synchronized int actualize() {
List<String> enabledUrls = new ArrayList<String>(allUrls.size());
for (String url : allUrls) {
log.debug("Pinging disabled url: {}", url);
if (ping(url)) {
log.debug("Url is alive now: {}", url);
enabledUrls.add(url);
} else {
log.debug("Url is dead now: {}", url);
}
}
this.enabledUrls = Collections.unmodifiableList(enabledUrls);
return enabledUrls.size();
}
private String getAnyUrl() throws SQLException {
List<String> localEnabledUrls = enabledUrls;
if (localEnabledUrls.isEmpty()) {
throw new SQLException("Unable to get connection: there are no enabled urls");
}
Random random = this.randomThreadLocal.get();
if (random == null) {
this.randomThreadLocal.set(new Random(System.currentTimeMillis()));
random = this.randomThreadLocal.get();
}
int index = random.nextInt(localEnabledUrls.size());//随机选择链接
return localEnabledUrls.get(index);
}
/**
* {@inheritDoc}
*/
public ClickHouseConnection getConnection() throws SQLException {
return driver.connect(getAnyUrl(), properties);
}
/**
* {@inheritDoc}
*/
public ClickHouseConnection getConnection(String username, String password) throws SQLException {
return driver.connect(getAnyUrl(), properties.withCredentials(username, password));
}
/**
* set time period for checking availability connections
* 周期检查可用性
* @param delay value for time unit
* @param timeUnit time unit for checking
* @return this datasource with changed settings
*/
public BalancedClickhouseDataSource scheduleActualization(int delay, TimeUnit timeUnit) {
ClickHouseDriver.ScheduledConnectionCleaner.INSTANCE.scheduleWithFixedDelay(new Runnable() {
public void run() {
try {
actualize();
} catch (Exception e) {
log.error("Unable to actualize urls", e);
}
}
}, 0, delay, timeUnit);
return this;
}
}
LogProxy
LogProxy在很多地方用到,主要功能是,当log可trace时,打印sql的上下文1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51public class LogProxy<T> implements InvocationHandler {
private static final Logger log = LoggerFactory.getLogger(LogProxy.class);
private final T object;
private final Class<T> clazz;
public static <T> T wrap(Class<T> interfaceClass, T object) {
if (log.isTraceEnabled()) {//仅当日志级别为trace,返回代理对象
LogProxy<T> proxy = new LogProxy<T>(interfaceClass, object);
return proxy.getProxy();
}
return object;
}
private LogProxy(Class<T> interfaceClass, T object) {
if (!interfaceClass.isInterface()) {
throw new IllegalStateException("Class " + interfaceClass.getName() + " is not an interface");
}
clazz = interfaceClass;
this.object = object;
}
"unchecked") (
public T getProxy() {
//xnoinspection x
// unchecked
return (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class<?>[]{clazz}, this);
}
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String msg =
"Call class: " + object.getClass().getName() +
"\nMethod: " + method.getName() +
"\nObject: " + object +
"\nArgs: " + Arrays.toString(args) +
"\nInvoke result: ";
try {
final Object invokeResult = method.invoke(object, args);
msg += invokeResult;
return invokeResult;
} catch (InvocationTargetException e) {
msg += e.getMessage();
throw e.getTargetException();
} finally {
msg = "==== ClickHouse JDBC trace begin ====\n" + msg + "\n==== ClickHouse JDBC trace end ====";
log.trace(msg);
}
}
}