美文网首页Hadoop大数据
Presto Event Listener开发

Presto Event Listener开发

作者: 叁金 | 来源:发表于2019-07-30 22:12 被阅读0次

    简介

    同Hive Hook一样,Presto也支持自定义实现Event Listener,用于侦听Presto引擎执行查询时发生的事件,并作出相应的处理。我们可以利用该功能实现诸如自定义日志记录、调试和性能分析插件,帮助我们更好的运维Presto集群。但是不同于Hive Hook的是,在Presto集群中,一次只能有一个Event Listener处于活动状态。

    Event Listener作为Plugin监听以下事件:

    Query Creation(查询建立相关信息)

    Query completion (success or failure)(查询执行相关信息,包含成功查询的细节信息,失败查询的错误码等信息)

    Split completion (success or failure)(split执行信息,同理包含成功和失败的细节信息)

    了解Hook及Listener模式的朋友对于其步骤应该很清楚了,我们只需要:

    实现Presto Event Listener和EventListenerFactory接口。

    正确的打包我们的jar。

    部署,放到Presto指定目录,修改配置文件。

    接口

    实现EventListener,该类是我们的核心逻辑所在,供包含上面所说的三个事件:

    public interface EventListener

    {

    //query创建的详细信息

        default void queryCreated(QueryCreatedEvent queryCreatedEvent)

        {

        }

    //query执行的详细信息

        default void queryCompleted(QueryCompletedEvent queryCompletedEvent)

        {

        }

    //split执行的详细信息

        default void splitCompleted(SplitCompletedEvent splitCompletedEvent)

        {

        }

    }

    实现EventListenerFactory创建我们自己实现的EventListener

    实现Plugin接口,实现getEventListenerFactories()方法,获取我们自己实现的EventListenerFactory

    添加配置信息,为etc/event-listener.properties。其中event-listener.name为必备属性,其他属性为我们plugin所需要的信息。

    示例

    由于集群运维的需要,先需要将用户的查询历史、查询花费的时间等信息进行统计,以便于后续对各个业务的查询进行优先级分级和评分,方便后续Presto集群稳定性易用性的维护。这里给出一个简单的将这些信息存储到Mysql数据库的样例。

    Maven Pom

    <dependency>

          <groupid>com.facebook.presto</groupid>

          <artifactid>presto-spi</artifactid>

          <version>0.220</version>

          <scope>compile</scope>

        </dependency>

    QueryEventListenerFactory

    public class QueryEventListenerFactory implements EventListenerFactory {

      @Override

      public String getName() {

        return "query-event-listener";

      }

      @Override

      public EventListener create(Map<string, string=""> config) {

        if (!config.containsKey("jdbc.uri")) {

          throw new RuntimeException("/etc/event-listener.properties file missing jdbc.uri");

        }

        if (!config.containsKey("jdbc.user")) {

          throw new RuntimeException("/etc/event-listener.properties file missing jdbc.user");

        }

        if (!config.containsKey("jdbc.pwd")) {

          throw new RuntimeException("/etc/event-listener.properties file missing jdbc.pwd");

        }

        return new QueryEventListener(config);

      }

    }

    QueryEventPlugin

    public class QueryEventPlugin implements Plugin {

      @Override

      public Iterable<eventlistenerfactory> getEventListenerFactories() {

        EventListenerFactory listenerFactory = new QueryEventListenerFactory();

        return Arrays.asList(listenerFactory);

      }

    }

    QueryEventListener

    public class QueryEventListener implements EventListener {

      private Map<string, string=""> config;

      private Connection connection;

      public QueryEventListener(Map<string, string=""> config) {

        this.config = new HashMap&lt;&gt;();

        this.config.putAll(config);

        init();

      }

      private void init() {

        try {

          if (connection == null || !connection.isValid(10)) {

            Class.forName("com.mysql.jdbc.Driver");

            connection = DriverManager

                .getConnection(config.get("jdbc.uri"), config.get("jdbc.user"), config.get("jdbc.pwd"));

          }

        } catch (SQLException | ClassNotFoundException e) {

          e.printStackTrace();

        }

      }

      @Override

      public void queryCreated(QueryCreatedEvent queryCreatedEvent) {

      }

      @Override

      public void queryCompleted(QueryCompletedEvent queryCompletedEvent) {

        String queryId = queryCompletedEvent.getMetadata().getQueryId();

        String querySql = queryCompletedEvent.getMetadata().getQuery();

        String queryState = queryCompletedEvent.getMetadata().getQueryState();

        String queryUser = queryCompletedEvent.getContext().getUser();

        long createTime = queryCompletedEvent.getCreateTime().toEpochMilli();

        long endTime = queryCompletedEvent.getEndTime().toEpochMilli();

        long startTime = queryCompletedEvent.getExecutionStartTime().toEpochMilli();

        //insert into query execution table

        long analysisTime = queryCompletedEvent.getStatistics().getAnalysisTime().orElse(Duration.ZERO)

            .toMillis();

        long cpuTime = queryCompletedEvent.getStatistics().getCpuTime().toMillis();

        long queuedTime = queryCompletedEvent.getStatistics().getQueuedTime().toMillis();

        long wallTime = queryCompletedEvent.getStatistics().getWallTime().toMillis();

        int completedSplits = queryCompletedEvent.getStatistics().getCompletedSplits();

        double cumulativeMemory = queryCompletedEvent.getStatistics().getCumulativeMemory();

        long outputBytes = queryCompletedEvent.getStatistics().getOutputBytes();

        long outputRows = queryCompletedEvent.getStatistics().getOutputRows();

        long totalBytes = queryCompletedEvent.getStatistics().getTotalBytes();

        long totalRows = queryCompletedEvent.getStatistics().getTotalRows();

        long writtenBytes = queryCompletedEvent.getStatistics().getWrittenBytes();

        long writtenRows = queryCompletedEvent.getStatistics().getWrittenRows();

    //insert into query info table

        queryCompletedEvent.getFailureInfo().ifPresent(queryFailureInfo -&gt; {

          int code = queryFailureInfo.getErrorCode().getCode();

          String name = queryFailureInfo.getErrorCode().getName();

          String failureType = queryFailureInfo.getFailureType().orElse("").toUpperCase();

          String failureHost = queryFailureInfo.getFailureHost().orElse("").toUpperCase();

          String failureMessage = queryFailureInfo.getFailureMessage().orElse("").toUpperCase();

          String failureTask = queryFailureInfo.getFailureTask().orElse("").toUpperCase();

          String failuresJson = queryFailureInfo.getFailuresJson();

          // insert into failed query table

        });

      }

      @Override

      public void splitCompleted(SplitCompletedEvent splitCompletedEvent) {

        long createTime = splitCompletedEvent.getCreateTime().toEpochMilli();

        long endTime = splitCompletedEvent.getEndTime().orElse(Instant.MIN).toEpochMilli();

        String payload = splitCompletedEvent.getPayload();

        String queryId = splitCompletedEvent.getQueryId();

        String stageId = splitCompletedEvent.getStageId();

        long startTime = splitCompletedEvent.getStartTime().orElse(Instant.MIN).toEpochMilli();

        String taskId = splitCompletedEvent.getTaskId();

        long completedDataSizeBytes = splitCompletedEvent.getStatistics().getCompletedDataSizeBytes();

        long completedPositions = splitCompletedEvent.getStatistics().getCompletedPositions();

        long completedReadTime = splitCompletedEvent.getStatistics().getCompletedReadTime().toMillis();

        long cpuTime = splitCompletedEvent.getStatistics().getCpuTime().toMillis();

        long queuedTime = splitCompletedEvent.getStatistics().getQueuedTime().toMillis();

        long wallTime = splitCompletedEvent.getStatistics().getWallTime().toMillis();

        //insert into stage info table

      }

    }

    打包

    Presto使用服务提供者接口(SPI)来扩展Presto。Presto使用SPI加载连接器功能类型系统访问控制。SPI通过元数据文件加载。我们还需要创建src/main/resources/META-INF/services/com.facebook.presto.spi.Plugin元数据文件。该文件应包含我们插件的类名如:com.ji3jin.presto.listener.QueryEventListener

    执行mvn clean install打包

    部署

    创建配置文件etc/event-listener.properties

    event-listener.name=query-event-listener

    jdbc.uri=jdbc:mysql://localhost:3306/presto_monitor

    jdbc.user=presto

    jdbc.pwd=presto123

    在presto根目录下创建query-event-listener目录,名称与我们上面event listener的name一致

    将我们的jar包和mysql connector的jar包拷贝到上面创建的目录

    重新启动Presto服务即可

    好了,现在你可以执行查询,然后就可以在Mysql中看到你的查询历史和相关时间的统计信息了。如果你目前的工作对此也有需要,还等什么,快动手实现一个吧。

    欢迎关注我的公众号:叁金大数据

    </string,></string,></string,>

    相关文章

      网友评论

        本文标题:Presto Event Listener开发

        本文链接:https://www.haomeiwen.com/subject/njoorctx.html