工学1号馆

home

HBase表扫描

Wu Yudong    March 20, 2016     Hadoop   538   

你可能发现,没有查询(query)命令。到目前为止,你都找不到这样的命令。查找包含某个特定值的记录的唯一办法是,使用扫描(Scan)命令读出表的某些部分,然后再使用过滤器(filter)来得到有关记录。可以想到,扫描返回的记录是排好序的。HBase 设计上支持这种方式,因此速度很快。

要扫描得到整个表的内容,单独使用Scan 构造函数即可:
Scan s = new Scan();

但是,你经常只对整张表的一个子集感兴趣。比如,你想得到所有以字母T 开头的ID 的用户。给Scan 构造函数增加起始行和结束行的信息即可:
Scan s = new Scan(Bytes.toBytes(“T”), Bytes.toBytes(“U”));

这个例子也许有些牵强,但可以帮助你理解。一个实战的例子是什么样呢?假设你存储了推帖,你一定想进一步了解某个特定用户的最新推帖。让我们开始实现这一点。

设计用于扫描的表

就像设计关系模式一样,为HBase 表设计模式(Schema)也需要考虑数据形态和访问模式。推帖数据的访问模型不同于用户,因此我们为它们新建自己的表。为了练手,这里使用Java API 而不是Shell 来新建表。

可以使用HBaseAdmin 对象的一个实例来执行表的操作:
Configuration conf = HBaseConfiguration.create();
// create table
HBaseAdmin admin = new HBaseAdmin(conf);

创建HBaseAdmin 实例显然需要一个Configuration 实例,默认的HTableHTablePool 构造函数帮你隐藏细节。这一步很简单。现在你可以定义一个新表并且创建它:
HTableDescriptor desc = new HTableDescriptor(“twits”);
HColumnDescriptor c = new HColumnDescriptor(“twits”);
c.setMaxVersions(1);
desc.addFamily(c);
admin.createTable(desc);

HTableDescriptor 对象建立新表的描述信息,其名字是twits。同样,使用HColumnDescriptor 建立列族,名字也是twits。和users 表一样,这里只需要一个列族。你不需要推帖的多个时间版本,所以限定保留的版本数为一个。

现在可以开始存储推帖到这个有趣的新twits 表。推帖包含内容和发布的日期和时间等。你需要一个唯一值作为行键,所以我们选择用户名加上时间戳来做行键。很简单,我们存储一些推帖,如下所示:
Put put = new Put(
Bytes.toBytes(“TheRealMT” + 1329088818321L));
put.add(Bytes.toBytes(“twits”),iBytes.toBytes(“dt”), Bytes.toBytes(1329088818321L));
put.add(Bytes.toBytes(“twits”), Bytes.toBytes(“twit”), Bytes.toBytes(“Hello, TwitBase!”));

