博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Flume-1.6.0 源码分析1:ExecSource分析
阅读量:6172 次
发布时间:2019-06-21

本文共 3941 字,大约阅读时间需要 13 分钟。

hot3.png

首先,这个类的位置:

144424_ZhgE_1382024.png

它的测试类:

144547_ogPZ_1382024.png

下面,我们就开始分析这个类。

首先在源代码中加入

public static void main(String[] args) {  ExecSource eSource = new ExecSource();  eSource.start(); }

 编译,没问题,抛到Linux中,然后加入以下依赖包

依赖包:   flume-ng-configuration类包  flume-ng-sdk类包

第三方jar: slf4j  guava-r07.jar

然后就可以愉快的开始 jdb了。

[root@machine1 flume-ng-exec-source]# jdb org.apache.flume.source.ExecSourceInitializing jdb ...> stop in org.apache.flume.source.ExecSource.mainDeferring breakpoint org.apache.flume.source.ExecSource.main.It will be set after the class is loaded.> runrun org.apache.flume.source.ExecSourceSet uncaught java.lang.ThrowableSet deferred uncaught java.lang.Throwable> VM Started: Set deferred breakpoint org.apache.flume.source.ExecSource.mainSLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".SLF4J: Defaulting to no-operation (NOP) logger implementationSLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.Breakpoint hit: "thread=main", org.apache.flume.source.ExecSource.main(), line=168 bci=0168      ExecSource eSource = new ExecSource();main[1]

 下面开始进入真正的源码分析阶段:

具体细节不一一讲述。

首先, 

if (shell != null) {

//shell="";

String[] commandArgs = formulateShellCommand(shellcommand);

process = Runtime.getRuntime().exec(commandArgs);

//process=Runtime.getRuntime().exec(command);

else {

String[] commandArgs = command.split("\\s+");

process = new ProcessBuilder(commandArgs).start();

}

通过这样的方式,打开一个子进程,

reader = new BufferedReader(new InputStreamReader(process.getInputStream(), charset));

 

// StderrLogger dies as soon as the input stream is invalid

StderrReader stderrReader = new StderrReader(

new BufferedReader(new InputStreamReader(process.getErrorStream(), charset)), logStderr);

这样拿到两个流,第一个流是内容,第2个流是进程的错误输出。

 

重点是内容怎么处理。

while ((line = reader.readLine()) != null) {

synchronized (eventList) {

sourceCounter.incrementEventReceivedCount();

eventList.add(EventBuilder.withBody(line.getBytes(charset)));

if (eventList.size() >= bufferCount || timeout()) {

flushEventBatch(eventList);

}

}

}

 

synchronized (eventList) {

if (!eventList.isEmpty()) {

flushEventBatch(eventList);

}

}

一行一行的读,然后放入eventList,必要时做刷新---大小足够多,或者超时,或者结束了。

private void flushEventBatch(List<Event> eventList) {

channelProcessor.processEventBatch(eventList);

sourceCounter.addToEventAcceptedCount(eventList.size());

eventList.clear();

lastPushToChannel = systemClock.currentTimeMillis();

}

这个就是刷新的过程,实际上就是交给 channelProcessor处理。

 

==========================================

同时还启动了一个线程做定时刷新

future = timedFlushService.scheduleWithFixedDelay(new Runnable() {

public void run() {

try {

synchronized (eventList) {

if (!eventList.isEmpty() && timeout()) {

flushEventBatch(eventList);

}

}

catch (Exception e) {

logger.error("Exception occured when processing event batch"e);

if (e instanceof InterruptedException) {

Thread.currentThread().interrupt();

}

}

}

}, batchTimeoutbatchTimeout, TimeUnit.MILLISECONDS);

没啥好说的,太简单。

================

错误处理

 

// StderrLogger dies as soon as the input stream is invalid

StderrReader stderrReader = new StderrReader(

new BufferedReader(new InputStreamReader(process.getErrorStream(), charset)), logStderr);

stderrReader.setName("StderrReader-[" + command + "]");

stderrReader.setDaemon(true);

stderrReader.start();

实际上内容为

public void run() {

try {

int i = 0;

String line = null;

while ((line = input.readLine()) != null) {

if (logStderr) {

// There is no need to read 'line' with a charset

// as we do not to propagate it.

// It is in UTF-16 and would be printed in UTF-8 format.

logger.info("StderrLogger[{}] = '{}'", ++iline);

}

}

catch (IOException e) {

logger.info("StderrLogger exiting"e);

finally {

try {

if (input != null) {

input.close();

}

catch (IOException ex) {

logger.error("Failed to close stderr reader for exec source"ex);

}

}

就是有错误输出时,打印出内容。

 

好久不玩C了,对文件描述符的切换都有点不太利落了,随便看吧。

==========================================

最后,如果

while (restart);

设置了重启的话,就再从头执行一遍,总体来说,很简单! 

转载于:https://my.oschina.net/qiangzigege/blog/523540

你可能感兴趣的文章
237. Delete Node in a Linked List
查看>>
[转] webpack之plugin内部运行机制
查看>>
宽字节与多字节之间的转换
查看>>
SEO的重要性
查看>>
ASP.NET 运行时详解 揭开请求过程神秘面纱
查看>>
Oracle 索引的失效检查
查看>>
C语言第五次作业--数据类型
查看>>
系统架构师-基础到企业应用架构-业务逻辑层
查看>>
高手详解SQL性能优化十条建议
查看>>
修改 IntelliJ IDEA 默认配置路径
查看>>
《现在的泪,都是当年脑子进的水》读书笔记
查看>>
IOSday04 UIButton使用
查看>>
铁大好青年内部分组
查看>>
unity3D ——自带寻路Navmesh入门教程(一)(转)
查看>>
判断字符串是否为数字的函数
查看>>
[emuch.net]MatrixComputations(7-12)
查看>>
linux 命令 — 文件相关
查看>>
自己空闲的时候封装一下
查看>>
Datagard產生gap
查看>>
本机web开发环境的搭建--nginx篇
查看>>