mirror of https://github.com/dunwu/db-tutorial.git
feat: HBase 、ES 示例更新
parent
aa66b1f3e1
commit
c82e9d737a
|
@ -1,7 +1,5 @@
|
||||||
package io.github.dunwu.javadb.elasticsearch.util;
|
package io.github.dunwu.javadb.elasticsearch.util;
|
||||||
|
|
||||||
import cn.hutool.core.bean.BeanUtil;
|
|
||||||
import cn.hutool.core.bean.copier.CopyOptions;
|
|
||||||
import cn.hutool.core.collection.CollectionUtil;
|
import cn.hutool.core.collection.CollectionUtil;
|
||||||
import cn.hutool.core.util.ArrayUtil;
|
import cn.hutool.core.util.ArrayUtil;
|
||||||
import cn.hutool.core.util.StrUtil;
|
import cn.hutool.core.util.StrUtil;
|
||||||
|
@ -35,8 +33,6 @@ import org.elasticsearch.client.RestHighLevelClient;
|
||||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
|
||||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
|
||||||
import org.elasticsearch.index.query.QueryBuilder;
|
import org.elasticsearch.index.query.QueryBuilder;
|
||||||
import org.elasticsearch.index.query.QueryBuilders;
|
import org.elasticsearch.index.query.QueryBuilders;
|
||||||
import org.elasticsearch.rest.RestStatus;
|
import org.elasticsearch.rest.RestStatus;
|
||||||
|
@ -47,7 +43,6 @@ import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
@ -170,18 +165,8 @@ public class ElasticsearchUtil {
|
||||||
|
|
||||||
public static <T extends EsEntity> String insert(RestHighLevelClient client, String index, String type, T entity)
|
public static <T extends EsEntity> String insert(RestHighLevelClient client, String index, String type, T entity)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
Map<String, Object> map = new HashMap<>();
|
Map<String, Object> map = toMap(entity);
|
||||||
BeanUtil.beanToMap(entity, map, CopyOptions.create().ignoreError());
|
IndexRequest request = new IndexRequest(index, type).source(map);
|
||||||
XContentBuilder builder = XContentFactory.jsonBuilder();
|
|
||||||
builder.startObject();
|
|
||||||
for (Map.Entry<String, Object> entry : map.entrySet()) {
|
|
||||||
String key = entry.getKey();
|
|
||||||
Object value = entry.getValue();
|
|
||||||
builder.field(key, value);
|
|
||||||
}
|
|
||||||
builder.endObject();
|
|
||||||
|
|
||||||
IndexRequest request = new IndexRequest(index, type).source(builder);
|
|
||||||
if (entity.getDocId() != null) {
|
if (entity.getDocId() != null) {
|
||||||
request.id(entity.getDocId());
|
request.id(entity.getDocId());
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,7 +18,7 @@ import java.util.Map;
|
||||||
/**
|
/**
|
||||||
* JSON 工具类
|
* JSON 工具类
|
||||||
*
|
*
|
||||||
* @author Zhang Peng
|
* @author <a href="mailto:forbreak@163.com">Zhang Peng</a>
|
||||||
* @date 2023-06-29
|
* @date 2023-06-29
|
||||||
*/
|
*/
|
||||||
@Slf4j
|
@Slf4j
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
<?xml version="1.0"?>
|
<?xml version="1.0"?>
|
||||||
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
|
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
|
||||||
xmlns="http://maven.apache.org/POM/4.0.0">
|
xmlns="http://maven.apache.org/POM/4.0.0">
|
||||||
<modelVersion>4.0.0</modelVersion>
|
<modelVersion>4.0.0</modelVersion>
|
||||||
<groupId>io.github.dunwu</groupId>
|
<groupId>io.github.dunwu</groupId>
|
||||||
<artifactId>javadb-hbase</artifactId>
|
<artifactId>javadb-hbase</artifactId>
|
||||||
|
@ -14,7 +14,7 @@
|
||||||
<maven.compiler.source>${java.version}</maven.compiler.source>
|
<maven.compiler.source>${java.version}</maven.compiler.source>
|
||||||
<maven.compiler.target>${java.version}</maven.compiler.target>
|
<maven.compiler.target>${java.version}</maven.compiler.target>
|
||||||
|
|
||||||
<hbase.version>1.3.1</hbase.version>
|
<hbase.version>2.4.15</hbase.version>
|
||||||
<junit.version>4.13.1</junit.version>
|
<junit.version>4.13.1</junit.version>
|
||||||
<dunwu.version>0.5.7</dunwu.version>
|
<dunwu.version>0.5.7</dunwu.version>
|
||||||
</properties>
|
</properties>
|
||||||
|
@ -25,8 +25,19 @@
|
||||||
<artifactId>hbase-client</artifactId>
|
<artifactId>hbase-client</artifactId>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>io.github.dunwu</groupId>
|
<groupId>org.apache.hadoop</groupId>
|
||||||
<artifactId>dunwu-tool-core</artifactId>
|
<artifactId>hadoop-hdfs</artifactId>
|
||||||
|
<version>2.10.2</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>cn.hutool</groupId>
|
||||||
|
<artifactId>hutool-all</artifactId>
|
||||||
|
<version>5.8.18</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.projectlombok</groupId>
|
||||||
|
<artifactId>lombok</artifactId>
|
||||||
|
<version>1.18.22</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
<!-- test begin -->
|
<!-- test begin -->
|
||||||
|
@ -44,11 +55,12 @@
|
||||||
<artifactId>hbase-client</artifactId>
|
<artifactId>hbase-client</artifactId>
|
||||||
<version>${hbase.version}</version>
|
<version>${hbase.version}</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
<!-- <dependency>-->
|
||||||
<groupId>io.github.dunwu</groupId>
|
<!-- <groupId>io.github.dunwu</groupId>-->
|
||||||
<artifactId>dunwu-tool-core</artifactId>
|
<!-- <artifactId>dunwu-tool-core</artifactId>-->
|
||||||
<version>${dunwu.version}</version>
|
<!-- <version>${dunwu.version}</version>-->
|
||||||
</dependency>
|
<!-- </dependency>-->
|
||||||
|
|
||||||
|
|
||||||
<!-- test begin -->
|
<!-- test begin -->
|
||||||
<dependency>
|
<dependency>
|
||||||
|
|
|
@ -0,0 +1,234 @@
|
||||||
|
package io.github.dunwu.javadb.hbase;
|
||||||
|
|
||||||
|
import cn.hutool.core.io.IoUtil;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.NamespaceDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.client.Admin;
|
||||||
|
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
||||||
|
import org.apache.hadoop.hbase.client.Connection;
|
||||||
|
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||||
|
import org.apache.hadoop.hbase.client.Table;
|
||||||
|
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||||
|
|
||||||
|
import java.io.Closeable;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* HBase 管理工具类
|
||||||
|
*
|
||||||
|
* @author <a href="mailto:forbreak@163.com">Zhang Peng</a>
|
||||||
|
* @date 2023-03-27
|
||||||
|
*/
|
||||||
|
public class HBaseAdminHelper implements Closeable {
|
||||||
|
|
||||||
|
private final Connection connection;
|
||||||
|
private final Configuration configuration;
|
||||||
|
|
||||||
|
protected HBaseAdminHelper(Configuration configuration) throws IOException {
|
||||||
|
this.configuration = configuration;
|
||||||
|
this.connection = ConnectionFactory.createConnection(configuration);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected HBaseAdminHelper(Connection connection) {
|
||||||
|
this.configuration = connection.getConfiguration();
|
||||||
|
this.connection = connection;
|
||||||
|
}
|
||||||
|
|
||||||
|
public synchronized static HBaseAdminHelper newInstance(Configuration configuration) throws IOException {
|
||||||
|
if (configuration == null) {
|
||||||
|
throw new IllegalArgumentException("configuration can not be null!");
|
||||||
|
}
|
||||||
|
return new HBaseAdminHelper(configuration);
|
||||||
|
}
|
||||||
|
|
||||||
|
public synchronized static HBaseAdminHelper newInstance(Connection connection) throws IOException {
|
||||||
|
if (connection == null) {
|
||||||
|
throw new IllegalArgumentException("connection can not be null!");
|
||||||
|
}
|
||||||
|
return new HBaseAdminHelper(connection);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 关闭内部持有的 HBase Connection 实例
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public synchronized void close() {
|
||||||
|
if (null == connection || connection.isClosed()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
IoUtil.close(connection);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 获取 HBase 连接实例
|
||||||
|
*
|
||||||
|
* @return /
|
||||||
|
*/
|
||||||
|
public Connection getConnection() {
|
||||||
|
if (null == connection) {
|
||||||
|
throw new RuntimeException("HBase connection init failed...");
|
||||||
|
}
|
||||||
|
return connection;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 获取 HBase 配置
|
||||||
|
*
|
||||||
|
* @return /
|
||||||
|
*/
|
||||||
|
public Configuration getConfiguration() {
|
||||||
|
return configuration;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 创建命名空间
|
||||||
|
*
|
||||||
|
* @param namespace 命名空间
|
||||||
|
*/
|
||||||
|
public void createNamespace(String namespace) throws IOException {
|
||||||
|
NamespaceDescriptor nd = NamespaceDescriptor.create(namespace).build();
|
||||||
|
Admin admin = getAdmin();
|
||||||
|
admin.createNamespace(nd);
|
||||||
|
admin.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 删除命名空间
|
||||||
|
*
|
||||||
|
* @param namespace 命名空间
|
||||||
|
*/
|
||||||
|
public void dropNamespace(String namespace) throws IOException {
|
||||||
|
dropNamespace(namespace, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 删除命名空间
|
||||||
|
*
|
||||||
|
* @param namespace 命名空间
|
||||||
|
* @param force 是否强制删除
|
||||||
|
*/
|
||||||
|
public void dropNamespace(String namespace, boolean force) throws IOException {
|
||||||
|
Admin admin = getAdmin();
|
||||||
|
if (force) {
|
||||||
|
TableName[] tableNames = getAdmin().listTableNamesByNamespace(namespace);
|
||||||
|
for (TableName name : tableNames) {
|
||||||
|
admin.disableTable(name);
|
||||||
|
admin.deleteTable(name);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
admin.deleteNamespace(namespace);
|
||||||
|
admin.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 指定表是否存在
|
||||||
|
*
|
||||||
|
* @param tableName 表名
|
||||||
|
*/
|
||||||
|
public boolean existsTable(TableName tableName) throws IOException {
|
||||||
|
Admin admin = getAdmin();
|
||||||
|
boolean result = admin.tableExists(tableName);
|
||||||
|
admin.close();
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 创建表
|
||||||
|
*
|
||||||
|
* @param tableName 表名
|
||||||
|
* @param families 列族
|
||||||
|
*/
|
||||||
|
public void createTable(TableName tableName, String... families) throws IOException {
|
||||||
|
createTable(tableName, null, families);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 创建表
|
||||||
|
*
|
||||||
|
* @param tableName 表名
|
||||||
|
* @param splitKeys 表初始区域的拆分关键字
|
||||||
|
* @param families 列族
|
||||||
|
*/
|
||||||
|
public void createTable(TableName tableName, byte[][] splitKeys, String... families) throws IOException {
|
||||||
|
|
||||||
|
List<ColumnFamilyDescriptor> columnFamilyDescriptorList = new ArrayList<>();
|
||||||
|
TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
|
||||||
|
for (String cf : families) {
|
||||||
|
ColumnFamilyDescriptor columnFamilyDescriptor = ColumnFamilyDescriptorBuilder.of(cf);
|
||||||
|
columnFamilyDescriptorList.add(columnFamilyDescriptor);
|
||||||
|
}
|
||||||
|
builder.setColumnFamilies(columnFamilyDescriptorList);
|
||||||
|
|
||||||
|
TableDescriptor td = builder.build();
|
||||||
|
Admin admin = getAdmin();
|
||||||
|
if (splitKeys != null) {
|
||||||
|
admin.createTable(td, splitKeys);
|
||||||
|
} else {
|
||||||
|
admin.createTable(td);
|
||||||
|
}
|
||||||
|
admin.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 删除表
|
||||||
|
*
|
||||||
|
* @param tableName 表名
|
||||||
|
*/
|
||||||
|
public void dropTable(TableName tableName) throws IOException {
|
||||||
|
if (existsTable(tableName)) {
|
||||||
|
Admin admin = getAdmin();
|
||||||
|
if (admin.isTableEnabled(tableName)) {
|
||||||
|
disableTable(tableName);
|
||||||
|
}
|
||||||
|
admin.deleteTable(tableName);
|
||||||
|
admin.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 禁用表
|
||||||
|
*
|
||||||
|
* @param tableName 表名
|
||||||
|
*/
|
||||||
|
public void disableTable(TableName tableName) throws IOException {
|
||||||
|
Admin admin = getAdmin();
|
||||||
|
admin.disableTable(tableName);
|
||||||
|
admin.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 启用表
|
||||||
|
*
|
||||||
|
* @param tableName 表名
|
||||||
|
*/
|
||||||
|
public void enableTable(TableName tableName) throws IOException {
|
||||||
|
Admin admin = getAdmin();
|
||||||
|
admin.enableTable(tableName);
|
||||||
|
admin.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 获取 {@link Table} 实例
|
||||||
|
*
|
||||||
|
* @param tableName 表名
|
||||||
|
* @return /
|
||||||
|
*/
|
||||||
|
public Table getTable(TableName tableName) throws IOException {
|
||||||
|
return getConnection().getTable(tableName);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 获取 {@link Admin} 实例
|
||||||
|
*
|
||||||
|
* @return /
|
||||||
|
*/
|
||||||
|
public Admin getAdmin() throws IOException {
|
||||||
|
return getConnection().getAdmin();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -1,25 +0,0 @@
|
||||||
package io.github.dunwu.javadb.hbase;
|
|
||||||
|
|
||||||
public enum HBaseConstant {
|
|
||||||
|
|
||||||
HBASE_ZOOKEEPER_QUORUM("hbase.zookeeper.quorum"),
|
|
||||||
HBASE_ENABLE("hbase.enable"),
|
|
||||||
HBASE_MASTER("hbase.master"),
|
|
||||||
HBASE_ZOOKEEPER_PROPERTY_CLIENTPORT("hbase.zookeeper.property.clientPort"),
|
|
||||||
HBASE_HCONNECTION_THREADS_MAX("hbase.hconnection.threads.max"),
|
|
||||||
HBASE_HCONNECTION_THREADS_CORE("hbase.hconnection.threads.core"),
|
|
||||||
ZOOKEEPER_ZNODE_PARENT("zookeeper.znode.parent"),
|
|
||||||
HBASE_COLUMN_FAMILY("hbase.column.family"),
|
|
||||||
HBASE_EXECUTOR_NUM("hbase.executor.num"),
|
|
||||||
HBASE_IPC_POOL_SIZE("hbase.client.ipc.pool.size");
|
|
||||||
|
|
||||||
private String key;
|
|
||||||
|
|
||||||
HBaseConstant(String key) {
|
|
||||||
this.key = key;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String key() {
|
|
||||||
return key;
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -0,0 +1,41 @@
|
||||||
|
package io.github.dunwu.javadb.hbase;
|
||||||
|
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
|
||||||
|
@Slf4j
|
||||||
|
public class HBaseDemo {
|
||||||
|
|
||||||
|
public static void main(String[] args) throws Exception {
|
||||||
|
|
||||||
|
// 请改为配置的方式
|
||||||
|
// String zkHosts = "192.168.31.127";
|
||||||
|
String zkHosts = "192.168.31.255";
|
||||||
|
// 请改为配置的方式
|
||||||
|
String zkPort = "2181";
|
||||||
|
// 请改为配置的方式
|
||||||
|
String namespace = "test";
|
||||||
|
String tablename = "test";
|
||||||
|
Configuration conf = HBaseConfiguration.create();
|
||||||
|
conf.set("hbase.zookeeper.quorum", zkHosts);
|
||||||
|
conf.set("hbase.zookeeper.port", zkPort);
|
||||||
|
|
||||||
|
// 创建命名空间和表
|
||||||
|
TableName tableName = TableName.valueOf(namespace, tablename);
|
||||||
|
HBaseAdminHelper hbaseAdminHelper = HBaseAdminHelper.newInstance(conf);
|
||||||
|
hbaseAdminHelper.enableTable(tableName);
|
||||||
|
// hbaseAdminHelper.createNamespace(namespace);
|
||||||
|
// hbaseAdminHelper.createTable(tableName, "c1");
|
||||||
|
//
|
||||||
|
// String rowKey = IdUtil.fastSimpleUUID();
|
||||||
|
// HBaseHelper hbaseHelper = HBaseHelper.newInstance(hbaseAdminHelper.getConnection());
|
||||||
|
// hbaseHelper.put(tableName, rowKey, "c1", "name", "jack");
|
||||||
|
// String value = hbaseHelper.get(tableName, rowKey, "c1", "name");
|
||||||
|
// System.out.println("value = " + value);
|
||||||
|
|
||||||
|
hbaseAdminHelper.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,920 @@
|
||||||
|
package io.github.dunwu.javadb.hbase;
|
||||||
|
|
||||||
|
import cn.hutool.core.bean.BeanUtil;
|
||||||
|
import cn.hutool.core.bean.copier.CopyOptions;
|
||||||
|
import cn.hutool.core.collection.CollectionUtil;
|
||||||
|
import cn.hutool.core.io.IoUtil;
|
||||||
|
import cn.hutool.core.map.MapUtil;
|
||||||
|
import cn.hutool.core.util.ArrayUtil;
|
||||||
|
import cn.hutool.core.util.ObjectUtil;
|
||||||
|
import cn.hutool.core.util.ReflectUtil;
|
||||||
|
import cn.hutool.core.util.StrUtil;
|
||||||
|
import io.github.dunwu.javadb.hbase.entity.HBaseFamilyRequest;
|
||||||
|
import io.github.dunwu.javadb.hbase.entity.HBaseMultiFamilyRequest;
|
||||||
|
import io.github.dunwu.javadb.hbase.entity.HBaseRowData;
|
||||||
|
import io.github.dunwu.javadb.hbase.entity.PageData;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.Cell;
|
||||||
|
import org.apache.hadoop.hbase.CellUtil;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.client.Connection;
|
||||||
|
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||||
|
import org.apache.hadoop.hbase.client.Delete;
|
||||||
|
import org.apache.hadoop.hbase.client.Durability;
|
||||||
|
import org.apache.hadoop.hbase.client.Get;
|
||||||
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
|
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||||
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
|
import org.apache.hadoop.hbase.client.Table;
|
||||||
|
import org.apache.hadoop.hbase.filter.Filter;
|
||||||
|
import org.apache.hadoop.hbase.filter.FilterList;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
|
||||||
|
import java.io.Closeable;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.lang.reflect.Field;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.LinkedHashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* HBase CRUD 工具类
|
||||||
|
*
|
||||||
|
* @author <a href="mailto:forbreak@163.com">Zhang Peng</a>
|
||||||
|
* @date 2023-03-27
|
||||||
|
*/
|
||||||
|
@Slf4j
|
||||||
|
public class HBaseHelper implements Closeable {
|
||||||
|
|
||||||
|
private final Connection connection;
|
||||||
|
private final Configuration configuration;
|
||||||
|
|
||||||
|
protected HBaseHelper(Configuration configuration) throws IOException {
|
||||||
|
this.configuration = configuration;
|
||||||
|
this.connection = ConnectionFactory.createConnection(configuration);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static synchronized HBaseHelper newInstance(Configuration configuration) throws IOException {
|
||||||
|
if (configuration == null) {
|
||||||
|
throw new IllegalArgumentException("configuration can not be null!");
|
||||||
|
}
|
||||||
|
return new HBaseHelper(configuration);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 关闭内部持有的 HBase Connection 实例
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public synchronized void close() {
|
||||||
|
if (null == connection || connection.isClosed()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
IoUtil.close(connection);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 获取 HBase 连接实例
|
||||||
|
*
|
||||||
|
* @return /
|
||||||
|
*/
|
||||||
|
public Connection getConnection() {
|
||||||
|
if (null == connection) {
|
||||||
|
throw new RuntimeException("HBase connection init failed...");
|
||||||
|
}
|
||||||
|
return connection;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 获取 HBase 配置
|
||||||
|
*
|
||||||
|
* @return /
|
||||||
|
*/
|
||||||
|
public Configuration getConfiguration() {
|
||||||
|
return configuration;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 获取 {@link Table} 实例
|
||||||
|
*
|
||||||
|
* @param tableName 表名
|
||||||
|
* @return /
|
||||||
|
*/
|
||||||
|
public Table getTable(String tableName) throws Exception {
|
||||||
|
return getTable(TableName.valueOf(tableName));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 获取 {@link Table} 实例
|
||||||
|
*
|
||||||
|
* @param tableName 表名
|
||||||
|
* @return /
|
||||||
|
*/
|
||||||
|
|
||||||
|
public synchronized Table getTable(TableName tableName) throws Exception {
|
||||||
|
return connection.getTable(tableName);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void put(TableName tableName, String row, String family, String column, String value) throws Exception {
|
||||||
|
put(tableName, row, family, null, column, value);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void put(TableName tableName, String row, String family, Long timestamp, String column, String value)
|
||||||
|
throws Exception {
|
||||||
|
Table table = getTable(tableName);
|
||||||
|
try {
|
||||||
|
Put put = new Put(Bytes.toBytes(row));
|
||||||
|
if (timestamp != null) {
|
||||||
|
put.addColumn(Bytes.toBytes(family), Bytes.toBytes(column), timestamp, Bytes.toBytes(value));
|
||||||
|
} else {
|
||||||
|
put.addColumn(Bytes.toBytes(family), Bytes.toBytes(column), Bytes.toBytes(value));
|
||||||
|
}
|
||||||
|
table.put(put);
|
||||||
|
} finally {
|
||||||
|
recycle(table);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void put(TableName tableName, String row, String family, Object obj) throws Exception {
|
||||||
|
put(tableName, row, family, null, obj);
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
public void put(TableName tableName, String row, String family, Long timestamp, Object obj) throws Exception {
|
||||||
|
Map<String, Object> map;
|
||||||
|
if (obj instanceof Map) {
|
||||||
|
map = (Map<String, Object>) obj;
|
||||||
|
} else {
|
||||||
|
map = BeanUtil.beanToMap(obj);
|
||||||
|
}
|
||||||
|
put(tableName, row, family, timestamp, map);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void put(TableName tableName, String row, String family, Map<String, Object> columnMap) throws Exception {
|
||||||
|
put(tableName, row, family, null, columnMap);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void put(TableName tableName, String row, String family, Long timestamp, Map<String, Object> columnMap)
|
||||||
|
throws Exception {
|
||||||
|
Put put = new Put(Bytes.toBytes(row));
|
||||||
|
columnMap.forEach((column, value) -> {
|
||||||
|
if (value != null) {
|
||||||
|
if (timestamp != null) {
|
||||||
|
put.addColumn(Bytes.toBytes(family), Bytes.toBytes(column), timestamp,
|
||||||
|
Bytes.toBytes(String.valueOf(value)));
|
||||||
|
} else {
|
||||||
|
put.addColumn(Bytes.toBytes(family), Bytes.toBytes(column), Bytes.toBytes(String.valueOf(value)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
Table table = getTable(tableName);
|
||||||
|
try {
|
||||||
|
table.put(put);
|
||||||
|
} finally {
|
||||||
|
recycle(table);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void put(TableName tableName, String row, Long timestamp, Map<String, Map<String, Object>> familyMap)
|
||||||
|
throws Exception {
|
||||||
|
Put put = new Put(Bytes.toBytes(row));
|
||||||
|
for (Map.Entry<String, Map<String, Object>> e : familyMap.entrySet()) {
|
||||||
|
String family = e.getKey();
|
||||||
|
Map<String, Object> columnMap = e.getValue();
|
||||||
|
if (MapUtil.isNotEmpty(columnMap)) {
|
||||||
|
for (Map.Entry<String, Object> entry : columnMap.entrySet()) {
|
||||||
|
String column = entry.getKey();
|
||||||
|
Object value = entry.getValue();
|
||||||
|
|
||||||
|
if (ObjectUtil.isEmpty(value)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (timestamp != null) {
|
||||||
|
put.addColumn(Bytes.toBytes(family), Bytes.toBytes(column), timestamp,
|
||||||
|
Bytes.toBytes(String.valueOf(value)));
|
||||||
|
} else {
|
||||||
|
put.addColumn(Bytes.toBytes(family), Bytes.toBytes(column),
|
||||||
|
Bytes.toBytes(String.valueOf(value)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Table table = getTable(tableName);
|
||||||
|
try {
|
||||||
|
table.put(put);
|
||||||
|
} finally {
|
||||||
|
recycle(table);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void deleteRow(TableName tableName, String row) throws Exception {
|
||||||
|
Delete delete = new Delete(Bytes.toBytes(row));
|
||||||
|
Table table = getTable(tableName);
|
||||||
|
try {
|
||||||
|
table.delete(delete);
|
||||||
|
} finally {
|
||||||
|
recycle(table);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public long incrementColumnValue(TableName tableName, String row, String family, String column, long amount)
|
||||||
|
throws Exception {
|
||||||
|
return incrementColumnValue(tableName, row, family, column, amount, Durability.SYNC_WAL);
|
||||||
|
}
|
||||||
|
|
||||||
|
public long incrementColumnValue(TableName tableName, String row, String family, String column, long amount,
|
||||||
|
Durability durability) throws Exception {
|
||||||
|
Table table = getTable(tableName);
|
||||||
|
try {
|
||||||
|
return table.incrementColumnValue(Bytes.toBytes(row), Bytes.toBytes(family), Bytes.toBytes(column), amount,
|
||||||
|
durability);
|
||||||
|
} finally {
|
||||||
|
recycle(table);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void dump(TableName tableName, String[] rows, String[] families, String[] columns) throws Exception {
|
||||||
|
|
||||||
|
List<Get> gets = new ArrayList<>();
|
||||||
|
for (String row : rows) {
|
||||||
|
Get get = new Get(Bytes.toBytes(row));
|
||||||
|
if (families != null) {
|
||||||
|
for (String family : families) {
|
||||||
|
for (String column : columns) {
|
||||||
|
get.addColumn(Bytes.toBytes(family), Bytes.toBytes(column));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
gets.add(get);
|
||||||
|
}
|
||||||
|
|
||||||
|
Table table = getTable(tableName);
|
||||||
|
try {
|
||||||
|
Result[] results = table.get(gets);
|
||||||
|
for (Result result : results) {
|
||||||
|
for (Cell cell : result.rawCells()) {
|
||||||
|
System.out.println(
|
||||||
|
"Cell: " + cell + ", Value: " + Bytes.toString(cell.getValueArray(), cell.getValueOffset(),
|
||||||
|
cell.getValueLength()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
recycle(table);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void dump(TableName tableName) throws Exception {
|
||||||
|
Table table = getTable(tableName);
|
||||||
|
try {
|
||||||
|
ResultScanner scanner = table.getScanner(new Scan());
|
||||||
|
for (Result result : scanner) {
|
||||||
|
dumpResult(result);
|
||||||
|
}
|
||||||
|
scanner.close();
|
||||||
|
} finally {
|
||||||
|
recycle(table);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 指定行、列族、列,返回相应单元中的值
|
||||||
|
*
|
||||||
|
* @param tableName 表名
|
||||||
|
* @param row 指定行
|
||||||
|
* @param family 列族
|
||||||
|
* @param column 列
|
||||||
|
* @return /
|
||||||
|
*/
|
||||||
|
public String getColumn(TableName tableName, String row, String family, String column) throws Exception {
|
||||||
|
Get get = new Get(Bytes.toBytes(row));
|
||||||
|
Table table = getTable(tableName);
|
||||||
|
try {
|
||||||
|
Result result = table.get(get);
|
||||||
|
return Bytes.toString(result.getValue(Bytes.toBytes(family), Bytes.toBytes(column)));
|
||||||
|
} finally {
|
||||||
|
recycle(table);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 返回指定行、列族,列的数据,以实体 {@link T} 形式返回数据
|
||||||
|
*
|
||||||
|
* @param tableName 表名
|
||||||
|
* @param row 指定行
|
||||||
|
* @param family 列族
|
||||||
|
* @param clazz 返回实体类型
|
||||||
|
* @param <T> 实体类型
|
||||||
|
* @return /
|
||||||
|
*/
|
||||||
|
public <T> T getFamilyMap(TableName tableName, String row, String family, Class<T> clazz) throws Exception {
|
||||||
|
Map<String, Field> fieldMap = ReflectUtil.getFieldMap(clazz);
|
||||||
|
Set<String> columns = fieldMap.keySet();
|
||||||
|
Map<String, String> map = getFamilyMap(tableName, row, family, columns);
|
||||||
|
if (MapUtil.isEmpty(map)) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
return BeanUtil.mapToBean(map, clazz, true, CopyOptions.create().ignoreError());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 返回指定行、列族,列的数据,并以 {@link Map} 形式返回
|
||||||
|
*
|
||||||
|
* @param tableName 表名
|
||||||
|
* @param row 指定行
|
||||||
|
* @param family 列族
|
||||||
|
* @param columns 指定列
|
||||||
|
* @return /
|
||||||
|
*/
|
||||||
|
public Map<String, String> getFamilyMap(TableName tableName, String row, String family,
|
||||||
|
Collection<String> columns) throws Exception {
|
||||||
|
|
||||||
|
if (CollectionUtil.isEmpty(columns)) {
|
||||||
|
return getFamilyMap(tableName, row, family);
|
||||||
|
}
|
||||||
|
|
||||||
|
List<Get> gets = new ArrayList<>();
|
||||||
|
Get get = new Get(Bytes.toBytes(row));
|
||||||
|
for (String column : columns) {
|
||||||
|
get.addColumn(Bytes.toBytes(family), Bytes.toBytes(column));
|
||||||
|
}
|
||||||
|
gets.add(get);
|
||||||
|
|
||||||
|
Table table = getTable(tableName);
|
||||||
|
try {
|
||||||
|
Result[] results = table.get(gets);
|
||||||
|
Map<String, String> map = new HashMap<>(columns.size());
|
||||||
|
for (Result result : results) {
|
||||||
|
for (String column : columns) {
|
||||||
|
String value = Bytes.toString(result.getValue(Bytes.toBytes(family), Bytes.toBytes(column)));
|
||||||
|
map.put(column, value);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return map;
|
||||||
|
} finally {
|
||||||
|
recycle(table);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 返回指定行、列族的所有列数据,并以 {@link Map} 形式返回
|
||||||
|
*
|
||||||
|
* @param tableName 表名
|
||||||
|
* @param row 指定行
|
||||||
|
* @param family 列族
|
||||||
|
* @return /
|
||||||
|
*/
|
||||||
|
public Map<String, String> getFamilyMap(TableName tableName, String row, String family) throws Exception {
|
||||||
|
List<Get> gets = new ArrayList<>();
|
||||||
|
Get get = new Get(Bytes.toBytes(row));
|
||||||
|
gets.add(get);
|
||||||
|
Table table = getTable(tableName);
|
||||||
|
try {
|
||||||
|
Result[] results = table.get(gets);
|
||||||
|
Map<String, Map<String, String>> familyColumnMap = getAllFamilyMap(results, row);
|
||||||
|
return familyColumnMap.get(family);
|
||||||
|
} finally {
|
||||||
|
recycle(table);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 指定多个row进行批量查询
|
||||||
|
*
|
||||||
|
* @param tableName
|
||||||
|
* @param rows
|
||||||
|
* @param family
|
||||||
|
* @param columns
|
||||||
|
* @return
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
public Map<String, Map<String, String>> getFamilyMapInRows(TableName tableName, List<String> rows, String family,
|
||||||
|
Collection<String> columns) throws Exception {
|
||||||
|
|
||||||
|
List<Get> gets = new ArrayList<>();
|
||||||
|
for (String row : rows) {
|
||||||
|
Get get = new Get(Bytes.toBytes(row));
|
||||||
|
for (String column : columns) {
|
||||||
|
get.addColumn(Bytes.toBytes(family), Bytes.toBytes(column));
|
||||||
|
}
|
||||||
|
gets.add(get);
|
||||||
|
}
|
||||||
|
Table table = getTable(tableName);
|
||||||
|
try {
|
||||||
|
Result[] results = table.get(gets);
|
||||||
|
Map<String, Map<String, String>> resultMap = new LinkedHashMap<>(gets.size());
|
||||||
|
for (Result result : results) {
|
||||||
|
Map<String, String> map = new HashMap<>(columns.size());
|
||||||
|
for (String column : columns) {
|
||||||
|
String value = Bytes.toString(result.getValue(Bytes.toBytes(family), Bytes.toBytes(column)));
|
||||||
|
map.put(column, value);
|
||||||
|
}
|
||||||
|
resultMap.put(Bytes.toString(result.getRow()), map);
|
||||||
|
}
|
||||||
|
return resultMap;
|
||||||
|
} finally {
|
||||||
|
recycle(table);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 返回指定行、列族,列的数据,并以 {@link Map} 形式返回
|
||||||
|
*
|
||||||
|
* @param tableName 表名
|
||||||
|
* @param row 指定行
|
||||||
|
* @param familyColumns <列族, 要查询的列>
|
||||||
|
* @return /
|
||||||
|
*/
|
||||||
|
public Map<String, Map<String, String>> getMultiFamilyMap(TableName tableName, String row,
|
||||||
|
Map<String, Collection<String>> familyColumns) throws Exception {
|
||||||
|
|
||||||
|
if (MapUtil.isEmpty(familyColumns)) {
|
||||||
|
return getMultiFamilyMap(tableName, row);
|
||||||
|
}
|
||||||
|
|
||||||
|
List<Get> gets = new ArrayList<>();
|
||||||
|
Get get = new Get(Bytes.toBytes(row));
|
||||||
|
for (Map.Entry<String, Collection<String>> entry : familyColumns.entrySet()) {
|
||||||
|
String family = entry.getKey();
|
||||||
|
Collection<String> columns = entry.getValue();
|
||||||
|
if (CollectionUtil.isNotEmpty(columns)) {
|
||||||
|
for (String column : columns) {
|
||||||
|
get.addColumn(Bytes.toBytes(family), Bytes.toBytes(column));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
gets.add(get);
|
||||||
|
|
||||||
|
Table table = getTable(tableName);
|
||||||
|
try {
|
||||||
|
Result[] results = table.get(gets);
|
||||||
|
Map<String, Map<String, String>> map = new HashMap<>(familyColumns.size());
|
||||||
|
for (Result result : results) {
|
||||||
|
if (result == null || result.isEmpty()) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
familyColumns.forEach((family, columns) -> {
|
||||||
|
Map<String, String> kvMap = new HashMap<>();
|
||||||
|
if (CollectionUtil.isNotEmpty(columns)) {
|
||||||
|
for (String column : columns) {
|
||||||
|
String value =
|
||||||
|
Bytes.toString(result.getValue(Bytes.toBytes(family), Bytes.toBytes(column)));
|
||||||
|
kvMap.put(column, value);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
map.put(family, kvMap);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
return map;
|
||||||
|
} finally {
|
||||||
|
recycle(table);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 返回指定行所有列族的数据,并以 {@link Map} 形式返回
|
||||||
|
*
|
||||||
|
* @param tableName 表名
|
||||||
|
* @param row 指定行
|
||||||
|
* @return /
|
||||||
|
*/
|
||||||
|
public Map<String, Map<String, String>> getMultiFamilyMap(TableName tableName, String row) throws Exception {
|
||||||
|
|
||||||
|
List<Get> gets = new ArrayList<>();
|
||||||
|
Get get = new Get(Bytes.toBytes(row));
|
||||||
|
gets.add(get);
|
||||||
|
|
||||||
|
Table table = getTable(tableName);
|
||||||
|
try {
|
||||||
|
Result[] results = table.get(gets);
|
||||||
|
return getAllFamilyMap(results, row);
|
||||||
|
} finally {
|
||||||
|
recycle(table);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private Map<String, Map<String, String>> getAllFamilyMap(Result[] results, String row) {
|
||||||
|
Map<String, Map<String, Map<String, String>>> rowFamilyColumnMap = getAllFamilyMapInRows(results);
|
||||||
|
if (MapUtil.isEmpty(rowFamilyColumnMap)) {
|
||||||
|
return new HashMap<>(0);
|
||||||
|
}
|
||||||
|
return rowFamilyColumnMap.get(row);
|
||||||
|
}
|
||||||
|
|
||||||
|
public HBaseRowData getRowData(TableName tableName, String row, Map<String, Collection<String>> familyColumns)
|
||||||
|
throws Exception {
|
||||||
|
Map<String, Map<String, String>> familyColumnMap = getMultiFamilyMap(tableName, row, familyColumns);
|
||||||
|
return HBaseRowData.buildByMap(row, null, familyColumnMap);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 指定起止行、列族、多个列、{@link Filter},进行范围查询
|
||||||
|
*
|
||||||
|
* @param tableName 表
|
||||||
|
* @param startRow 起始行
|
||||||
|
* @param stopRow 结束行
|
||||||
|
* @param stopRow rowInclude 控制范围闭合条件[startRowInclude, endRowInclude],默认左闭右开
|
||||||
|
* @param family 列族
|
||||||
|
* @param columns 将要返回的列(未指定的列不会返回)
|
||||||
|
* @param operator filter执行条件
|
||||||
|
* @param filters {@link Filter} 实体
|
||||||
|
* @return 一级 Map 的 key 是 Row Key;二级 Map 的 key 是列,value 是列值
|
||||||
|
*/
|
||||||
|
public Map<String, Map<String, String>> scanFamilyMap(TableName tableName, String startRow, String stopRow,
|
||||||
|
boolean[] rowInclude, String family, Collection<String> columns, FilterList.Operator operator,
|
||||||
|
Filter... filters) throws Exception {
|
||||||
|
Scan scan = new Scan();
|
||||||
|
fillColumnsToScan(family, columns, scan);
|
||||||
|
boolean startRowInclude = true, endRowInclude = false;
|
||||||
|
if (null != rowInclude || rowInclude.length == 2) {
|
||||||
|
startRowInclude = rowInclude[0];
|
||||||
|
endRowInclude = rowInclude[1];
|
||||||
|
}
|
||||||
|
scan.withStartRow(Bytes.toBytes(startRow), startRowInclude);
|
||||||
|
scan.withStopRow(Bytes.toBytes(stopRow), endRowInclude);
|
||||||
|
if (ArrayUtil.isNotEmpty(filters)) {
|
||||||
|
FilterList filterList = new FilterList(filters);
|
||||||
|
scan.setFilter(filterList);
|
||||||
|
}
|
||||||
|
return scanFamilyMap(tableName, scan, family, columns);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 指定列族、多个列,进行全表范围查询
|
||||||
|
*
|
||||||
|
* @param tableName 表名
|
||||||
|
* @param family 列族
|
||||||
|
* @param columns 将要返回的列(未指定的列不会返回)
|
||||||
|
* @return 一级 Map 的 key 是 Row Key;二级 Map 的 key 是列,value 是列值
|
||||||
|
*/
|
||||||
|
public Map<String, Map<String, String>> scanFamilyMap(TableName tableName, String family,
|
||||||
|
Collection<String> columns)
|
||||||
|
throws Exception {
|
||||||
|
HBaseFamilyRequest request = new HBaseFamilyRequest();
|
||||||
|
request.setFamily(family)
|
||||||
|
.setColumns(columns)
|
||||||
|
.setTableName(tableName.getNameAsString())
|
||||||
|
.setReversed(true);
|
||||||
|
return scanFamilyMap(request);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 指定起止行、列族、多个列,进行范围查询
|
||||||
|
*
|
||||||
|
* @param startRow 起始行
|
||||||
|
* @param stopRow 结束行
|
||||||
|
* @param tableName 表名
|
||||||
|
* @param family 列族
|
||||||
|
* @param columns 将要返回的列(未指定的列不会返回)
|
||||||
|
* @return 一级 Map 的 key 是 Row Key;二级 Map 的 key 是列,value 是列值
|
||||||
|
*/
|
||||||
|
public Map<String, Map<String, String>> scanFamilyMap(TableName tableName, String startRow, String stopRow,
|
||||||
|
String family, Collection<String> columns) throws Exception {
|
||||||
|
HBaseFamilyRequest request = new HBaseFamilyRequest();
|
||||||
|
request.setFamily(family)
|
||||||
|
.setColumns(columns)
|
||||||
|
.setTableName(tableName.getNameAsString())
|
||||||
|
.setStartRow(startRow)
|
||||||
|
.setStopRow(stopRow)
|
||||||
|
.setReversed(true);
|
||||||
|
return scanFamilyMap(request);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 指定列族、多个列、{@link Filter},进行全表范围查询
|
||||||
|
*
|
||||||
|
* @param tableName 表名
|
||||||
|
* @param family 列族
|
||||||
|
* @param columns 将要返回的列(未指定的列不会返回)
|
||||||
|
* @param filter {@link Filter} 实体
|
||||||
|
* @return 一级 Map 的 key 是 Row Key;二级 Map 的 key 是列,value 是列值
|
||||||
|
*/
|
||||||
|
public Map<String, Map<String, String>> scanFamilyMap(TableName tableName, String family,
|
||||||
|
Collection<String> columns,
|
||||||
|
Filter filter) throws Exception {
|
||||||
|
HBaseFamilyRequest request = new HBaseFamilyRequest();
|
||||||
|
request.setFamily(family)
|
||||||
|
.setColumns(columns)
|
||||||
|
.setTableName(tableName.getNameAsString())
|
||||||
|
.setReversed(true)
|
||||||
|
.addFilter(filter);
|
||||||
|
return scanFamilyMap(request);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 指定起止行、列族、多个列、{@link Filter},进行范围查询
|
||||||
|
*
|
||||||
|
* @param startRow 起始行
|
||||||
|
* @param stopRow 结束行
|
||||||
|
* @param tableName 表名
|
||||||
|
* @param family 列族
|
||||||
|
* @param columns 将要返回的列(未指定的列不会返回)
|
||||||
|
* @param filter {@link Filter} 实体
|
||||||
|
* @return 一级 Map 的 key 是 Row Key;二级 Map 的 key 是列,value 是列值
|
||||||
|
*/
|
||||||
|
public Map<String, Map<String, String>> scanFamilyMap(TableName tableName, String startRow, String stopRow,
|
||||||
|
String family, Collection<String> columns, Filter filter) throws Exception {
|
||||||
|
HBaseFamilyRequest request = new HBaseFamilyRequest();
|
||||||
|
request.setFamily(family)
|
||||||
|
.setColumns(columns)
|
||||||
|
.setTableName(tableName.getNameAsString())
|
||||||
|
.setStartRow(startRow)
|
||||||
|
.setStopRow(stopRow)
|
||||||
|
.setReversed(true)
|
||||||
|
.addFilter(filter);
|
||||||
|
return scanFamilyMap(request);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 指定起止写入时间、列族、多个列、{@link Filter},进行范围查询
|
||||||
|
* <p>
|
||||||
|
* 注:根据时间范围查询时,会强制按时序倒序排列
|
||||||
|
*
|
||||||
|
* @param tableName 表名
|
||||||
|
* @param family 列族
|
||||||
|
* @param columns 将要返回的列(未指定的列不会返回)
|
||||||
|
* @param minStamp 起始写入时间
|
||||||
|
* @param maxStamp 结束写入时间
|
||||||
|
* @param filter {@link Filter} 实体
|
||||||
|
* @return 一级 Map 的 key 是 Row Key;二级 Map 的 key 是列,value 是列值
|
||||||
|
*/
|
||||||
|
public Map<String, Map<String, String>> scanFamilyMap(TableName tableName, String family,
|
||||||
|
Collection<String> columns,
|
||||||
|
long minStamp, long maxStamp, Filter filter) throws Exception {
|
||||||
|
HBaseFamilyRequest request = new HBaseFamilyRequest();
|
||||||
|
request.setFamily(family)
|
||||||
|
.setColumns(columns)
|
||||||
|
.setTableName(tableName.getNameAsString())
|
||||||
|
.setMinTimeStamp(minStamp)
|
||||||
|
.setMaxTimeStamp(maxStamp)
|
||||||
|
.setReversed(true)
|
||||||
|
.addFilter(filter);
|
||||||
|
return scanFamilyMap(request);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 返回匹配请求条件的数据、{@link Filter},进行范围查询
|
||||||
|
* <p>
|
||||||
|
* 注:根据时间范围查询时,会强制按时序倒序排列
|
||||||
|
*
|
||||||
|
* @param request {@link HBaseFamilyRequest} 请求条件
|
||||||
|
* @return 一级 Map 的 key 是 Row Key;二级 Map 的 key 是列,value 是列值
|
||||||
|
*/
|
||||||
|
public Map<String, Map<String, String>> scanFamilyMap(HBaseFamilyRequest request) throws Exception {
|
||||||
|
return scanFamilyMap(TableName.valueOf(request.getTableName()),
|
||||||
|
request.toScan(), request.getFamily(), request.getColumns());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 指定列族、多个列、{@link Scan},进行范围查询
|
||||||
|
*
|
||||||
|
* @param tableName 表名
|
||||||
|
* @param scan {@link Scan} 实体
|
||||||
|
* @param family 列族
|
||||||
|
* @param columns 将要返回的列(未指定的列不会返回)
|
||||||
|
* @return 一级 Map 的 key 是 Row Key;二级 Map 的 key 是列,value 是列值
|
||||||
|
*/
|
||||||
|
public Map<String, Map<String, String>> scanFamilyMap(TableName tableName, Scan scan,
|
||||||
|
String family, Collection<String> columns) throws Exception {
|
||||||
|
|
||||||
|
Table table = getTable(tableName);
|
||||||
|
ResultScanner scanner = null;
|
||||||
|
try {
|
||||||
|
scanner = table.getScanner(scan);
|
||||||
|
Map<String, Map<String, String>> map = new LinkedHashMap<>();
|
||||||
|
for (Result result : scanner) {
|
||||||
|
Map<String, String> columnMap = new HashMap<>(columns.size());
|
||||||
|
for (String column : columns) {
|
||||||
|
String value = Bytes.toString(result.getValue(Bytes.toBytes(family), Bytes.toBytes(column)));
|
||||||
|
columnMap.put(column, value);
|
||||||
|
}
|
||||||
|
map.put(Bytes.toString(result.getRow()), columnMap);
|
||||||
|
}
|
||||||
|
return map;
|
||||||
|
} finally {
|
||||||
|
IoUtil.close(scanner);
|
||||||
|
recycle(table);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void fillColumnsToScan(String family, Collection<String> columns, Scan scan) {
|
||||||
|
if (StrUtil.isNotBlank(family) && CollectionUtil.isNotEmpty(columns)) {
|
||||||
|
for (String column : columns) {
|
||||||
|
scan.addColumn(Bytes.toBytes(family), Bytes.toBytes(column));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 指定多个列族,每个列族包含的列、{@link Scan},进行范围查询
|
||||||
|
*
|
||||||
|
* @param tableName 表名
|
||||||
|
* @param scan {@link Scan} 实体
|
||||||
|
* @param familyColumns 列族, 列族所包含的列
|
||||||
|
* @return 一级 Map 的 key 是 Row Key;二级 Map 的 key 是列族;三级 Map 的 key 是列,value 是列值
|
||||||
|
*/
|
||||||
|
public Map<String, Map<String, Map<String, String>>> scanMultiFamilyMap(TableName tableName, Scan scan,
|
||||||
|
Map<String, Collection<String>> familyColumns) throws Exception {
|
||||||
|
|
||||||
|
if (MapUtil.isEmpty(familyColumns)) {
|
||||||
|
return scanMultiFamilyMap(tableName, scan);
|
||||||
|
}
|
||||||
|
|
||||||
|
Table table = getTable(tableName);
|
||||||
|
ResultScanner scanner = null;
|
||||||
|
try {
|
||||||
|
scanner = table.getScanner(scan);
|
||||||
|
Map<String, Map<String, Map<String, String>>> familyKvDataMap = new LinkedHashMap<>();
|
||||||
|
for (Result result : scanner) {
|
||||||
|
Map<String, Map<String, String>> familyMap = new HashMap<>();
|
||||||
|
familyColumns.forEach((family, columns) -> {
|
||||||
|
Map<String, String> columnMap = new HashMap<>();
|
||||||
|
if (CollectionUtil.isNotEmpty(columns)) {
|
||||||
|
for (String column : columns) {
|
||||||
|
String value = Bytes.toString(result.getValue(Bytes.toBytes(family),
|
||||||
|
Bytes.toBytes(column)));
|
||||||
|
columnMap.put(column, value);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
familyMap.put(family, columnMap);
|
||||||
|
});
|
||||||
|
familyKvDataMap.put(Bytes.toString(result.getRow()), familyMap);
|
||||||
|
}
|
||||||
|
return familyKvDataMap;
|
||||||
|
} finally {
|
||||||
|
IoUtil.close(scanner);
|
||||||
|
recycle(table);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 返回匹配 {@link Scan} 的所有列族的数据
|
||||||
|
*
|
||||||
|
* @param tableName 表名
|
||||||
|
* @param scan {@link Scan} 实体
|
||||||
|
* @return 一级 Map 的 key 是 Row Key;二级 Map 的 key 是列族;三级 Map 的 key 是列,value 是列值
|
||||||
|
*/
|
||||||
|
public Map<String, Map<String, Map<String, String>>> scanMultiFamilyMap(TableName tableName, Scan scan)
|
||||||
|
throws Exception {
|
||||||
|
Table table = getTable(tableName);
|
||||||
|
ResultScanner scanner = null;
|
||||||
|
try {
|
||||||
|
scanner = table.getScanner(scan);
|
||||||
|
Result[] results = ArrayUtil.toArray(scanner, Result.class);
|
||||||
|
return getAllFamilyMapInRows(results);
|
||||||
|
} finally {
|
||||||
|
IoUtil.close(scanner);
|
||||||
|
recycle(table);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public Map<String, Map<String, Map<String, String>>> scanMultiFamilyMap(HBaseMultiFamilyRequest request)
|
||||||
|
throws Exception {
|
||||||
|
return scanMultiFamilyMap(TableName.valueOf(request.getTableName()), request.toScan(),
|
||||||
|
request.getFamilyColumns());
|
||||||
|
}
|
||||||
|
|
||||||
|
private Map<String, Map<String, Map<String, String>>> getAllFamilyMapInRows(Result[] results) {
|
||||||
|
Map<String, Map<String, Map<String, String>>> rowFamilyColumnMap = new HashMap<>();
|
||||||
|
for (Result result : results) {
|
||||||
|
if (result == null || result.isEmpty()) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
Map<String, Map<String, String>> familyColumnMap = new HashMap<>();
|
||||||
|
for (Cell cell : result.listCells()) {
|
||||||
|
String family = Bytes.toString(CellUtil.cloneFamily(cell));
|
||||||
|
if (!familyColumnMap.containsKey(family)) {
|
||||||
|
familyColumnMap.put(family, new HashMap<>());
|
||||||
|
}
|
||||||
|
String column = Bytes.toString(CellUtil.cloneQualifier(cell));
|
||||||
|
String value = Bytes.toString(CellUtil.cloneValue(cell));
|
||||||
|
familyColumnMap.get(family).put(column, value);
|
||||||
|
}
|
||||||
|
rowFamilyColumnMap.put(Bytes.toString(result.getRow()), familyColumnMap);
|
||||||
|
}
|
||||||
|
return rowFamilyColumnMap;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 扫描(scan)一个列族的数据,并返回列表记录
|
||||||
|
*
|
||||||
|
* @param request 单列族请求
|
||||||
|
* @return /
|
||||||
|
*/
|
||||||
|
public List<HBaseRowData> listRowData(HBaseFamilyRequest request) throws Exception {
|
||||||
|
Map<String, Map<String, String>> rowColumnMap = scanFamilyMap(request);
|
||||||
|
return HBaseRowData.toRowList(request.getFamily(), rowColumnMap);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 扫描(scan)多个列族的数据,并返回列表记录
|
||||||
|
*
|
||||||
|
* @param request 多列族请求
|
||||||
|
* @return /
|
||||||
|
*/
|
||||||
|
public List<HBaseRowData> listRowData(HBaseMultiFamilyRequest request) throws Exception {
|
||||||
|
Map<String, Map<String, Map<String, String>>> map = scanMultiFamilyMap(request);
|
||||||
|
return HBaseRowData.toRowList(map);
|
||||||
|
}
|
||||||
|
|
||||||
|
public PageData<HBaseRowData> pageRowData(HBaseMultiFamilyRequest request) throws Exception {
|
||||||
|
return pageRowData(TableName.valueOf(request.getTableName()), request.getFamilyColumns(),
|
||||||
|
request.getPageNo(), request.getPageSize(), request.toScan());
|
||||||
|
}
|
||||||
|
|
||||||
|
public PageData<HBaseRowData> pageRowData(TableName tableName,
|
||||||
|
Map<String, Collection<String>> familyColumns, Integer pageNo, Integer pageSize, Scan scan) throws Exception {
|
||||||
|
|
||||||
|
Table table = getTable(tableName);
|
||||||
|
Map<String, Map<String, Map<String, String>>> rowMap = new HashMap<>();
|
||||||
|
|
||||||
|
int page = 1;
|
||||||
|
byte[] lastRow = null;
|
||||||
|
long total = 0L;
|
||||||
|
while (true) {
|
||||||
|
if (lastRow != null) {
|
||||||
|
scan.withStartRow(lastRow, false);
|
||||||
|
}
|
||||||
|
ResultScanner rs = table.getScanner(scan);
|
||||||
|
Iterator<Result> it = rs.iterator();
|
||||||
|
int count = 0;
|
||||||
|
while (it.hasNext()) {
|
||||||
|
Result result = it.next();
|
||||||
|
if (pageNo == page) {
|
||||||
|
fillRowMap(result, familyColumns, rowMap);
|
||||||
|
}
|
||||||
|
lastRow = result.getRow();
|
||||||
|
count++;
|
||||||
|
}
|
||||||
|
|
||||||
|
page++;
|
||||||
|
rs.close();
|
||||||
|
total += count;
|
||||||
|
if (count == 0) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
recycle(table);
|
||||||
|
List<HBaseRowData> content = HBaseRowData.toRowList(rowMap);
|
||||||
|
return new PageData<>(pageNo, pageSize, total, content);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void fillRowMap(Result result, Map<String, Collection<String>> familyColumns,
|
||||||
|
Map<String, Map<String, Map<String, String>>> rowMap) {
|
||||||
|
|
||||||
|
String row = Bytes.toString(result.getRow());
|
||||||
|
if (row == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
Map<String, Map<String, String>> familyMap;
|
||||||
|
if (MapUtil.isEmpty(familyColumns)) {
|
||||||
|
familyMap = new HashMap<>();
|
||||||
|
for (Cell cell : result.listCells()) {
|
||||||
|
String family = Bytes.toString(CellUtil.cloneFamily(cell));
|
||||||
|
if (!familyMap.containsKey(family)) {
|
||||||
|
familyMap.put(family, new HashMap<>());
|
||||||
|
}
|
||||||
|
String column = Bytes.toString(CellUtil.cloneQualifier(cell));
|
||||||
|
String value = Bytes.toString(CellUtil.cloneValue(cell));
|
||||||
|
familyMap.get(family).put(column, value);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
familyMap = new HashMap<>(familyColumns.size());
|
||||||
|
familyColumns.forEach((family, columns) -> {
|
||||||
|
if (CollectionUtil.isNotEmpty(columns)) {
|
||||||
|
Map<String, String> columnMap = new HashMap<>(columns.size());
|
||||||
|
for (String column : columns) {
|
||||||
|
String value = Bytes.toString(result.getValue(Bytes.toBytes(family), Bytes.toBytes(column)));
|
||||||
|
columnMap.put(column, value);
|
||||||
|
}
|
||||||
|
familyMap.put(family, columnMap);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
rowMap.put(row, familyMap);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void dumpResult(Result result) {
|
||||||
|
for (Cell cell : result.rawCells()) {
|
||||||
|
String msg = StrUtil.format("Cell: {}, Value: {}", cell,
|
||||||
|
Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
|
||||||
|
System.out.println(msg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void recycle(Table table) {
|
||||||
|
if (null == table) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
IoUtil.close(table);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -1,84 +0,0 @@
|
||||||
package io.github.dunwu.javadb.hbase;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* HBase Cell 实体
|
|
||||||
* @author Zhang Peng
|
|
||||||
* @since 2019-03-04
|
|
||||||
*/
|
|
||||||
public class HbaseCellEntity {
|
|
||||||
|
|
||||||
private String table;
|
|
||||||
|
|
||||||
private String row;
|
|
||||||
|
|
||||||
private String colFamily;
|
|
||||||
|
|
||||||
private String col;
|
|
||||||
|
|
||||||
private String val;
|
|
||||||
|
|
||||||
public HbaseCellEntity() {
|
|
||||||
}
|
|
||||||
|
|
||||||
public HbaseCellEntity(String row, String colFamily, String col, String val) {
|
|
||||||
this.row = row;
|
|
||||||
this.colFamily = colFamily;
|
|
||||||
this.col = col;
|
|
||||||
this.val = val;
|
|
||||||
}
|
|
||||||
|
|
||||||
public HbaseCellEntity(String table, String row, String colFamily, String col, String val) {
|
|
||||||
this.table = table;
|
|
||||||
this.row = row;
|
|
||||||
this.colFamily = colFamily;
|
|
||||||
this.col = col;
|
|
||||||
this.val = val;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getTable() {
|
|
||||||
return table;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setTable(String table) {
|
|
||||||
this.table = table;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getRow() {
|
|
||||||
return row;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setRow(String row) {
|
|
||||||
this.row = row;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getColFamily() {
|
|
||||||
return colFamily;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setColFamily(String colFamily) {
|
|
||||||
this.colFamily = colFamily;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getCol() {
|
|
||||||
return col;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setCol(String col) {
|
|
||||||
this.col = col;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getVal() {
|
|
||||||
return val;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setVal(String val) {
|
|
||||||
this.val = val;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String toString() {
|
|
||||||
return "HbaseCellEntity{" + "table='" + table + '\'' + ", row='" + row + '\'' + ", colFamily='" + colFamily
|
|
||||||
+ '\'' + ", col='" + col + '\'' + ", val='" + val + '\'' + '}';
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -1,369 +0,0 @@
|
||||||
package io.github.dunwu.javadb.hbase;
|
|
||||||
|
|
||||||
import io.github.dunwu.tool.util.PropertiesUtil;
|
|
||||||
import org.apache.commons.lang.StringUtils;
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hadoop.hbase.*;
|
|
||||||
import org.apache.hadoop.hbase.client.*;
|
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Properties;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* HBase 服务实现类
|
|
||||||
* @author Zhang Peng
|
|
||||||
* @since 2019-03-01
|
|
||||||
*/
|
|
||||||
public class HbaseHelper {
|
|
||||||
|
|
||||||
private static final String FIRST_CONFIG = "classpath://config//hbase.properties";
|
|
||||||
|
|
||||||
private static final String SECOND_CONFIG = "classpath://application.properties";
|
|
||||||
|
|
||||||
private HbaseProperties hbaseProperties;
|
|
||||||
|
|
||||||
private Connection connection;
|
|
||||||
|
|
||||||
public HbaseHelper() throws Exception {
|
|
||||||
// 初始化参数
|
|
||||||
Properties properties = loadConfigFile();
|
|
||||||
if (properties == null) {
|
|
||||||
throw new Exception("读取 Hbase 配置失败,无法建立连接");
|
|
||||||
}
|
|
||||||
Boolean enable = PropertiesUtil.getBoolean(properties, HBaseConstant.HBASE_ENABLE.key(), true);
|
|
||||||
if (!enable) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
String quorum = PropertiesUtil.getString(properties, HBaseConstant.HBASE_ZOOKEEPER_QUORUM.key(), "");
|
|
||||||
String hbaseMaster = PropertiesUtil.getString(properties, HBaseConstant.HBASE_MASTER.key(), "");
|
|
||||||
String clientPort =
|
|
||||||
PropertiesUtil.getString(properties, HBaseConstant.HBASE_ZOOKEEPER_PROPERTY_CLIENTPORT.key(), "");
|
|
||||||
String znodeParent = PropertiesUtil.getString(properties, HBaseConstant.ZOOKEEPER_ZNODE_PARENT.key(), "");
|
|
||||||
String maxThreads = PropertiesUtil.getString(properties, HBaseConstant.HBASE_HCONNECTION_THREADS_MAX.key(), "");
|
|
||||||
String coreThreads =
|
|
||||||
PropertiesUtil.getString(properties, HBaseConstant.HBASE_HCONNECTION_THREADS_CORE.key(), "");
|
|
||||||
String columnFamily = PropertiesUtil.getString(properties, HBaseConstant.HBASE_COLUMN_FAMILY.key(), "");
|
|
||||||
String hbaseExecutorsNum = PropertiesUtil.getString(properties, HBaseConstant.HBASE_EXECUTOR_NUM.key(), "10");
|
|
||||||
String ipcPoolSize = PropertiesUtil.getString(properties, HBaseConstant.HBASE_IPC_POOL_SIZE.key(), "1");
|
|
||||||
|
|
||||||
hbaseProperties =
|
|
||||||
new HbaseProperties(hbaseMaster, quorum, clientPort, znodeParent, maxThreads, coreThreads, columnFamily,
|
|
||||||
hbaseExecutorsNum, ipcPoolSize);
|
|
||||||
init(hbaseProperties);
|
|
||||||
}
|
|
||||||
|
|
||||||
private Properties loadConfigFile() {
|
|
||||||
Properties properties = null;
|
|
||||||
try {
|
|
||||||
properties = PropertiesUtil.loadFromFile(FIRST_CONFIG);
|
|
||||||
} catch (Exception e) {
|
|
||||||
e.printStackTrace();
|
|
||||||
}
|
|
||||||
|
|
||||||
if (properties == null) {
|
|
||||||
try {
|
|
||||||
properties = PropertiesUtil.loadFromFile(SECOND_CONFIG);
|
|
||||||
} catch (Exception e) {
|
|
||||||
e.printStackTrace();
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return properties;
|
|
||||||
}
|
|
||||||
|
|
||||||
private void init(HbaseProperties hbaseProperties) throws Exception {
|
|
||||||
try {
|
|
||||||
// @formatter:off
|
|
||||||
Configuration configuration = HBaseConfiguration.create();
|
|
||||||
configuration.set(HBaseConstant.HBASE_ZOOKEEPER_QUORUM.key(), hbaseProperties.getQuorum());
|
|
||||||
configuration.set(HBaseConstant.HBASE_MASTER.key(), hbaseProperties.getHbaseMaster());
|
|
||||||
configuration.set(HBaseConstant.HBASE_ZOOKEEPER_PROPERTY_CLIENTPORT.key(),
|
|
||||||
hbaseProperties.getClientPort());
|
|
||||||
configuration.set(HBaseConstant.HBASE_HCONNECTION_THREADS_MAX.key(),
|
|
||||||
hbaseProperties.getMaxThreads());
|
|
||||||
configuration.set(HBaseConstant.HBASE_HCONNECTION_THREADS_CORE.key(),
|
|
||||||
hbaseProperties.getCoreThreads());
|
|
||||||
configuration.set(HBaseConstant.ZOOKEEPER_ZNODE_PARENT.key(), hbaseProperties.getZnodeParent());
|
|
||||||
configuration.set(HBaseConstant.HBASE_COLUMN_FAMILY.key(), hbaseProperties.getColumnFamily());
|
|
||||||
configuration.set(HBaseConstant.HBASE_IPC_POOL_SIZE.key(), hbaseProperties.getIpcPoolSize());
|
|
||||||
// @formatter:on
|
|
||||||
connection = ConnectionFactory.createConnection(configuration);
|
|
||||||
} catch (Exception e) {
|
|
||||||
throw new Exception("hbase链接未创建", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public HbaseHelper(HbaseProperties hbaseProperties) throws Exception {
|
|
||||||
this.hbaseProperties = hbaseProperties;
|
|
||||||
init(hbaseProperties);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void destory() {
|
|
||||||
if (connection != null) {
|
|
||||||
try {
|
|
||||||
connection.close();
|
|
||||||
} catch (IOException e) {
|
|
||||||
e.printStackTrace();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public HTableDescriptor[] listTables() throws Exception {
|
|
||||||
return listTables(null);
|
|
||||||
}
|
|
||||||
|
|
||||||
public HTableDescriptor[] listTables(String tableName) throws Exception {
|
|
||||||
if (connection == null) {
|
|
||||||
throw new Exception("hbase链接未创建");
|
|
||||||
}
|
|
||||||
|
|
||||||
HTableDescriptor[] hTableDescriptors = new HTableDescriptor[0];
|
|
||||||
try {
|
|
||||||
if (StringUtils.isEmpty(tableName)) {
|
|
||||||
hTableDescriptors = connection.getAdmin().listTables();
|
|
||||||
} else {
|
|
||||||
hTableDescriptors = connection.getAdmin().listTables(tableName);
|
|
||||||
}
|
|
||||||
} catch (IOException e) {
|
|
||||||
throw new Exception("执行失败", e);
|
|
||||||
}
|
|
||||||
return hTableDescriptors;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 创建表
|
|
||||||
* <p>
|
|
||||||
* 等价于:
|
|
||||||
* <ul>
|
|
||||||
* <li>create 'tablename','family1','family2','family3'...</li>
|
|
||||||
* </ul>
|
|
||||||
*/
|
|
||||||
public void createTable(String tableName) throws Exception {
|
|
||||||
createTable(tableName, new String[] {hbaseProperties.getColumnFamily()});
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 创建表
|
|
||||||
* <p>
|
|
||||||
* 等价于:
|
|
||||||
* <ul>
|
|
||||||
* <li>create 'tablename','family1','family2','family3'...</li>
|
|
||||||
* </ul>
|
|
||||||
*/
|
|
||||||
public void createTable(String tableName, String[] colFamilies) throws Exception {
|
|
||||||
if (connection == null) {
|
|
||||||
throw new Exception("hbase链接未创建");
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
|
||||||
TableName tablename = TableName.valueOf(tableName);
|
|
||||||
// 如果表存在,先删除
|
|
||||||
if (connection.getAdmin().isTableAvailable(tablename)) {
|
|
||||||
dropTable(tableName);
|
|
||||||
}
|
|
||||||
HTableDescriptor tableDescriptor = new HTableDescriptor(tablename);
|
|
||||||
for (String famliy : colFamilies) {
|
|
||||||
tableDescriptor.addFamily(new HColumnDescriptor(famliy));
|
|
||||||
}
|
|
||||||
|
|
||||||
connection.getAdmin().createTable(tableDescriptor);
|
|
||||||
} catch (IOException e) {
|
|
||||||
e.printStackTrace();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 删除表
|
|
||||||
* <p>
|
|
||||||
* 等价于:
|
|
||||||
* <ul>
|
|
||||||
* <li>disable 'tablename'</li>
|
|
||||||
* <li>drop 't1'</li>
|
|
||||||
* </ul>
|
|
||||||
* @param name
|
|
||||||
*/
|
|
||||||
public void dropTable(String name) throws Exception {
|
|
||||||
if (connection == null) {
|
|
||||||
throw new Exception("hbase链接未创建");
|
|
||||||
}
|
|
||||||
|
|
||||||
Admin admin = null;
|
|
||||||
try {
|
|
||||||
admin = connection.getAdmin();
|
|
||||||
TableName tableName = TableName.valueOf(name);
|
|
||||||
// 如果表存在,先删除
|
|
||||||
if (admin.isTableAvailable(tableName)) {
|
|
||||||
admin.disableTable(tableName);
|
|
||||||
admin.deleteTable(tableName);
|
|
||||||
}
|
|
||||||
} catch (IOException e) {
|
|
||||||
e.printStackTrace();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private Put toPut(HbaseCellEntity hBaseTableDTO) throws Exception {
|
|
||||||
if (connection == null) {
|
|
||||||
throw new Exception("hbase链接未创建");
|
|
||||||
}
|
|
||||||
|
|
||||||
Put put = new Put(Bytes.toBytes(hBaseTableDTO.getRow()));
|
|
||||||
put.addColumn(Bytes.toBytes(hBaseTableDTO.getColFamily()), Bytes.toBytes(hBaseTableDTO.getCol()),
|
|
||||||
Bytes.toBytes(hBaseTableDTO.getVal()));
|
|
||||||
return put;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void delete(String tableName, String rowKey) throws Exception {
|
|
||||||
if (connection == null) {
|
|
||||||
throw new Exception("hbase链接未创建");
|
|
||||||
}
|
|
||||||
|
|
||||||
Table table = null;
|
|
||||||
try {
|
|
||||||
table = connection.getTable(TableName.valueOf(tableName));
|
|
||||||
Delete delete = new Delete(Bytes.toBytes(rowKey));
|
|
||||||
table.delete(delete);
|
|
||||||
} catch (IOException e) {
|
|
||||||
e.printStackTrace();
|
|
||||||
throw new Exception("delete失败");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public String resultToString(Result result) {
|
|
||||||
if (result == null) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
Cell[] cells = result.rawCells();
|
|
||||||
StringBuilder sb = new StringBuilder();
|
|
||||||
for (Cell cell : cells) {
|
|
||||||
sb.append("{ ");
|
|
||||||
sb.append("RowName -> ").append(new String(CellUtil.cloneRow(cell)));
|
|
||||||
sb.append(", Timetamp -> ").append(cell.getTimestamp());
|
|
||||||
sb.append(", Column Family -> ").append(new String(CellUtil.cloneFamily(cell)));
|
|
||||||
sb.append(", Row Name -> ").append(new String(CellUtil.cloneQualifier(cell)));
|
|
||||||
sb.append(", value -> ").append(new String(CellUtil.cloneValue(cell)));
|
|
||||||
sb.append(" }\n");
|
|
||||||
}
|
|
||||||
return sb.toString();
|
|
||||||
}
|
|
||||||
|
|
||||||
public Result get(String tableName, String rowKey) throws Exception {
|
|
||||||
return get(tableName, rowKey, null, null);
|
|
||||||
}
|
|
||||||
|
|
||||||
public Result get(String tableName, String rowKey, String colFamily, String qualifier) throws Exception {
|
|
||||||
if (connection == null) {
|
|
||||||
throw new Exception("hbase链接未创建");
|
|
||||||
}
|
|
||||||
|
|
||||||
if (connection.isClosed()) {
|
|
||||||
throw new Exception("hbase 连接已关闭");
|
|
||||||
}
|
|
||||||
|
|
||||||
if (StringUtils.isEmpty(tableName) || StringUtils.isEmpty(rowKey)) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
Result result = null;
|
|
||||||
try {
|
|
||||||
Table table = connection.getTable(TableName.valueOf(tableName));
|
|
||||||
Get get = new Get(Bytes.toBytes(rowKey));
|
|
||||||
if (StringUtils.isNotEmpty(colFamily)) {
|
|
||||||
if (StringUtils.isNotEmpty(qualifier)) {
|
|
||||||
get.addColumn(Bytes.toBytes(colFamily), Bytes.toBytes(qualifier));
|
|
||||||
} else {
|
|
||||||
get.addFamily(Bytes.toBytes(colFamily));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
result = table.get(get);
|
|
||||||
} catch (IOException e) {
|
|
||||||
throw new Exception("查询时发生异常");
|
|
||||||
}
|
|
||||||
return result;
|
|
||||||
}
|
|
||||||
|
|
||||||
public Result get(String tableName, String rowKey, String colFamily) throws Exception {
|
|
||||||
return get(tableName, rowKey, colFamily, null);
|
|
||||||
}
|
|
||||||
|
|
||||||
public Result[] scan(String tableName) throws Exception {
|
|
||||||
return scan(tableName, null, null, null, null);
|
|
||||||
}
|
|
||||||
|
|
||||||
public Result[] scan(String tableName, String colFamily, String qualifier, String startRow, String stopRow)
|
|
||||||
throws Exception {
|
|
||||||
if (connection == null) {
|
|
||||||
throw new Exception("hbase链接未创建");
|
|
||||||
}
|
|
||||||
|
|
||||||
if (StringUtils.isEmpty(tableName)) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
ResultScanner resultScanner = null;
|
|
||||||
List<Result> list = new ArrayList<>();
|
|
||||||
try {
|
|
||||||
Table table = connection.getTable(TableName.valueOf(tableName));
|
|
||||||
Scan scan = new Scan();
|
|
||||||
if (StringUtils.isNotEmpty(colFamily)) {
|
|
||||||
if (StringUtils.isNotEmpty(qualifier)) {
|
|
||||||
scan.addColumn(Bytes.toBytes(colFamily), Bytes.toBytes(qualifier));
|
|
||||||
}
|
|
||||||
scan.addFamily(Bytes.toBytes(colFamily));
|
|
||||||
}
|
|
||||||
if (StringUtils.isNotEmpty(startRow)) {
|
|
||||||
scan.setStartRow(Bytes.toBytes(startRow));
|
|
||||||
}
|
|
||||||
if (StringUtils.isNotEmpty(stopRow)) {
|
|
||||||
scan.setStopRow(Bytes.toBytes(stopRow));
|
|
||||||
}
|
|
||||||
resultScanner = table.getScanner(scan);
|
|
||||||
Result result = resultScanner.next();
|
|
||||||
while (result != null) {
|
|
||||||
list.add(result);
|
|
||||||
result = resultScanner.next();
|
|
||||||
}
|
|
||||||
} catch (IOException e) {
|
|
||||||
e.printStackTrace();
|
|
||||||
} finally {
|
|
||||||
if (resultScanner != null) {
|
|
||||||
resultScanner.close();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return list.toArray(new Result[0]);
|
|
||||||
}
|
|
||||||
|
|
||||||
public Result[] scan(String tableName, String colFamily) throws Exception {
|
|
||||||
return scan(tableName, colFamily, null, null, null);
|
|
||||||
}
|
|
||||||
|
|
||||||
public Result[] scan(String tableName, String colFamily, String qualifier) throws Exception {
|
|
||||||
return scan(tableName, colFamily, qualifier, null, null);
|
|
||||||
}
|
|
||||||
|
|
||||||
private List<Result> resultScannerToResults(ResultScanner resultScanner) {
|
|
||||||
if (resultScanner == null) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
List<Result> list = new ArrayList<>();
|
|
||||||
Result result = null;
|
|
||||||
try {
|
|
||||||
result = resultScanner.next();
|
|
||||||
while (result != null) {
|
|
||||||
list.add(result);
|
|
||||||
result = resultScanner.next();
|
|
||||||
}
|
|
||||||
} catch (IOException e) {
|
|
||||||
e.printStackTrace();
|
|
||||||
}
|
|
||||||
return list;
|
|
||||||
}
|
|
||||||
|
|
||||||
public HbaseProperties getHbaseProperties() {
|
|
||||||
return hbaseProperties;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -1,127 +0,0 @@
|
||||||
package io.github.dunwu.javadb.hbase;
|
|
||||||
|
|
||||||
import java.io.Serializable;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Hbase 配置参数管理对象
|
|
||||||
* @author Zhang Peng
|
|
||||||
*/
|
|
||||||
public class HbaseProperties implements Serializable {
|
|
||||||
|
|
||||||
private static final long serialVersionUID = 2930639554689310736L;
|
|
||||||
|
|
||||||
private String hbaseMaster;
|
|
||||||
|
|
||||||
private String quorum;
|
|
||||||
|
|
||||||
private String clientPort;
|
|
||||||
|
|
||||||
private String znodeParent;
|
|
||||||
|
|
||||||
private String maxThreads;
|
|
||||||
|
|
||||||
private String coreThreads;
|
|
||||||
|
|
||||||
private String columnFamily;
|
|
||||||
|
|
||||||
private String hbaseExecutorsNum = "10";
|
|
||||||
|
|
||||||
private String ipcPoolSize;
|
|
||||||
|
|
||||||
public HbaseProperties() {
|
|
||||||
}
|
|
||||||
|
|
||||||
public HbaseProperties(String hbaseMaster, String quorum, String clientPort, String znodeParent, String maxThreads,
|
|
||||||
String coreThreads, String columnFamily, String hbaseExecutorsNum, String ipcPoolSize) {
|
|
||||||
this.hbaseMaster = hbaseMaster;
|
|
||||||
this.quorum = quorum;
|
|
||||||
this.clientPort = clientPort;
|
|
||||||
this.znodeParent = znodeParent;
|
|
||||||
this.maxThreads = maxThreads;
|
|
||||||
this.coreThreads = coreThreads;
|
|
||||||
this.columnFamily = columnFamily;
|
|
||||||
this.hbaseExecutorsNum = hbaseExecutorsNum;
|
|
||||||
this.ipcPoolSize = ipcPoolSize;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getHbaseMaster() {
|
|
||||||
return hbaseMaster;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setHbaseMaster(String hbaseMaster) {
|
|
||||||
this.hbaseMaster = hbaseMaster;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getQuorum() {
|
|
||||||
return quorum;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setQuorum(String quorum) {
|
|
||||||
this.quorum = quorum;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getClientPort() {
|
|
||||||
return clientPort;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setClientPort(String clientPort) {
|
|
||||||
this.clientPort = clientPort;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getZnodeParent() {
|
|
||||||
return znodeParent;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setZnodeParent(String znodeParent) {
|
|
||||||
this.znodeParent = znodeParent;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getMaxThreads() {
|
|
||||||
return maxThreads;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setMaxThreads(String maxThreads) {
|
|
||||||
this.maxThreads = maxThreads;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getCoreThreads() {
|
|
||||||
return coreThreads;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setCoreThreads(String coreThreads) {
|
|
||||||
this.coreThreads = coreThreads;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getColumnFamily() {
|
|
||||||
return columnFamily;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setColumnFamily(String columnFamily) {
|
|
||||||
this.columnFamily = columnFamily;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getHbaseExecutorsNum() {
|
|
||||||
return hbaseExecutorsNum;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setHbaseExecutorsNum(String hbaseExecutorsNum) {
|
|
||||||
this.hbaseExecutorsNum = hbaseExecutorsNum;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getIpcPoolSize() {
|
|
||||||
return ipcPoolSize;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setIpcPoolSize(String ipcPoolSize) {
|
|
||||||
this.ipcPoolSize = ipcPoolSize;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String toString() {
|
|
||||||
return "HbaseProperties{" + "quorum='" + quorum + '\'' + ", clientPort='" + clientPort + '\''
|
|
||||||
+ ", znodeParent='" + znodeParent + '\'' + ", maxThreads='" + maxThreads + '\'' + ", coreThreads='"
|
|
||||||
+ coreThreads + '\'' + ", columnFamily='" + columnFamily + '\'' + ", hbaseExecutorsNum='"
|
|
||||||
+ hbaseExecutorsNum + '\'' + '}';
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -0,0 +1,94 @@
|
||||||
|
package io.github.dunwu.javadb.hbase.entity;
|
||||||
|
|
||||||
|
import cn.hutool.core.util.StrUtil;
|
||||||
|
import lombok.Data;
|
||||||
|
import lombok.experimental.Accessors;
|
||||||
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
|
import org.apache.hadoop.hbase.filter.Filter;
|
||||||
|
import org.apache.hadoop.hbase.filter.FilterList;
|
||||||
|
import org.apache.hadoop.hbase.filter.PageFilter;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* HBase 封装请求参数
|
||||||
|
*
|
||||||
|
* @author <a href="mailto:forbreak@163.com">Zhang Peng</a>
|
||||||
|
* @date 2023-05-19
|
||||||
|
*/
|
||||||
|
@Data
|
||||||
|
@Accessors(chain = true)
|
||||||
|
public class BaseFamilyRequest {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 表名
|
||||||
|
*/
|
||||||
|
protected String tableName;
|
||||||
|
/**
|
||||||
|
* 起始 row
|
||||||
|
*/
|
||||||
|
protected String startRow;
|
||||||
|
/**
|
||||||
|
* 结束 row
|
||||||
|
*/
|
||||||
|
protected String stopRow;
|
||||||
|
/**
|
||||||
|
* 起始时间
|
||||||
|
*/
|
||||||
|
protected Long minTimeStamp;
|
||||||
|
/**
|
||||||
|
* 结束时间
|
||||||
|
*/
|
||||||
|
protected Long maxTimeStamp;
|
||||||
|
/**
|
||||||
|
* 是否降序
|
||||||
|
*/
|
||||||
|
protected boolean reversed = false;
|
||||||
|
/**
|
||||||
|
* 页号
|
||||||
|
*/
|
||||||
|
protected Integer pageNo;
|
||||||
|
/**
|
||||||
|
* 默认的每页记录数大小(pageNo!=null时才使用)
|
||||||
|
*/
|
||||||
|
protected Integer pageSize = 10;
|
||||||
|
/**
|
||||||
|
* 过滤器列表
|
||||||
|
*/
|
||||||
|
protected List<Filter> filters = new ArrayList<>();
|
||||||
|
|
||||||
|
public void addFilter(Filter filter) {
|
||||||
|
this.filters.add(filter);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Scan toScan() throws IOException {
|
||||||
|
Scan scan = new Scan();
|
||||||
|
scan.setReversed(reversed);
|
||||||
|
if (StrUtil.isNotBlank(startRow)) {
|
||||||
|
scan.withStartRow(Bytes.toBytes(startRow), true);
|
||||||
|
}
|
||||||
|
if (StrUtil.isNotBlank(stopRow)) {
|
||||||
|
scan.withStartRow(Bytes.toBytes(stopRow), false);
|
||||||
|
}
|
||||||
|
if (minTimeStamp != null && maxTimeStamp != null) {
|
||||||
|
scan.setTimeRange(minTimeStamp, maxTimeStamp);
|
||||||
|
}
|
||||||
|
if (pageNo != null) {
|
||||||
|
PageFilter pageFilter = new PageFilter(pageSize);
|
||||||
|
filters.add(pageFilter);
|
||||||
|
// 缓存1000条数据
|
||||||
|
scan.setCaching(1000);
|
||||||
|
scan.setCacheBlocks(false);
|
||||||
|
}
|
||||||
|
FilterList filterList = new FilterList();
|
||||||
|
for (Filter filter : filters) {
|
||||||
|
filterList.addFilter(filter);
|
||||||
|
}
|
||||||
|
scan.setFilter(filterList);
|
||||||
|
return scan;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,37 @@
|
||||||
|
package io.github.dunwu.javadb.hbase.entity;
|
||||||
|
|
||||||
|
import cn.hutool.core.map.MapUtil;
|
||||||
|
import lombok.AllArgsConstructor;
|
||||||
|
import lombok.Data;
|
||||||
|
import lombok.NoArgsConstructor;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* HBase 列族数据结构
|
||||||
|
*
|
||||||
|
* @author <a href="mailto:forbreak@163.com">Zhang Peng</a>
|
||||||
|
* @date 2023-05-19
|
||||||
|
*/
|
||||||
|
@Data
|
||||||
|
@NoArgsConstructor
|
||||||
|
@AllArgsConstructor
|
||||||
|
public class HBaseFamilyData {
|
||||||
|
|
||||||
|
private String family;
|
||||||
|
private Map<String, String> columnMap;
|
||||||
|
|
||||||
|
public static Map<String, HBaseFamilyData> toFamilyMap(Map<String, Map<String, String>> map) {
|
||||||
|
if (MapUtil.isEmpty(map)) {
|
||||||
|
return new HashMap<>(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
Map<String, HBaseFamilyData> familyMap = new HashMap<>(map.size());
|
||||||
|
map.forEach((family, columnMap) -> {
|
||||||
|
familyMap.put(family, new HBaseFamilyData(family, columnMap));
|
||||||
|
});
|
||||||
|
return familyMap;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,37 @@
|
||||||
|
package io.github.dunwu.javadb.hbase.entity;
|
||||||
|
|
||||||
|
import cn.hutool.core.collection.CollectionUtil;
|
||||||
|
import lombok.Data;
|
||||||
|
import lombok.experimental.Accessors;
|
||||||
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collection;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* HBase 封装请求参数
|
||||||
|
*
|
||||||
|
* @author <a href="mailto:forbreak@163.com">Zhang Peng</a>
|
||||||
|
* @date 2023-05-19
|
||||||
|
*/
|
||||||
|
@Data
|
||||||
|
@Accessors(chain = true)
|
||||||
|
public class HBaseFamilyRequest extends BaseFamilyRequest {
|
||||||
|
|
||||||
|
private String family;
|
||||||
|
private Collection<String> columns = new ArrayList<>();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Scan toScan() throws IOException {
|
||||||
|
Scan scan = super.toScan();
|
||||||
|
if (CollectionUtil.isNotEmpty(this.getColumns())) {
|
||||||
|
for (String column : columns) {
|
||||||
|
scan.addColumn(Bytes.toBytes(family), Bytes.toBytes(column));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return scan;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,59 @@
|
||||||
|
package io.github.dunwu.javadb.hbase.entity;
|
||||||
|
|
||||||
|
import cn.hutool.core.collection.CollectionUtil;
|
||||||
|
import cn.hutool.core.map.MapUtil;
|
||||||
|
import lombok.Data;
|
||||||
|
import lombok.experimental.Accessors;
|
||||||
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* HBase 封装请求参数
|
||||||
|
*
|
||||||
|
* @author <a href="mailto:forbreak@163.com">Zhang Peng</a>
|
||||||
|
* @date 2023-05-19
|
||||||
|
*/
|
||||||
|
@Data
|
||||||
|
@Accessors(chain = true)
|
||||||
|
public class HBaseMultiFamilyRequest extends BaseFamilyRequest {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 列族, 列族所包含的列(不可为空)
|
||||||
|
*/
|
||||||
|
private final Map<String, Collection<String>> familyColumns = new HashMap<>();
|
||||||
|
|
||||||
|
public HBaseMultiFamilyRequest addFamilyColumn(String family, Collection<String> columns) {
|
||||||
|
this.familyColumns.put(family, columns);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public HBaseMultiFamilyRequest addFamilyColumns(Map<String, Collection<String>> familyColumns) {
|
||||||
|
if (MapUtil.isNotEmpty(familyColumns)) {
|
||||||
|
this.familyColumns.putAll(familyColumns);
|
||||||
|
}
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Scan toScan() throws IOException {
|
||||||
|
Scan scan = super.toScan();
|
||||||
|
if (MapUtil.isNotEmpty(familyColumns)) {
|
||||||
|
for (Map.Entry<String, Collection<String>> entry : familyColumns.entrySet()) {
|
||||||
|
String family = entry.getKey();
|
||||||
|
Collection<String> columns = entry.getValue();
|
||||||
|
if (CollectionUtil.isNotEmpty(columns)) {
|
||||||
|
for (String column : columns) {
|
||||||
|
scan.addColumn(Bytes.toBytes(family), Bytes.toBytes(column));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return scan;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,98 @@
|
||||||
|
package io.github.dunwu.javadb.hbase.entity;
|
||||||
|
|
||||||
|
import cn.hutool.core.map.MapUtil;
|
||||||
|
import lombok.AllArgsConstructor;
|
||||||
|
import lombok.Data;
|
||||||
|
import lombok.NoArgsConstructor;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* HBase 行数据结构
|
||||||
|
*
|
||||||
|
* @author <a href="mailto:forbreak@163.com">Zhang Peng</a>
|
||||||
|
* @date 2023-05-19
|
||||||
|
*/
|
||||||
|
@Data
|
||||||
|
@NoArgsConstructor
|
||||||
|
@AllArgsConstructor
|
||||||
|
public class HBaseRowData {
|
||||||
|
|
||||||
|
private String row;
|
||||||
|
private Long timestamp;
|
||||||
|
private Map<String, HBaseFamilyData> familyMap = new HashMap<>();
|
||||||
|
|
||||||
|
public Map<String, Map<String, String>> toMap() {
|
||||||
|
return toMap(this);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static HBaseRowData build(String row, Long timestamp, Map<String, HBaseFamilyData> familyMap) {
|
||||||
|
return new HBaseRowData(row, timestamp, familyMap);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static HBaseRowData buildByMap(String row, Long timestamp, Map<String, Map<String, String>> map) {
|
||||||
|
return new HBaseRowData(row, timestamp, HBaseFamilyData.toFamilyMap(map));
|
||||||
|
}
|
||||||
|
|
||||||
|
public static Map<String, Map<String, String>> toMap(HBaseRowData data) {
|
||||||
|
|
||||||
|
if (data == null || MapUtil.isEmpty(data.getFamilyMap())) {
|
||||||
|
return new HashMap<>(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
Map<String, Map<String, String>> map = new HashMap<>(data.getFamilyMap().size());
|
||||||
|
data.getFamilyMap().forEach((family, familyData) -> {
|
||||||
|
map.put(family, familyData.getColumnMap());
|
||||||
|
});
|
||||||
|
return map;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static Map<String, HBaseRowData> toRowMap(Map<String, Map<String, Map<String, String>>> rowMultiFamilyMap) {
|
||||||
|
if (MapUtil.isEmpty(rowMultiFamilyMap)) {
|
||||||
|
return new HashMap<>(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
Map<String, HBaseRowData> rowDataMap = new HashMap<>(rowMultiFamilyMap.size());
|
||||||
|
rowMultiFamilyMap.forEach((row, familyDataMap) -> {
|
||||||
|
Map<String, HBaseFamilyData> familyMap = HBaseFamilyData.toFamilyMap(familyDataMap);
|
||||||
|
rowDataMap.put(row, new HBaseRowData(row, null, familyMap));
|
||||||
|
});
|
||||||
|
return rowDataMap;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static List<HBaseRowData> toRowList(Map<String, Map<String, Map<String, String>>> rowMultiFamilyMap) {
|
||||||
|
Map<String, HBaseRowData> rowMap = toRowMap(rowMultiFamilyMap);
|
||||||
|
if (MapUtil.isEmpty(rowMap)) {
|
||||||
|
return new ArrayList<>();
|
||||||
|
}
|
||||||
|
return new ArrayList<>(rowMap.values());
|
||||||
|
}
|
||||||
|
|
||||||
|
public static Map<String, HBaseRowData> toRowMap(String family, Map<String, Map<String, String>> rowColumnMap) {
|
||||||
|
if (MapUtil.isEmpty(rowColumnMap)) {
|
||||||
|
return new HashMap<>(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
Map<String, HBaseRowData> rowDataMap = new HashMap<>(rowColumnMap.size());
|
||||||
|
|
||||||
|
rowColumnMap.forEach((row, columnMap) -> {
|
||||||
|
HBaseFamilyData familyData = new HBaseFamilyData(family, columnMap);
|
||||||
|
Map<String, HBaseFamilyData> familyMap = new HashMap<>();
|
||||||
|
familyMap.put(family, familyData);
|
||||||
|
rowDataMap.put(row, new HBaseRowData(row, null, familyMap));
|
||||||
|
});
|
||||||
|
return rowDataMap;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static List<HBaseRowData> toRowList(String family, Map<String, Map<String, String>> rowColumnMap) {
|
||||||
|
Map<String, HBaseRowData> rowMap = toRowMap(family, rowColumnMap);
|
||||||
|
if (MapUtil.isEmpty(rowMap)) {
|
||||||
|
return new ArrayList<>();
|
||||||
|
}
|
||||||
|
return new ArrayList<>(rowMap.values());
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,29 @@
|
||||||
|
package io.github.dunwu.javadb.hbase.entity;
|
||||||
|
|
||||||
|
import lombok.Data;
|
||||||
|
|
||||||
|
import java.util.Collection;
|
||||||
|
|
||||||
|
@Data
|
||||||
|
public class PageData<T> {
|
||||||
|
|
||||||
|
private Integer number;
|
||||||
|
private Integer size;
|
||||||
|
private Long total;
|
||||||
|
private Integer totalPages;
|
||||||
|
private Collection<T> content;
|
||||||
|
|
||||||
|
public PageData() { }
|
||||||
|
|
||||||
|
public PageData(Integer number, Integer size, Long total, Collection<T> content) {
|
||||||
|
this.number = number;
|
||||||
|
this.size = size;
|
||||||
|
this.total = total;
|
||||||
|
this.content = content;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getTotalPages() {
|
||||||
|
return this.getSize() == 0 ? 0 : (int) Math.ceil((double) this.total / (double) this.getSize());
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -1,109 +0,0 @@
|
||||||
package io.github.dunwu.javadb.hbase;
|
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
|
||||||
import org.apache.hadoop.hbase.client.Result;
|
|
||||||
import org.junit.Assert;
|
|
||||||
import org.junit.BeforeClass;
|
|
||||||
import org.junit.Test;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @author Zhang Peng
|
|
||||||
* @since 2019-03-29
|
|
||||||
*/
|
|
||||||
public class HbaseHelperTest {
|
|
||||||
|
|
||||||
private static HbaseHelper hbaseHelper;
|
|
||||||
|
|
||||||
@BeforeClass
|
|
||||||
public static void BeforeClass() {
|
|
||||||
try {
|
|
||||||
hbaseHelper = new HbaseHelper();
|
|
||||||
} catch (Exception e) {
|
|
||||||
e.printStackTrace();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void listTable() throws Exception {
|
|
||||||
HTableDescriptor[] hTableDescriptors = hbaseHelper.listTables();
|
|
||||||
if (hTableDescriptors == null || hTableDescriptors.length <= 0) {
|
|
||||||
Assert.fail();
|
|
||||||
}
|
|
||||||
|
|
||||||
System.out.println("Tables:");
|
|
||||||
for (HTableDescriptor item : hTableDescriptors) {
|
|
||||||
System.out.println(item.getTableName());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void createTable() throws Exception {
|
|
||||||
hbaseHelper.createTable("table1", new String[] {"columnFamliy1", "columnFamliy2"});
|
|
||||||
HTableDescriptor[] table1s = hbaseHelper.listTables("table1");
|
|
||||||
if (table1s == null || table1s.length <= 0) {
|
|
||||||
Assert.fail();
|
|
||||||
}
|
|
||||||
|
|
||||||
hbaseHelper.createTable("table2", new String[] {"columnFamliy1", "columnFamliy2"});
|
|
||||||
table1s = hbaseHelper.listTables("table2");
|
|
||||||
if (table1s == null || table1s.length <= 0) {
|
|
||||||
Assert.fail();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void dropTable() throws Exception {
|
|
||||||
hbaseHelper.dropTable("table1");
|
|
||||||
HTableDescriptor[] table1s = hbaseHelper.listTables("table1");
|
|
||||||
if (table1s != null && table1s.length > 0) {
|
|
||||||
Assert.fail();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void get() throws Exception {
|
|
||||||
Result result = hbaseHelper.get("table1", "row1");
|
|
||||||
System.out.println(hbaseHelper.resultToString(result));
|
|
||||||
|
|
||||||
result = hbaseHelper.get("table1", "row2", "columnFamliy1");
|
|
||||||
System.out.println(hbaseHelper.resultToString(result));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void scan() throws Exception {
|
|
||||||
Result[] results = hbaseHelper.scan("table1");
|
|
||||||
System.out.println("HbaseUtil.scan(\"table1\") result: ");
|
|
||||||
if (results.length > 0) {
|
|
||||||
for (Result r : results) {
|
|
||||||
System.out.println(hbaseHelper.resultToString(r));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
results = hbaseHelper.scan("table1", "columnFamliy1");
|
|
||||||
System.out.println("HbaseUtil.scan(\"table1\", \"columnFamliy1\" result: ");
|
|
||||||
if (results.length > 0) {
|
|
||||||
for (Result r : results) {
|
|
||||||
System.out.println(hbaseHelper.resultToString(r));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
results = hbaseHelper.scan("table1", "columnFamliy1", "a");
|
|
||||||
System.out.println("HbaseUtil.scan(\"table1\", \"columnFamliy1\", \"a\") result: ");
|
|
||||||
if (results.length > 0) {
|
|
||||||
for (Result r : results) {
|
|
||||||
System.out.println(hbaseHelper.resultToString(r));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void delete() throws Exception {
|
|
||||||
Result result = hbaseHelper.get("table1", "row1");
|
|
||||||
System.out.println(result.toString());
|
|
||||||
|
|
||||||
hbaseHelper.delete("table1", "row1");
|
|
||||||
result = hbaseHelper.get("table1", "row1");
|
|
||||||
System.out.println(result.toString());
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -1,7 +0,0 @@
|
||||||
hbase.enable = true
|
|
||||||
hbase.zookeeper.quorum = localhost,xxxx,xxxx
|
|
||||||
hbase.zookeeper.property.clientPort = 2181
|
|
||||||
zookeeper.znode.parent = /hbase
|
|
||||||
hbase.hconnection.threads.max = 256
|
|
||||||
hbase.hconnection.threads.core = 32
|
|
||||||
hbase.column.family = F
|
|
Loading…
Reference in New Issue