美文网首页
Streaming写Hive仓库

Streaming写Hive仓库

作者: ryancao_b9b9 | 来源:发表于2020-04-16 19:21 被阅读0次

一、需求背景
  客服团队在接到客户电话后,会根据通话内容,判断客户的诉求,为用户创建工单并在内部系统系统流转,对工单分类因存在人为错误概率而影响客服质量。质检团队会在监控客服人员的用户交谈内容是否合乎服务标准,同时对工单的分类进行审计,纠正存在错误的分类。
  算法团队尝试对工单内容进行建模以及分类,最终尝试能在客户挂完电话后,即可预测出该通话是投诉、咨询诉求类型,并判断客服在通话过程中的行为是否合乎服务标准。实时团队需要对接质检团队发送的Kafka消息(完整的工单信息含语音文本内容),并存至Hive仓库中,为提供AI算法团队进行分析需要的基础数据。

二、总体设计


总体设计.png

二、实现过程
1、Hive建表
(a) 投诉工单基础表

CREATE TABLE st.zto_complaint_record_text_base_info
(`bill_code` string comment '运单号',
`business_id` string comment '工单号',
`score` int comment '评价',
`model_analyze_results` string comment '质检标签',
`agt_id` string comment '客服工号',
`batch_id` string comment '普强系统批次ID',
`data_source_type` string comment '普强系统数据源类型',
`id` string comment '普强系统记录ID',
`name` string comment '客服姓名',
`silence_mix` string comment '静音时长',
`task_number` string comment '会话ID',
`big_channel` string comment '会话来源渠道',
`business_type` string comment '工单大类型',
`call_center_name` string comment '网点名称',
`called` string comment '被叫号码',
`caller` string comment '用户手机号',
`calling_identity` string comment '来电客户身份',
`call_type` string comment '呼叫类型',
`complain_type` string comment '业务类型',
`org_code` string comment '网点ID',
`region` string comment 'orgName',
`service_group_user_id` string comment '组长工号',
`service_group_user_name` string comment '组长姓名',
`start_time` string comment '会话开始时间',
`talking_time` int comment '通话时长',
`user_id` string comment '用户ID',
`receive_date` timestamp comment '接收时间')
COMMENT '投诉记录基础信息'
PARTITIONED BY ( `ds` string COMMENT '分区时间')
STORED AS ORC

(b) 投诉工单语音数据表

CREATE TABLE st.zto_complaint_record_text_speech_info
(`task_number` string comment '会话ID',
`speecher` string comment '说话者',
`content` string comment '内容',
`start_time` string comment '开始时间',
`end_time` string comment '结束时间') 
COMMENT '投诉记录基础信息'
PARTITIONED BY ( `ds` string COMMENT '分区时间')
STORED AS ORC

2、Streaming代码片段

//存储投诉工单文本基础信息
  def complaintBaseInfoInsertHive(spark:SparkSession, df: DataFrame): Unit ={
    df.createOrReplaceTempView("tmp_zto_complaint_base_table")
    spark.sql("set hive.exec.dynamici.partition = true")
    spark.sql("set hive.exec.dynamic.partition.mode=nonstrict")
    spark.sql(
      s"""
         |insert into table st.zto_complaint_record_text_base_info5
         |partition(ds)
         |select
         | bill_code,
         | business_id,
         | score,
         | model_analyze_results,
         | agt_id,
         | batch_id,
         | data_source_type,
         | id,
         | name,
         | silence_mix,
         | task_number,
         | big_channel,
         | business_type,
         | call_center_name,
         | called,
         | caller,
         | calling_identity,
         | call_type,
         | complain_type,
         | org_code,
         | region,
         | service_group_user_id,
         | service_group_user_name,
         | start_time,
         | talking_time,
         | user_id,
         | receive_date,
         | ds
         |from tmp_zto_complaint_base_table
         |""".stripMargin)
  }

三、存在问题
1、小文件过多
因为Streaming每批次都会在hive的分区文件夹下产生一个文件,每批次写入数据量并不会太大(5M以内),频繁执行会在hdfs产生过多小文件
解决方案:
a:调整批次时间窗口
通过观察每批次文件大小,适当拉大批次时间尽量使单次写入的文件大小接近128M
b:定时合并分区表
Streaming写入hive临时表,分钟调度(SparkSql)定时合并分区表

相关文章

网友评论

      本文标题:Streaming写Hive仓库

      本文链接:https://www.haomeiwen.com/subject/lhmdvhtx.html