guangzhou



shanghai

Recent posts:
Blog index
About
RSS

HBase实战:一个MapReduce的例子

March 24, 2016     Hadoop   626   

本文将举出一个例子来说明HBase什么时候结合使用Mapreduce

串行计算吞吐量有限

如果使用Scan命令来查找TwitBase用户的最新推贴,新建起行键和停止行键,然后执行扫描。查找一个用户是可行的
HTableInterface twits = pool.getTable(TABLE_NAME);
Scan s = new Scan();
ResultScanner results = twits.getScanner(s);
for(Result r : results) {
……//process twits
}

但是如果查找的用户有一定的规模,即使运行迭代循环的机器可以每秒处理10M的数据,搞定100G的推贴也需要将近3小时

并行计算提高吞吐量

换作并行处理,将GB级的推贴数据切片,并行处理所有数据切片。启动8个线程并行处理,处理时间会从3个小时变成25分钟,一个8核的笔记本可以轻松处理100GB的数据,只有内存够用,这一切非常简单

将工作分布到不同的线程上的代码如下:

int numSplits = 8;
Split[] splits = split(startrow, endrow, numSplits); //拆分工作
List<Future<?>> works = new ArrayList<Future<?>>(numSplits);
ExecutorService esExecutorService = Executors.newFixedThreadPool(numSplits);
for(final Split split : splits) {  //分发工作
	works.add(es.submit(new Runnable() {
		@Override
		public void run() {
			HTableInterface twits = pool.getTanle(TABLE_NAME);
			Scan s = new Scan(split.start, split.end);
			ResultScanner results = twits.getScanner(s);
			for(Result r : results) {  //计算工作
				......//
			}
			
		}
	}));
}
for(Future<?> f : workers) {
	f.get();  //聚合工作
	......//
}
es.shutdownNow();

MapReduce:用分布式计算最大化吞吐量

使用MapReduce,你只需要编写计算工作和聚合工作,其中计算(map)阶段的 代码如下:

public class Map extends Mapper<LongWritable, Text, Text, LongWritable> {
	protected void map(LongWritable key, Text value, Context context) 
			throws IOException ,InterruptedException {
		......//计算工作
	}
}

这段代码实现了map任务,这个函数的输入为LongWritable类型键和Text类型,输出为Text类型和LongWritable类型值

聚合(reduce)阶段的代码如下:

public class Reduce extends Reducer<Text, LongWritable, Text, LongWritable> {
	protected void reduce(Text key, Iterable<LongWritable> vals, Context context) 
			throws IOException ,InterruptedException {
		......//聚合工作
	}
}

通过这篇文章,可以了解到什么时候使用MP而不是直接基于HBase API编程

如果文章对您有帮助,欢迎点击下方按钮打赏作者

Comments

No comments yet.
To verify that you are human, please fill in "七"(required)