本文将举出一个例子来说明HBase什么时候结合使用Mapreduce
串行计算吞吐量有限
如果使用Scan命令来查找TwitBase用户的最新推贴,新建起行键和停止行键,然后执行扫描。查找一个用户是可行的
HTableInterface twits = pool.getTable(TABLE_NAME);
Scan s = new Scan();
ResultScanner results = twits.getScanner(s);
for(Result r : results) {
……//process twits
}
但是如果查找的用户有一定的规模,即使运行迭代循环的机器可以每秒处理10M的数据,搞定100G的推贴也需要将近3小时
并行计算提高吞吐量
换作并行处理,将GB级的推贴数据切片,并行处理所有数据切片。启动8个线程并行处理,处理时间会从3个小时变成25分钟,一个8核的笔记本可以轻松处理100GB的数据,只有内存够用,这一切非常简单
将工作分布到不同的线程上的代码如下:
int numSplits = 8; Split[] splits = split(startrow, endrow, numSplits); //拆分工作 List<Future<?>> works = new ArrayList<Future<?>>(numSplits); ExecutorService esExecutorService = Executors.newFixedThreadPool(numSplits); for(final Split split : splits) { //分发工作 works.add(es.submit(new Runnable() { @Override public void run() { HTableInterface twits = pool.getTanle(TABLE_NAME); Scan s = new Scan(split.start, split.end); ResultScanner results = twits.getScanner(s); for(Result r : results) { //计算工作 ......// } } })); } for(Future<?> f : workers) { f.get(); //聚合工作 ......// } es.shutdownNow();
MapReduce:用分布式计算最大化吞吐量
使用MapReduce,你只需要编写计算工作和聚合工作,其中计算(map)阶段的 代码如下:
public class Map extends Mapper<LongWritable, Text, Text, LongWritable> { protected void map(LongWritable key, Text value, Context context) throws IOException ,InterruptedException { ......//计算工作 } }
这段代码实现了map任务,这个函数的输入为LongWritable类型键和Text类型,输出为Text类型和LongWritable类型值
聚合(reduce)阶段的代码如下:
public class Reduce extends Reducer<Text, LongWritable, Text, LongWritable> { protected void reduce(Text key, Iterable<LongWritable> vals, Context context) throws IOException ,InterruptedException { ......//聚合工作 } }
通过这篇文章,可以了解到什么时候使用MP而不是直接基于HBase API编程
Comments