前段时间因为项目的需要,做了一下java连R语言环境执行脚本的功能。现在做下总结
我选用了Rserve工具
<dependency>
<groupId>org.rosuda.REngine</groupId>
<artifactId>Rserve</artifactId>
<version>1.8.1</version>
</dependency>
<dependency>
<groupId>org.rosuda.REngine</groupId>
<artifactId>REngine</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.rosuda.REngine</groupId>
<artifactId>RserveEngine</artifactId>
<version>4066</version>
</dependency>
install.packages('rJava')
install.packages('DBI')
install.packages('RJDBC')
install.packages(“Rserve”, “Rserve_1.8-6.tgz”, “http://www.rforge.net/”)
执行命令 R CMD Rserve --RS-enable-remote
因为在连接Rserve需要一个共享的连接,所以用了一个包Rsession,然后改造了这个包里面的类
这是状态维持的类。
import org.math.R.RserverConf;
import org.math.R.Rsession;
import org.rosuda.REngine.REXP;
import org.rosuda.REngine.Rserve.RConnection;
import org.rosuda.REngine.Rserve.RserveException;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 加入 Rsession
* 进行状态维持
*/
public class RServeHelper {
private static RConnection rConnection = null;
private static Rsession rsession = null;
private String interpreterIp = Config.InterperterIp;
private RBaseUtils rBaseUtils = new RBaseUtils();
private RLogger rLogger = new RLogger();
private static AtomicInteger stopStatus= new AtomicInteger(0);
public synchronized Rsession getRSession() throws Exception {
if(stopStatus.intValue()==1){
System.out.println("R服务器已经终止");
rsession=null;
return null;
}
if (rsession == null||!rsession.connected ) {
String username = "";
String password = "";
RserverConf rconf = new RserverConf(interpreterIp, 6311, username, password, new Properties());
try {
rsession = Rsession.newInstanceTry(System.out, rconf);
} catch (Exception e) {
e.printStackTrace();
throw e;
}
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return rsession;
}
public static void setStopStatus(){
System.out.println("设置终止标志");
stopStatus=new AtomicInteger(0);
}
/**
* 终止任务
*
* @return
*/
public synchronized static void closeRTask() {
// getRSession().end();
try {
if (rsession == null) {
return;
}
rsession.end();
rsession=null;
stopStatus=new AtomicInteger(1);
} catch (Exception ex) {
System.out.println(ex.getMessage());
}
}
public synchronized REXP runScript(String script) throws Exception {
try {
if( getRSession()==null){
System.out.println("没有连接到R服务器");
return null;
}
return getRSession().eval(script);
} catch (Exception e) {
throw e;
}
}
}
执行R的过程中需要知道里面的运行情况,有几种处理方式:
1、继承RLogger接口,
2、用socket连接
例子:
con1 <- socketConnection(host='127.0.0.1', port="9999", blocking=TRUE);
write.table(“ss”, file=con1, col.names=FALSE,row.names=FALSE);
网友评论