1.背景
Flume1.7有一个叫taildir的source,可以监控一个或者多个文件夹里面的文本文件,有新增文件或者文件有追加内容,都会被监测到然后按行把数据发送到channel。为了唯一标示一个文件,该source利用操作系统inode的方式获得文件的一个id,目前仅采用unix的方式获取,不支持window,需要更改源码添加window获取inode的方法。
[inode]
操作系统读取硬盘的时候,不会一个个扇区地读取,而是一次性连续读取多个扇区,即一次性读取一个"块"(block)。这种由多个扇区组成的"块",是文件存取的最小单位。"块"的大小,最常见的是4KB,即连续八个 sector组成一个 block。
文件数据都储存在"块"中,还必须找到一个地方储存文件的元信息,比如文件的创建者、文件的创建日期、文件的大小等等。这种储存文件元信息的区域就叫做inode,中文译名为"索引节点"。
2.Flume源码编译
Github上选择1.7版本的branch,下载源码。
(https://github.com/apache/flume)
由于有些依赖包下不到,修改根目录的pom.xml文件,在根<project>标签下添加如下的maven库:
<repositories>
<repository>
<id>nexus.axiomalaska.com</id>
<url>http://nexus.axiomalaska.com/nexus/content/repositories/public</url>
</repository>
<repository>
<id>maven.aliyun.com</id>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
</repository>
<repository>
<id>maven.tempo-db.com</id>
<url>http://maven.oschina.net/service/local/repositories/sonatype-public-grid/content/</url>
</repository>
<repository>
<id>p2.jfrog.org</id>
<url>http://p2.jfrog.org/libs-releases</url>
</repository>
</repositories>
用如下命令编译flume源码:
mvn clean install -DskipTests
flume编译结果
编译完成会在每个模块下看到各自相应的target文件夹,里面有编译之后的jar包。
3.新添加source模块
在flume-ng-sources模块中添加flume-win-taildir-source模块。新建package:org.apache.flume.source.wintaildir,把flume-taildir-source中的类考到该包下。
新添加source模块pom.xml中的内容也拷贝过来,并做如下更改:
pom.xml修改添加如下依赖:
win-taildir-source依赖添加在编译环境下搜索“flume-taildir-source”, 含有该字符串的的pom.xml文件中仿照flume-taildir-source模块添加flume-win-taildir-source的相应描述。
4.添加windows获取inode的代码
新建util包,添加如下两个类。
新添加两个类Kernel32.java:
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.flume.source.wintaildir.util;
/**
* Created by caolch on 2017/4/25.
*/
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import com.sun.jna.Library;
import com.sun.jna.Native;
import com.sun.jna.Structure;
import com.sun.jna.platform.win32.WinBase.FILETIME;
import com.sun.jna.platform.win32.WinDef.DWORD;
import com.sun.jna.platform.win32.WinNT.HANDLE;
import com.sun.jna.win32.StdCallLibrary;
import com.sun.jna.win32.W32APIFunctionMapper;
import com.sun.jna.win32.W32APITypeMapper;
public interface Kernel32 extends StdCallLibrary {
final static Map<String, Object> WIN32API_OPTIONS = new HashMap<String, Object>() {
private static final long serialVersionUID = 1L;
{
put(Library.OPTION_FUNCTION_MAPPER, W32APIFunctionMapper.UNICODE);
put(Library.OPTION_TYPE_MAPPER, W32APITypeMapper.UNICODE);
}
};
public Kernel32 INSTANCE = (Kernel32) Native.loadLibrary("Kernel32",
Kernel32.class, WIN32API_OPTIONS);
public int GetLastError();
public class BY_HANDLE_FILE_INFORMATION extends Structure {
public DWORD dwFileAttributes;
public FILETIME ftCreationTime;
public FILETIME ftLastAccessTime;
public FILETIME ftLastWriteTime;
public DWORD dwVolumeSerialNumber;
public DWORD nFileSizeHigh;
public DWORD nFileSizeLow;
public DWORD nNumberOfLinks;
public DWORD nFileIndexHigh;
public DWORD nFileIndexLow;
public static class ByReference extends BY_HANDLE_FILE_INFORMATION
implements Structure.ByReference {
};
public static class ByValue extends BY_HANDLE_FILE_INFORMATION
implements Structure.ByValue {
}
@Override
protected List getFieldOrder() {
List<String> fields = new ArrayList<String>();
fields.addAll(Arrays.asList(new String[] { "dwFileAttributes",
"ftCreationTime", "ftLastAccessTime", "ftLastWriteTime",
"dwVolumeSerialNumber", "nFileSizeHigh", "nFileSizeLow",
"nNumberOfLinks", "nFileIndexHigh", "nFileIndexLow" }));
return fields;
};
};
boolean GetFileInformationByHandle(HANDLE hFile,
BY_HANDLE_FILE_INFORMATION lpFileInformation);
}
WinFileUtil.java:
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.flume.source.wintaildir.util;
/**
* Created by caolch on 2017/4/25.
*/
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.sun.jna.platform.win32.Kernel32;
import com.sun.jna.platform.win32.WinBase;
import com.sun.jna.platform.win32.WinNT.HANDLE;
//import org.apache.flume.source.wintaildir.util.Kernel32;
public class WinFileUtil {
private static Logger logger = LoggerFactory.getLogger(WinFileUtil.class);
public static String getFileId(String filepath) {
final int FILE_SHARE_READ = (0x00000001);
final int OPEN_EXISTING = (3);
final int GENERIC_READ = (0x80000000);
final int FILE_ATTRIBUTE_ARCHIVE = (0x20);
WinBase.SECURITY_ATTRIBUTES attr = null;
org.apache.flume.source.wintaildir.util.Kernel32.BY_HANDLE_FILE_INFORMATION lpFileInformation = new org.apache.flume.source.wintaildir.util.Kernel32.BY_HANDLE_FILE_INFORMATION();
HANDLE hFile = null;
hFile = Kernel32.INSTANCE.CreateFile(filepath, 0,
FILE_SHARE_READ, attr, OPEN_EXISTING, FILE_ATTRIBUTE_ARCHIVE,
null);
String ret = "0";
if (Kernel32.INSTANCE.GetLastError() == 0) {
org.apache.flume.source.wintaildir.util.Kernel32.INSTANCE
.GetFileInformationByHandle(hFile, lpFileInformation);
ret = lpFileInformation.dwVolumeSerialNumber.toString()
+ lpFileInformation.nFileIndexLow.toString();
Kernel32.INSTANCE.CloseHandle(hFile);
if (Kernel32.INSTANCE.GetLastError() == 0) {
logger.debug("inode:" + ret);
return ret;
} else {
logger.error("关闭文件发生错误:{}", filepath);
throw new RuntimeException("关闭文件发生错误:" + filepath);
}
} else {
if (hFile != null) {
Kernel32.INSTANCE.CloseHandle(hFile);
}
logger.error("打开文件发生错误:{}", filepath);
throw new RuntimeException("打开文件发生错误:" + filepath);
}
}
}
代码文件头一定要添加licenses声明,否则编译不通过。
在类RelialeTailderEventReader中找到getInode方法,修改方法如下:
5.打包测试
Mvn重新编译,找到flume-win-taildir-source-1.7.0.jar包,拷贝到flume的lib下。
在MVN本地库中找到如下两个jar包,也拷贝到flume的lib目录下,这两个包是获取windows操作系统inode的依赖的包。
编辑flume配置文件,采用自定义的source,sink采用file_roll,详细内容如下:
# example.conf: A single-node Flume configuration
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#Describe the source
a1.sources.r1.type = org.apache.flume.source.wintaildir.TaildirSource
#org.apache.flume.source.wintaildir.TaildirSource
a1.sources.r1.positionFile = E:\\tmp\\.meta\\taildir_position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 =E:\\tmp\\.*\.txt
a1.sources.r1.headers.f1.headerKey1 = value1
a1.sources.r1.fileHeader = true
# Describe the sink
a1.sinks.k1.type = file_roll
#a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1
a1.sinks.k1.sink.directory = E:\\output
a1.sinks.k1.sink.useFileSuffix = true
a1.sinks.k1.sink.fileSuffix = .completed
a1.sinks.k1.sink.rollInterval = 0
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 20000000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
cmd中定位到flume的安装目录的bin下,执行如下命令:
flume-ng.cmd agent -conf ../conf -conf-file ../conf/wintest.properties -n a1
6.windows操作系统支持
flume中的flume-ng.cmd脚本采用PowserShell2.0以上版本编写,理论上有PowserShell2.0以上版本的操作系统都支持。
亲测以下几个版本的操作系统可用,仅window server 2003 Sp1不支持flume。
版本 | 是否支持 |
---|---|
Window Server 2003 sp1 | 仅支持powershell1.0,不支持flume |
Window Server 2003 sp2 x86 | 可安装powershell2.0 |
Window Server 2003 sp2 x64 | 可安装powershell2.0 |
Window Server 2008 x32 | 可安装powershell2.0,3.0, 4.0 |
Window Server 2008 x64 | 可安装powershell2.0,3.0, 4.0 |
Window 10 | 内置powershell5.0 |
网友评论