美文网首页
presto操作kafka和hive,mysql

presto操作kafka和hive,mysql

作者: 会飞的蜗牛66666 | 来源:发表于2019-01-23 15:41 被阅读0次

    首先我们在ambari上面集成presto相关的组件,具体过程不在此说明了。
    注意如下事项:
    connectors.to.add:
    {'hive':[
    'connector.name=hive-hadoop2',
    'hive.metastore.uri=thrift://knowyou-hdp-02:9083'
    ],
    'kafka':[
    'connector.name=kafka',
    'kafka.table-names=ambari_kafka_service_check,rawMessage,bmMessage',
    'kafka.nodes=knowyou-hdp-01:6667,knowyou-hdp-02:6667,knowyou-hdp-03:6667'
    ],
    'mysql':[
    'connector.name=mysql',
    'connection-url=jdbc:mysql://172.16.1.208:3306',
    'connection-user=root',
    'connection-password=root'
    ]}
    discovery.uri:
    http://172.16.1.209:8285

    1,shell命令操作方式:
    连接hive
    /usr/lib/presto/bin/presto-cli --server 172.16.1.209:8285 --catalog hive
    show schemas;查询所有的库


    image.png

    use test;
    show tables;


    image.png
    我们来查询下
    image.png
    连接kafka
    /usr/lib/presto/bin/presto-cli --server 172.16.1.209:8285 --catalog kafka
    image.png

    连接mysql同上

    2,jdbc的连接方式
    配置文件PropertyUtil

    public class PropertyUtil extends Properties {

    private static final long serialVersionUID = 50440463580273222L;
    
    private static PropertyUtil instance = null;
    
    public static synchronized PropertyUtil getInstance() {
        if (instance == null) {
            instance = new PropertyUtil();
        }
        return instance;
    }
    
    @Override
    public String getProperty(String key, String defaultValue) {
        String val = getProperty(key);
        return (val == null || val.isEmpty()) ? defaultValue : val;
    }
    
    public String getString(String name, String defaultValue) {
        return this.getProperty(name, defaultValue);
    }
    
    public int getInt(String name, int defaultValue) {
        String val = this.getProperty(name);
        return (val == null || val.isEmpty()) ? defaultValue : Integer.parseInt(val);
    }
    
    public long getLong(String name, long defaultValue) {
        String val = this.getProperty(name);
        return (val == null || val.isEmpty()) ? defaultValue : Long.valueOf(val);
    }
    
    public float getFloat(String name, float defaultValue) {
        String val = this.getProperty(name);
        return (val == null || val.isEmpty()) ? defaultValue : Float.parseFloat(val);
    }
    
    public double getDouble(String name, double defaultValue) {
        String val = this.getProperty(name);
        return (val == null || val.isEmpty()) ? defaultValue : Double.parseDouble(val);
    }
    
    public byte getByte(String name, byte defaultValue) {
        String val = this.getProperty(name);
        return (val == null || val.isEmpty()) ? defaultValue : Byte.parseByte(val);
    }
    
    private PropertyUtil() {
        InputStream in;
        try {
            in = this.getClass().getClassLoader().getResourceAsStream("my.properties");
            this.load(in);
            if (in != null) {
                in.close();
            }
        } catch (IOException e1) {
            // TODO Auto-generated catch block
            e1.printStackTrace();
        }
    }
    

    }

    连接池
    public class PrestoJdbcConnectionPool implements DataSource {
    private static PropertyUtil property = PropertyUtil.getInstance();
    private static Logger logger = Logger.getLogger(PrestoJdbcConnectionPool.class);
    private static String driver;
    private static String url;
    private static String username;
    private static String password;
    private static int jdbcConnectionInitSize;
    private static int max = 10;
    private static LinkedList<Connection> list = new LinkedList<>();

    static {
        try {
            url = property.getString("presto.url", "");
            driver = property.getString("presto.driver", "");
            username = property.getString("presto.userName", "");
            password = property.getString("presto.password", "");
            Class.forName(driver);
            jdbcConnectionInitSize = property.getInt("jdbcConnectionInitSize", 0);
            //创建最小数据库的连接
            for (int i = 0; i < jdbcConnectionInitSize; i++) {
                final Connection conn = DriverManager.getConnection(url, username, password);
                logger.info("connected to presto...");
                list.add(conn);
            }
        } catch (SQLException | ClassNotFoundException e) {
            e.printStackTrace();
        }
    }
    
    @Override
    public Connection getConnection() throws SQLException {
        if (list.size() == 0 && max <= 50) {
            try {
                Class.forName(driver);
            } catch (ClassNotFoundException e) {
                e.printStackTrace();
            }
            for (int i = 0; i < jdbcConnectionInitSize; i++) {
                final Connection conn = DriverManager.getConnection(url, username, password);
                list.add(conn);
            }
            max++;
        }
        if (list.size() > 0) {
            final Connection conn1 = list.removeFirst();
            return (Connection) Proxy.newProxyInstance(PrestoJdbcConnectionPool.class.getClassLoader(),
                    new Class[]{Connection.class}, new InvocationHandler() {
    
                        @Override
                        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                            String close = "close";
                            if (!method.getName().equalsIgnoreCase(close)) {
                                return method.invoke(conn1, args);
                            } else {
                                list.add(conn1);
                                return null;
                            }
                        }
                    });
        } else {
            logger.error("connect database error.");
        }
        return null;
    }
    
    @Override
    public Connection getConnection(String username, String password) {
        return null;
    }
    
    @Override
    public <T> T unwrap(Class<T> iface) {
        return null;
    }
    
    @Override
    public boolean isWrapperFor(Class<?> iface) {
        return false;
    }
    
    @Override
    public PrintWriter getLogWriter() {
        return null;
    }
    
    @Override
    public void setLogWriter(PrintWriter out) {
    
    }
    
    @Override
    public void setLoginTimeout(int seconds) {
    
    }
    
    @Override
    public int getLoginTimeout() {
        return 0;
    }
    
    @Override
    public java.util.logging.Logger getParentLogger() {
        return null;
    }
    

    }

    连接工具
    public class PrestoJdbcConnection {
    private static PrestoJdbcConnectionPool pool = new PrestoJdbcConnectionPool();
    private static Logger logger = Logger.getLogger(PrestoJdbcConnection.class);

    /**
     * @return 获取连接
     */
    public static Connection getConnection() {
        Connection conn = null;
        try {
            conn = pool.getConnection();
        } catch (SQLException e) {
            e.printStackTrace();
            logger.error(String.format("get presto connection error:%s", e.getMessage()));
        }
        return conn;
    }
    
    
    /**
     * 释放连接
     *
     * @param conn 连接
     */
    public static void clearConnection(final Connection conn) {
        Runtime.getRuntime().addShutdownHook(new Thread() {
    
            @Override
            public void run() {
                super.run();
                if (conn != null) {
                    try {
                        conn.close();
                    } catch (SQLException e) {
                        e.printStackTrace();
                    }
                }
            }
        });
    }
    

    }
    测试
    public class Test {

    public static void main(String[] args) throws SQLException {
        final Connection conn = PrestoJdbcConnection.getConnection();
        String sql = format("select * from student");
        final Statement stat = conn.createStatement();//不可使用prepareStatement
        final ResultSet rs = stat.executeQuery(sql);
        while (rs.next()) {
            final String name = rs.getString(1);
            System.out.println(name);
        }
    }
    

    }

    相关文章

      网友评论

          本文标题:presto操作kafka和hive,mysql

          本文链接:https://www.haomeiwen.com/subject/vjzjjqtx.html