一、需求背景
客服团队在接到客户电话后,会根据通话内容,判断客户的诉求,为用户创建工单并在内部系统系统流转,对工单分类因存在人为错误概率而影响客服质量。质检团队会在监控客服人员的用户交谈内容是否合乎服务标准,同时对工单的分类进行审计,纠正存在错误的分类。
算法团队尝试对工单内容进行建模以及分类,最终尝试能在客户挂完电话后,即可预测出该通话是投诉、咨询诉求类型,并判断客服在通话过程中的行为是否合乎服务标准。实时团队需要对接质检团队发送的Kafka消息(完整的工单信息含语音文本内容),并存至Hive仓库中,为提供AI算法团队进行分析需要的基础数据。
二、总体设计
总体设计.png
二、实现过程
1、Hive建表
(a) 投诉工单基础表
CREATE TABLE st.zto_complaint_record_text_base_info
(`bill_code` string comment '运单号',
`business_id` string comment '工单号',
`score` int comment '评价',
`model_analyze_results` string comment '质检标签',
`agt_id` string comment '客服工号',
`batch_id` string comment '普强系统批次ID',
`data_source_type` string comment '普强系统数据源类型',
`id` string comment '普强系统记录ID',
`name` string comment '客服姓名',
`silence_mix` string comment '静音时长',
`task_number` string comment '会话ID',
`big_channel` string comment '会话来源渠道',
`business_type` string comment '工单大类型',
`call_center_name` string comment '网点名称',
`called` string comment '被叫号码',
`caller` string comment '用户手机号',
`calling_identity` string comment '来电客户身份',
`call_type` string comment '呼叫类型',
`complain_type` string comment '业务类型',
`org_code` string comment '网点ID',
`region` string comment 'orgName',
`service_group_user_id` string comment '组长工号',
`service_group_user_name` string comment '组长姓名',
`start_time` string comment '会话开始时间',
`talking_time` int comment '通话时长',
`user_id` string comment '用户ID',
`receive_date` timestamp comment '接收时间')
COMMENT '投诉记录基础信息'
PARTITIONED BY ( `ds` string COMMENT '分区时间')
STORED AS ORC
(b) 投诉工单语音数据表
CREATE TABLE st.zto_complaint_record_text_speech_info
(`task_number` string comment '会话ID',
`speecher` string comment '说话者',
`content` string comment '内容',
`start_time` string comment '开始时间',
`end_time` string comment '结束时间')
COMMENT '投诉记录基础信息'
PARTITIONED BY ( `ds` string COMMENT '分区时间')
STORED AS ORC
2、Streaming代码片段
//存储投诉工单文本基础信息
def complaintBaseInfoInsertHive(spark:SparkSession, df: DataFrame): Unit ={
df.createOrReplaceTempView("tmp_zto_complaint_base_table")
spark.sql("set hive.exec.dynamici.partition = true")
spark.sql("set hive.exec.dynamic.partition.mode=nonstrict")
spark.sql(
s"""
|insert into table st.zto_complaint_record_text_base_info5
|partition(ds)
|select
| bill_code,
| business_id,
| score,
| model_analyze_results,
| agt_id,
| batch_id,
| data_source_type,
| id,
| name,
| silence_mix,
| task_number,
| big_channel,
| business_type,
| call_center_name,
| called,
| caller,
| calling_identity,
| call_type,
| complain_type,
| org_code,
| region,
| service_group_user_id,
| service_group_user_name,
| start_time,
| talking_time,
| user_id,
| receive_date,
| ds
|from tmp_zto_complaint_base_table
|""".stripMargin)
}
三、存在问题
1、小文件过多
因为Streaming每批次都会在hive的分区文件夹下产生一个文件,每批次写入数据量并不会太大(5M以内),频繁执行会在hdfs产生过多小文件
解决方案:
a:调整批次时间窗口
通过观察每批次文件大小,适当拉大批次时间尽量使单次写入的文件大小接近128M
b:定时合并分区表
Streaming写入hive临时表,分钟调度(SparkSql)定时合并分区表
网友评论