首先我们在ambari上面集成presto相关的组件,具体过程不在此说明了。
注意如下事项:
connectors.to.add:
{'hive':[
'connector.name=hive-hadoop2',
'hive.metastore.uri=thrift://knowyou-hdp-02:9083'
],
'kafka':[
'connector.name=kafka',
'kafka.table-names=ambari_kafka_service_check,rawMessage,bmMessage',
'kafka.nodes=knowyou-hdp-01:6667,knowyou-hdp-02:6667,knowyou-hdp-03:6667'
],
'mysql':[
'connector.name=mysql',
'connection-url=jdbc:mysql://172.16.1.208:3306',
'connection-user=root',
'connection-password=root'
]}
discovery.uri:
http://172.16.1.209:8285
1,shell命令操作方式:
连接hive
/usr/lib/presto/bin/presto-cli --server 172.16.1.209:8285 --catalog hive
show schemas;查询所有的库
image.png
use test;
show tables;
image.png
我们来查询下
image.png
连接kafka
/usr/lib/presto/bin/presto-cli --server 172.16.1.209:8285 --catalog kafka
image.png
连接mysql同上
2,jdbc的连接方式
配置文件PropertyUtil
public class PropertyUtil extends Properties {
private static final long serialVersionUID = 50440463580273222L;
private static PropertyUtil instance = null;
public static synchronized PropertyUtil getInstance() {
if (instance == null) {
instance = new PropertyUtil();
}
return instance;
}
@Override
public String getProperty(String key, String defaultValue) {
String val = getProperty(key);
return (val == null || val.isEmpty()) ? defaultValue : val;
}
public String getString(String name, String defaultValue) {
return this.getProperty(name, defaultValue);
}
public int getInt(String name, int defaultValue) {
String val = this.getProperty(name);
return (val == null || val.isEmpty()) ? defaultValue : Integer.parseInt(val);
}
public long getLong(String name, long defaultValue) {
String val = this.getProperty(name);
return (val == null || val.isEmpty()) ? defaultValue : Long.valueOf(val);
}
public float getFloat(String name, float defaultValue) {
String val = this.getProperty(name);
return (val == null || val.isEmpty()) ? defaultValue : Float.parseFloat(val);
}
public double getDouble(String name, double defaultValue) {
String val = this.getProperty(name);
return (val == null || val.isEmpty()) ? defaultValue : Double.parseDouble(val);
}
public byte getByte(String name, byte defaultValue) {
String val = this.getProperty(name);
return (val == null || val.isEmpty()) ? defaultValue : Byte.parseByte(val);
}
private PropertyUtil() {
InputStream in;
try {
in = this.getClass().getClassLoader().getResourceAsStream("my.properties");
this.load(in);
if (in != null) {
in.close();
}
} catch (IOException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
}
}
连接池
public class PrestoJdbcConnectionPool implements DataSource {
private static PropertyUtil property = PropertyUtil.getInstance();
private static Logger logger = Logger.getLogger(PrestoJdbcConnectionPool.class);
private static String driver;
private static String url;
private static String username;
private static String password;
private static int jdbcConnectionInitSize;
private static int max = 10;
private static LinkedList<Connection> list = new LinkedList<>();
static {
try {
url = property.getString("presto.url", "");
driver = property.getString("presto.driver", "");
username = property.getString("presto.userName", "");
password = property.getString("presto.password", "");
Class.forName(driver);
jdbcConnectionInitSize = property.getInt("jdbcConnectionInitSize", 0);
//创建最小数据库的连接
for (int i = 0; i < jdbcConnectionInitSize; i++) {
final Connection conn = DriverManager.getConnection(url, username, password);
logger.info("connected to presto...");
list.add(conn);
}
} catch (SQLException | ClassNotFoundException e) {
e.printStackTrace();
}
}
@Override
public Connection getConnection() throws SQLException {
if (list.size() == 0 && max <= 50) {
try {
Class.forName(driver);
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
for (int i = 0; i < jdbcConnectionInitSize; i++) {
final Connection conn = DriverManager.getConnection(url, username, password);
list.add(conn);
}
max++;
}
if (list.size() > 0) {
final Connection conn1 = list.removeFirst();
return (Connection) Proxy.newProxyInstance(PrestoJdbcConnectionPool.class.getClassLoader(),
new Class[]{Connection.class}, new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String close = "close";
if (!method.getName().equalsIgnoreCase(close)) {
return method.invoke(conn1, args);
} else {
list.add(conn1);
return null;
}
}
});
} else {
logger.error("connect database error.");
}
return null;
}
@Override
public Connection getConnection(String username, String password) {
return null;
}
@Override
public <T> T unwrap(Class<T> iface) {
return null;
}
@Override
public boolean isWrapperFor(Class<?> iface) {
return false;
}
@Override
public PrintWriter getLogWriter() {
return null;
}
@Override
public void setLogWriter(PrintWriter out) {
}
@Override
public void setLoginTimeout(int seconds) {
}
@Override
public int getLoginTimeout() {
return 0;
}
@Override
public java.util.logging.Logger getParentLogger() {
return null;
}
}
连接工具
public class PrestoJdbcConnection {
private static PrestoJdbcConnectionPool pool = new PrestoJdbcConnectionPool();
private static Logger logger = Logger.getLogger(PrestoJdbcConnection.class);
/**
* @return 获取连接
*/
public static Connection getConnection() {
Connection conn = null;
try {
conn = pool.getConnection();
} catch (SQLException e) {
e.printStackTrace();
logger.error(String.format("get presto connection error:%s", e.getMessage()));
}
return conn;
}
/**
* 释放连接
*
* @param conn 连接
*/
public static void clearConnection(final Connection conn) {
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
super.run();
if (conn != null) {
try {
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
});
}
}
测试
public class Test {
public static void main(String[] args) throws SQLException {
final Connection conn = PrestoJdbcConnection.getConnection();
String sql = format("select * from student");
final Statement stat = conn.createStatement();//不可使用prepareStatement
final ResultSet rs = stat.executeQuery(sql);
while (rs.next()) {
final String name = rs.getString(1);
System.out.println(name);
}
}
}
网友评论