工学1号馆

home

HBase实战:HBase与MapReduce

Wu Yudong    March 25, 2016     Hadoop   978   

本文主要介绍一下HBase与MapReduce在一起是如何工作的

先看一个MapReduce的例子–一个应用服务器的日志文件如下:

Date              Time UserID  Activity   TimeSpent

01/01/2011 18:00 user1  load_page1     3s
01/01/2011 18:01 user1  load_page2     5s
01/01/2011 18:01 user2  load_page1     2s
01/01/2011 18:01 user3  load_page1     3s
01/01/2011 18:04 user4  load_page3    10s
01/01/2011 18:05 user1  load_page3     5s
01/01/2011 18:05 user3  load_page5     3s
01/01/2011 18:06 user4  load_page4     6s
01/01/2011 18:06 user1  purchase         5s
01/01/2011 18:10 user4  purchase         8s
01/01/2011 18:10 user1  confirm            9s
01/01/2011 18:10 user4  confirm           11s
01/01/2011 18:11 user1  load_page3     3s

问题:计算每一个用户使用该应用所花费的总时间

这里就不详细推导过程了,直接上代码

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class TimeSpent {

  public static class Map extends 
  			Mapper<LongWritable, Text, Text, LongWritable> {

    private static final String splitRE = "\\W+";
    private Text user = new Text();
    private LongWritable time = new LongWritable();

    public void map(LongWritable key, Text value, Context context) 
    		throws IOException, InterruptedException {
      String line = value.toString();
      String[] splits = line.split(splitRE);
      if(null == splits || splits.length < 8)
        return;

      user.set(splits[5]);
      time.set(new Long(splits[7].substring(0, splits[7].length()-1)));
      context.write(user, time);
    }
  }

  public static class Reduce extends 
  			Reducer<Text, LongWritable, Text, LongWritable> {

    public void reduce(Text key, Iterable<LongWritable> values, 
    		Context context) throws IOException, InterruptedException {
      long sum = 0;
      for(LongWritable time : values) {
        sum += time.get();
      }
      context.write(key, new LongWritable(sum));
    }
  }

  public static void main(String[] args) throws Exception {
    if (args.length != 2) {
      String usage =
        "TimeSpent is the log processing example app used in " +
        "Chapter 03 to demonstrate a MapReduce application.\n" +
        "Usage:\n" +
        "  TimeSpent path/to/input path/to/output\n";
      System.out.print(usage);
      System.exit(1);
    }

    Path inputPath = new Path(args[0]);
    Path outputPath = new Path(args[1]);

    Configuration conf = new Configuration();
    Job job = new Job(conf, "TimeSpent");
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(LongWritable.class);
    job.setMapperClass(Map.class);
    job.setCombinerClass(Reduce.class);
    job.setReducerClass(Reduce.class);
    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(TextOutputFormat.class);
    FileInputFormat.addInputPath(job, inputPath);
    FileOutputFormat.setOutputPath(job, outputPath);

    FileSystem fs = outputPath.getFileSystem(conf);
    if (fs.exists(outputPath)) {
      System.out.println("Deleting output path before proceeding.");
      fs.delete(outputPath, true);
    }

    System.exit(job.waitForCompletion(true) ? 0 : 1);

  }
}

运行:
wu@ubuntu:~/opt/twitbase$ mvn clean package
[INFO] ————————————————————————
[INFO] BUILD SUCCESS
[INFO] ————————————————————————
[INFO] Total time: 2:23.745s
[INFO] Finished at: Thu Mar 24 06:35:38 PDT 2016
[INFO] Final Memory: 15M/57M
[INFO] ————————————————————————
wu@ubuntu:~/opt/twitbase$ java -cp target/twitbase-1.0.0.jar HBaseIA.TwitBase.mapreduce.TimeSpent \
> src/test/resource/listing\ 3.3.txt ./out
16/03/24 06:38:07 INFO mapred.JobClient: map 100% reduce 100%
16/03/24 06:38:07 INFO mapred.JobClient: Job complete: job_local_0001
16/03/24 06:38:07 INFO mapred.JobClient: Counters: 20
16/03/24 06:38:07 INFO mapred.JobClient: File Output Format Counters
wu@ubuntu:~/opt/twitbase$ cat out/part-r-00000
user1 30
user2 2
user3 6
user4 35


从MapReduce应用访问HBase有三种方式:作业开始的时候可以使用hbase作为数据源;作业结束的时候可以使用hbase接收数据;任务过程中使用hbase共享资源

使用hbase作为数据源

在上面的例子中,HDFS目录作为MP(注:MapReduce)的数据源,数据源的模式把[line number:line]解释为[k1, v1]键值对。MP作业中使用TextInputFormat类来定义这种模式,相关代码如下:

Configuration conf = new Configuration();
Job job = new Job(conf, “TimeSpent”);
……
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);

TextInputFormat把[k1, v1]键值对中的line number和line的类型分别定义为LongWritable和Text,相应的map任务定义中使用这些输入键值对类型:

public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
……
}

hbase使用了类似的类来使用表中的数据,使用Scan类从hbase中取出数据。内部机制中,由Scan定义的范围取出的行会被切分并分配给所有服务器

每个region作为一个map任务,这些任务把对应region的键范围作为它们的输入数据切片,并在上面执行扫描

从base表中读取的作业以[rowkey:scan result]格式接收[k1,v1]键值对。

扫描器的结果和hbase api是一样的,对应的类型是ImmutableBytesWritable和Result,系统提供的TableMapper封装了这些细节,可以将它作为基类实现Map阶段的功能

public static class Map extends TableMapper<Text, LongWritable> {

}

protected void map(ImmutableBytesWritable rowkey,Result result,Context context) {  //定义map任务接收的[k1,v1]的输入类型,本例来自于扫描器

}

接着在MP中使用Scan实例,

TableMapReduceUtil.initTableMapperJob(
“twits”,
scan,
Map.class,
ImmutableBytesWritable.class,
Result.class,
job);

这一步会配置作业对象,建立hbase特有的输入格式(TableInputFormat)然后设置MP使用Scan实例读取表的记录,这些会出现在Map和Reduce类的实现里

使用HBase接收数据

reduce任务写入HBase region,reduce任务不一定写入同一台物理主机的region上,它们有可能写入任何一个包含要写入的键范围的region

在上面的例子中,聚合器生成的[k3,v3]键值对是[UserID:TotalTime]。在MP中,它们分别对应Hadoop序列化类型Text和LongWritable。配置输出类型和输入类型类似,不同之处在于[k3,v3]输出类型需要明确的定义而不是由OutputFormat默认指定

Configuration conf = new Configuration();
Job job = new Job(conf, “TimeSpent”);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);

job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);

Context对象包含数据类型信息,这里定义reduce函数如下:

public void reduce(Text key, Iterable<LongWritable> values, Context context) {

}

当从MP写入hbase时,会再一次用到hbase api,假定[k3,v3]键值对的类型是一个行键和一个hbase的对象,v3的值可能是Put或Delete,使用TableReducer封装细节

protected void reduce(ImmutableBytesWritable rowkey,Iterable<Put> values,
Context context) {

}

最后一步是把reducer填入到作业配置中,你需要使用合适的类型定义目标表,再一次使用TableMapReduceUtil,它为你设置TableOutputFormat,这里使用系统提供的IdentityTableReducer类,由于不需要在Reduce阶段执行任何的计算

TableMapReduceUtil.initTableReducerJob(“users”,
IdentityTableReducer.class, job);

一个reduce任务不只对应一个region

使用HBase共享资源

一种很常见的例子是支持大型的Map侧联接(map-side join),联接的意思是位于不同的数据集上基于一个共同属性的相同值(join key)把数据记录联合起来。回到上面的MP例子,

举个例子

回到TimeSpent MP作业,生成一个数据集,包含UserID和各自花在TwitBase网站的TotalTime

UserID    TimeSpent
Yvonn66   30s
Mario23    2s
Rober4      6s
Masan46  35s

还有一个包含用户信息的TwitBase表,如下:

UserID       Name                                  Email                                TwitCount
Yvonn66 Yvonne Marc          Yvonn66@unmercantile.com         48
Masan46 Masanobu Olof        Masan46@acetylic.com                  47
Mario23   Marion Scott           Mario23@Wahima.com                  56
Rober4   Roberto Jacques      Rober4@slidage.com                        2

如果想知道用户花在网站上的总时间和总帖数的比值,这里使用联接键UserID,执行联结操作,结果如下:

UserID     TwitCount    TimeSpent
Yvonn66       48                   30s
Mario23        56                    2s
Rober4           2                     6s
Masan46       47                    35s

关系型数据库执行联结操作比mapreduce容易很多,如果在一台服务器上,效率也比较高,但是如果跨多台服务器,mapreduce就容易得多。联结实现分map侧还是reduce侧,选择哪个是由两个数据集记录联结执行的位置决定的,reduce侧更容易实现

1、reduce侧联结

原理:利用了中间洗牌阶段,把两个数据集的相关记录放在一起,对它们做map计算,输出以联结键为键的键值对,然后reduce可以处理值的所有组合

接下来编写算法

首先使用TimeSpent数据的map任务的伪代码:

map_timespent(line_num, line):
userid,timespend = split(line)
record = {“TimeSpent” : timespent, //生成复合记录作为V2输出在MP作业很常见
“type” : “TimeSpent”}
emit(userid, record)

map任务从k1输入行中取出UserID和TimeSpent值,然后构建一个包含type和TimeSpent属性的字典。生成[UserID:dictionary]作为[k2,v2]输出。使用Users数据的map也类似,只不过去掉了一些无关字段

map_users(line_num, line):
userid, name, email, twitcount = split(line)
record = {“TwitCount” : twitcount,
“type” : “TwitCount”}
emit(userid, record)

两个map任务都使用UserID作为k2的值,reduce任务就有了完成联结所需要的所有内容:

reduce(userid, records):
	timespent_recs = []
	twitcount_recs = []
	for rec in records:
		if rec.type == "TimeSpent":
			rec.del("type")
			timespent_recs.push(rec)
		else:
			rec.del("type")
			twitcount_recs.push(rec)
	for timespent in timespent_recs:
		for twitcount in twitcount_recs:
			emit(userid, merge(timespent, twitcount)

reduce任务把所有相同的类型的记录都组在一起,生成两种类型的所有可能组合作为k3输出。也可以在任务中直接生成想计算的比例:

reduce(userid, records):
	for rec in records:
		rec.del("type")
	merge(records)
	emit(userid, ratio(Receiver["TimeSpent"], rec["TwitCount"]))

reduce侧联结需要在map和reduce任务之间对数据进行洗牌和排序,带来IO开销

2、map侧联结

map侧联结没有reduce侧联结那么普遍适用,加载较小的数据集到内存中的散列表,map任务在遍历另一个数据集时可以访问第一个,此时你可以跳开洗牌阶段和排序阶段,在map阶段直接输出结果,回到上面的例子

map_timespent(line_num, line):
	users_recs = read_timespent("/path/to/users.csv")
	userid, timespent = split(line)
	record = {"TimeSpent" : timespent}
	record = merge(record, users_recs(userid))
	emit(userid, ratio(record["TimeSpent"], record["TwitCount"]))

3、使用hbase的map侧联结

把map侧联结的users_recs用hbase中的Users表代替,现在可以联结巨大的Users表和巨大的TimeSpent数据集了:

map_timespent(line_num, line):
	users_table = HBase.connent("Users")
	userid, timespent = split(line)
	record = {"TimeSpent" : timespent}
	record = merge(record, users_table.get(userid, "info:twitcount"))
	emit(userid, ratio(record["TimeSpent"], record["info:twitcount"]))

使用hbase存储查找表,供map任务用于执行map侧联结

hadoop提供了一个叫做hadoop-datajoin的contrib JAR来简化使用联结

实战–编写Mapreduce应用

莎士比亚作品计数

public class CountShakespeare {

  public static class Map
    extends TableMapper<Text, LongWritable> {

    public static enum Counters {ROWS, SHAKESPEAREAN};
    private Random rand;

    /**
     * Determines if the message pertains to Shakespeare.
     */
    private boolean containsShakespear(String msg) {
      return rand.nextBoolean();
    }

    @Override
    protected void setup(Context context) {
      rand = new Random(System.currentTimeMillis());
    }

    @Override
      protected void map(
          ImmutableBytesWritable rowkey,
          Result result,
          Context context) {
      byte[] b = result.getColumnLatest(
        TwitsDAO.TWITS_FAM,
        TwitsDAO.TWIT_COL).getValue();
      if (b == null) return;

      String msg = Bytes.toString(b);
      if (msg.isEmpty()) return;

      context.getCounter(Counters.ROWS).increment(1);
      if (containsShakespear(msg))
        context.getCounter(Counters.SHAKESPEAREAN).increment(1);
    }
  }

  public static void main(String[] args) throws Exception {
    Configuration conf = HBaseConfiguration.create();
    Job job = new Job(conf, "TwitBase Shakespeare counter");
    job.setJarByClass(CountShakespeare.class);

    Scan scan = new Scan();
    scan.addColumn(TwitsDAO.TWITS_FAM, TwitsDAO.TWIT_COL);
    TableMapReduceUtil.initTableMapperJob(
      Bytes.toString(TwitsDAO.TABLE_NAME),
      scan,
      Map.class,
      ImmutableBytesWritable.class,
      Result.class,
      job);

    job.setOutputFormatClass(NullOutputFormat.class);
    job.setNumReduceTasks(0);
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

运行的时候出现错误:

java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:599)
at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:286)
at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1035)

原因是没有启动hbase

执行:wu@ubuntu:~/opt/hbase-0.92.1$ bin/hbase shell
wu@ubuntu:~/opt/twitbase$ java -cp target/twitbase-1.0.0.jar HBaseIA.TwitBase.LoadUsers 100
wu@ubuntu:~/opt/twitbase$ java -cp target/twitbase-1.0.0.jar HBaseIA.TwitBase.LoadTwits 100

现在有数据了,可以运行CountShakespeare应用:

wu@ubuntu:~/opt/twitbase$ java -cp target/twitbase-1.0.0.jar HBaseIA.TwitBase.mapreduce.CountShakespeare

如果文章对您有帮助,欢迎点击下方按钮打赏作者

Comments

不错,不错,看看了!


To verify that you are human, please fill in "七"(required)