好了,基本如此。首先请注意,用户ID 是个变长字符串。当你使用复合行键时这会带来一些麻烦,因为你需要某种分隔符来切分出用户ID。一种变通的办法是对行键的变长类型部分做散列(hash)处理。选择一种散列算法生成固定长度的值。因为你想基于用户分组存储不同用户的推帖,MD5 算法是一种好选择。这些组按序存储。在组内,推帖是基于发布日期时间先后顺序存储的。MD5 是一种单向散列算法,所以不要忘了把未经编码处理的用户ID 另外存储在一个列里,以防后面用到。如下所示,向twits表中写入数据。
int longLength = Long.SIZE / 8;
byte[] userHash = Md5Utils.md5sum(“TheRealMT”);
byte[] timestamp = Bytes.toBytes(-1 * 1329088818321L);
byte[] rowKey = new byte[Md5Utils.MD5_LENGTH + longLength];
int offset = 0;
offset = Bytes.putBytes(rowKey, offset, userHash, 0, userHash.length);
Bytes.putBytes(rowKey, offset, timestamp, 0, timestamp.length);
Put put = new Put(rowKey);
put.add(
Bytes.toBytes(“twits”),
Bytes.toBytes(“user”),
Bytes.toBytes(“TheRealMT”);
put.add(
Bytes.toBytes(“twits”),
Bytes.toBytes(“twit”),
Bytes.toBytes(“Hello, TwitBase!));

一般来说,你会先用到最新推帖。HBase 在物理数据模型里按照行键顺序存储行。你可以利用这个特性。在行键里包括推帖的时间戳,并且乘以-1,就可以先得到最新的推帖。

注意:在HBase 模式中行键设计至关重要

这一点如何强调都不为过:HBase 的行键在设计表时是第一重要的考量因素。

执行扫描

使用用户ID 作为twits 表行键的第一部分证明是好办法。它可以基于用户以自然行的顺序有效地生成数据桶(bucket)。来自同一用户的数据以连续行的形式存储在一起。现在Scan 命令如何使用呢?或多或少和之前介绍的类似,只是计算停止键时复杂一点:

byte[] userHash = Md5Utils.md5sum(user);
byte[] startRow = Bytes.padTail(userHash, longLength); // 212d…866f00…
byte[] stopRow = Bytes.padTail(userHash, longLength);
stopRow[Md5Utils.MD5_LENGTH-1]++; // 212d…867000…
Scan s = new Scan(startRow, stopRow);
ResultsScanner rs = twits.getScanner(s);

本例中,你可以通过对行键中用户ID 部分的最后字符加1 来生成停止键。扫描器返回包括起始键但是不包括停止键的记录,因此你只得到了匹配用户的推帖。

再通过一个简单的循环从ResultScanner 中读出推帖:

for(Result r : rs) {
// extract the username
byte[] b = r.getValue(
Bytes.toBytes(“twits”),
Bytes.toBytes(“user”));
String user = Bytes.toString(b);
// extract the twit
b = r.getValue(
Bytes.toBytes(“twits”),
Bytes.toBytes(“twit”));
String message = Bytes.toString(b);
// extract the timestamp
b = Arrays.copyOfRange(
r.getRow(),
Md5Utils.MD5_LENGTH,
Md5Utils.MD5_LENGTH + longLength);
DateTime dt = new DateTime(-1 * Bytes.toLong(b));
}

循环中唯一需要处理的是分离出时间戳,并且把字节数组byte[]转换成合适的数据类型。你会得到如下数据:

<Twit: TheRealMT 2012-02-20T00:13:27.931-08:00 Hello, TwitBase!>

扫描器缓存

在HBase 的设置里扫描每次RPC 调用得到一批行数据。这可以在扫描对象上使用setCaching(int)在每个扫描器(scanner)层次上设置,也可以在hbasesite.xml 配置文件里使用HBase.client.scanner.caching 属性来设置。如果缓存值设置为n,每次RPC 调用扫描器返回n 行,然后这些数据缓存在客户端。这个设置的默认值是1,这意味着客户端对HBase 的每次RPC 调用在扫描整张表后仅仅返回一行。这个数字很保守,你可以调整它以获得更好的性能。但是该值设置过高意味着客户端和HBase 的交互会出现较长暂停,这会导致HBase 端的超时。

ResultScanner 接口也有一个next(int)调用,你可以用来要求返回扫描的下面n 行。这是在API 层面提供的便利,与为了获得那n 行数据客户端对HBase 的RPC 调用次数无关。在内部机制中,ResultScanner 使用了多次RPC 调用来满足这个请求,每次RPC调用返回的行数只取决于你为扫描器设置的缓存值。

使用过滤器

并不总能设计一个行键来完美地匹配你的访问模式。有时你的使用场景需要扫描HBase 的一组数据但是只返回它的子集给客户端。这时需要使用过滤器(filter)。为你的Scan 对象增加过滤器,如下所示:

Filter f = …
Scan s = new Scan();
s.setFilter(f);

过滤器是在HBase 服务器端上而不是在客户端执行判断动作。当你在Scan 里设定Filter 时,HBase 使用它来决定一个记录是否返回。这样避免了许多不必要的数据传输。这个特性在服务器上执行过滤动作而不是把负担放在客户端。

使用过滤器需要实现org.apache.hadoop.hbase.filter.filter 接口。

HBase 提供了许多种过滤器,但实现你自己的过滤器也很容易。

为了过滤所有提到TwitBase 的推帖,你可以结合RegexStringComparator 使用ValueFilter:

Scan s = new Scan();
s.addColumn(Bytes.toBytes(“twits”), Bytes.toByes(“twit”));
Filter f = new ValueFilter(
CompareOp.EQUAL,
new RegexStringComparator(“.*TwitBase.*”));
s.setFilter(f);

HBase 也提供了一个过滤器构造类。ParseFilter 对象实现了一种查询语言,可以用来构造Filter 实例。可以用一个表达式构造同样的TwitBase 过滤器:

Scan s = new Scan();
s.addColumn(TWITS_FAM, TWIT_COL);
String expression = “ValueFilter(=,’regexString:.*TwitBase.*’)”;
ParseFilter p = new ParseFilter();
Filter f = p.parseSimpleFilterExpression(Bytes.toBytes(expression));
s.setFilter(f);

这两个例子中,数据在到达客户端之前在region 中编译和使用了正则表达式。

上面是一个在应用中使用过滤器的简单例子。HBase 中过滤器可以应用到行键、列限定符或者数据值。你也可以使用FilterList 和WhileMatchFilter 对象组合多个过滤器。过滤器允许对数据分页处理,限制扫描器返回的行数。

如果文章对您有帮助,欢迎点击下方按钮打赏作者

Comments

No comments yet.
To verify that you are human, please fill in "七"(required)