Hbase统计Rowkey行数的方法

Hbase如何快速统计RowKey行数
已邀请:

openinx - HBase Committer @小米

赞同来自: chang

试试这个工具:
 
 $ ./bin/hbase org.apache.hadoop.hbase.mapreduce.RowCounter
ERROR: Wrong number of parameters: 0
Usage: RowCounter [options] <tablename> [--range=[startKey],[endKey]] [<column1> <column2>...]
For performance consider the following options:
-Dhbase.client.scanner.caching=100
-Dmapred.map.tasks.speculative.execution=false

openinx - HBase Committer @小米

赞同来自: chang

自己写一个也非常简单:
 
public class TestCounter {

private static final byte[] TABLE_NAME = Bytes.toBytes("TestTable");

public static class TestMapper extends TableMapper<Text, LongWritable> {
public void map(ImmutableBytesWritable key, Result result, Context context)
throws IOException, InterruptedException {
context.getCounter("TestRowCounter", "Test Row Counter").increment(1);
}
}

public static void main(String args[]) throws Exception {
Configuration conf = HBaseConfiguration.create();
Job job = new Job(conf);

Scan scan = new Scan();
scan.setCaching(100);
scan.setCacheBlocks(false);

job.setSpeculativeExecution(false);
job.setJarByClass(TestCounter.class);
job.setMapperClass(TestMapper.class);
job.setMapOutputValueClass(LongWritable.class);
job.setOutputFormatClass(TextOutputFormat.class);

TableMapReduceUtil.initTableMapperJob(TABLE_NAME, scan, TestMapper.class, Text.class,
LongWritable.class, job);
TableMapReduceUtil.initTableReducerJob(Bytes.toString(TABLE_NAME), null, job);

if (!job.waitForCompletion(true)) {
System.out.println("wait job complete failed.");
}
}
}

SuperbDong

赞同来自: chang

1.使用scan 加 FirstKeyOnlyFilter 过滤器
public int getCount1() {
long bef = System.currentTimeMillis();
int i = 0;
ResultScanner rs = null;
try {
Scan s = new Scan();
s.setCaching(500);
s.setCacheBlocks(false);
s.setFilter(new KeyOnlyFilter());
rs = tableKeyword.getScanner(s);
} catch (IOException e) {
log.warn(e);
e.printStackTrace();
}
for (org.apache.hadoop.hbase.client.Result r : rs) {
i++ ;
}
long now = System.currentTimeMillis();
log.warn("keyword表中数据总数 :" + i + ", 所用时间 : " + (now - bef)/1000.0);
rs.close();
return i;
}

2.编写MR程序
public class RowCounter extends Configured implements Tool {
// Name of this 'program'
static final String NAME = "rowcounter";

/**
* Mapper that runs the count.
*/
static class RowCounterMapper
implements TableMap<ImmutableBytesWritable, Result> {
private static enum Counters {ROWS}

public void map(ImmutableBytesWritable row, Result values,
OutputCollector<ImmutableBytesWritable, Result> output,
Reporter reporter)
throws IOException {
boolean content = false;

for (KeyValue value: values.list()) {
if (value.getValue().length > 0) {
content = true;
break;
}
}
if (!content) {
// Don't count rows that are all empty values.
return;
}
// Give out same value every time. We're only interested in the row/key
reporter.incrCounter(Counters.ROWS, 1);
}

public void configure(JobConf jc) {
// Nothing to do.
}

public void close() throws IOException {
// Nothing to do.
}
}

/**
* @param args
* @return the JobConf
* @throws IOException
*/
public JobConf createSubmittableJob(String[] args) throws IOException {
JobConf c = new JobConf(getConf(), getClass());
c.setJobName(NAME);
// Columns are space delimited
StringBuilder sb = new StringBuilder();
final int columnoffset = 2;
for (int i = columnoffset; i < args.length; i++) {
if (i > columnoffset) {
sb.append(" ");
}
sb.append(args[i]);
}
// Second argument is the table name.
TableMapReduceUtil.initTableMapJob(args[1], sb.toString(),
RowCounterMapper.class, ImmutableBytesWritable.class, Result.class, c);
c.setNumReduceTasks(0);
// First arg is the output directory.
FileOutputFormat.setOutputPath(c, new Path(args[0]));
return c;
}

static int printUsage() {
System.out.println(NAME +
" <outputdir> <tablename> <column1> [<column2>...]");
return -1;
}

public int run(final String[] args) throws Exception {
// Make sure there are at least 3 parameters
if (args.length < 3) {
System.err.println("ERROR: Wrong number of parameters: " + args.length);
return printUsage();
}
JobClient.runJob(createSubmittableJob(args));
return 0;
}

/**
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
HBaseConfiguration c = new HBaseConfiguration();
int errCode = ToolRunner.run(c, new RowCounter(), args);
System.exit(errCode);
}
}

3.为表添加协处理器
    private static void initCountRowkey(String tableName) {
String coprocessClassName = "org.apache.hadoop.hbase.coprocessor.AggregateImpletation";
try {
HBaseAdmin admin = new HBaseAdmin(configuration);
admin.disableTable(tableName.getBytes());
HTableDescriptor hTableDescriptor = admin.getTableDescriptor(tableName.getBytes());
hTableDescriptor.addCoprocessor(coprocessClassName);
admin.modifyTable(tableName, hTableDescriptor);
admin.enableTable(tableName.getBytes());
} catch (IOException e) {
e.printStackTrace();
}
}

private static void countRowkey(String tableName) {
AggregationClient aggregationClient = new AggregationClient(configuration);
Scan scan = new Scan();
scan.addFamily("value".getBytes());
scan.setFilter(new FirstKeyOnlyFilter());
try {
long startTime = new Date().getTime();
long rowCount = aggregationClient.rowCount(tableName.getBytes(), new , scan);
long endTime = new Date().getTime();
System.out.println(endTime - startTime);
System.out.println(rowCount);
} catch (Throwable throwable) {
throwable.printStackTrace();
}
}

4.(预估)使用BloomFilter => row 在 HFile文件中的No of Keys in bloom
 sh hbase org.apache.hadoop.hbase.io.hfile.HFile -f /hbase/hfile -m
Bloom filter:
BloomSize: 262144
No of Keys in bloom: 217851
Max Keys for bloom: 218612
Percentage filled: 100%
Number of chunks: 2
Comparator: ByteArrayComparator

qilangye

赞同来自:

以上这些代码是要在哪里运行呢?

要回复问题请先登录注册