背景
之前说过,其实ES很多功能是用不到的。尤其是mapping有很大调优空间。专家和新手调配的集群,性能是截然不同的。但是再怎么调优,仍然需要走ES的很多流程,还是满足不了一些场景。
一直想弄个简化版的ES,砍掉花里胡哨的代码,压缩处理链与存储空间,做到极限的全文检索数据库。。。
这个想法要实现起来起码要大改ES源码了,不太现实。
正好有个项目需求,先尝试做到快速加载到ES,起码50MB/s单节点 约50万条每秒(100字节)
(想法参考了滴滴公司做的fast-loades 他们跑的是MR任务)
需求
- 只需要核心ES功能 索引数据以及常用检索
- 突破ES正常加载速度 30MB/s 左右
- 数据库 分区功能 SQL [待定]
思路
- 绕过正常加载流程
HTTP流程、JSON解析、请求转发、mapping检查、mvcc控制、translog等
- 走直接生成Lucene文件的方式
Kafka > Avro > Lucene > merge > add Indices > ES shard 目录的流程
技术性验证
- 可行性在于ES可以有plugin机制 能定位到内存中的shard对象 add Indices 做到实时导入目录
- 对Lucene来说add Indices不需要重新索引文档(直接拷贝文件)
且这一步操作Lucene保证了是事务的 要么成功要么不成功
给重试机制提供了可能 - ES本质上是Lucene文件 只要能定位到mapping对应产生的Fields就能抛开ES单独直接生产Lucene文件
- 前提是直接生成Lucene文件的速度几倍于ES的速度上限 否则折腾这个没必要
工程化难点和思路
-
怎么生成ES格式的indexwriter (因为ES有很多mapping参数 映射到Lucene对象上比较复杂)
考虑整理一个工具包 给定ES mapping 生成indexwriter以及add Document
(滴滴是进程启动了一个本地节点的ES实例 原生接口入ES,虽然最简单 但是不是极致性能) -
怎么在本节点直接生成Lucene添加到本地shard(kafka怎么指定节点消费 且需要动态改变)
节点驱动->不太可能(不清楚会有哪些日期分区 表)
数据驱动->节点消费kafka生成数据后 推送到目标节点? -
索引周期性生成的话怎么全局控制
-
spark streaming OR 本地kafka直接消费?
spark好处是有全局控制、可以定制metric统计、管理部署简单
kafka好处是更加稳定高效
初步方案
- 目前只考虑支持常用类型 常用分词器等
- IndexWriter的获取可以考虑参考ES测试类的构造方式。。
- 使用standalone方式的spark streaming(方便控制executor分布)driver维护产生了哪些索引(负责创建)
- spark消费kafka生成本地Lucene 不考虑目标shard所在
- 实现ES插件 周期检查生成目录 把已关闭的Lucene(达到大小或时间)按策略发往目标节点的shard 这里应该采取本地就近原则
- Lucene添加到shard 移除
TODO
- 这样直接add Field 实际上大大减少了ES的功能点。。会不会导致添加到ES的数据无法正常使用?
- 例如ES每个字段根据mapping的不同会add 多个field(indexed stored docvalues sorted ...)
- 这里可能需要对Lucene底层较了解才行 (或者直接对比正常ES流程产生的shard文件 解读)
- 测试生成导入文件后 常见检索有无异常。。
- 进一步优化 : Lucene的field能不能更简洁了
测试
-
测试Lucene生成速度 parquet -> Lucene SY数据
[SATA硬盘x10 E5 CPU 40线程] 数据30个字段 平均300字节(CSV)
测试场景 | 并发线程数 | 持续时长 | 总写入条数 | 平均速度 | 字节速度 | CPU占用 |
---|---|---|---|---|---|---|
单磁盘 | 1 | 120s | 300万 | 25000/s | 7.5MB/s | 2% ~ 5% |
单磁盘 | 2 | 200s | 1000万 | 50000/s | 15MB/s | 5% ~ 7% |
多磁盘 | 20 | 340s | 1亿 | 290000/s | 87MB/s | 30% ~ 65% |
多磁盘 | 40 | 2000s | 8亿 | 400000/s | 120MB/s | 80% ~ 100% |
-
测试生成的Lucene文件添加到ES
1)测试直接add会导致查询错误。。NPE 原因:缺少很多ES内部metadata field以及其他一些功能性field ...
- _field_names field
- _ignored field
- _id field
- _index field
- _meta field
- _routing field
- _source field
- _type field
又如 SeqNoFieldMapper 会自动生成好几个Field
- Mapper for the {@code _seq_no} field.
- We expect to use the seq# for sorting, during collision checking and for
- doing range searches. Therefore the {@code _seq_no} field is stored both
- as a numeric doc value and as numeric indexed field.
2)
结论
绕过ES本身接口进行数据加载是可行的。并且能达到3-4倍于ES的极限加载速度。
附 1 ES 的常用字段类型最简化mapping添加doc 产生的fields 以及属性 。。。
重要属性设置
参数 | 含义 | 可选项 |
---|---|---|
analyzer | text类型的分词器 建索引的时候 | 略 |
doc_values | 列存方式的数据 能快速遍历字段的terms。用于排序 聚合等。text类型不能设置 | 默认开启,可设置false |
enabled | 设置字段是否启用(通常是object字段 在索引过程可忽略)但是能在source取出 | 略 |
fielddata | 这是给text类型单独的选项 用于排序 聚合(针对text类型不能doc_values)但是需要大量内存,默认关闭 慎用。 | 略 |
format | date类型专用 设置数据解析格式 注意可以是多个 | 略 |
ignore_above | 字符串专用 设置需要索引文本长度(影响取词范围)但是_source仍可以拿到完整 | 略 |
ignore_malformed | 对数值 日期 GEO等类型 设置容忍数据格式错误 出错时会忽略掉该字段的索引 | 略 |
index | 设置字段是否索引 (即可查 但是聚合等仍开启,比enabled稍轻) | 略 |
index_options | text类型专用。设置索引中存储的内容包含级别 (词频、位置、偏移)不需要评分、短语检索、高亮功能可适当关闭 以省磁盘空间 | docs/freqs/positions(默认)/offsets |
meta | 字段可以扩展部分信息 额外存储 | 略 |
fields | 通常用于字符串类型,设置同一个字段多种索引方式 如用不同的分词器都建索引(用于不同功能) | 略 |
normalizer | 控制分词器的 (分词分为切词 过滤 统一化 等过程) | 略 |
norms | 评分相关 不需要评分可以关闭 | 略 |
properties | 嵌套类型的子字段。(object/nested) | 略 |
search_analyzer | 设置检索时使用的分词器。(这里举了个自动补全检索的例子 可以关注下) | 略 |
similarity | 相关度的算法 | bm25/classic(指TF/IDF) / boolean |
store | 字段是否在field对象里存储。注意默认是不存储的 因为通常是从_source 内部字段取值 但是有些情况是希望只取部分列 不想拿整个大的_source | 略 |
term_vector | 默认关闭。设置字段额外文件结构 存储文档中各种词的信息 如出现总频次 这里与index_options 似乎有点重复 个人理解这里是给其他API分析用的 非检索用的 | 最大级别:with_positions_offsets_payloads |
测试的mapping
StructField(mobileid,ByteType,true)
StructField(nettype,ByteType,true)
StructField(nattype,ByteType,true)
StructField(imsi,LongType,true)
StructField(imei,StringType,true)
StructField(mac,StringType,true)
StructField(account,StringType,true)
StructField(ua,StringType,true)
StructField(accounttype,ByteType,true)
StructField(logintype,ByteType,true)
StructField(linetimetype,ByteType,true)
StructField(srcipv4,StringType,true)
StructField(srcipv6,StringType,true)
StructField(srcport,IntegerType,true)
StructField(dstipv4,StringType,true)
StructField(dstipv6,StringType,true)
StructField(dstport,IntegerType,true)
StructField(protocoltype,IntegerType,true)
StructField(servicetypeid,ShortType,true)
StructField(virtualusername,StringType,true)
StructField(accesstime,TimestampType,true)
StructField(lac,StringType,true)
StructField(ci,StringType,true)
StructField(httprequesttype,IntegerType,true)
StructField(url,StringType,true)
StructField(accountflag,ByteType,true)
StructField(urlflag,ByteType,true)
StructField(priipv4,StringType,true)
StructField(startport,IntegerType,true)
StructField(endport,IntegerType,true)
StructField(url_s,StringType,true)
StructField(fileid,LongType,true)
其中URL分词 其余全部索引
附 2 ES正常加载速度测试 。。。
image.png附 3 核心代码(即Lucene文件生成)
import org.apache.lucene.document.Field.Store
import org.apache.lucene.document.{IntPoint, LongPoint, StringField, TextField}
import org.apache.lucene.index.IndexableField
import org.apache.spark.sql.types._
trait Converters {
val name:String
def tofield(a:Any):IndexableField
}
object Converters {
//注意把判null前移 顺便做 _source 字段的拼接
def toConverters(s:StructType):Array[Converters] = {
s.fields.map(convert).toArray
}
private def convert(f:StructField):Converters = {
//注:数值类型 Lucene似乎只有int long float double
f.dataType match {
case ByteType =>
new IntConverter(f.name)
case ShortType =>
new IntConverter(f.name)
case IntegerType =>
new IntConverter(f.name)
case LongType =>
new LongConverter(f.name)
case StringType if (f.name.equalsIgnoreCase("url")) =>
new TextConverter(f.name)
case StringType =>
new StringConverter(f.name)
case TimestampType =>
new TimeConverter(f.name)
case o=>
throw new Exception(s"not supported type yet ${f}")
}
}
}
class IntConverter(fieldname:String) extends Converters {
override val name: String = fieldname
override def tofield(a: Any): IndexableField = {
val v = a.asInstanceOf[Number].intValue()
new IntPoint(name,v)
}
}
class LongConverter(fieldname:String) extends Converters {
override val name: String = fieldname
override def tofield(a: Any): IndexableField = {
val v = a.asInstanceOf[Long]
new LongPoint(name,v)
}
}
class StringConverter(fieldname:String) extends Converters {
override val name: String = fieldname
override def tofield(a: Any): IndexableField = {
val v = a.asInstanceOf[String]
new StringField(name,v,Store.NO)
}
}
class TextConverter (fieldname:String) extends Converters {
override val name: String = fieldname
override def tofield(a: Any): IndexableField = {
val v = a.asInstanceOf[String]
new TextField(name,v,Store.NO)
}
}
//暂不清楚es怎么搞 当成long处理
class TimeConverter(fieldname:String) extends Converters {
override val name: String = fieldname
override def tofield(a: Any): IndexableField = {
val v = a.asInstanceOf[java.sql.Timestamp]
new LongPoint(name,v.getTime)
}
}
def newDoc(r:Row,converters: Array[Converters]):(String,Document) = {
val id = IDCreater.newid()
val doc = new Document
val source = new JSONObject()
/**
val calls = r.toSeq.zip(converters)
calls.foreach{case(v,f)=>
if(v != null){
source.put(f.name,v.toString)
doc.add(f.tofield(v))
}
}
*/
for(i <- (0 until converters.length)){
val v = r.get(i)
val f = converters(i)
if(v != null){
source.put(f.name,v.toString)
doc.add(f.tofield(v))
}
}
val json = source.toJSONString
val seqfield = SequenceIDFields.emptySeqID
doc.add(seqfield.seqNo)
doc.add(seqfield.seqNoDocValue)
doc.add(seqfield.primaryTerm)
doc.add(new NumericDocValuesField("_version",1))
doc.add(new StringField("_id",Uid.encodeId(id),Store.YES))
doc.add(new StoredField("_source",new BytesArray(json).toBytesRef))
(json,doc)
}
网友评论