`

HBase Coprocessor

 
阅读更多

HBase的coprocessor分为两类,Observer和EndPoint。Observer相当于触发器,代码部署在服务端,相当于对API调用的代理。 EndPoint相当于存储过程。0.94.x之前使用EndPoint需要实现CoprocessorProtocol接口,而0.96.x的EndPoint改为用protobufs作为RPC的协议。

 

EndPoint协处理器 

Endpoint 是一个 Server 端 Service 的具体实现。它的实现有一些框架代码,这些框架代码与具体的业务需求逻辑无关。仅仅是为了和 HBase 的运行时环境协同工作而必须遵循和完成的一些粘合代码。因此多数情况下仅仅需要从一个例子程序拷贝过来并进行命名修改即可。不过我们还是完整地对这些粘合代码进行粗略的讲解以便更好地理解代码。

例如:统计一张表的行数

首先首先编写protobuf文件并编译,protobuf的安装请见protobuf安装与使用;

 

option java_package = "com.cobub.protobuf";
option java_outer_classname = "ExampleProtos";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
option optimize_for = SPEED;

message CountRequest {
}

message CountResponse {
  required int64 count = 1 [default = 0];
}

service RowCountService {
  rpc getRowCount(CountRequest)
    returns (CountResponse);
  rpc getKeyValueCount(CountRequest)
    returns (CountResponse);
}

 

 

protoc --java_out=$PROJECT_HOME/src/main/java  exampleProtos.proto

 

用protoc编译后,可以看到在工程的 src/main/java/com/cobub/protobuf 目录下生成了一个名为 ExampleProtos.java 的文件。这个 Java 文件就是 RPC 的 Java 代码,在后续的 Server 端代码和 Client 端代码中都要用到这个 Java 文件。

 

endpoint 服务端:

首先 Endpoint 协处理器是一个 Protobuf Service 的实现,因此需要它必须继承某个 Protobuf Service。其次,作为一个 HBase 的协处理器,Endpoint 还必须实现 HBase 定义的协处理器协议,用 Java 的接口来定义。具体来说就是CoprocessorService 和 Coprocessor,这些 HBase 接口负责将协处理器和 HBase 的 RegionServer 等实例联系起来,以便协同工作。

public class RowCountEndpoint extends ExampleProtos.RowCountService
    implements Coprocessor, CoprocessorService {
  private RegionCoprocessorEnvironment env;

  public RowCountEndpoint() {
  }

  /**
   * Just returns a reference to this object, which implements the RowCounterService interface.
   */
  @Override
  public Service getService() {
    return this;
  }

  /**
   * Returns a count of the rows in the region where this coprocessor is loaded.
   */
  @Override
  public void getRowCount(RpcController controller, ExampleProtos.CountRequest request,
                          RpcCallback<ExampleProtos.CountResponse> done) {
    Scan scan = new Scan();
    scan.setFilter(new FirstKeyOnlyFilter());
    ExampleProtos.CountResponse response = null;
    InternalScanner scanner = null;
    try {
      scanner = env.getRegion().getScanner(scan);
      List<Cell> results = new ArrayList<Cell>();
      boolean hasMore = false;
      byte[] lastRow = null;
      long count = 0;
      do {
        hasMore = scanner.next(results);
        for (Cell kv : results) {
          byte[] currentRow = CellUtil.cloneRow(kv);
          if (lastRow == null || !Bytes.equals(lastRow, currentRow)) {
            lastRow = currentRow;
            count++;
          }
        }
        results.clear();
      } while (hasMore);

      response = ExampleProtos.CountResponse.newBuilder()
          .setCount(count).build();
    } catch (IOException ioe) {
      ResponseConverter.setControllerException(controller, ioe);
    } finally {
      if (scanner != null) {
        try {
          scanner.close();
        } catch (IOException ignored) {}
      }
    }
    done.run(response);
  }

  /**
   * Returns a count of all KeyValues in the region where this coprocessor is loaded.
   */
  @Override
  public void getKeyValueCount(RpcController controller, ExampleProtos.CountRequest request,
                               RpcCallback<ExampleProtos.CountResponse> done) {
    ExampleProtos.CountResponse response = null;
    InternalScanner scanner = null;
    try {
      scanner = env.getRegion().getScanner(new Scan());
      List<Cell> results = new ArrayList<Cell>();
      boolean hasMore = false;
      long count = 0;
      do {
        hasMore = scanner.next(results);
        for (Cell kv : results) {
          count++;
        }
        results.clear();
      } while (hasMore);

      response = ExampleProtos.CountResponse.newBuilder()
          .setCount(count).build();
    } catch (IOException ioe) {
      ResponseConverter.setControllerException(controller, ioe);
    } finally {
      if (scanner != null) {
        try {
          scanner.close();
        } catch (IOException ignored) {}
      }
    }
    done.run(response);
  }

  /**
   * Stores a reference to the coprocessor environment provided by the
   * {@link org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost} from the region where this
   * coprocessor is loaded.  Since this is a coprocessor endpoint, it always expects to be loaded
   * on a table region, so always expects this to be an instance of
   * {@link RegionCoprocessorEnvironment}.
   * @param env the environment provided by the coprocessor host
   * @throws IOException if the provided environment is not an instance of
   * {@code RegionCoprocessorEnvironment}
   */
  @Override
  public void start(CoprocessorEnvironment env) throws IOException {
    if (env instanceof RegionCoprocessorEnvironment) {
      this.env = (RegionCoprocessorEnvironment)env;
    } else {
      throw new CoprocessorException("Must be loaded on a table region!");
    }
  }

  @Override
  public void stop(CoprocessorEnvironment env) throws IOException {
    // nothing to do
  }
} 

编译服务端代码,并打包存放到hdfs(方便集群所有节点共享)

动态加载coprocessor

alter 'test:test', METHOD => 'table_att', 'coprocessor'=>'|org.apache.hadoop.hbase.coprocessor.example.RowCountEndpoint|1001|arg1=1,arg2=2'

 

 

协处理器在 Region 打开的时候被 RegionServer 自动加载,并会调用器 start 接口,完成初始化工作。一般的该接口函数中仅仅需要将协处理器的运行上下文环境变量 CoprocessorEnviorment保存到本地即可。

CoprocessorEnviorment 保存了协处理器的运行环境,每个协处理器都是在一个 RegionServer 进程内运行,并隶属于某个 Region。通过该变量,可以获取 Region 的实例等 HBase 的运行时环境对象 

 

endpoint客户端代码

public class RowEndPointClient {

    private static final byte[] TEST_FAMILY = Bytes.toBytes("f");
    private static final byte[] TEST_COLUMN = Bytes.toBytes("col1");

    public static void main(String[] args) throws Throwable {
        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", "master001:2181,slave001:2181,slave003:2181");
        // conf.set("hbase.master", "host_master:60000");
        HTable table = new HTable(conf, "test:test");
        // insert some test rows
        for (int i = 10; i < 15; i++) {
            byte[] iBytes = Bytes.toBytes(i);
            Put p = new Put(iBytes);
            p.addColumn(TEST_FAMILY, TEST_COLUMN, iBytes);
            table.put(p);
        }

        final ExampleProtos.CountRequest request = ExampleProtos.CountRequest.getDefaultInstance();
        Map<byte[], Long> results = table.coprocessorService(ExampleProtos.RowCountService.class,
                null, null,
                new Batch.Call<ExampleProtos.RowCountService, Long>() {
                    public Long call(ExampleProtos.RowCountService counter) throws IOException {
                        ServerRpcController controller = new ServerRpcController();
                        BlockingRpcCallback<ExampleProtos.CountResponse> rpcCallback =
                                new BlockingRpcCallback<ExampleProtos.CountResponse>();
                        counter.getRowCount(controller, request, rpcCallback);
                        ExampleProtos.CountResponse response = rpcCallback.get();
                        if (controller.failedOnException()) {
                            throw controller.getFailedOn();
                        }
                        return (response != null && response.hasCount()) ? response.getCount() : 0;
                    }
                });
        // should be one region with results
        Iterator<Long> iter = results.values().iterator();
        long count = 0;
        int num = 0;
        while (iter.hasNext()) {
            num++;
            count += iter.next();
        }
        System.out.println("count=" + count + ",num=" + num);
    }
}

 

Observer 协处理器

 

参考:

HBase 官网上的编程手册,对 HBase 的编程 API 有全面的介绍;

网络文章 HBase Coprocessor 示例,是一篇非常好的 Coprocessor 实例讲解;

HBase 协处理器编程入门,详细介绍了Coprocessor服务端的使用;

 

 

 

 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics