首先编写一个ftp的工厂类:
FTPPoolFactory:
public class FTPPoolFactory implements PooledObjectFactory<FTPClient> {
private static Logger logger = LoggerFactory.getLogger(FTPPoolFactory.class);
/** 本地字符编码 */
private static String LOCAL_CHARSET = "GBK";
private FtpConfig ftpConfig;
private static int BUFF_SIZE = 256000;
public FTPPoolFactory(FtpConfig ftpConfig) {
this.ftpConfig = ftpConfig;
}
@Override
public PooledObject<FTPClient> makeObject() throws Exception {
FTPClient ftpClient = new FTPClient();
ftpClient.setDefaultPort(ftpConfig.getPort());
ftpClient.setBufferSize(BUFF_SIZE);
ftpClient.setConnectTimeout(30000);
ftpClient.setDataTimeout(180000);
ftpClient.setControlKeepAliveTimeout(60);
ftpClient.setControlKeepAliveReplyTimeout(60);
ftpClient.setControlEncoding("UTF-8");
FTPClientConfig clientConfig = new FTPClientConfig(FTPClientConfig.SYST_UNIX);
clientConfig.setServerLanguageCode("UTF-8");
ftpClient.configure(clientConfig);
try {
ftpClient.connect(ftpConfig.getHost());
} catch (IOException e) {
logger.error("fail to connect ftp server:" + e.getMessage());
}
int reply = ftpClient.getReplyCode();
if (!FTPReply.isPositiveCompletion(reply)) {
ftpClient.disconnect();
logger.error("FTPServer refused connection");
return null;
}
ftpClient.sendCommand("OPTS UTF8", "ON");
boolean result = ftpClient.login(ftpConfig.getUserName(), ftpConfig.getPassWord());
if (!result) {
logger.error("FTP server refuse to connect.");
}
ftpClient.setBufferSize(BUFF_SIZE);
ftpClient.setFileType(FTP.BINARY_FILE_TYPE);
ftpClient.setFileTransferMode(FTP.COMPRESSED_TRANSFER_MODE);
ftpClient.enterLocalPassiveMode();
ftpClient.changeWorkingDirectory(ftpConfig.getWorkPath());
return new DefaultPooledObject<FTPClient>(ftpClient);
}
@Override
public void destroyObject(PooledObject<FTPClient> pooledObject) throws Exception {
FTPClient ftpClient = pooledObject.getObject();
try {
if (ftpClient.isConnected()) {
ftpClient.logout();
ftpClient.disconnect();
}
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public boolean validateObject(PooledObject<FTPClient> pooledObject) {
FTPClient ftpClient = pooledObject.getObject();
try {
boolean b = ftpClient.sendNoOp();
return b;
} catch (IOException e) {
e.printStackTrace();
return false;
}
}
@Override
public void activateObject(PooledObject<FTPClient> pooledObject) throws Exception {
}
@Override
public void passivateObject(PooledObject<FTPClient> pooledObject) throws Exception {
}
}
编写ftp的连接池:FtpPool
public class FtpPool {
private final static GenericObjectPool<FTPClient> internalPool;
private static PropertyUtil property = PropertyUtil.getInstance();
private static Logger logger = Logger.getLogger(FtpPool.class);
static {
String ip = property.getString("gather.host.ip", "");
int port = Integer.parseInt(property.getString("gather.host.port", ""));
String userName = property.getString("gather.host.username", "");
String pwd = property.getString("gather.host.passwd", "");
int maxTotal = property.getInt("maxTotal", 8);
long maxWaitMillis = property.getLong("maxWaitMillis", 1000);
FtpConfig ftpConfig = new FtpConfig();
ftpConfig.setHost(ip);
ftpConfig.setPort(port);
ftpConfig.setPassWord(pwd);
ftpConfig.setUserName(userName);
ftpConfig.setMaxTotal(maxTotal);
ftpConfig.setMaxWaitMillis(maxWaitMillis);
GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
poolConfig.setMaxTotal(ftpConfig.getMaxTotal());// 不设置的话默认是8
poolConfig.setMaxWaitMillis(ftpConfig.getMaxWaitMillis());// 不设置默认无限等待
internalPool = new GenericObjectPool<FTPClient>(new FTPPoolFactory(ftpConfig), poolConfig);
}
/**
* 获取ftp连接
*
* @return
*/
public static FTPClient getFTPClient() {
try {
return internalPool.borrowObject();
} catch (Exception e) {
logger.error("获取FTP连接异常:", e);
return null;
}
}
/**
* 归还连接资源
*
* @param ftpClient
*/
public static void returnFTPClient(FTPClient ftpClient) {
try {
internalPool.returnObject(ftpClient);
} catch (Exception e) {
logger.error("return ftp connection error:" + e.getMessage());
}
}
/**
* 销毁连接池
*/
public static void destoryFTPClientPool() {
try {
internalPool.close();
} catch (Exception e) {
logger.error("destory the client Pool error");
}
}
}
编写FTP上传和下载的工具类:
FTPUtil:
public class FTPUtil {
private static Logger logger = Logger.getLogger(FTPUtil.class);
private static ProduceUtil produce = ProduceUtil.getInstance();
private static ThreadPoolUtil threadPool = ThreadPoolUtil.init();
/**
* 获取FTPClient对象
*
* @param ftpHost FTP主机服务器
* @param ftpPassword FTP 登录密码
* @param ftpUserName FTP登录用户名
* @param ftpPort FTP端口 默认为21
* @return
*/
public static FTPClient getFTPClient(String ftpHost, int ftpPort, String ftpUserName, String ftpPassword) {
FTPClient ftpClient = null;
try {
ftpClient = new FTPClient();
ftpClient.connect(ftpHost, ftpPort);// 连接FTP服务器
ftpClient.login(ftpUserName, ftpPassword);// 登陆FTP服务器
System.out.println(ftpClient.getReply());
if (!FTPReply.isPositiveCompletion(ftpClient.getReplyCode())) {
logger.error("ftp connect error,check the username and password");
ftpClient.disconnect();
} else {
logger.info("ftp connection success");
}
} catch (SocketException e) {
e.printStackTrace();
logger.error("ftp ip error");
} catch (IOException e) {
e.printStackTrace();
logger.error("ftp port error");
}
return ftpClient;
}
/**
* 写入到临时目录
*
* @param s
* @param out
* @return
*/
public static Boolean writeTempFile(String s, BufferedOutputStream out) {
Boolean res = true;
try {
out.write(s.getBytes());
out.flush();
logger.info("write2temp file sucuss....");
} catch (IOException e) {
res = false;
logger.error("write2temp file error...." + e.getMessage());
} finally {
if (res != null) {
try {
out.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
return res;
}
/**
* 从ftp下载文件
*
* @return true下载文件成功,false下载文件失败
*/
public static boolean downloadFile() {
String topic = PropertyUtil.getInstance().getString("topic", "");
String workPath = PropertyUtil.getInstance().getString("workPath", "");
InputStream in = null;
FTPClient ftpClient = null;
BufferedReader br = null;
String line = null;
Boolean result = true;
try {
ftpClient = FtpPool.getFTPClient();
boolean directory = ftpClient.changeWorkingDirectory(workPath);
if (!directory) {
logger.error("读取的目录不存在....");
return false;
}
FTPFile[] ftpFiles = ftpClient.listFiles(workPath);
for (FTPFile ftpFile : ftpFiles) {
if (!ftpFile.isFile())
continue;
String s = ftpFile.getName();
if (!s.endsWith(".txt"))
continue;
in = ftpClient.retrieveFileStream(s);
if (in != null) {
br = new BufferedReader(new InputStreamReader(in));
while ((line = br.readLine()) != null) {
System.out.println(line);
produce.send(topic, line);
}
in.close();
//这句的意思是ftp服务在等待反馈是否结束了
ftpClient.completePendingCommand();
//发送完毕以后删除原来文件
ftpClient.deleteFile(s);
}
}
} catch (Exception e) {
result = false;
logger.error("fail to read ftp file....");
} finally {
if (ftpClient != null) {
try {
FtpPool.returnFTPClient(ftpClient);
} catch (Exception e) {
e.printStackTrace();
}
}
if (br != null) {
try {
br.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (in != null) {
try {
in.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
return result;
}
}
网友评论