上节中,我们采用JavaAPI的方式来操作HBase,接连和访问方式都比较简单直接,而本节我们采用MapReduce的方式来操作HBase,那么就要先配置好Eclipse-Hadoop的插件。
一、安装Eclipse-Hadoop插件
由于网上这方面的资料非常全,所以本人推荐一个参考博文,照着配置就OK:
http://www.cnblogs.com/flyoung2008/archive/2011/12/09/2281400.html
二、定义Map
package txt_to_hbase; import java.io.IOException; import java.util.Random; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class THMapper extends Mapper<LongWritable, Text, Text, Text> { public void map(LongWritable key, Text value, Context context) { long begintime = System.currentTimeMillis(); System.out.println("map开始时间:"+begintime); String[] items = value.toString().split("--"); String k = String.valueOf((new Random()).nextLong());//生成随机rowkey String v = items[1]; // System.out.println("key:" + k + "," + "value:" + v); try { context.write(new Text(k), new Text(v)); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } long endtime = System.currentTimeMillis(); // System.out.println("map耗时:"+(endtime - begintime)); } }
Map跟普通的mapreduce函数没有多大区别,正常的TextInputFormat方式输入,按行读取。
Reduce中要把处理之后的结果写入hbase的表中,所以与普通的mapreduce程序有些区别,由以上代码可以知道,reduce类继承的是TableReducer,通过查询API(如下图1)知道,它也是一种基本的Reducer类,与其他的reduce类一样,它的输入k/v对是对应Map的输出k/v对,它的输出key可以是任意的类型,但是value必须是一个put或delete实例。
三、定义reduce:
package txt_to_hbase; import java.io.IOException; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.io.Text; public class THReducer extends TableReducer<Text, Text, ImmutableBytesWritable>{ public void reduce(Text key, Iterable<Text> values, Context context) { long begintime = System.currentTimeMillis(); String k = key.toString(); String v = values.iterator().next().toString(); // 由数据知道value就只有一行 Put putrow = new Put(k.getBytes()); putrow.addColumn("fam1".getBytes(), "qualifier".getBytes(), v.getBytes()); try { context.write(new ImmutableBytesWritable(key.getBytes()), putrow); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } long endtime = System.currentTimeMillis(); if(Jishu.stac==0L) { Jishu.stac = System.currentTimeMillis(); } Jishu.endc = System.currentTimeMillis(); System.out.println("stac="+Jishu.stac+",endc="+Jishu.endc+",total="+(Jishu.endc-Jishu.stac)); } }
Reduce的输出key是ImmutableWritable类型(org.apache.hadoop.hase.io),API中的解释,它是一个可以用作key或value类型的字节序列,该类型基于BytesWritable,不能调整大小。Reduce的输出value是一个put。
四、定义drive:
package txt_to_hbase; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.util.Tool; public class THDriver extends Configured implements Tool { @Override public int run(String[] arg0) throws Exception { // TODO Auto-generated method stub System.out.println("11111111111111111111"); Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", "192.168.13.74"); conf.set("hbase.zookeeper.property.clientPort", "2181"); Job job = Job.getInstance(conf,"Txt-to-Hbase"); job.setJarByClass(TxtHbase.class); Path in = new Path("hdfs://192.168.13.74:9000/cjt/input"); job.setInputFormatClass(TextInputFormat.class); FileInputFormat.addInputPath(job, in); long begintime = System.currentTimeMillis(); System.out.println("当前时间1:"+System.currentTimeMillis()); job.setMapperClass(THMapper.class); job.setReducerClass(THReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); System.out.println("当前时间2:"+System.currentTimeMillis()); TableMapReduceUtil.initTableReducerJob("tab1", THReducer.class, job); long endtime = System.currentTimeMillis(); System.out.println("当前时间3:"+System.currentTimeMillis()); job.waitForCompletion(true); return 0; } }
跟普通的MapReduce操作基本一样,唯一不同的是,在执行Hbase写入操作时使用的是TableMapReduceUtil.initTableReducerJob,同理执行Hbase读取操作应该使用TableMapReduceUtil.initTableMapperJob。而我们的例子中,读取的是文件,非Hbase表数据,所以用到的是:
Path in = new Path("hdfs://192.168.13.74:9000/cjt/input"); job.setInputFormatClass(TextInputFormat.class); FileInputFormat.addInputPath(job, in);
五、定义主类:
package txt_to_hbase; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.util.ToolRunner; public class TxtHbase { public static void main(String [] args) throws Exception{ int mr; mr = ToolRunner.run(new Configuration(),new THDriver(),args); System.exit(mr); } }
ok,至此我们可以运行该测试类,首先我们把数据文件上传到HDFS上,我这边测试了1千,1万,3万,10万,15万,30万,60万条记录的入库速度,结果如下:
结论: 1、通过JavaAPI方式调用,单线程入库速度为2000条/s~7000条/s之间,而在多线程并发状态下,最高速度能达到10900条/s,明显优于Mysql单节点的入库速度。 2、以MapReduce的方式调用,速度得到进一步提升,速度区间为11500条/s~15500条/s,而且在10万量级之前,入库速度与量级成线性正相关,10万量级以上,入库速度稳定在15500条/s(Hbase未经过深度调优) 3、无论小数据量还是大数据量的入库操作,MapReduce的效率均远远高于直接JavaAPI的调用效率,小数据量时效率差达到5倍,大数据量时效率差为1.5倍 4、以上的验证,均基于同一hadoop和Hbase环境,单条记录与总数据量均保持一致,故两种调用结果可以直接进行量比。 5、综上,采用MapReduce入库Hbase是最高效的,而且一次MR操作数量级在10万条以上时,能达到最快的入库速度。 |
相关推荐
类比于传统型数据库里的一些查询方式,本文对Hbase的存储原理进行了研究,借助分布式计算框架Mapreduce在Hbase上构建了二级索引,就可以对表进行有针对性的定位和高效率的查找,同时也减轻zookeeper服务对资源调度的压力...
Hbase几种数据入库(load)方式比较
的方案出现了。 Solr Solr 是一个独立的企业级搜索应用服务器,是 Apache Lucene 项目的开源企业搜索平台, 其主要功能包括全文检索、命中标示、分面搜索、动态聚类、数据库集成,以及富文本(如 Word、PDF)的处理...
基于HBase的大数据解决方案.pdf
hbase-sdk是基于hbase-client和hbase-thrift的原生API封装的一款轻量级的HBase ORM框架。 针对HBase各版本API(1.x~2.x)间的差异,在其上剥离出了一层统一的抽象。并提供了以类SQL的方式来读写HBase表中的数据。对...
中国HBase技术社区第4届-MeetUp-上海站_基于HBase实时数仓探索实践.pptx
Nutch抓取指定网址数据,存储在HBase数据库中,存储过程由zookeeper管理。脚本调用索引器部件将数据索引化,经过索引化的数据被前端检索查询,最后前端展示查询结果,用户点击结果列表查看目标资料。
基于HBase和Spark构建企业级数据处理平台.pdf
基于HBase的海量能耗数据存储系统,姜治光,,云计算、物联网等新概念与新技术的蓬勃发展,推动了数据中心建设的高速发展,大量能源消耗设备将产生大规模的能源消耗数据。针对
基于HBase的分布式空间数据库技术,利用Hbase构建分布式空间数据库
为解决现有的HBase数据压缩策略选择方法未考虑数据的冷热性,以及在选择过程中存在片面性和不可靠性的缺陷,提出了基于HBase数据分类的压缩策略选择方法。依据数据文件的访问频度将HBase数据划分为冷热数据,并限定具体...
课时4:基于HBase Client API的CRUD实战 课时5:批处理与扫描器实战 课时6:使用Ganglia监控HBase 课时7:过滤器实战之比较过滤器 课时8:过滤器实战之专用过滤器与FilterList 课时9:过滤器实战之自定义过滤器 ...
实时数据仓库构建,基于HBase存储构建实时的数据仓库系统技术
在HBase基础上,提出了一个基于索引的气象结构化数据查询优化架构HBase4M(HBase for Meteorology)。首先,根据HBase存储特性设计表结构;然后,利用协处理器建立和维护辅助索引,将字段查询转化为对索引表的行键...
基于hadoop+hbase+springboot实现的分布式网盘系统,适合本科毕业设计 资源包含的整个demo在Hadoop,和Hbase环境搭建好了,可以启动起来。 技术选型 1.Hadoop 2.Hbase 3.SpringBoot ...... 系统实现的功能 1.用户...
基于hbase的企业级大数据平台,
本文当是一个基于HBase的海量数据的实实时查询系统的原理分析。详细的介绍了大数据查询的原理。
4,企业级方案设计,完全匹配工作场景。 适用人群 1、对大数据感兴趣的在校生及应届毕业生。 2、对目前职业有进一步提升要求,希望从事大数据行业高薪工作的在职人员。 3、对大数据行业感兴趣的相关人员。 课程...
主要是自己大学时候的毕业设计,关于Hbase下用聚类算法写的一个搜索工具,实现了将文本存入数据库,然后进行搜索的算法。其中包括了word毕业设计文档,还有答辩的ppt,还有在linux平台下的java源码,希望对这方面有...
基于HBase的图书借阅数据挖掘模型设计与实现.pdf