增量索引的bash文件注释如下,更多详细配置请查阅官方文档
#!/bin/sh
# 当前脚本的绝对路径
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
bin=${DIR}/../bin
lib=${DIR}/../lib
echo '
{
"type" : "jdbc",
"jdbc" : {
# 链接 mysql 的 test 数据库
"url" : "jdbc:mysql://localhost:3306/test",
# mysql 用户
"user" : "root",
# mysql 密码
"password" : "123456",
# 计划任务状态文件
"statefile" : "statefile-article.json",
# 计划任务时间 这里是每分钟执行一次
"schedule" : "0 0-59 0-23 ? * *",
# 执行导入的sql 语句
"sql" : [
{
"statement" : "select *, id as _id from article where update_time > ?",
"parameter" : [ "$metrics.lastexecutionstart" ]
}
],
# 索引名称 jdbctest
"index" : "jdbctest",
# 类型名称 article
"type" : "article",
# 类型设置
"index_settings" : {
"analysis" : {
"analyzer" : {
"ik" : {
# 涉及到中文使用ik 分词
"tokenizer" : "ik"
}
}
}
},
# 类型中的字段映射
"type_mapping": {
# 类型名称
"article" : {
"properties" : {
# 对应的字段
"id" : {
# 字段类型
"type" : "integer",
# 当成一个准确的值进行索引(全匹配)
"index" : "not_analyzed"
},
"subject" : {
"type" : "string",
"analyzer" : "ik"
},
"author" : {
"type" : "string",
"analyzer" : "ik"
},
"create_time" : {
"type" : "date"
},
"update_time" : {
"type" : "date"
}
}
}
}
}
}
' | java \
-cp "${lib}/*" \
-Dlog4j.configurationFile=${bin}/log4j2.xml \
org.xbib.tools.Runner \
org.xbib.tools.JDBCImporter
这里选几个属性来介绍一下 :
- url:数据库链接串,所以把这个链接串改成其它数据源,这个脚本也可以使用(前提是那个数据源中有对应的 article 表)
- statefile :计划任务状态文件名称。它长这样
{
"type" : "jdbc",
"jdbc" : {
"index_settings" : {
"analysis" : {
"analyzer" : {
"ik" : {
"tokenizer" : "ik"
}
}
}
},
"index" : "jdbctest",
"schedule" : "0 0-59 0-23 ? * *",
"sql" : [ {
"statement" : "select *, id as _id from article where update_time > ?",
"parameter" : [ "$metrics.lastexecutionstart" ]
} ],
"metrics" : {
"lastexecutionend" : "2016-11-01T06:01:01.441Z",
"lastexecutionstart" : "2016-11-01T06:01:01.125Z",
"counter" : "23"
},
"type" : "article",
"statefile" : "statefile-article.json",
"user" : "root",
"password" : "123456",
"url" : "jdbc:mysql://localhost:3306/test",
"type_mapping" : {
"article" : {
"properties" : {
"create_time" : {
"type" : "date"
},
"id" : {
"type" : "integer",
"index" : "not_analyzed"
},
"author" : {
"type" : "string",
"analyzer" : "ik"
},
"update_time" : {
"type" : "date"
},
"subject" : {
"type" : "string",
"analyzer" : "ik"
}
}
}
}
}
}
其实 jdbc 每次执行的就是这个文件,执行完成后就覆盖此文件,改变的只是 metrics 属性内的时间,而 lastexecutionstart 这个时间就是我们下面 sql 语句要用到的最后更新时间
schedule : 计划任务时间表。表示多久执行一次更新。下面有几个例子
0 0-59 0-23 ? * *:每分钟执行一次
0 0/5 0-23 ? * * :每五分钟执行一次;当分钟等于 0,5,10,15…55的时候执行
我们参考官方的字段描述:
Field Name | Allowed Values | Allowed Special Characters |
---|---|---|
字段名称 | 允许的值 | 允许的特殊字符 |
Seconds | 0-59 | , - * / |
Minutes | 0-59 | , - * / |
Hours | 0-23 | , - * / |
Day-of-month | 1-31 | , - * ? / L W |
Month | 1-12 or JAN-DEC | , - * / |
Day-of-Week | 1-7 or SUN-SAT | , - * ? / L # |
Year (Optional) | empty, 1970-2199 | , - * / |
详细注释请点击查看
sql:支持两种方式,一种是直接写sql语句,一种是有条件的sql语句。一般我们会在sql语句中使用”field as _id “这样的方式来指定这条数据在ES 中的唯一标识(field字段为唯一标识)
parameter 属性中的可选的动态参数有:
$now - the current timestamp
$state - the state, one of: BEFORE_FETCH, FETCH, AFTER_FETCH, IDLE, EXCEPTION
$metrics.counter - a counter
$lastrowcount - number of rows from last statement
$lastexceptiondate - SQL timestamp of last exception
$lastexception - full stack trace of last exception
$metrics.lastexecutionstart - SQL timestamp of the time when last execution started
$metrics.lastexecutionend - SQL timestamp of the time when last execution ended
$metrics.totalrows - total number of rows fetched
$metrics.totalbytes - total number of bytes fetched
$metrics.failed - total number of failed SQL executions
$metrics.succeeded - total number of succeeded SQL executions
在上面例子中的 sql
select *, id as _id from article where update_time > ?
表示获取更新时间(update_time)大于 最后执行时间($metrics.lastexecutionstart)的所有数据
其它如 index、type_mapping 之类的属性就不一一介绍了,很容易理解
网友评论