首先,这个类的位置:
它的测试类:
下面,我们就开始分析这个类。
首先在源代码中加入
public static void main(String[] args) { ExecSource eSource = new ExecSource(); eSource.start(); }
编译,没问题,抛到Linux中,然后加入以下依赖包
依赖包: flume-ng-configuration类包 flume-ng-sdk类包
第三方jar: slf4j guava-r07.jar
然后就可以愉快的开始 jdb了。
[root@machine1 flume-ng-exec-source]# jdb org.apache.flume.source.ExecSourceInitializing jdb ...> stop in org.apache.flume.source.ExecSource.mainDeferring breakpoint org.apache.flume.source.ExecSource.main.It will be set after the class is loaded.> runrun org.apache.flume.source.ExecSourceSet uncaught java.lang.ThrowableSet deferred uncaught java.lang.Throwable> VM Started: Set deferred breakpoint org.apache.flume.source.ExecSource.mainSLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".SLF4J: Defaulting to no-operation (NOP) logger implementationSLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.Breakpoint hit: "thread=main", org.apache.flume.source.ExecSource.main(), line=168 bci=0168 ExecSource eSource = new ExecSource();main[1]
下面开始进入真正的源码分析阶段:
具体细节不一一讲述。
首先,
if (shell != null) {
//shell="";
String[] commandArgs = formulateShellCommand(shell, command);
process = Runtime.getRuntime().exec(commandArgs);
//process=Runtime.getRuntime().exec(command);
} else {
String[] commandArgs = command.split("\\s+");
process = new ProcessBuilder(commandArgs).start();
}
通过这样的方式,打开一个子进程,
reader = new BufferedReader(new InputStreamReader(process.getInputStream(), charset));
// StderrLogger dies as soon as the input stream is invalid
StderrReader stderrReader = new StderrReader(
new BufferedReader(new InputStreamReader(process.getErrorStream(), charset)), logStderr);
这样拿到两个流,第一个流是内容,第2个流是进程的错误输出。
重点是内容怎么处理。
while ((line = reader.readLine()) != null) {
synchronized (eventList) {
sourceCounter.incrementEventReceivedCount();
eventList.add(EventBuilder.withBody(line.getBytes(charset)));
if (eventList.size() >= bufferCount || timeout()) {
flushEventBatch(eventList);
}
}
}
synchronized (eventList) {
if (!eventList.isEmpty()) {
flushEventBatch(eventList);
}
}
一行一行的读,然后放入eventList,必要时做刷新---大小足够多,或者超时,或者结束了。
private void flushEventBatch(List<Event> eventList) {
channelProcessor.processEventBatch(eventList);
sourceCounter.addToEventAcceptedCount(eventList.size());
eventList.clear();
lastPushToChannel = systemClock.currentTimeMillis();
}
这个就是刷新的过程,实际上就是交给 channelProcessor处理。
==========================================
同时还启动了一个线程做定时刷新
future = timedFlushService.scheduleWithFixedDelay(new Runnable() {
public void run() {
try {
synchronized (eventList) {
if (!eventList.isEmpty() && timeout()) {
flushEventBatch(eventList);
}
}
} catch (Exception e) {
logger.error("Exception occured when processing event batch", e);
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
}
}
}, batchTimeout, batchTimeout, TimeUnit.MILLISECONDS);
没啥好说的,太简单。
================
错误处理
// StderrLogger dies as soon as the input stream is invalid
StderrReader stderrReader = new StderrReader(
new BufferedReader(new InputStreamReader(process.getErrorStream(), charset)), logStderr);
stderrReader.setName("StderrReader-[" + command + "]");
stderrReader.setDaemon(true);
stderrReader.start();
实际上内容为
public void run() {
try {
int i = 0;
String line = null;
while ((line = input.readLine()) != null) {
if (logStderr) {
// There is no need to read 'line' with a charset
// as we do not to propagate it.
// It is in UTF-16 and would be printed in UTF-8 format.
logger.info("StderrLogger[{}] = '{}'", ++i, line);
}
}
} catch (IOException e) {
logger.info("StderrLogger exiting", e);
} finally {
try {
if (input != null) {
input.close();
}
} catch (IOException ex) {
logger.error("Failed to close stderr reader for exec source", ex);
}
}
就是有错误输出时,打印出内容。
好久不玩C了,对文件描述符的切换都有点不太利落了,随便看吧。
==========================================
最后,如果
} while (restart);
设置了重启的话,就再从头执行一遍,总体来说,很简单!