HBase
- 1.hbase 介绍
Apache HBase™是Hadoop数据库,是一个分布式,可扩展的大数据存储。
当您需要对大数据进行随机,实时读/写访问时,请使用Apache HBase™。该项目的目标是托
管非常大的表 - 数十亿行X百万列 - 在商品硬件集群上。Apache HBase是一个开源的,分布式
的,版本化的非关系数据库,模仿Google的Bigtable: Chang等人的结构化数据分布式存储系
统。正如Bigtable利用Google文件系统提供的分布式数据存储一样,Apache HBase在Hadoop和
HDFS之上提供类似Bigtable的功能。
2006年-google发表了bigtable的白皮书
2006年-开始开发hbase
2008年-hbase正式成为apache的子项目
2010年-正式成为apache的顶级项目
1.HBase是在HDFS上面向列的分布式的数据库;
HBase首先是数据库,分布式的,面向列的,稀疏,<首选在HDFS基础上>;
Google发布三篇论文:GFS、MapReduce、BigTable开启分布式存储和计算的纪元;
hdfs+MapRedece(Hadoop)解决离线分析;HBase解决实时处理业务需求;
2.HBase不是关系型数据库,它不支持SQL;
3.列簇(Column family):
物理上,列簇存储在文件系统中,面向列簇的存储器;
创建table时,必须制定列簇,列簇的中列可随时增加;
针对调优和存储考虑,需将列簇成员设置成相同的访问权限和大小特征;
5cf62127380d361164
- 2.hbase 特点
大:一个表可以有数十亿行,上百万列;
无模式:每行都有一个可排序的主键和任意多的列,列可以根据需要动态的增加,同一张表中不同的行可以有截然不同的列;
面向列:面向列(族)的存储和权限控制,列(族)独立检索;
稀疏:对于空(null)的列,并不占用存储空间,表可以设计的非常稀疏;
数据多版本:每个单元中的数据可以有多个版本,默认情况下版本号自动分配,是单元格插入时的时间戳;
数据类型单一:Hbase中的数据都是字符串,没有类型。
- 3.hbase 搭建(完全分布式+HA)
1) 解压
tar -zxvf hbase-1.3.1-bin.tar.gz -C /root/soft/
mv ~/soft/hbase-1.3.1 ~/soft/hbase
2) 添加环境变量
echo "#hbase install" >> /etc/profile
echo "export HBASE_HOME=/root/soft/hbase" >> /etc/profile
echo "export PATH=$HBASE_HOME/bin:$PATH" >> /etc/profile
source /etc/profile
3) 修改配置文件
(1) hbase-env.sh
export JAVA_HOME=/root/soft/java
export HBASE_CLASSPATH=/root/soft/hadoop/etc/hadoop
# 去掉后边的 -XX:PermSize=128m -XX:MaxPermSize=128m
# java1.8 后会自动调整内存 添加参数会有警告
export HBASE_MASTER_OPTS="$HBASE_MASTER_OPTS "
export HBASE_REGIONSERVER_OPTS="$HBASE_REGIONSERVER_OPTS "
# 关闭hbase自带Zookeeper
export HBASE_MANAGES_ZK=false
(2) hbase-site.xml
<property>
<name>hbase.rootdir</name>
<value>hdfs://mycluster/hbase</value>
</property>
<property>
<name>hbase.cluster.distributed</name>
<value>true</value>
</property>
<property>
<name>hbase.zookeeper.quorum</name>
<value>mstr,slv1,slv2,slv3,slv4</value>
</property>
<property>
<name>hbase.zookeeper.property.dataDir</name>
<value>/root/zookeeper/zkdata</value>
</property>
<property>
<name>hbase.tmp.dir</name>
<value>/root/tmp/hbase</value>
</property>
(3) regionservers
slv1
slv2
slv3
(4) backup-masters (自己在conf下创建 )
slv4
(5) 软连接hadoop的配置文件(core, hdfs)
#软连接hadoop配置
ln -s ~/soft/hadoop/core-site.xml ~/soft/hbase/conf
ln -s ~/root/hadoop/hdfs-site.xml ~/soft/hbase/conf
(6) 分发软件, 配置文件和环境变量
- 4 hbase 操作
1) 启动集群
zkServer.sh start //在Zk集群节点上执行
start-dfs.sh
start-hbase.sh
image
2) Web-UI
(mstr:16010)
image(slv4:16010)
[图片上传失败...(image-b9822d-1565087262349)]
3) hbase shell
[root@mstr ~]# hbase shell
HBase Shell; enter 'help<RETURN>' for list of supported commands.
Type "exit<RETURN>" to leave the HBase Shell
Version 1.3.1, r930b9a55528fe45d8edce7af42fef2d35e77677a, Thu Apr 6 19:36:54 PDT 2017
1."查看服务器状态"
> status
1 active master, 1 backup masters, 3 servers, 0 dead, 2.0000 average load
2."查看表"
> list
TABLE
mfrain:student
mfrain:tt
testtable
wordcount
3."扫描"
# 全表扫描
> scan 'mfrain:tt'
ROW COLUMN+CELL
1001 column=info1:a, timestamp=1557909052757, value=aaa
1001 column=info1:b, timestamp=1557909063664, value=bbb
# 筛选扫描
> scan 'user',{STARTROW =>'101',STOPROW => '101'}
# 获取一行数据
> get 'mfrain:tt','1001'
COLUMN CELL
info1:a timestamp=1557909052757, value=aaa
info1:b timestamp=1557909063664, value=bbb
# 查看某单元格值
> get 'mfrain:tt','1001','info1:a'
COLUMN CELL
info1:a timestamp=1557909052757, value=aaa
4."创建表"
> create '表名','列族名'
5."插入数据"
> put '表名','行键','列族名:列名','值'
#在hbase中没有修改,但是可以覆盖只要保持rowkey,列族,列相同即可进行覆盖操作
6."查看表结构"
> describe 'testtable'
Table testtable is ENABLED
testtable
COLUMN FAMILIES DESCRIPTION
{NAME => 'colfaml',
BLOOMFILTER => 'ROW',
VERSIONS => '1',
IN_MEMORY => 'false',
KEEP_DELETED_CELLS => 'FALSE',
DATA_BLOCK_ENCODING => 'NONE',
TTL => 'FOREVER',
COMPRESSION => 'NONE',
MIN_VERSIONS => '0',
BLOCKCACHE => 'true',
BLOCKSIZE => '65536',
REPLICATION_SCOPE => '0'}
7."更改表信息"
# 更改列族的版本号
> alter '表名',{NAME => 'info',VERSIONS => '3'}
# 添加列族
> disable '表名'
> alter '表名',NAME=>'新增列族名'
> enable '表名'
8."删除表信息"
# 删除具体单元格值
> delete '表名','行键','列族:列'
# 删除一行值
> deleteall '表名','行键'
# 清空表信息
> truncate '表名'
# 删除列族
> alter "表名",'delete' => '列族名'
9."删除表"
# 第一步 将表设置为disable
> disable '表名'
# 第二部 删除表
> drop '表名'
10."多版本控制"
# 默认version为1,只能存1个版本
# 更改表的列族的最多版本
> alter 'mfrain:student',{NAME=>'info1',VERSIONS=>3}
# 扫描表最近三个版本信息
> scan 'mfrain:student' ,{VERSIONS=>3}
ROW COLUMN+CELL
1001 column=info1:nage, timestamp=1557901465180, value=18
1001 column=info1:name, timestamp=1557905467926, value=mfre4
1001 column=info1:name, timestamp=1557905336099, value=mfr3
1001 column=info1:name, timestamp=1557902966333, value=mfr2
1001 column=info2:addr, timestamp=1557901498209, value=JiLin
# 执行delete删除命令时,如需按照ts(timestamp)时间戳进行删除操作时,之前版本一并删除掉;
> delete 'mfrain:student','1001','info1:name',1557905336099
> scan 'mfrain:student' ,{VERSIONS=>3}
ROW COLUMN+CELL
1001 column=info1:nage, timestamp=1559645182639, value=20
1001 column=info1:nage, timestamp=1557901465180, value=18
1001 column=info1:name, timestamp=1557905467926, value=mfre4
1001 column=info2:addr, timestamp=1557901498209, value=JiLin
11."get_table指令"
# 将table映射成相对应的变量值(table实例),通过table实例对表进行相关操作
> t1 = get_table '表名'
> t1.scan
> t1.put 'row-2','info1:name','zhangsan'
> t1.get 'row-2'
12."namespace操作"
# namespace名字空间,相当于传统数据库的表空间:
# 列出名字空间
> list_namespace
# 列出名字空间下有哪些表
> list_namespace_tables 'default'
# 创建namespace
> create_namespace 'ns1',{'author'=>'zhangsan','data'=>'2018-07-31'}
# 查看namespace 信息
> describe_namespace 'mfrain'
DESCRIPTION
{NAME => 'mfrain', author => 'mfr', data => '20190515'}
# 删除namespace (先清空表)
> drop_namespace 'ns1'
4) Flush刷新表(1.3.1版本默认128M进行刷新)
(1) 插入数据
(2) 查看hbase/data/列族生成0 bytes
> hadoop fs -lsr /hbase
image(3) 刷盘/关闭Hbase
(Hbase)> flush 'mfrain:stu'
或者 > stop-hbase.sh
(4) 查看hbase/data/列族生成4918bytes
> hadoop fs -lsr /hbase
image(5) 设置刷盘阈值
<property> <name>hbase.regionserver.optionalcacheflushinterval</name> <value>3600000</value> <description> 在自动刷新之前,编辑在内存中的最长时间。 默认的1小时。将其设置为0以禁用自动刷新。 </description> </property> <property> <name>hbase.hregion.memstore.flush.size</name> <value>134217728</value> <description> 如果memstore的大小超过128M,进行刷盘操作 </description> </property>
5) 负载均衡
balance负载均衡指令,需使用开关模式,原因负载均衡比较耗费资源:
> balance_switch true //开启负载均衡
> balancer //执行负载均衡
> balance_switch false //关闭负载均衡
> balancer "force" //强制负载均衡
调节均衡器运行间隔(默认30000)
<property> <name>hbase.balancer.period</name> <value>3000</value> </property>
6) meta表
查看meta表(只有mfrain:stu表)
两条记录
[图片上传失败...(image-655311-1565087262350)]
查看WEB-UI
三个region 包括 hbase:namespace, mfrain:stu, hbase:meta
image image imageregionname格式:
<table_name>+","+<startKey>+","+<timestamp>+"."+<ENCODED>MD5生成码(生成规则由“{<table_name>+","+<startKey>+","+<timestamp>+"."}”
7) split切分
(1) 添加测试数据,使用Ruby语法添加数据
hbase(main):003:0> create_namespace 'mfrain' hbase(main):003:0> create 'mfrain:stu','info' hbase(main):003:0> for i in 'a'..'z' do for j in 'a'..'z' do \ hbase(main):009:2* put 'mfrain:stu',"row-#{i}#{j}","info:#{j}","#{j}" end end hbase(main):008:0> count 'mfrain:stu' 676 row(s) in 0.2560 seconds hbase(main):010:0> flush 'mfrain:stu' 0 row(s) in 0.9240 seconds
(2) 查看/hbase路径和meta表
① $> hadoop fs -lsr /hbase
[图片上传失败...(image-92e46c-1565087262350)]
② hbase> scan 'hbase:meta'
image③ 切分
# 切分 hbase(main):012:0> split 'mfrain:stu','row-mm' 0 row(s) in 0.2460 seconds
④ 查看切分后的meta表
切分时
切分时
切分完成
切分完成
⑤ 查看切分后的HDFS
hdfs⑥ 总结
1.rowkey区间,前包后不包 2.切分时原region不会立刻删除,等待切分完成后再删除!切分时原region有如下标记: STARTKEY => '', ENDKEY => '', OFFLINE => true, SPLIT => true
8) merge合并
(1) 合并指令
hbase(main):019:0> merge_region '53c1b5981f6c7eab63b0f260eddff79a','7c0fda1423359fd3152466935b56d8bd'
(2) 合并过程
合并时
[图片上传失败...(image-5486e5-1565087262350)]
合并完成
合并完成
(3) Web-UI merge
webUI wancheng(4) Region移动
随机移动
# move 'encoded' move '2530e3effe529c9f8d51141525d478f2'
指定移动
# move 'encoded','regionserver_hostname,16020,regionservertimestamp' move '2530e3effe529c9f8d51141525d478f2','slv1,16020,1560150516386'
9) 预分区 (事先指定分区(rowkey设计至关重要))
# 说明:在ns1命名空间下,创建t1表,列簇为f1;预分区为5个(字符串):['',10);[10,20);[20,30);[30,40);[40,''] hbase> create 'ns1:t1', 'f1', SPLITS => ['10', '20', '30', '40']
- 5. HBase API
1) 开发环境准备
hbase 对应版本相关Jar
hdfs-site.xml
core-site.xml
hbase-site.xml
//编写HBaseTool工具类
//1.定义conn对象
private static Configuration conf;
private static Connection conn;
static{
conf = HBaseConfiguration.create();
try {
conn = ConnectionFactory.createConnection(conf);
} catch (IOException e) {
e.printStackTrace();
}
}
2) namespace 操作
/**
* 创建名字空间
* create_namespace 'ns1'
*/
public static void createNS(String namespace){
Admin admin = null;
try {
admin = conn.getAdmin();
NamespaceDescriptor nsdesc = NamespaceDescriptor.create(namespace).build();
admin.createNamespace(nsdesc);
System.out.println("---创建成功!---");
} catch (IOException e) {
e.printStackTrace();
System.out.println("---创建失败!---");
}finally{
try {
admin.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* 查询名字空间
* list_namespace
*/
public static void listNS(){
Admin admin = null;
try {
admin = conn.getAdmin();
NamespaceDescriptor[] nss = admin.listNamespaceDescriptors();
for (NamespaceDescriptor ns : nss) {
System.out.println(ns);
}
System.out.println("---查询成功!---");
} catch (IOException e) {
e.printStackTrace();
System.out.println("---查询失败!---");
}finally{
try {
admin.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* 删除名字空间
* drop_namespace 'ns1'
*/
public static void deleteNS(String namespace){
Admin admin = null;
try {
admin = conn.getAdmin();
admin.deleteNamespace(namespace);
System.out.println("---删除成功!---");
} catch (IOException e) {
e.printStackTrace();
System.out.println("---删除失败!---");
} finally{
try {
admin.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
3) Table 操作
/**
* 创建Table
* create 'ns1:student','f1'
*/
public static void createTable(String tablename,String colunmfamily){
Admin admin = null;
try {
admin = conn.getAdmin();
TableName tname = TableName.valueOf(tablename);
HTableDescriptor htdesc = new HTableDescriptor(tname);
HColumnDescriptor hcdesc = new HColumnDescriptor(colunmfamily);
htdesc.addFamily(hcdesc);
admin.createTable(htdesc);
System.out.println("---表创建成功!---");
} catch (IOException e) {
e.printStackTrace();
System.out.println("---表创建失败!---");
}finally{
try {
admin.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* 查看表
* list
*/
public static void getTable() {
try {
HTableDescriptor[] listtable = admin.listTables();
for(HTableDescriptor tables :listtable) {
System.out.println(tables);
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
/**
* 删除Table
* drop 'ns1:student'
*/
public static void dropTable(String tablename){
Admin admin = null;
try {
admin = conn.getAdmin();
TableName tname = TableName.valueOf(tablename);
admin.disableTable(tname);
admin.deleteTable(tname);
System.out.println("---表删除成功!---");
} catch (IOException e) {
e.printStackTrace();
System.out.println("---表删除失败!---");
}finally{
try {
admin.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
4) Put 操作
/**
* 单行put
* put 'ns1:student','row-1','f1:name','zhangsan'
*/
public static void putTable(String tablename){
TableName tableName =TableName.valueOf(tablename);
try {
HTable table = new HTable(conf,tableName);
for(int i = 20; i<=23 ; i++) {
byte[] row = Bytes.toBytes("row"+i);
Put put = new Put(row);
byte[] columFamily = Bytes.toBytes("fA");
byte[] qualifier = Bytes.toBytes(String.valueOf(i));
byte[] value = Bytes.toBytes("value"+i);
put.addColumn(columFamily,qualifier,value);
table.put(put);
System.out.println("put 成功!");
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
/**
* 多行put
*
*/
public static void puts(String tablename,List<Put> puts){
try {
Table _table = conn.getTable(TableName.valueOf(tablename));
for (int i = 0; i < puts.size(); i++) {
_table.put(puts.get(i));
}
System.out.println("---插入成功!---");
} catch (IOException e) {
e.printStackTrace();
System.out.println("---插入失败!---");
}
}
//测试多行插入
public static void main(String[] args) throws IOException {
List<Put> _puts = new ArrayList<Put>();
for (int i = 3; i < 10000; i++) {
Put _put = new Put(Bytes.toBytes("row-"+i));
_put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("name"), Bytes.toBytes("asa"+i));
_put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("age"), Bytes.toBytes("23"));
_put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("sex"), Bytes.toBytes("男"+i));
_put.addColumn(Bytes.toBytes("f1"), Bytes.toBytes("class"), Bytes.toBytes("BD1709"+i));
_puts.add(_put);
}
HBaseTool.puts("ns1:t1", _puts);
}
5) Get 操作
HBase以表的方式存储数据。表是由行和列构成的,列从属于某一个列族(column family)。
行和列的交叉点称之为cell,cell是版本化的,cell的内容就是数据,cell中的数据是没有类型的,
全部是字节码形式存贮,是不可分割的字节数组。
/**
* 查询单条记录通过rowkey查询 get 'tableName','rowKey'
* @param tablename
* @param rowkey
*/
public static void getone(String tablename,String rowkey){
try {
Table _table = conn.getTable(TableName.valueOf(tablename));
//get 对象首先指定rowkey
Get _get = new Get(Bytes.toBytes(rowkey));
Result rs = _table.get(_get);
List<Cell> cells = rs.listCells();
for (Cell cell : cells) {
System.out.println("列簇为:"+Bytes.toString(CellUtil.cloneFamily(cell)));
System.out.println("限定符为:"+Bytes.toString(CellUtil.cloneQualifier(cell)));
System.out.println("rowkey为:"+Bytes.toString(CellUtil.cloneRow(cell)));
System.out.println("value为:"+Bytes.toString(CellUtil.cloneValue(cell)));
}
System.out.println("---查询成功!---");
} catch (IOException e) {
e.printStackTrace();
System.out.println("---查询失败!---");
}
}
/**
* 查询单条记录,并返回指定Version数
* @param tablename
* @param rowkey
*/
public static void getone(String tablename,String rowkey,int version){
try {
Table _table = conn.getTable(TableName.valueOf(tablename));
Get _get = new Get(Bytes.toBytes(rowkey));
_get.setMaxVersions(version);
Result rs = _table.get(_get);
List<Cell> cells = rs.listCells();
for (Cell cell : cells) {
System.out.println("列簇为:"+Bytes.toString(CellUtil.cloneFamily(cell)));
System.out.println("限定符为:"+Bytes.toString(CellUtil.cloneQualifier(cell)));
System.out.println("rowkey为:"+Bytes.toString(CellUtil.cloneRow(cell)));
System.out.println("value为:"+Bytes.toString(CellUtil.cloneValue(cell)));
System.out.println("timestamp为:"+cell.getTimestamp());
}
System.out.println("---查询成功!---");
} catch (IOException e) {
e.printStackTrace();
System.out.println("---查询失败!---");
}
}
/**
* 按照hbase shell 格式输出
*
*/
private static void printOut(Result rs){
List<Cell> cells = rs.listCells();
for (Cell cell : cells) {
String family = Bytes.toString(CellUtil.cloneFamily(cell));
String qualifier = Bytes.toString(CellUtil.cloneQualifier(cell));
String rowkey = Bytes.toString(CellUtil.cloneRow(cell));
String value = Bytes.toString(CellUtil.cloneValue(cell));
long timestamp = cell.getTimestamp();
System.out.println(rowkey+" column="+family+":"+qualifier+", timestamp="+timestamp+", value="+value);
}
}
/**
* 查询所有
* @param tablename
* @param rowkey
*/
public static void findAll(String tablename){
try {
Table _table = conn.getTable(TableName.valueOf(tablename));
Scan scan = new Scan();
ResultScanner rsscan = _table.getScanner(scan);
Iterator<Result> its = rsscan.iterator();
while (its.hasNext()) {
printOut(its.next());
}
System.out.println("---查询成功!---");
} catch (IOException e) {
e.printStackTrace();
System.out.println("---查询失败!---");
}
}
/**
* 查询startkey至endkey之间的数据
* @param tablename
* @param rowkey
*/
public static void findByRowkey(String tablename,String startkey,String endkey){
try {
Table _table = conn.getTable(TableName.valueOf(tablename));
Scan scan = new Scan(Bytes.toBytes(startkey),Bytes.toBytes(endkey));
ResultScanner rsscan = _table.getScanner(scan);
Iterator<Result> its = rsscan.iterator();
while (its.hasNext()) {
printOut(its.next());
}
System.out.println("---查询成功!---");
} catch (IOException e) {
e.printStackTrace();
System.out.println("---查询失败!---");
}
}
按rowKey删除 $> deleteall 'ts1','33'
public static void deleteRowKey(String tableName,String rowkey) throws IOException {
Table _table = conn.getTable(TableName.valueOf(tablename));
Delete delete = new Delete(Bytes.toBytes(rowkey));
table.delete(delete);
}
6) split 操作
/**
* 切分Table
* 查看HDFS hadoop fs -lsr /hbase
*/
public static void splittable(String tablename){
Admin admin = null;
try {
admin = conn.getAdmin();
admin.split(TableName.valueOf(tablename),Bytes.toBytes("5"));
System.out.println("---切分成功!---");
} catch (IOException e) {
e.printStackTrace();
System.out.println("---切分失败!---");
}finally{
try {
admin.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* 切分region和table 相同
*
*/
public static void splitregion(String regionname){
Admin admin = null;
try {
admin = conn.getAdmin();
admin.splitRegion(Bytes.toBytes(regionname),"row-nu".getBytes());
System.out.println("---切分成功!---");
} catch (IOException e) {
e.printStackTrace();
System.out.println("---切分失败!---");
}finally{
try {
admin.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
7) merge 操作 / move 操作
//合并region
admin = conn.getAdmin();
admin.mergeRegions("ffd7e19bf6dabc990aa9da07cea32afc".getBytes(), "27e639cc8d5d43d4ff25d38c59a3bc1e".getBytes(), true);
//移动region到指定节点
admin = conn.getAdmin();
admin.move("ffd7e19bf6dabc990aa9da07cea32afc".getBytes(), "slave1,16020,1558184978689".getBytes());
8) Scan缓存/bacth批量查询
一、设置Scan缓存
通过Scan扫描,每一次调用next()方法都会生成一个单独的PRC请求,并且返回一条记录
缓存可以让next()一次获取多条记录
设置缓存方式有两种 :
1)表Htable层面 setScannerCaching(int)
2)Scan类 setCaching(int)
也可以通过设置配置文件hbase-site.xml
<property>
<name>hbase.client.scanner.caching</name>
<value>10</value>
</property>
优先级顺序为: scan类 -->Htable类-->配置文件
二、设置bacth批量查询
批量可以让用户选择每一次ResultScanner实例的next()操作要取回多少列
例如,setBatch(5),则一次next()返回的Resulut实例包括5列
缓存是面向行一级的操作,而批量则是面向列一级的操作
public static void findAll(String tablename){
try {
Table table = conn.getTable(TableName.valueOf(tablename));
//EQUAL 匹配等于设定值的值
Scan scan = new Scan();
//设置扫描批次,一次next()返回 Result实例会包括5列
// scan.setBatch(5);
// //hbase设置缓存
// scan.setCaching(100);
long starttime = System.currentTimeMillis();
ResultScanner scanner = table.getScanner(scan);
Iterator<Result> its = scanner.iterator();
while (its.hasNext()) {
printOut(its.next());
}
scanner.close();
System.out.println(System.currentTimeMillis()-starttime);
System.out.println("---查询成功!---");
} catch (IOException e) {
e.printStackTrace();
System.out.println("---查询失败!---");
}
}
网友评论