美文网首页
presto(十)——data之从hive数据拉数据

presto(十)——data之从hive数据拉数据

作者: hello高world | 来源:发表于2017-05-05 10:56 被阅读0次

    1、从hive中拉数据

    1.1 <b>入口</b>:在presto指定根目录下/etc/catalog/hive.properties

    1.2 <b>读取配置文件</b>com.facebook.presto.connector.ConnectorManager:

    private void loadCatalog(File file)
                throws Exception
        {
            //读取catalog下面的文件,把后缀名给去掉
            String catalogName = Files.getNameWithoutExtension(file.getName());
            if (disabledCatalogs.contains(catalogName)) {
                log.info("Skipping disabled catalog %s", catalogName);
                return;
            }
    
            log.info("-- Loading catalog %s --", file);
            //加载文件中的配置信息到map中
            Map<String, String> properties = new HashMap<>(loadProperties(file));
    
            //获取connector.name 这里一般是写hive不同版本(hive-hadoop2等)
            String connectorName = properties.remove("connector.name");
            checkState(connectorName != null, "Catalog configuration %s does not contain connector.name", file.getAbsoluteFile());
    
            //创建连接
            connectorManager.createConnection(catalogName, connectorName, ImmutableMap.copyOf(properties));
            log.info("-- Added catalog %s using connector %s --", catalogName, connectorName);
        }
    

    <b>1.3 创建连接到hive</b>

    //catalogName=hive connectorName=我使用的是hive-hadoop2  properties包含hive.metastore.uri连接
    connectorManager.createConnection(catalogName, connectorName, ImmutableMap.copyOf(properties));
    
    public synchronized ConnectorId createConnection(String catalogName, String connectorName, Map<String, String> properties)
        {
            requireNonNull(connectorName, "connectorName is null");
            //这个connectorFactories怎么来的?
            ConnectorFactory connectorFactory = connectorFactories.get(connectorName);
            checkArgument(connectorFactory != null, "No factory for connector %s", connectorName);
            return createConnection(catalogName, connectorFactory, properties);
        }
    

    疑问:上面的connectorFactories是怎么来的呢?

     public synchronized void addConnectorFactory(ConnectorFactory connectorFactory)
        {
            ConnectorFactory existingConnectorFactory = connectorFactories.putIfAbsent(connectorFactory.getName(), connectorFactory);
        }
    

    谁调用了上面的这个函数呢?
    <b>在PluginManager类中有这样的函数:

     public void installPlugin(Plugin plugin)
        {
            ......
            for (com.facebook.presto.spi.ConnectorFactory connectorFactory : plugin.getLegacyConnectorFactories()) {
                log.info("Registering legacy connector %s", connectorFactory.getName());
                //会读取plugin的目录,然后加载插件,添加到factory中
                connectorManager.addConnectorFactory(new LegacyTransactionConnectorFactory(connectorFactory));
            }
    
        public PluginManager(
                参数省略)
        {
             ......
            //读取配置信息获取plugin所在的目录地址,即服务的plugin
            installedPluginsDir = config.getInstalledPluginsDir();
            if (config.getPlugins() == null) {
                this.plugins = ImmutableList.of();
            }
            else {
                this.plugins = ImmutableList.copyOf(config.getPlugins());
            }
    
     public void loadPlugins()
                throws Exception
        {
          ....
            //读取plugin目录下的所有目录加载插件,并加载
            for (File file : listFiles(installedPluginsDir)) {
                if (file.isDirectory()) {
                    loadPlugin(file.getAbsolutePath());
                }
            }
    
            for (String plugin : plugins) {
                loadPlugin(plugin);
            }
    .......
        }
    

    <b>1.4 来看看具体的加载数据</b>

    上面我们已经明确了创建连接的过程了,接下面要介绍如何读取hive中的数据。

    我们回到上面createConnection的方法

      private synchronized ConnectorId createConnection(String catalogName, ConnectorFactory connectorFactory, Map<String, String> properties)
        {
            //省略 
            //重点是这句话
            addCatalogConnector(catalogName, connectorId, connectorFactory, properties);
            //省略
        }
    
        private synchronized void addCatalogConnector(String catalogName, ConnectorId connectorId, ConnectorFactory factory, Map<String, String> properties)
        {
            //创建一个connector,由之前的factorys来获取HiveConnectorFactory(presto/spi/connector包下)
            Connector connector = createConnector(connectorId, factory, properties);
    
            //标准的表,也就是hive中的用户表信息
            addConnectorInternal(ConnectorType.STANDARD, catalogName, connectorId, connector);
    
            //hive的表结构信息
            ConnectorId informationSchemaId = createInformationSchemaConnectorId(connectorId);
            addConnectorInternal(ConnectorType.INFORMATION_SCHEMA, catalogName, informationSchemaId, new InformationSchemaConnector(catalogName, nodeManager, metadataManager));
    
            //系统表信息
            ConnectorId systemId = createSystemTablesConnectorId(connectorId);
            addConnectorInternal(ConnectorType.SYSTEM, catalogName, systemId, new SystemConnector(
                    systemId,
                    nodeManager,
                    connector.getSystemTables(),
                    transactionId -> transactionManager.getConnectorTransaction(transactionId, connectorId)));
        }
    

    其中addConnectorInternal的动作:

    1、将connector放入到map中
    2、获取ConnectorSplitManager(spi具体connector实现)
    3、获取PageSourceProvider
    4、获取PageSinkProvider
    5、获取IndexProvider
    6、获取NodePartitioningProvider
    7、获取AccessControl
    8、把上面获取到的对象添加到presto自身的Manager中,比如PageSourceProvider添加到PageSourceManager管理中。
    

    <b>我们关心的从哪里加载数据,应该是PageSourceProvider作用</b>

    #PageSourceManager类
     @Override
        public ConnectorPageSource createPageSource(Session session, Split split, List<ColumnHandle> columns)
        {
            getPageSourceProvider(split).createPageSource(split.getTransactionHandle(), connectorSession, split.getConnectorSplit(), columns);
        }
    
    #具体的实现类HivePageSourceProvider
      @Override
        public ConnectorPageSource createPageSource(ConnectorTransactionHandle transaction, ConnectorSession session, ConnectorSplit split, List<ColumnHandle> columns)
        {
            //读取列信息
            List<HiveColumnHandle> hiveColumns = columns.stream()
                    .map(HiveColumnHandle::toHiveColumnHandle)
                    .collect(toList());
    
            //获取hiveSplit路径
            HiveSplit hiveSplit = checkType(split, HiveSplit.class, "split");
            Path path = new Path(hiveSplit.getPath());
            //hive的数据来源
            Optional<ConnectorPageSource> pageSource = createHivePageSource(
                    cursorProviders,
                    pageSourceFactories,
                    hiveSplit.getClientId(),
                    hdfsEnvironment.getConfiguration(path),
                    session,
                    path,
                    hiveSplit.getStart(),
                    hiveSplit.getLength(),
                    hiveSplit.getSchema(),
                    hiveSplit.getEffectivePredicate(),
                    hiveColumns,
                    hiveSplit.getPartitionKeys(),
                    hiveStorageTimeZone,
                    typeManager);
            if (pageSource.isPresent()) {
                return pageSource.get();
            }
            throw new RuntimeException("Could not find a file reader for split " + hiveSplit);
        }
    

    好了,我们已经知道了他是如何获取数据的了,下一节介绍什么时候去拉取数据。待续~

    相关文章

      网友评论

          本文标题:presto(十)——data之从hive数据拉数据

          本文链接:https://www.haomeiwen.com/subject/sllktxtx.html