HBase的coprocessor分为两类,Observer和EndPoint。Observer相当于触发器,代码部署在服务端,相当于对API调用的代理。 EndPoint相当于存储过程。0.94.x之前使用EndPoint需要实现CoprocessorProtocol接口,而0.96.x的EndPoint改为用protobufs作为RPC的协议。
EndPoint协处理器
Endpoint 是一个 Server 端 Service 的具体实现。它的实现有一些框架代码,这些框架代码与具体的业务需求逻辑无关。仅仅是为了和 HBase 的运行时环境协同工作而必须遵循和完成的一些粘合代码。因此多数情况下仅仅需要从一个例子程序拷贝过来并进行命名修改即可。不过我们还是完整地对这些粘合代码进行粗略的讲解以便更好地理解代码。
例如:统计一张表的行数
首先首先编写protobuf文件并编译,protobuf的安装请见protobuf安装与使用;
option java_package = "com.cobub.protobuf"; option java_outer_classname = "ExampleProtos"; option java_generic_services = true; option java_generate_equals_and_hash = true; option optimize_for = SPEED; message CountRequest { } message CountResponse { required int64 count = 1 [default = 0]; } service RowCountService { rpc getRowCount(CountRequest) returns (CountResponse); rpc getKeyValueCount(CountRequest) returns (CountResponse); }
protoc --java_out=$PROJECT_HOME/src/main/java exampleProtos.proto
用protoc编译后,可以看到在工程的 src/main/java/com/cobub/protobuf 目录下生成了一个名为 ExampleProtos.java 的文件。这个 Java 文件就是 RPC 的 Java 代码,在后续的 Server 端代码和 Client 端代码中都要用到这个 Java 文件。
endpoint 服务端:
首先 Endpoint 协处理器是一个 Protobuf Service 的实现,因此需要它必须继承某个 Protobuf Service。其次,作为一个 HBase 的协处理器,Endpoint 还必须实现 HBase 定义的协处理器协议,用 Java 的接口来定义。具体来说就是CoprocessorService 和 Coprocessor,这些 HBase 接口负责将协处理器和 HBase 的 RegionServer 等实例联系起来,以便协同工作。
public class RowCountEndpoint extends ExampleProtos.RowCountService implements Coprocessor, CoprocessorService { private RegionCoprocessorEnvironment env; public RowCountEndpoint() { } /** * Just returns a reference to this object, which implements the RowCounterService interface. */ @Override public Service getService() { return this; } /** * Returns a count of the rows in the region where this coprocessor is loaded. */ @Override public void getRowCount(RpcController controller, ExampleProtos.CountRequest request, RpcCallback<ExampleProtos.CountResponse> done) { Scan scan = new Scan(); scan.setFilter(new FirstKeyOnlyFilter()); ExampleProtos.CountResponse response = null; InternalScanner scanner = null; try { scanner = env.getRegion().getScanner(scan); List<Cell> results = new ArrayList<Cell>(); boolean hasMore = false; byte[] lastRow = null; long count = 0; do { hasMore = scanner.next(results); for (Cell kv : results) { byte[] currentRow = CellUtil.cloneRow(kv); if (lastRow == null || !Bytes.equals(lastRow, currentRow)) { lastRow = currentRow; count++; } } results.clear(); } while (hasMore); response = ExampleProtos.CountResponse.newBuilder() .setCount(count).build(); } catch (IOException ioe) { ResponseConverter.setControllerException(controller, ioe); } finally { if (scanner != null) { try { scanner.close(); } catch (IOException ignored) {} } } done.run(response); } /** * Returns a count of all KeyValues in the region where this coprocessor is loaded. */ @Override public void getKeyValueCount(RpcController controller, ExampleProtos.CountRequest request, RpcCallback<ExampleProtos.CountResponse> done) { ExampleProtos.CountResponse response = null; InternalScanner scanner = null; try { scanner = env.getRegion().getScanner(new Scan()); List<Cell> results = new ArrayList<Cell>(); boolean hasMore = false; long count = 0; do { hasMore = scanner.next(results); for (Cell kv : results) { count++; } results.clear(); } while (hasMore); response = ExampleProtos.CountResponse.newBuilder() .setCount(count).build(); } catch (IOException ioe) { ResponseConverter.setControllerException(controller, ioe); } finally { if (scanner != null) { try { scanner.close(); } catch (IOException ignored) {} } } done.run(response); } /** * Stores a reference to the coprocessor environment provided by the * {@link org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost} from the region where this * coprocessor is loaded. Since this is a coprocessor endpoint, it always expects to be loaded * on a table region, so always expects this to be an instance of * {@link RegionCoprocessorEnvironment}. * @param env the environment provided by the coprocessor host * @throws IOException if the provided environment is not an instance of * {@code RegionCoprocessorEnvironment} */ @Override public void start(CoprocessorEnvironment env) throws IOException { if (env instanceof RegionCoprocessorEnvironment) { this.env = (RegionCoprocessorEnvironment)env; } else { throw new CoprocessorException("Must be loaded on a table region!"); } } @Override public void stop(CoprocessorEnvironment env) throws IOException { // nothing to do } }
编译服务端代码,并打包存放到hdfs(方便集群所有节点共享)
动态加载coprocessor
alter 'test:test', METHOD => 'table_att', 'coprocessor'=>'|org.apache.hadoop.hbase.coprocessor.example.RowCountEndpoint|1001|arg1=1,arg2=2'
协处理器在 Region 打开的时候被 RegionServer 自动加载,并会调用器 start 接口,完成初始化工作。一般的该接口函数中仅仅需要将协处理器的运行上下文环境变量 CoprocessorEnviorment保存到本地即可。
CoprocessorEnviorment 保存了协处理器的运行环境,每个协处理器都是在一个 RegionServer 进程内运行,并隶属于某个 Region。通过该变量,可以获取 Region 的实例等 HBase 的运行时环境对象
endpoint客户端代码
public class RowEndPointClient { private static final byte[] TEST_FAMILY = Bytes.toBytes("f"); private static final byte[] TEST_COLUMN = Bytes.toBytes("col1"); public static void main(String[] args) throws Throwable { Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", "master001:2181,slave001:2181,slave003:2181"); // conf.set("hbase.master", "host_master:60000"); HTable table = new HTable(conf, "test:test"); // insert some test rows for (int i = 10; i < 15; i++) { byte[] iBytes = Bytes.toBytes(i); Put p = new Put(iBytes); p.addColumn(TEST_FAMILY, TEST_COLUMN, iBytes); table.put(p); } final ExampleProtos.CountRequest request = ExampleProtos.CountRequest.getDefaultInstance(); Map<byte[], Long> results = table.coprocessorService(ExampleProtos.RowCountService.class, null, null, new Batch.Call<ExampleProtos.RowCountService, Long>() { public Long call(ExampleProtos.RowCountService counter) throws IOException { ServerRpcController controller = new ServerRpcController(); BlockingRpcCallback<ExampleProtos.CountResponse> rpcCallback = new BlockingRpcCallback<ExampleProtos.CountResponse>(); counter.getRowCount(controller, request, rpcCallback); ExampleProtos.CountResponse response = rpcCallback.get(); if (controller.failedOnException()) { throw controller.getFailedOn(); } return (response != null && response.hasCount()) ? response.getCount() : 0; } }); // should be one region with results Iterator<Long> iter = results.values().iterator(); long count = 0; int num = 0; while (iter.hasNext()) { num++; count += iter.next(); } System.out.println("count=" + count + ",num=" + num); } }
Observer 协处理器
参考:
HBase 官网上的编程手册,对 HBase 的编程 API 有全面的介绍;
网络文章 HBase Coprocessor 示例,是一篇非常好的 Coprocessor 实例讲解;
HBase 协处理器编程入门,详细介绍了Coprocessor服务端的使用;
相关推荐
HBaseCoprocessor的实现与应用.pdf
在hbtc2012上的发言,介绍hbase coprocessor的优化。hbase的cp是其自带的分布式并行计算框架。
讲师:陈杨——快手大数据高级研发工程师 ...内容概要:(1)讲解hbase coprocessor的原理以及使用场景,(2) coprocessor整个流程实战,包括开发,加载,运行以及管理(3)结合1,2分析coprocessor在rsgroup中的具体使用
藏经阁-HBase Coprocessor-22.pdf
HBaseCoprocessor的实现与应用.zip
使用Hbase协作器(Coprocessor)同步数据到ElasticSearch(hbase 版本 1.2.0-cdh5.8.0, es 2.4.0 版本)源代码
hbase-solr-coprocessor 测试代码,目的是借助solr实现hbase二级索引,以使hbase支持高效的多条件查询。主要通过hbase的coprocessor的Observer实现,通过coprocessor在记录插入hbase时向solr中创建索引。 项目核心为...
HBase-coprocessor
Hbase本身只有一级索引rowkey,现在通过Hbase coprocessor协处理器把Hbase的数据索引存储到Elasticsearch,从而建立二级索引;ppt中讲述了一些注意事项,挺有用的,希望能有所帮忙!
2-6+HBase+Coprocessor
javaapi实现hbase的触发器,包含hbase的工具类
增量式的Apriori算法,有点像分布式的Apriori,因为我们可以把已挖掘的事务集和新增的事务集看作两个互相独立的数据集,挖掘新增的事务集,获取所有新增频繁集,然后与已有的频繁集做并集,对于两边都同时频繁的项集...
hbaseSecondaryIndex是一个hbase二级索引的实现,基于solr+hbase coprocessor框架为hbase提供索引表支持。 此工程可打包为一个hbase的插件,通用方便。 这篇文档的目的是为了向您介绍hbaseSecondaryIndex的基本使用...
hbase-协处理器-示例此示例演示如何使用 HBase 协处理器和 Algebird monoid 实现分组聚合。 我们在这里使用的 HBase 版本...复制到HBase classpath $ sbt assembly$ cp $PWD /target/scala-2.10/hbase-coprocessor-asse
协处理器通过宏和概括的方式,我们可以将协处理器定义为一个框架,该框架提供了一种在HBase中执行自定义代码的简便方法。 代表协处理器的最常用类比是“触发器/存储过程”和AOP。 协处理器可以开发为: 观察者协处理...
16.6 HBase Coprocessor 262 16.7 布隆过滤器 262 16.8 小结 262 第17章 工具和实用程序 263 17.1 RRDTool 263 17.2 Nagios 265 17.3 Scribe 266 17.4 Flume 267 17.5 Chukwa 267 17.6 Pig 268 17.6.1 ...
手把手视频详细讲解项目开发全过程,需要的小伙伴自行百度网盘下载,链接见附件,永久有效。 课程简介 从HBase的集群搭建、HBaseshell操作、java...3. HBase的协处理器(Coprocessor) 4. HBase事务 5. HBase数据结构
Apache Phoenix是构建在HBase之上的关系型数据库层,作为内嵌的客户端JDBC驱动用以对...直接使用HBase的API,结合协处理器(coprocessor)和自定义的过滤器的话,小范围的查询在毫秒级响应,千万数据的话响应速度为秒级
timelineservice jar包
HBase 中的高效无读增量 很多时候,当用于存储计数器时,为更新它们而执行的增量操作的结果会被忽略。 然而,HBase 会完成所有必要的工作来计算和... 首先,我们需要构建一个 Coprocessor jar: $ mvn clean package