feat: 更新 hbase 示例

master
dunwu 2023-11-17 07:04:54 +08:00
parent 088d49c81f
commit 7e344cd073
29 changed files with 2355 additions and 1209 deletions

View File

@ -23,10 +23,11 @@
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<artifactId>hadoop-auth</artifactId>
<version>2.10.2</version>
</dependency>
<dependency>
@ -39,37 +40,23 @@
<artifactId>lombok</artifactId>
<version>1.18.22</version>
</dependency>
<!-- test begin -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<groupId>org.springframework</groupId>
<artifactId>spring-context-support</artifactId>
<version>5.1.10.RELEASE</version>
<exclusions>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<version>2.6.3</version>
<scope>test</scope>
</dependency>
<!-- test end -->
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${hbase.version}</version>
</dependency>
<!-- <dependency>-->
<!-- <groupId>io.github.dunwu</groupId>-->
<!-- <artifactId>dunwu-tool-core</artifactId>-->
<!-- <version>${dunwu.version}</version>-->
<!-- </dependency>-->
<!-- test begin -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
<!-- test end -->
</dependencies>
</dependencyManagement>
</project>

View File

@ -0,0 +1,155 @@
package io.github.dunwu.javadb.hbase;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.util.StrUtil;
import io.github.dunwu.javadb.hbase.entity.BaseHbaseEntity;
import io.github.dunwu.javadb.hbase.entity.ScrollData;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
/**
* HBase Mapper
*
* @author <a href="mailto:forbreak@163.com">Zhang Peng</a>
* @date 2023-11-15
*/
@Slf4j
@RequiredArgsConstructor
public abstract class BaseHbaseMapper<T extends BaseHbaseEntity> implements HbaseMapper<T> {
protected final HbaseTemplate hbaseTemplate;
protected final HbaseAdmin hbaseAdmin;
@Override
public Connection getClient() {
return hbaseTemplate.getConnection();
}
@Override
public String getNamespace() {
return "default";
}
@Override
public boolean existsTable() {
byte[] namespace = Bytes.toBytes(getNamespace());
byte[] tableName = Bytes.toBytes(getTableName());
try {
return hbaseAdmin.existsTable(TableName.valueOf(namespace, tableName));
} catch (IOException e) {
log.error("【Hbase】existsTable 异常", e);
return false;
}
}
@Override
public String getFamily() {
return "f";
}
@Override
public T pojoById(String id) {
if (StrUtil.isBlank(id)) {
return null;
}
try {
return hbaseTemplate.getEntity(getFullTableName(), id, getFamily(), getEntityClass());
} catch (IOException e) {
log.error("【Hbase】pojoById 异常", e);
return null;
}
}
@Override
public List<T> pojoListByIds(Collection<String> ids) {
if (CollectionUtil.isEmpty(ids)) {
return null;
}
try {
return hbaseTemplate.getEntityList(getFullTableName(), ids.toArray(new String[0]),
getFamily(), getEntityClass());
} catch (IOException e) {
log.error("【Hbase】getEntityList 异常", e);
return new ArrayList<>();
}
}
@Override
public List<T> scroll(String scrollId, int size) {
try {
ScrollData<T> scrollData =
hbaseTemplate.getEntityScroll(getFullTableName(), getFamily(), scrollId, size, getEntityClass());
if (scrollData == null || CollectionUtil.isEmpty(scrollData.getContent())) {
return new ArrayList<>();
}
return new ArrayList<>(scrollData.getContent());
} catch (IOException e) {
log.error("【Hbase】getEntityScroll 异常", e);
return new ArrayList<>();
}
}
@Override
public T save(T entity) {
try {
hbaseTemplate.put(getFullTableName(), entity.getId(), getFamily(), entity);
return entity;
} catch (IOException e) {
log.error("【Hbase】put 异常", e);
return null;
}
}
@Override
public boolean batchSave(Collection<T> list) {
try {
hbaseTemplate.batchPut(getFullTableName(), getFamily(), list);
return true;
} catch (IOException | InterruptedException e) {
log.error("【Hbase】batchPut 异常", e);
return false;
}
}
@Override
public boolean deleteById(String id) {
if (StrUtil.isBlank(id)) {
return true;
}
try {
hbaseTemplate.delete(getFullTableName(), id);
return true;
} catch (IOException e) {
log.error("【Hbase】delete 异常", e);
return false;
}
}
@Override
public boolean batchDeleteById(Collection<String> ids) {
if (CollectionUtil.isEmpty(ids)) {
return true;
}
try {
hbaseTemplate.batchDelete(getFullTableName(), ids.toArray(new String[0]));
return true;
} catch (IOException | InterruptedException e) {
log.error("【Hbase】batchDelete 异常", e);
return false;
}
}
protected String getFullTableName() {
return StrUtil.format("{}:{}", getNamespace(), getTableName());
}
}

View File

@ -1,41 +0,0 @@
package io.github.dunwu.javadb.hbase;
import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
@Slf4j
public class HBaseDemo {
public static void main(String[] args) throws Exception {
// 请改为配置的方式
// String zkHosts = "192.168.31.127";
String zkHosts = "192.168.31.255";
// 请改为配置的方式
String zkPort = "2181";
// 请改为配置的方式
String namespace = "test";
String tablename = "test";
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", zkHosts);
conf.set("hbase.zookeeper.port", zkPort);
// 创建命名空间和表
TableName tableName = TableName.valueOf(namespace, tablename);
HBaseAdminHelper hbaseAdminHelper = HBaseAdminHelper.newInstance(conf);
hbaseAdminHelper.enableTable(tableName);
// hbaseAdminHelper.createNamespace(namespace);
// hbaseAdminHelper.createTable(tableName, "c1");
//
// String rowKey = IdUtil.fastSimpleUUID();
// HBaseHelper hbaseHelper = HBaseHelper.newInstance(hbaseAdminHelper.getConnection());
// hbaseHelper.put(tableName, rowKey, "c1", "name", "jack");
// String value = hbaseHelper.get(tableName, rowKey, "c1", "name");
// System.out.println("value = " + value);
hbaseAdminHelper.close();
}
}

View File

@ -1,920 +0,0 @@
package io.github.dunwu.javadb.hbase;
import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.bean.copier.CopyOptions;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.io.IoUtil;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.ArrayUtil;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.ReflectUtil;
import cn.hutool.core.util.StrUtil;
import io.github.dunwu.javadb.hbase.entity.HBaseFamilyRequest;
import io.github.dunwu.javadb.hbase.entity.HBaseMultiFamilyRequest;
import io.github.dunwu.javadb.hbase.entity.HBaseRowData;
import io.github.dunwu.javadb.hbase.entity.PageData;
import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.Closeable;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* HBase CRUD
*
* @author <a href="mailto:forbreak@163.com">Zhang Peng</a>
* @date 2023-03-27
*/
@Slf4j
public class HBaseHelper implements Closeable {
private final Connection connection;
private final Configuration configuration;
protected HBaseHelper(Configuration configuration) throws IOException {
this.configuration = configuration;
this.connection = ConnectionFactory.createConnection(configuration);
}
public static synchronized HBaseHelper newInstance(Configuration configuration) throws IOException {
if (configuration == null) {
throw new IllegalArgumentException("configuration can not be null!");
}
return new HBaseHelper(configuration);
}
/**
* HBase Connection
*/
@Override
public synchronized void close() {
if (null == connection || connection.isClosed()) {
return;
}
IoUtil.close(connection);
}
/**
* HBase
*
* @return /
*/
public Connection getConnection() {
if (null == connection) {
throw new RuntimeException("HBase connection init failed...");
}
return connection;
}
/**
* HBase
*
* @return /
*/
public Configuration getConfiguration() {
return configuration;
}
/**
* {@link Table}
*
* @param tableName
* @return /
*/
public Table getTable(String tableName) throws Exception {
return getTable(TableName.valueOf(tableName));
}
/**
* {@link Table}
*
* @param tableName
* @return /
*/
public synchronized Table getTable(TableName tableName) throws Exception {
return connection.getTable(tableName);
}
public void put(TableName tableName, String row, String family, String column, String value) throws Exception {
put(tableName, row, family, null, column, value);
}
public void put(TableName tableName, String row, String family, Long timestamp, String column, String value)
throws Exception {
Table table = getTable(tableName);
try {
Put put = new Put(Bytes.toBytes(row));
if (timestamp != null) {
put.addColumn(Bytes.toBytes(family), Bytes.toBytes(column), timestamp, Bytes.toBytes(value));
} else {
put.addColumn(Bytes.toBytes(family), Bytes.toBytes(column), Bytes.toBytes(value));
}
table.put(put);
} finally {
recycle(table);
}
}
public void put(TableName tableName, String row, String family, Object obj) throws Exception {
put(tableName, row, family, null, obj);
}
@SuppressWarnings("unchecked")
public void put(TableName tableName, String row, String family, Long timestamp, Object obj) throws Exception {
Map<String, Object> map;
if (obj instanceof Map) {
map = (Map<String, Object>) obj;
} else {
map = BeanUtil.beanToMap(obj);
}
put(tableName, row, family, timestamp, map);
}
public void put(TableName tableName, String row, String family, Map<String, Object> columnMap) throws Exception {
put(tableName, row, family, null, columnMap);
}
public void put(TableName tableName, String row, String family, Long timestamp, Map<String, Object> columnMap)
throws Exception {
Put put = new Put(Bytes.toBytes(row));
columnMap.forEach((column, value) -> {
if (value != null) {
if (timestamp != null) {
put.addColumn(Bytes.toBytes(family), Bytes.toBytes(column), timestamp,
Bytes.toBytes(String.valueOf(value)));
} else {
put.addColumn(Bytes.toBytes(family), Bytes.toBytes(column), Bytes.toBytes(String.valueOf(value)));
}
}
});
Table table = getTable(tableName);
try {
table.put(put);
} finally {
recycle(table);
}
}
public void put(TableName tableName, String row, Long timestamp, Map<String, Map<String, Object>> familyMap)
throws Exception {
Put put = new Put(Bytes.toBytes(row));
for (Map.Entry<String, Map<String, Object>> e : familyMap.entrySet()) {
String family = e.getKey();
Map<String, Object> columnMap = e.getValue();
if (MapUtil.isNotEmpty(columnMap)) {
for (Map.Entry<String, Object> entry : columnMap.entrySet()) {
String column = entry.getKey();
Object value = entry.getValue();
if (ObjectUtil.isEmpty(value)) {
continue;
}
if (timestamp != null) {
put.addColumn(Bytes.toBytes(family), Bytes.toBytes(column), timestamp,
Bytes.toBytes(String.valueOf(value)));
} else {
put.addColumn(Bytes.toBytes(family), Bytes.toBytes(column),
Bytes.toBytes(String.valueOf(value)));
}
}
}
}
Table table = getTable(tableName);
try {
table.put(put);
} finally {
recycle(table);
}
}
public void deleteRow(TableName tableName, String row) throws Exception {
Delete delete = new Delete(Bytes.toBytes(row));
Table table = getTable(tableName);
try {
table.delete(delete);
} finally {
recycle(table);
}
}
public long incrementColumnValue(TableName tableName, String row, String family, String column, long amount)
throws Exception {
return incrementColumnValue(tableName, row, family, column, amount, Durability.SYNC_WAL);
}
public long incrementColumnValue(TableName tableName, String row, String family, String column, long amount,
Durability durability) throws Exception {
Table table = getTable(tableName);
try {
return table.incrementColumnValue(Bytes.toBytes(row), Bytes.toBytes(family), Bytes.toBytes(column), amount,
durability);
} finally {
recycle(table);
}
}
public void dump(TableName tableName, String[] rows, String[] families, String[] columns) throws Exception {
List<Get> gets = new ArrayList<>();
for (String row : rows) {
Get get = new Get(Bytes.toBytes(row));
if (families != null) {
for (String family : families) {
for (String column : columns) {
get.addColumn(Bytes.toBytes(family), Bytes.toBytes(column));
}
}
}
gets.add(get);
}
Table table = getTable(tableName);
try {
Result[] results = table.get(gets);
for (Result result : results) {
for (Cell cell : result.rawCells()) {
System.out.println(
"Cell: " + cell + ", Value: " + Bytes.toString(cell.getValueArray(), cell.getValueOffset(),
cell.getValueLength()));
}
}
} finally {
recycle(table);
}
}
public void dump(TableName tableName) throws Exception {
Table table = getTable(tableName);
try {
ResultScanner scanner = table.getScanner(new Scan());
for (Result result : scanner) {
dumpResult(result);
}
scanner.close();
} finally {
recycle(table);
}
}
/**
*
*
* @param tableName
* @param row
* @param family
* @param column
* @return /
*/
public String getColumn(TableName tableName, String row, String family, String column) throws Exception {
Get get = new Get(Bytes.toBytes(row));
Table table = getTable(tableName);
try {
Result result = table.get(get);
return Bytes.toString(result.getValue(Bytes.toBytes(family), Bytes.toBytes(column)));
} finally {
recycle(table);
}
}
/**
* {@link T}
*
* @param tableName
* @param row
* @param family
* @param clazz
* @param <T>
* @return /
*/
public <T> T getFamilyMap(TableName tableName, String row, String family, Class<T> clazz) throws Exception {
Map<String, Field> fieldMap = ReflectUtil.getFieldMap(clazz);
Set<String> columns = fieldMap.keySet();
Map<String, String> map = getFamilyMap(tableName, row, family, columns);
if (MapUtil.isEmpty(map)) {
return null;
}
return BeanUtil.mapToBean(map, clazz, true, CopyOptions.create().ignoreError());
}
/**
* {@link Map}
*
* @param tableName
* @param row
* @param family
* @param columns
* @return /
*/
public Map<String, String> getFamilyMap(TableName tableName, String row, String family,
Collection<String> columns) throws Exception {
if (CollectionUtil.isEmpty(columns)) {
return getFamilyMap(tableName, row, family);
}
List<Get> gets = new ArrayList<>();
Get get = new Get(Bytes.toBytes(row));
for (String column : columns) {
get.addColumn(Bytes.toBytes(family), Bytes.toBytes(column));
}
gets.add(get);
Table table = getTable(tableName);
try {
Result[] results = table.get(gets);
Map<String, String> map = new HashMap<>(columns.size());
for (Result result : results) {
for (String column : columns) {
String value = Bytes.toString(result.getValue(Bytes.toBytes(family), Bytes.toBytes(column)));
map.put(column, value);
}
}
return map;
} finally {
recycle(table);
}
}
/**
* {@link Map}
*
* @param tableName
* @param row
* @param family
* @return /
*/
public Map<String, String> getFamilyMap(TableName tableName, String row, String family) throws Exception {
List<Get> gets = new ArrayList<>();
Get get = new Get(Bytes.toBytes(row));
gets.add(get);
Table table = getTable(tableName);
try {
Result[] results = table.get(gets);
Map<String, Map<String, String>> familyColumnMap = getAllFamilyMap(results, row);
return familyColumnMap.get(family);
} finally {
recycle(table);
}
}
/**
* row
*
* @param tableName
* @param rows
* @param family
* @param columns
* @return
* @throws Exception
*/
public Map<String, Map<String, String>> getFamilyMapInRows(TableName tableName, List<String> rows, String family,
Collection<String> columns) throws Exception {
List<Get> gets = new ArrayList<>();
for (String row : rows) {
Get get = new Get(Bytes.toBytes(row));
for (String column : columns) {
get.addColumn(Bytes.toBytes(family), Bytes.toBytes(column));
}
gets.add(get);
}
Table table = getTable(tableName);
try {
Result[] results = table.get(gets);
Map<String, Map<String, String>> resultMap = new LinkedHashMap<>(gets.size());
for (Result result : results) {
Map<String, String> map = new HashMap<>(columns.size());
for (String column : columns) {
String value = Bytes.toString(result.getValue(Bytes.toBytes(family), Bytes.toBytes(column)));
map.put(column, value);
}
resultMap.put(Bytes.toString(result.getRow()), map);
}
return resultMap;
} finally {
recycle(table);
}
}
/**
* {@link Map}
*
* @param tableName
* @param row
* @param familyColumns <, >
* @return /
*/
public Map<String, Map<String, String>> getMultiFamilyMap(TableName tableName, String row,
Map<String, Collection<String>> familyColumns) throws Exception {
if (MapUtil.isEmpty(familyColumns)) {
return getMultiFamilyMap(tableName, row);
}
List<Get> gets = new ArrayList<>();
Get get = new Get(Bytes.toBytes(row));
for (Map.Entry<String, Collection<String>> entry : familyColumns.entrySet()) {
String family = entry.getKey();
Collection<String> columns = entry.getValue();
if (CollectionUtil.isNotEmpty(columns)) {
for (String column : columns) {
get.addColumn(Bytes.toBytes(family), Bytes.toBytes(column));
}
}
}
gets.add(get);
Table table = getTable(tableName);
try {
Result[] results = table.get(gets);
Map<String, Map<String, String>> map = new HashMap<>(familyColumns.size());
for (Result result : results) {
if (result == null || result.isEmpty()) {
continue;
}
familyColumns.forEach((family, columns) -> {
Map<String, String> kvMap = new HashMap<>();
if (CollectionUtil.isNotEmpty(columns)) {
for (String column : columns) {
String value =
Bytes.toString(result.getValue(Bytes.toBytes(family), Bytes.toBytes(column)));
kvMap.put(column, value);
}
}
map.put(family, kvMap);
});
}
return map;
} finally {
recycle(table);
}
}
/**
* {@link Map}
*
* @param tableName
* @param row
* @return /
*/
public Map<String, Map<String, String>> getMultiFamilyMap(TableName tableName, String row) throws Exception {
List<Get> gets = new ArrayList<>();
Get get = new Get(Bytes.toBytes(row));
gets.add(get);
Table table = getTable(tableName);
try {
Result[] results = table.get(gets);
return getAllFamilyMap(results, row);
} finally {
recycle(table);
}
}
private Map<String, Map<String, String>> getAllFamilyMap(Result[] results, String row) {
Map<String, Map<String, Map<String, String>>> rowFamilyColumnMap = getAllFamilyMapInRows(results);
if (MapUtil.isEmpty(rowFamilyColumnMap)) {
return new HashMap<>(0);
}
return rowFamilyColumnMap.get(row);
}
public HBaseRowData getRowData(TableName tableName, String row, Map<String, Collection<String>> familyColumns)
throws Exception {
Map<String, Map<String, String>> familyColumnMap = getMultiFamilyMap(tableName, row, familyColumns);
return HBaseRowData.buildByMap(row, null, familyColumnMap);
}
/**
* {@link Filter}
*
* @param tableName
* @param startRow
* @param stopRow
* @param stopRow rowInclude [startRowInclude, endRowInclude],
* @param family
* @param columns
* @param operator filter
* @param filters {@link Filter}
* @return Map key Row Key Map key value
*/
public Map<String, Map<String, String>> scanFamilyMap(TableName tableName, String startRow, String stopRow,
boolean[] rowInclude, String family, Collection<String> columns, FilterList.Operator operator,
Filter... filters) throws Exception {
Scan scan = new Scan();
fillColumnsToScan(family, columns, scan);
boolean startRowInclude = true, endRowInclude = false;
if (null != rowInclude || rowInclude.length == 2) {
startRowInclude = rowInclude[0];
endRowInclude = rowInclude[1];
}
scan.withStartRow(Bytes.toBytes(startRow), startRowInclude);
scan.withStopRow(Bytes.toBytes(stopRow), endRowInclude);
if (ArrayUtil.isNotEmpty(filters)) {
FilterList filterList = new FilterList(filters);
scan.setFilter(filterList);
}
return scanFamilyMap(tableName, scan, family, columns);
}
/**
*
*
* @param tableName
* @param family
* @param columns
* @return Map key Row Key Map key value
*/
public Map<String, Map<String, String>> scanFamilyMap(TableName tableName, String family,
Collection<String> columns)
throws Exception {
HBaseFamilyRequest request = new HBaseFamilyRequest();
request.setFamily(family)
.setColumns(columns)
.setTableName(tableName.getNameAsString())
.setReversed(true);
return scanFamilyMap(request);
}
/**
*
*
* @param startRow
* @param stopRow
* @param tableName
* @param family
* @param columns
* @return Map key Row Key Map key value
*/
public Map<String, Map<String, String>> scanFamilyMap(TableName tableName, String startRow, String stopRow,
String family, Collection<String> columns) throws Exception {
HBaseFamilyRequest request = new HBaseFamilyRequest();
request.setFamily(family)
.setColumns(columns)
.setTableName(tableName.getNameAsString())
.setStartRow(startRow)
.setStopRow(stopRow)
.setReversed(true);
return scanFamilyMap(request);
}
/**
* {@link Filter}
*
* @param tableName
* @param family
* @param columns
* @param filter {@link Filter}
* @return Map key Row Key Map key value
*/
public Map<String, Map<String, String>> scanFamilyMap(TableName tableName, String family,
Collection<String> columns,
Filter filter) throws Exception {
HBaseFamilyRequest request = new HBaseFamilyRequest();
request.setFamily(family)
.setColumns(columns)
.setTableName(tableName.getNameAsString())
.setReversed(true)
.addFilter(filter);
return scanFamilyMap(request);
}
/**
* {@link Filter}
*
* @param startRow
* @param stopRow
* @param tableName
* @param family
* @param columns
* @param filter {@link Filter}
* @return Map key Row Key Map key value
*/
public Map<String, Map<String, String>> scanFamilyMap(TableName tableName, String startRow, String stopRow,
String family, Collection<String> columns, Filter filter) throws Exception {
HBaseFamilyRequest request = new HBaseFamilyRequest();
request.setFamily(family)
.setColumns(columns)
.setTableName(tableName.getNameAsString())
.setStartRow(startRow)
.setStopRow(stopRow)
.setReversed(true)
.addFilter(filter);
return scanFamilyMap(request);
}
/**
* {@link Filter}
* <p>
*
*
* @param tableName
* @param family
* @param columns
* @param minStamp
* @param maxStamp
* @param filter {@link Filter}
* @return Map key Row Key Map key value
*/
public Map<String, Map<String, String>> scanFamilyMap(TableName tableName, String family,
Collection<String> columns,
long minStamp, long maxStamp, Filter filter) throws Exception {
HBaseFamilyRequest request = new HBaseFamilyRequest();
request.setFamily(family)
.setColumns(columns)
.setTableName(tableName.getNameAsString())
.setMinTimeStamp(minStamp)
.setMaxTimeStamp(maxStamp)
.setReversed(true)
.addFilter(filter);
return scanFamilyMap(request);
}
/**
* {@link Filter}
* <p>
*
*
* @param request {@link HBaseFamilyRequest}
* @return Map key Row Key Map key value
*/
public Map<String, Map<String, String>> scanFamilyMap(HBaseFamilyRequest request) throws Exception {
return scanFamilyMap(TableName.valueOf(request.getTableName()),
request.toScan(), request.getFamily(), request.getColumns());
}
/**
* {@link Scan}
*
* @param tableName
* @param scan {@link Scan}
* @param family
* @param columns
* @return Map key Row Key Map key value
*/
public Map<String, Map<String, String>> scanFamilyMap(TableName tableName, Scan scan,
String family, Collection<String> columns) throws Exception {
Table table = getTable(tableName);
ResultScanner scanner = null;
try {
scanner = table.getScanner(scan);
Map<String, Map<String, String>> map = new LinkedHashMap<>();
for (Result result : scanner) {
Map<String, String> columnMap = new HashMap<>(columns.size());
for (String column : columns) {
String value = Bytes.toString(result.getValue(Bytes.toBytes(family), Bytes.toBytes(column)));
columnMap.put(column, value);
}
map.put(Bytes.toString(result.getRow()), columnMap);
}
return map;
} finally {
IoUtil.close(scanner);
recycle(table);
}
}
private static void fillColumnsToScan(String family, Collection<String> columns, Scan scan) {
if (StrUtil.isNotBlank(family) && CollectionUtil.isNotEmpty(columns)) {
for (String column : columns) {
scan.addColumn(Bytes.toBytes(family), Bytes.toBytes(column));
}
}
}
/**
* {@link Scan}
*
* @param tableName
* @param scan {@link Scan}
* @param familyColumns ,
* @return Map key Row Key Map key Map key value
*/
public Map<String, Map<String, Map<String, String>>> scanMultiFamilyMap(TableName tableName, Scan scan,
Map<String, Collection<String>> familyColumns) throws Exception {
if (MapUtil.isEmpty(familyColumns)) {
return scanMultiFamilyMap(tableName, scan);
}
Table table = getTable(tableName);
ResultScanner scanner = null;
try {
scanner = table.getScanner(scan);
Map<String, Map<String, Map<String, String>>> familyKvDataMap = new LinkedHashMap<>();
for (Result result : scanner) {
Map<String, Map<String, String>> familyMap = new HashMap<>();
familyColumns.forEach((family, columns) -> {
Map<String, String> columnMap = new HashMap<>();
if (CollectionUtil.isNotEmpty(columns)) {
for (String column : columns) {
String value = Bytes.toString(result.getValue(Bytes.toBytes(family),
Bytes.toBytes(column)));
columnMap.put(column, value);
}
}
familyMap.put(family, columnMap);
});
familyKvDataMap.put(Bytes.toString(result.getRow()), familyMap);
}
return familyKvDataMap;
} finally {
IoUtil.close(scanner);
recycle(table);
}
}
/**
* {@link Scan}
*
* @param tableName
* @param scan {@link Scan}
* @return Map key Row Key Map key Map key value
*/
public Map<String, Map<String, Map<String, String>>> scanMultiFamilyMap(TableName tableName, Scan scan)
throws Exception {
Table table = getTable(tableName);
ResultScanner scanner = null;
try {
scanner = table.getScanner(scan);
Result[] results = ArrayUtil.toArray(scanner, Result.class);
return getAllFamilyMapInRows(results);
} finally {
IoUtil.close(scanner);
recycle(table);
}
}
public Map<String, Map<String, Map<String, String>>> scanMultiFamilyMap(HBaseMultiFamilyRequest request)
throws Exception {
return scanMultiFamilyMap(TableName.valueOf(request.getTableName()), request.toScan(),
request.getFamilyColumns());
}
private Map<String, Map<String, Map<String, String>>> getAllFamilyMapInRows(Result[] results) {
Map<String, Map<String, Map<String, String>>> rowFamilyColumnMap = new HashMap<>();
for (Result result : results) {
if (result == null || result.isEmpty()) {
continue;
}
Map<String, Map<String, String>> familyColumnMap = new HashMap<>();
for (Cell cell : result.listCells()) {
String family = Bytes.toString(CellUtil.cloneFamily(cell));
if (!familyColumnMap.containsKey(family)) {
familyColumnMap.put(family, new HashMap<>());
}
String column = Bytes.toString(CellUtil.cloneQualifier(cell));
String value = Bytes.toString(CellUtil.cloneValue(cell));
familyColumnMap.get(family).put(column, value);
}
rowFamilyColumnMap.put(Bytes.toString(result.getRow()), familyColumnMap);
}
return rowFamilyColumnMap;
}
/**
* scan
*
* @param request
* @return /
*/
public List<HBaseRowData> listRowData(HBaseFamilyRequest request) throws Exception {
Map<String, Map<String, String>> rowColumnMap = scanFamilyMap(request);
return HBaseRowData.toRowList(request.getFamily(), rowColumnMap);
}
/**
* scan
*
* @param request
* @return /
*/
public List<HBaseRowData> listRowData(HBaseMultiFamilyRequest request) throws Exception {
Map<String, Map<String, Map<String, String>>> map = scanMultiFamilyMap(request);
return HBaseRowData.toRowList(map);
}
public PageData<HBaseRowData> pageRowData(HBaseMultiFamilyRequest request) throws Exception {
return pageRowData(TableName.valueOf(request.getTableName()), request.getFamilyColumns(),
request.getPageNo(), request.getPageSize(), request.toScan());
}
public PageData<HBaseRowData> pageRowData(TableName tableName,
Map<String, Collection<String>> familyColumns, Integer pageNo, Integer pageSize, Scan scan) throws Exception {
Table table = getTable(tableName);
Map<String, Map<String, Map<String, String>>> rowMap = new HashMap<>();
int page = 1;
byte[] lastRow = null;
long total = 0L;
while (true) {
if (lastRow != null) {
scan.withStartRow(lastRow, false);
}
ResultScanner rs = table.getScanner(scan);
Iterator<Result> it = rs.iterator();
int count = 0;
while (it.hasNext()) {
Result result = it.next();
if (pageNo == page) {
fillRowMap(result, familyColumns, rowMap);
}
lastRow = result.getRow();
count++;
}
page++;
rs.close();
total += count;
if (count == 0) {
break;
}
}
recycle(table);
List<HBaseRowData> content = HBaseRowData.toRowList(rowMap);
return new PageData<>(pageNo, pageSize, total, content);
}
private void fillRowMap(Result result, Map<String, Collection<String>> familyColumns,
Map<String, Map<String, Map<String, String>>> rowMap) {
String row = Bytes.toString(result.getRow());
if (row == null) {
return;
}
Map<String, Map<String, String>> familyMap;
if (MapUtil.isEmpty(familyColumns)) {
familyMap = new HashMap<>();
for (Cell cell : result.listCells()) {
String family = Bytes.toString(CellUtil.cloneFamily(cell));
if (!familyMap.containsKey(family)) {
familyMap.put(family, new HashMap<>());
}
String column = Bytes.toString(CellUtil.cloneQualifier(cell));
String value = Bytes.toString(CellUtil.cloneValue(cell));
familyMap.get(family).put(column, value);
}
} else {
familyMap = new HashMap<>(familyColumns.size());
familyColumns.forEach((family, columns) -> {
if (CollectionUtil.isNotEmpty(columns)) {
Map<String, String> columnMap = new HashMap<>(columns.size());
for (String column : columns) {
String value = Bytes.toString(result.getValue(Bytes.toBytes(family), Bytes.toBytes(column)));
columnMap.put(column, value);
}
familyMap.put(family, columnMap);
}
});
}
rowMap.put(row, familyMap);
}
public void dumpResult(Result result) {
for (Cell cell : result.rawCells()) {
String msg = StrUtil.format("Cell: {}, Value: {}", cell,
Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
System.out.println(msg);
}
}
private void recycle(Table table) {
if (null == table) {
return;
}
IoUtil.close(table);
}
}

View File

@ -24,33 +24,37 @@ import java.util.List;
* @author <a href="mailto:forbreak@163.com">Zhang Peng</a>
* @date 2023-03-27
*/
public class HBaseAdminHelper implements Closeable {
public class HbaseAdmin implements Closeable {
private final Connection connection;
private final Configuration configuration;
protected HBaseAdminHelper(Configuration configuration) throws IOException {
protected HbaseAdmin(Configuration configuration) throws IOException {
this.configuration = configuration;
// 无需鉴权连接
this.connection = ConnectionFactory.createConnection(configuration);
// 鉴权连接
// this.connection = ConnectionFactory.createConnection(configuration, null,
// new User.SecureHadoopUser(UserGroupInformation.createRemoteUser("test")));
}
protected HBaseAdminHelper(Connection connection) {
protected HbaseAdmin(Connection connection) {
this.configuration = connection.getConfiguration();
this.connection = connection;
}
public synchronized static HBaseAdminHelper newInstance(Configuration configuration) throws IOException {
public synchronized static HbaseAdmin newInstance(Configuration configuration) throws IOException {
if (configuration == null) {
throw new IllegalArgumentException("configuration can not be null!");
}
return new HBaseAdminHelper(configuration);
return new HbaseAdmin(configuration);
}
public synchronized static HBaseAdminHelper newInstance(Connection connection) throws IOException {
public synchronized static HbaseAdmin newInstance(Connection connection) throws IOException {
if (connection == null) {
throw new IllegalArgumentException("connection can not be null!");
}
return new HBaseAdminHelper(connection);
return new HbaseAdmin(connection);
}
/**
@ -91,10 +95,14 @@ public class HBaseAdminHelper implements Closeable {
* @param namespace
*/
public void createNamespace(String namespace) throws IOException {
NamespaceDescriptor nd = NamespaceDescriptor.create(namespace).build();
Admin admin = getAdmin();
admin.createNamespace(nd);
admin.close();
Admin admin = null;
try {
admin = getAdmin();
NamespaceDescriptor nd = NamespaceDescriptor.create(namespace).build();
admin.createNamespace(nd);
} finally {
recycle(admin);
}
}
/**
@ -113,16 +121,33 @@ public class HBaseAdminHelper implements Closeable {
* @param force
*/
public void dropNamespace(String namespace, boolean force) throws IOException {
Admin admin = getAdmin();
if (force) {
TableName[] tableNames = getAdmin().listTableNamesByNamespace(namespace);
for (TableName name : tableNames) {
admin.disableTable(name);
admin.deleteTable(name);
Admin admin = null;
try {
admin = getAdmin();
if (force) {
TableName[] tableNames = admin.listTableNamesByNamespace(namespace);
for (TableName name : tableNames) {
admin.disableTable(name);
admin.deleteTable(name);
}
}
admin.deleteNamespace(namespace);
} finally {
recycle(admin);
}
}
/**
*
*/
public String[] listNamespaces() throws IOException {
Admin admin = null;
try {
admin = getAdmin();
return admin.listNamespaces();
} finally {
recycle(admin);
}
admin.deleteNamespace(namespace);
admin.close();
}
/**
@ -213,7 +238,33 @@ public class HBaseAdminHelper implements Closeable {
}
/**
* {@link Table}
*
*/
public TableName[] listTableNames() throws IOException {
Admin admin = null;
try {
admin = getAdmin();
return admin.listTableNames();
} finally {
recycle(admin);
}
}
/**
*
*/
public TableName[] listTableNamesByNamespace(String namespace) throws IOException {
Admin admin = null;
try {
admin = getAdmin();
return admin.listTableNamesByNamespace(namespace);
} finally {
recycle(admin);
}
}
/**
* {@link org.apache.hadoop.hbase.client.Table}
*
* @param tableName
* @return /
@ -223,7 +274,7 @@ public class HBaseAdminHelper implements Closeable {
}
/**
* {@link Admin}
* {@link org.apache.hadoop.hbase.client.Admin}
*
* @return /
*/
@ -231,4 +282,11 @@ public class HBaseAdminHelper implements Closeable {
return getConnection().getAdmin();
}
private void recycle(Admin admin) {
if (null == admin) {
return;
}
IoUtil.close(admin);
}
}

View File

@ -0,0 +1,35 @@
package io.github.dunwu.javadb.hbase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import java.io.IOException;
/**
* HBase
*
* @author <a href="mailto:forbreak@163.com">Zhang Peng</a>
* @date 2023-07-05
*/
public class HbaseFactory {
public static HbaseTemplate newHbaseTemplate() throws IOException {
return HbaseTemplate.newInstance(newHbaseConfiguration());
}
public static HbaseAdmin newHbaseAdmin() throws IOException {
return HbaseAdmin.newInstance(newHbaseConfiguration());
}
public static Configuration newHbaseConfiguration() {
Configuration configuration = HBaseConfiguration.create();
configuration.set("hbase.zookeeper.quorum", "10.101.129.74,10.101.129.76,10.101.129.77");
configuration.set("hbase.zookeeper.property.clientPort", "2181");
configuration.set("hbase.rootdir", "/hbase");
configuration.set("hbase.meta.replicas.use", "true");
configuration.set("hbase.client.retries.number", "5");
configuration.set("hbase.rpc.timeout", "600000");
return configuration;
}
}

View File

@ -0,0 +1,104 @@
package io.github.dunwu.javadb.hbase;
import io.github.dunwu.javadb.hbase.entity.BaseHbaseEntity;
import org.apache.hadoop.hbase.client.Connection;
import java.util.Collection;
import java.util.List;
/**
* Hbase Mapper
*
* @author <a href="mailto:forbreak@163.com">Zhang Peng</a>
* @date 2023-11-15
*/
public interface HbaseMapper<T extends BaseHbaseEntity> {
/**
* Hbase
*/
Connection getClient();
/**
*
*/
String getNamespace();
/**
*
*/
String getTableName();
/**
*
*/
boolean existsTable();
/**
*
*/
String getFamily();
/**
*
*/
Class<T> getEntityClass();
/**
* ID
*
* @param id Hbase rowkey
* @return /
*/
T pojoById(String id);
/**
* ID
*
* @param ids Hbase rowkey
* @return /
*/
List<T> pojoListByIds(Collection<String> ids);
/**
* ID
*
* @param scrollId
* @param size
* @return /
*/
List<T> scroll(String scrollId, int size);
/**
*
*
* @param entity Hbase f
* @return /
*/
T save(T entity);
/**
*
*
* @param list Hbase f
* @return /
*/
boolean batchSave(Collection<T> list);
/**
* ID
*
* @param id Hbase rowkey
* @return /
*/
boolean deleteById(String id);
/**
* ID
*
* @param ids Hbase rowkey
* @return /
*/
boolean batchDeleteById(Collection<String> ids);
}

View File

@ -0,0 +1,919 @@
package io.github.dunwu.javadb.hbase;
import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.bean.copier.CopyOptions;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.io.IoUtil;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.ArrayUtil;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.ReflectUtil;
import cn.hutool.core.util.StrUtil;
import io.github.dunwu.javadb.hbase.entity.BaseHbaseEntity;
import io.github.dunwu.javadb.hbase.entity.ColumnDo;
import io.github.dunwu.javadb.hbase.entity.FamilyDo;
import io.github.dunwu.javadb.hbase.entity.PageData;
import io.github.dunwu.javadb.hbase.entity.RowDo;
import io.github.dunwu.javadb.hbase.entity.ScrollData;
import io.github.dunwu.javadb.hbase.entity.scan.MultiFamilyScan;
import io.github.dunwu.javadb.hbase.entity.scan.SingleFamilyScan;
import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.Closeable;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* HBase
*
* @author <a href="mailto:forbreak@163.com">Zhang Peng</a>
* @date 2023-03-27
*/
@Slf4j
public class HbaseTemplate implements Closeable {
private final Connection connection;
private final Configuration configuration;
protected HbaseTemplate(Configuration configuration) throws IOException {
this.configuration = configuration;
// 无需鉴权连接
this.connection = ConnectionFactory.createConnection(configuration);
// 鉴权连接
// this.connection = ConnectionFactory.createConnection(configuration, null,
// new User.SecureHadoopUser(UserGroupInformation.createRemoteUser("test")));
}
protected HbaseTemplate(Connection connection) {
this.configuration = connection.getConfiguration();
this.connection = connection;
}
public static synchronized HbaseTemplate newInstance(Configuration configuration) throws IOException {
if (configuration == null) {
throw new IllegalArgumentException("configuration can not be null!");
}
return new HbaseTemplate(configuration);
}
public synchronized static HbaseTemplate newInstance(Connection connection) {
if (connection == null) {
throw new IllegalArgumentException("connection can not be null!");
}
return new HbaseTemplate(connection);
}
/**
* HBase Connection
*/
@Override
public synchronized void close() {
if (null == connection || connection.isClosed()) {
return;
}
IoUtil.close(connection);
}
/**
* HBase
*
* @return /
*/
public Connection getConnection() {
if (null == connection) {
throw new RuntimeException("HBase connection init failed...");
}
return connection;
}
/**
* HBase
*
* @return /
*/
public Configuration getConfiguration() {
return configuration;
}
/**
* {@link org.apache.hadoop.hbase.client.Table}
*
* @param tableName
* @return /
*/
public Table getTable(String tableName) throws IOException {
return getTable(TableName.valueOf(tableName));
}
/**
* {@link org.apache.hadoop.hbase.client.Table}
*
* @param tableName
* @return /
*/
public synchronized Table getTable(TableName tableName) throws IOException {
return connection.getTable(tableName);
}
// =====================================================================================
// put 操作封装
// =====================================================================================
public void put(String tableName, Put put) throws IOException {
if (StrUtil.isBlank(tableName) || put == null) {
return;
}
Table table = getTable(tableName);
try {
table.put(put);
} finally {
recycle(table);
}
}
public void put(String tableName, String row, String family, String column, String value)
throws IOException {
Put put = newPut(row, null, family, column, value);
put(tableName, put);
}
public void put(String tableName, String row, Long timestamp, String family, String column, String value)
throws IOException {
Put put = newPut(row, timestamp, family, column, value);
put(tableName, put);
}
public void put(String tableName, String row, String family, Object obj) throws IOException {
put(tableName, row, null, family, obj);
}
public void put(String tableName, String row, Long timestamp, String family, Object obj) throws IOException {
Put put = newPut(row, timestamp, family, obj);
put(tableName, put);
}
public void put(String tableName, String row, String family, Map<String, Object> columnMap)
throws IOException {
Put put = newPut(row, null, family, columnMap);
put(tableName, put);
}
public void put(String tableName, String row, Long timestamp, String family, Map<String, Object> columnMap)
throws IOException {
Put put = newPut(row, timestamp, family, columnMap);
put(tableName, put);
}
public void put(String tableName, String row, Long timestamp, Map<String, Map<String, Object>> familyMap)
throws IOException {
Put put = newPut(row, timestamp, familyMap);
put(tableName, put);
}
public void batchPut(String tableName, Collection<Put> list) throws IOException, InterruptedException {
batch(tableName, list);
}
public <T extends BaseHbaseEntity> void batchPut(String tableName, String family, Collection<T> list)
throws IOException, InterruptedException {
if (StrUtil.isBlank(tableName) || StrUtil.isBlank(family) || CollectionUtil.isEmpty(list)) {
return;
}
List<Put> puts = newPutList(family, list);
batchPut(tableName, puts);
}
public static Put newPut(String row, Long timestamp, String family, String column, String value) {
if (StrUtil.isBlank(row) || StrUtil.isBlank(family) || StrUtil.isBlank(column) || StrUtil.isBlank(value)) {
return null;
}
Map<String, Object> columnMap = new HashMap<>(1);
columnMap.put(column, value);
return newPut(row, timestamp, family, columnMap);
}
public static Put newPut(String row, Long timestamp, String family, Map<String, Object> columnMap) {
if (StrUtil.isBlank(row) || StrUtil.isBlank(family) || MapUtil.isEmpty(columnMap)) {
return null;
}
Map<String, Map<String, Object>> familyMap = new HashMap<>(1);
familyMap.put(family, columnMap);
return newPut(row, timestamp, familyMap);
}
@SuppressWarnings("unchecked")
public static Put newPut(String row, Long timestamp, String family, Object obj) {
if (obj == null) {
return null;
}
Map<String, Object> columnMap;
if (obj instanceof Map) {
columnMap = (Map<String, Object>) obj;
} else {
columnMap = BeanUtil.beanToMap(obj);
}
return newPut(row, timestamp, family, columnMap);
}
public static Put newPut(String row, Long timestamp, Map<String, Map<String, Object>> familyMap) {
if (StrUtil.isBlank(row) || MapUtil.isEmpty(familyMap)) {
return null;
}
if (timestamp == null) {
timestamp = System.currentTimeMillis();
}
Put put = new Put(Bytes.toBytes(row));
for (Map.Entry<String, Map<String, Object>> e : familyMap.entrySet()) {
String family = e.getKey();
Map<String, Object> columnMap = e.getValue();
if (MapUtil.isNotEmpty(columnMap)) {
for (Map.Entry<String, Object> entry : columnMap.entrySet()) {
String column = entry.getKey();
Object value = entry.getValue();
if (ObjectUtil.isEmpty(value)) {
continue;
}
put.addColumn(Bytes.toBytes(family), Bytes.toBytes(column), timestamp,
Bytes.toBytes(String.valueOf(value)));
}
}
}
return put;
}
private static <T extends BaseHbaseEntity> List<Put> newPutList(String family, Collection<T> list) {
long timestamp = System.currentTimeMillis();
return list.stream()
.map(entity -> newPut(entity.getId(), timestamp, family, entity))
.collect(Collectors.toList());
}
// =====================================================================================
// delete 操作封装
// =====================================================================================
public void delete(String tableName, Delete delete) throws IOException {
if (StrUtil.isBlank(tableName) || delete == null) {
return;
}
Table table = getTable(tableName);
try {
table.delete(delete);
} finally {
recycle(table);
}
}
public void delete(String tableName, String row) throws IOException {
Delete delete = new Delete(Bytes.toBytes(row));
delete(tableName, delete);
}
public void batchDelete(String tableName, String... rows) throws IOException, InterruptedException {
if (ArrayUtil.isEmpty(rows)) {
return;
}
List<Delete> deletes = Stream.of(rows)
.map(row -> new Delete(Bytes.toBytes(row)))
.distinct().collect(Collectors.toList());
batchDelete(tableName, deletes);
}
public void batchDelete(String tableName, List<Delete> deletes) throws IOException, InterruptedException {
batch(tableName, deletes);
}
// =====================================================================================
// get 操作封装
// =====================================================================================
public Result get(String tableName, String row) throws IOException {
if (StrUtil.isBlank(tableName) || StrUtil.isBlank(row)) {
return null;
}
Get get = newGet(row);
return get(tableName, get);
}
public Result get(String tableName, Get get) throws IOException {
if (StrUtil.isBlank(tableName) || get == null) {
return null;
}
Table table = getTable(tableName);
try {
return table.get(get);
} finally {
recycle(table);
}
}
public Result[] batchGet(String tableName, String[] rows) throws IOException {
if (StrUtil.isBlank(tableName) || ArrayUtil.isEmpty(rows)) {
return null;
}
List<Get> gets = newGetList(rows);
return batchGet(tableName, gets);
}
public Result[] batchGet(String tableName, List<Get> gets) throws IOException {
if (StrUtil.isBlank(tableName) || CollectionUtil.isEmpty(gets)) {
return null;
}
Table table = getTable(tableName);
try {
return table.get(gets);
} finally {
recycle(table);
}
}
/**
* {@link T}
*
* @param tableName
* @param row
* @param family
* @param clazz
* @param <T>
* @return /
*/
public <T> T getEntity(String tableName, String row, String family, Class<T> clazz) throws IOException {
if (StrUtil.isBlank(tableName) || StrUtil.isBlank(row) || StrUtil.isBlank(family) || clazz == null) {
return null;
}
Map<String, Field> fieldMap = ReflectUtil.getFieldMap(clazz);
String[] columns = fieldMap.keySet().toArray(new String[0]);
Map<String, ColumnDo> columnMap = getColumnMap(tableName, row, family, columns);
return toEntity(columnMap, clazz);
}
/**
* {@link T}
*
* @param tableName
* @param rows
* @param family
* @param clazz
* @param <T>
* @return /
*/
public <T> List<T> getEntityList(String tableName, String[] rows, String family, Class<T> clazz)
throws IOException {
if (StrUtil.isBlank(tableName) || ArrayUtil.isEmpty(rows) || StrUtil.isBlank(family) || clazz == null) {
return null;
}
Map<String, Field> fieldMap = ReflectUtil.getFieldMap(clazz);
String[] columns = fieldMap.keySet().toArray(new String[0]);
List<Get> gets = newGetList(rows, family, columns);
Result[] results = batchGet(tableName, gets);
if (ArrayUtil.isEmpty(results)) {
return new ArrayList<>();
}
List<T> list = new ArrayList<>();
for (Result result : results) {
Map<String, ColumnDo> columnMap =
getColumnsFromResult(result, tableName, family, CollectionUtil.newArrayList(columns));
T entity = toEntity(columnMap, clazz);
list.add(entity);
}
return list;
}
private <T> T toEntity(Map<String, ColumnDo> columnMap, Class<T> clazz) {
if (MapUtil.isEmpty(columnMap)) {
return null;
}
return BeanUtil.mapToBean(ColumnDo.toKvMap(columnMap), clazz, true, CopyOptions.create().ignoreError());
}
/**
*
*
* @param tableName
* @param row
* @param family
* @param column
* @return /
*/
public ColumnDo getColumn(String tableName, String row, String family, String column) throws IOException {
if (StrUtil.isBlank(tableName) || StrUtil.isBlank(row) || StrUtil.isBlank(family) || StrUtil.isBlank(column)) {
return null;
}
Result result = get(tableName, row);
if (result == null) {
return null;
}
return getColumnFromResult(result, tableName, family, column);
}
/**
*
*
* @param tableName
* @param row
* @param family
* @param columns
* @return /
*/
public Map<String, ColumnDo> getColumnMap(String tableName, String row, String family, String... columns)
throws IOException {
if (StrUtil.isBlank(tableName) || StrUtil.isBlank(row) || StrUtil.isBlank(family)) {
return null;
}
Get get = newGet(row, family, columns);
Result result = get(tableName, get);
if (result == null) {
return null;
}
return getColumnsFromResult(result, tableName, family, Arrays.asList(columns));
}
/**
*
*
* @param tableName
* @param row
* @param family
* @return /
*/
public FamilyDo getFamily(String tableName, String row, String family) throws IOException {
Map<String, ColumnDo> columnMap = getColumnMap(tableName, row, family);
return new FamilyDo(tableName, row, family, columnMap);
}
/**
*
*
* @param tableName
* @param row
* @param familyColumnMap <, >
* @return /
*/
public Map<String, FamilyDo> getFamilyMap(String tableName, String row,
Map<String, Collection<String>> familyColumnMap) throws IOException {
if (StrUtil.isBlank(tableName) || StrUtil.isBlank(row)) {
return new HashMap<>(0);
}
if (MapUtil.isEmpty(familyColumnMap)) {
RowDo rowDo = getRow(tableName, row);
if (rowDo == null) {
return new HashMap<>(0);
}
return rowDo.getFamilyMap();
}
Get get = newGet(row);
for (Map.Entry<String, Collection<String>> entry : familyColumnMap.entrySet()) {
String family = entry.getKey();
Collection<String> columns = entry.getValue();
if (CollectionUtil.isNotEmpty(columns)) {
for (String column : columns) {
get.addColumn(Bytes.toBytes(family), Bytes.toBytes(column));
}
}
}
Result result = get(tableName, get);
if (result == null) {
return null;
}
return getFamiliesFromResult(result, tableName, familyColumnMap);
}
/**
*
*
* @param tableName
* @param row
* @return /
*/
public RowDo getRow(String tableName, String row) throws IOException {
if (StrUtil.isBlank(tableName) || StrUtil.isBlank(row)) {
return null;
}
Result result = get(tableName, row);
if (result == null) {
return null;
}
return getRowFromResult(result, tableName);
}
/**
*
*
* @param tableName
* @param rows
* @return /
*/
public Map<String, RowDo> getRowMap(String tableName, String... rows) throws IOException {
if (StrUtil.isBlank(tableName) || ArrayUtil.isEmpty(rows)) {
return null;
}
Result[] results = batchGet(tableName, rows);
if (ArrayUtil.isEmpty(results)) {
return new HashMap<>(0);
}
Map<String, RowDo> map = new HashMap<>(results.length);
for (Result result : results) {
String row = Bytes.toString(result.getRow());
RowDo rowDo = getRowFromResult(result, tableName);
map.put(row, rowDo);
}
return map;
}
private static Get newGet(String row) {
return new Get(Bytes.toBytes(row));
}
private static Get newGet(String row, String family, String... columns) {
Get get = newGet(row);
get.addFamily(Bytes.toBytes(family));
if (ArrayUtil.isNotEmpty(columns)) {
for (String column : columns) {
get.addColumn(Bytes.toBytes(family), Bytes.toBytes(column));
}
}
return get;
}
private static List<Get> newGetList(String[] rows) {
if (ArrayUtil.isEmpty(rows)) {
return new ArrayList<>();
}
return Stream.of(rows).map(HbaseTemplate::newGet).collect(Collectors.toList());
}
private static List<Get> newGetList(String[] rows, String family, String[] columns) {
if (ArrayUtil.isEmpty(rows)) {
return new ArrayList<>();
}
return Stream.of(rows).map(row -> newGet(row, family, columns)).collect(Collectors.toList());
}
// =====================================================================================
// scan 操作封装
// =====================================================================================
/**
* {@link org.apache.hadoop.hbase.client.Scan}
*
* @param tableName
* @param scan {@link org.apache.hadoop.hbase.client.Scan}
* @return /
*/
public Result[] scan(String tableName, Scan scan) throws IOException {
Table table = getTable(tableName);
ResultScanner scanner = null;
try {
scanner = table.getScanner(scan);
return ArrayUtil.toArray(scanner, Result.class);
} finally {
IoUtil.close(scanner);
recycle(table);
}
}
public PageData<RowDo> page(SingleFamilyScan scan) throws IOException {
if (scan == null) {
return null;
}
return getPageData(scan.getTableName(), scan.getPage(), scan.getSize(), scan.toScan(),
scan.getFamilyColumnMap());
}
public PageData<RowDo> page(MultiFamilyScan scan) throws IOException {
if (scan == null) {
return null;
}
return getPageData(scan.getTableName(), scan.getPage(), scan.getSize(), scan.toScan(),
scan.getFamilyColumnMap());
}
public ScrollData<RowDo> scroll(SingleFamilyScan scan) throws IOException {
if (scan == null) {
return null;
}
return getScrollData(scan.getTableName(), scan.getSize(), scan.toScan(), scan.getFamilyColumnMap());
}
public ScrollData<RowDo> scroll(MultiFamilyScan scan) throws IOException {
if (scan == null) {
return null;
}
return getScrollData(scan.getTableName(), scan.getSize(), scan.toScan(), scan.getFamilyColumnMap());
}
public <T> PageData<T> getEntityPage(SingleFamilyScan scan, Class<T> clazz) throws IOException {
Map<String, Field> fieldMap = ReflectUtil.getFieldMap(clazz);
Set<String> columns = fieldMap.keySet();
scan.setColumns(columns);
PageData<RowDo> data = page(scan);
if (data == null || CollectionUtil.isEmpty(data.getContent())) {
return new PageData<>(scan.getPage(), scan.getSize(), 0L, new ArrayList<>());
}
List<T> list = data.getContent().stream().map(rowDo -> {
Map<String, Map<String, String>> familyKvMap = rowDo.getFamilyKvMap();
Map<String, String> columnKvMap = familyKvMap.get(scan.getFamily());
return BeanUtil.mapToBean(columnKvMap, clazz, true, CopyOptions.create().ignoreError());
}).collect(Collectors.toList());
return new PageData<>(scan.getPage(), scan.getSize(), data.getTotal(), list);
}
public <T> ScrollData<T> getEntityScroll(SingleFamilyScan scan, Class<T> clazz) throws IOException {
Map<String, Field> fieldMap = ReflectUtil.getFieldMap(clazz);
Set<String> columns = fieldMap.keySet();
scan.setColumns(columns);
ScrollData<RowDo> data = scroll(scan);
if (data == null || CollectionUtil.isEmpty(data.getContent())) {
return new ScrollData<>(scan.getStartRow(), scan.getStopRow(), null, 0, new ArrayList<>());
}
List<T> list = data.getContent().stream().map(rowDo -> {
Map<String, Map<String, String>> familyKvMap = rowDo.getFamilyKvMap();
Map<String, String> columnKvMap = familyKvMap.get(scan.getFamily());
return BeanUtil.mapToBean(columnKvMap, clazz, true, CopyOptions.create().ignoreError());
}).collect(Collectors.toList());
return new ScrollData<>(data.getStartRow(), data.getStopRow(), data.getScrollRow(), 0, list);
}
public <T> ScrollData<T> getEntityScroll(String tableName, String family, String scrollRow, int size,
Class<T> clazz) throws IOException {
SingleFamilyScan scan = new SingleFamilyScan();
scan.setFamily(family)
.setScrollRow(scrollRow)
.setTableName(tableName)
.setSize(size)
.setReversed(false);
return getEntityScroll(scan, clazz);
}
private PageData<RowDo> getPageData(String tableName, Integer page, Integer size, Scan scan,
Map<String, Collection<String>> familyColumnMap) throws IOException {
Table table = getTable(tableName);
Map<String, RowDo> rowMap = new HashMap<>(size);
try {
int pageIndex = 1;
byte[] lastRow = null;
long total = 0L;
while (true) {
if (lastRow != null) {
scan.withStartRow(lastRow, false);
}
ResultScanner rs = table.getScanner(scan);
Iterator<Result> it = rs.iterator();
int count = 0;
while (it.hasNext()) {
Result result = it.next();
if (page == pageIndex) {
RowDo rowDo = getRowFromResult(result, tableName, familyColumnMap);
rowMap.put(rowDo.getRow(), rowDo);
}
lastRow = result.getRow();
count++;
}
pageIndex++;
rs.close();
total += count;
if (count == 0) {
break;
}
}
return new PageData<>(page, size, total, rowMap.values());
} finally {
recycle(table);
}
}
private ScrollData<RowDo> getScrollData(String tableName, int size, Scan scan,
Map<String, Collection<String>> familyColumnMap) throws IOException {
Table table = getTable(tableName);
ResultScanner scanner = null;
Map<String, RowDo> rowMap = new HashMap<>(size);
try {
scanner = table.getScanner(scan);
for (Result result : scanner) {
RowDo rowDo = getRowFromResult(result, tableName, familyColumnMap);
rowMap.put(rowDo.getRow(), rowDo);
}
String scrollRow = null;
if (MapUtil.isNotEmpty(rowMap)) {
List<String> rows = rowMap.values().stream()
.map(RowDo::getRow)
.collect(Collectors.toList());
if (scan.isReversed()) {
scrollRow = CollectionUtil.min(rows);
} else {
scrollRow = CollectionUtil.max(rows);
}
}
return new ScrollData<>(Bytes.toString(scan.getStartRow()), Bytes.toString(scan.getStopRow()),
scrollRow, size, rowMap.values());
} finally {
IoUtil.close(scanner);
recycle(table);
}
}
// =====================================================================================
// 其他操作封装
// =====================================================================================
public long incrementColumnValue(String tableName, String row, String family, String column, long amount)
throws IOException {
return incrementColumnValue(tableName, row, family, column, amount, Durability.SYNC_WAL);
}
public long incrementColumnValue(String tableName, String row, String family, String column, long amount,
Durability durability) throws IOException {
if (StrUtil.isBlank(tableName) || StrUtil.isBlank(row) || StrUtil.isBlank(family) || StrUtil.isBlank(column)) {
return -1L;
}
Table table = getTable(tableName);
try {
return table.incrementColumnValue(Bytes.toBytes(row), Bytes.toBytes(family), Bytes.toBytes(column), amount,
durability);
} finally {
recycle(table);
}
}
private void batch(String tableName, Collection<? extends Row> list)
throws IOException, InterruptedException {
if (StrUtil.isBlank(tableName) || CollectionUtil.isEmpty(list)) {
return;
}
Object[] results = new Object[list.size()];
Table table = getTable(tableName);
try {
table.batch(new ArrayList<>(list), results);
} finally {
recycle(table);
}
}
private void recycle(Table table) {
if (null == table) {
return;
}
IoUtil.close(table);
}
private static RowDo getRowFromResult(Result result, String tableName) {
if (result == null || result.isEmpty()) {
return null;
}
String row = Bytes.toString(result.getRow());
Map<String, Map<String, ColumnDo>> familyColumnMap = new HashMap<>(result.size());
for (Cell cell : result.listCells()) {
String family = Bytes.toString(CellUtil.cloneFamily(cell));
if (!familyColumnMap.containsKey(family)) {
familyColumnMap.put(family, new HashMap<>(0));
}
String column = Bytes.toString(CellUtil.cloneQualifier(cell));
String value = Bytes.toString(CellUtil.cloneValue(cell));
long timestamp = cell.getTimestamp();
ColumnDo columnDo = new ColumnDo(tableName, row, family, timestamp, column, value);
familyColumnMap.get(family).put(column, columnDo);
}
Map<String, FamilyDo> familyMap = new HashMap<>(familyColumnMap.size());
familyColumnMap.forEach((family, columnMap) -> {
FamilyDo familyDo = new FamilyDo(tableName, row, family, columnMap);
familyMap.put(family, familyDo);
});
return new RowDo(tableName, row, familyMap);
}
private static RowDo getRowFromResult(Result result, String tableName,
Map<String, Collection<String>> familyColumnMap) {
if (MapUtil.isEmpty(familyColumnMap)) {
return getRowFromResult(result, tableName);
}
String row = Bytes.toString(result.getRow());
Map<String, FamilyDo> familyMap = getFamiliesFromResult(result, tableName, familyColumnMap);
return new RowDo(tableName, row, familyMap);
}
private static FamilyDo getFamilyFromResult(Result result, String tableName, String family) {
if (result == null || result.isEmpty()) {
return null;
}
RowDo rowDo = getRowFromResult(result, tableName);
if (rowDo == null || MapUtil.isEmpty(rowDo.getFamilyMap())) {
return null;
}
return rowDo.getFamilyMap().get(family);
}
private static Map<String, FamilyDo> getFamiliesFromResult(Result result, String tableName,
Map<String, Collection<String>> familyColumnMap) {
if (result == null || StrUtil.isBlank(tableName) || MapUtil.isEmpty(familyColumnMap)) {
return new HashMap<>(0);
}
String row = Bytes.toString(result.getRow());
Map<String, FamilyDo> familyMap = new HashMap<>(familyColumnMap.size());
familyColumnMap.forEach((family, columns) -> {
Map<String, ColumnDo> columnMap;
if (CollectionUtil.isNotEmpty(columns)) {
columnMap = new HashMap<>(columns.size());
for (String column : columns) {
ColumnDo columnDo = getColumnFromResult(result, tableName, family, column);
columnMap.put(column, columnDo);
}
} else {
columnMap = new HashMap<>(0);
}
familyMap.put(family, new FamilyDo(tableName, row, family, columnMap));
});
return familyMap;
}
private static ColumnDo getColumnFromResult(Result result, String tableName, String family, String column) {
if (result == null || StrUtil.isBlank(tableName) || StrUtil.isBlank(family) || StrUtil.isBlank(column)) {
return null;
}
Cell cell = result.getColumnLatestCell(Bytes.toBytes(family), Bytes.toBytes(column));
if (cell == null) {
return null;
}
String row = Bytes.toString(result.getRow());
String value = Bytes.toString(CellUtil.cloneValue(cell));
long timestamp = cell.getTimestamp();
return new ColumnDo(tableName, row, family, timestamp, column, value);
}
private static Map<String, ColumnDo> getColumnsFromResult(Result result, String tableName, String family,
Collection<String> columns) {
if (CollectionUtil.isEmpty(columns)) {
RowDo rowDo = getRowFromResult(result, tableName);
return rowDo.getFamilyMap().get(family).getColumnMap();
}
Map<String, ColumnDo> columnMap = new HashMap<>(columns.size());
for (String column : columns) {
ColumnDo columnDo = getColumnFromResult(result, tableName, family, column);
columnMap.put(column, columnDo);
}
return columnMap;
}
}

View File

@ -0,0 +1,26 @@
package io.github.dunwu.javadb.hbase.config;
import org.springframework.context.annotation.EnableAspectJAutoProxy;
import org.springframework.context.annotation.Import;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* HBase
*
* @author <a href="mailto:forbreak@163.com">Zhang Peng</a>
* @date 2023-06-30
*/
@Target({ ElementType.TYPE })
@Retention(RetentionPolicy.RUNTIME)
@EnableAspectJAutoProxy(
proxyTargetClass = false
)
@Import({ HbaseConfiguration.class })
@Documented
public @interface EnableHbase {
}

View File

@ -0,0 +1,29 @@
package io.github.dunwu.javadb.hbase.config;
import io.github.dunwu.javadb.hbase.HbaseAdmin;
import io.github.dunwu.javadb.hbase.HbaseFactory;
import io.github.dunwu.javadb.hbase.HbaseTemplate;
import org.springframework.context.annotation.Bean;
import java.io.IOException;
/**
* HBase
*
* @author <a href="mailto:forbreak@163.com">Zhang Peng</a>
* @date 2023-07-04
*/
@org.springframework.context.annotation.Configuration
public class HbaseConfiguration {
@Bean("hbaseTemplate")
public HbaseTemplate hbaseTemplate() throws IOException {
return HbaseFactory.newHbaseTemplate();
}
@Bean("hbaseAdmin")
public HbaseAdmin hbaseAdmin() throws IOException {
return HbaseFactory.newHbaseAdmin();
}
}

View File

@ -0,0 +1,25 @@
package io.github.dunwu.javadb.hbase.entity;
import java.io.Serializable;
/**
* HBase
*
* @author <a href="mailto:forbreak@163.com">Zhang Peng</a>
* @date 2023-11-15
*/
public abstract class BaseHbaseEntity implements Serializable {
/**
*
*/
public abstract String getId();
/**
*
*/
public abstract String getIdKey();
private static final long serialVersionUID = 5075127328254616085L;
}

View File

@ -0,0 +1,82 @@
package io.github.dunwu.javadb.hbase.entity;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.StrUtil;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
/**
* HBase
*
* @author <a href="mailto:forbreak@163.com">Zhang Peng</a>
* @date 2023-05-19
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class ColumnDo {
/** 表名 */
private String tableName;
/** 行 */
private String row;
/** 列族 */
private String family;
/** 时间戳 */
private Long timestamp;
/** 列 */
private String column;
/** 列值 */
private String value;
public boolean check() {
return check(this);
}
public static boolean check(ColumnDo columnDo) {
return columnDo != null
&& StrUtil.isNotBlank(columnDo.getTableName())
&& StrUtil.isNotBlank(columnDo.getRow())
&& StrUtil.isNotBlank(columnDo.getFamily())
&& StrUtil.isNotEmpty(columnDo.getColumn());
}
public static Map<String, ColumnDo> toColumnMap(String tableName, String row, String family,
Map<String, String> columnValueMap) {
if (MapUtil.isEmpty(columnValueMap)) {
return new HashMap<>(0);
}
Map<String, ColumnDo> map = new HashMap<>(columnValueMap.size());
columnValueMap.forEach((column, value) -> {
ColumnDo columnDo = new ColumnDo(tableName, row, family, null, column, value);
if (columnDo.check()) {
map.put(column, columnDo);
}
});
return map;
}
public static Map<String, String> toKvMap(Map<String, ColumnDo> columnMap) {
if (MapUtil.isEmpty(columnMap)) {
return new HashMap<>(0);
}
Collection<ColumnDo> columns = columnMap.values().stream()
.filter(Objects::nonNull)
.collect(Collectors.toList());
Map<String, String> map = new HashMap<>(columns.size());
for (ColumnDo columnDo : columns) {
if (columnDo.check()) {
map.put(columnDo.getColumn(), columnDo.getValue());
}
}
return map;
}
}

View File

@ -0,0 +1,73 @@
package io.github.dunwu.javadb.hbase.entity;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.StrUtil;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.HashMap;
import java.util.Map;
/**
* HBase
*
* @author <a href="mailto:forbreak@163.com">Zhang Peng</a>
* @date 2023-05-19
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class FamilyDo {
/** 表名 */
private String tableName;
/** 行 */
private String row;
/** 列族 */
private String family;
/** 列 Mapkey 为 columnvalue 为列详细信息) */
private Map<String, ColumnDo> columnMap;
public boolean check() {
return check(this);
}
public Map<String, String> getColumnKvMap() {
return FamilyDo.getColumnKvMap(this);
}
public static Map<String, String> getColumnKvMap(FamilyDo familyDo) {
if (familyDo == null || MapUtil.isEmpty(familyDo.getColumnMap())) {
return new HashMap<>(0);
}
return ColumnDo.toKvMap(familyDo.getColumnMap());
}
public static boolean check(FamilyDo familyDo) {
return familyDo != null
&& StrUtil.isNotBlank(familyDo.getTableName())
&& StrUtil.isNotBlank(familyDo.getRow())
&& StrUtil.isNotBlank(familyDo.getFamily())
&& MapUtil.isNotEmpty(familyDo.getColumnMap());
}
public static Map<String, FamilyDo> toFamilyMap(String tableName, String row,
Map<String, Map<String, String>> familyColumnValueMap) {
if (MapUtil.isEmpty(familyColumnValueMap)) {
return new HashMap<>(0);
}
Map<String, FamilyDo> familyMap = new HashMap<>(familyColumnValueMap.size());
familyColumnValueMap.forEach((family, columnMap) -> {
familyMap.put(family, toFamily(tableName, row, family, columnMap));
});
return familyMap;
}
public static FamilyDo toFamily(String tableName, String row, String family, Map<String, String> columnValueMap) {
Map<String, ColumnDo> columnMap = ColumnDo.toColumnMap(tableName, row, family, columnValueMap);
return new FamilyDo(tableName, row, family, columnMap);
}
}

View File

@ -1,37 +0,0 @@
package io.github.dunwu.javadb.hbase.entity;
import cn.hutool.core.map.MapUtil;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.HashMap;
import java.util.Map;
/**
* HBase
*
* @author <a href="mailto:forbreak@163.com">Zhang Peng</a>
* @date 2023-05-19
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class HBaseFamilyData {
private String family;
private Map<String, String> columnMap;
public static Map<String, HBaseFamilyData> toFamilyMap(Map<String, Map<String, String>> map) {
if (MapUtil.isEmpty(map)) {
return new HashMap<>(0);
}
Map<String, HBaseFamilyData> familyMap = new HashMap<>(map.size());
map.forEach((family, columnMap) -> {
familyMap.put(family, new HBaseFamilyData(family, columnMap));
});
return familyMap;
}
}

View File

@ -1,98 +0,0 @@
package io.github.dunwu.javadb.hbase.entity;
import cn.hutool.core.map.MapUtil;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* HBase
*
* @author <a href="mailto:forbreak@163.com">Zhang Peng</a>
* @date 2023-05-19
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class HBaseRowData {
private String row;
private Long timestamp;
private Map<String, HBaseFamilyData> familyMap = new HashMap<>();
public Map<String, Map<String, String>> toMap() {
return toMap(this);
}
public static HBaseRowData build(String row, Long timestamp, Map<String, HBaseFamilyData> familyMap) {
return new HBaseRowData(row, timestamp, familyMap);
}
public static HBaseRowData buildByMap(String row, Long timestamp, Map<String, Map<String, String>> map) {
return new HBaseRowData(row, timestamp, HBaseFamilyData.toFamilyMap(map));
}
public static Map<String, Map<String, String>> toMap(HBaseRowData data) {
if (data == null || MapUtil.isEmpty(data.getFamilyMap())) {
return new HashMap<>(0);
}
Map<String, Map<String, String>> map = new HashMap<>(data.getFamilyMap().size());
data.getFamilyMap().forEach((family, familyData) -> {
map.put(family, familyData.getColumnMap());
});
return map;
}
public static Map<String, HBaseRowData> toRowMap(Map<String, Map<String, Map<String, String>>> rowMultiFamilyMap) {
if (MapUtil.isEmpty(rowMultiFamilyMap)) {
return new HashMap<>(0);
}
Map<String, HBaseRowData> rowDataMap = new HashMap<>(rowMultiFamilyMap.size());
rowMultiFamilyMap.forEach((row, familyDataMap) -> {
Map<String, HBaseFamilyData> familyMap = HBaseFamilyData.toFamilyMap(familyDataMap);
rowDataMap.put(row, new HBaseRowData(row, null, familyMap));
});
return rowDataMap;
}
public static List<HBaseRowData> toRowList(Map<String, Map<String, Map<String, String>>> rowMultiFamilyMap) {
Map<String, HBaseRowData> rowMap = toRowMap(rowMultiFamilyMap);
if (MapUtil.isEmpty(rowMap)) {
return new ArrayList<>();
}
return new ArrayList<>(rowMap.values());
}
public static Map<String, HBaseRowData> toRowMap(String family, Map<String, Map<String, String>> rowColumnMap) {
if (MapUtil.isEmpty(rowColumnMap)) {
return new HashMap<>(0);
}
Map<String, HBaseRowData> rowDataMap = new HashMap<>(rowColumnMap.size());
rowColumnMap.forEach((row, columnMap) -> {
HBaseFamilyData familyData = new HBaseFamilyData(family, columnMap);
Map<String, HBaseFamilyData> familyMap = new HashMap<>();
familyMap.put(family, familyData);
rowDataMap.put(row, new HBaseRowData(row, null, familyMap));
});
return rowDataMap;
}
public static List<HBaseRowData> toRowList(String family, Map<String, Map<String, String>> rowColumnMap) {
Map<String, HBaseRowData> rowMap = toRowMap(family, rowColumnMap);
if (MapUtil.isEmpty(rowMap)) {
return new ArrayList<>();
}
return new ArrayList<>(rowMap.values());
}
}

View File

@ -4,10 +4,16 @@ import lombok.Data;
import java.util.Collection;
/**
* HBase
*
* @author <a href="mailto:forbreak@163.com">Zhang Peng</a>
* @date 2023-05-19
*/
@Data
public class PageData<T> {
private Integer number;
private Integer page;
private Integer size;
private Long total;
private Integer totalPages;
@ -15,8 +21,8 @@ public class PageData<T> {
public PageData() { }
public PageData(Integer number, Integer size, Long total, Collection<T> content) {
this.number = number;
public PageData(Integer page, Integer size, Long total, Collection<T> content) {
this.page = page;
this.size = size;
this.total = total;
this.content = content;

View File

@ -0,0 +1,87 @@
package io.github.dunwu.javadb.hbase.entity;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.StrUtil;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* HBase
*
* @author <a href="mailto:forbreak@163.com">Zhang Peng</a>
* @date 2023-05-19
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class RowDo {
/** 表名 */
private String tableName;
/** 行 */
private String row;
/** 列族 Mapkey 为 familyvalue 为列族详细信息) */
private Map<String, FamilyDo> familyMap;
public boolean check() {
return check(this);
}
public Map<String, Map<String, String>> getFamilyKvMap() {
return RowDo.getFamilyKvMap(this);
}
public static boolean check(RowDo rowDo) {
return rowDo != null
&& StrUtil.isNotBlank(rowDo.getTableName())
&& StrUtil.isNotBlank(rowDo.getRow())
&& MapUtil.isNotEmpty(rowDo.getFamilyMap());
}
public static Map<String, Map<String, String>> getFamilyKvMap(RowDo rowDo) {
if (rowDo == null || MapUtil.isEmpty(rowDo.getFamilyMap())) {
return new HashMap<>(0);
}
Map<String, Map<String, String>> kvMap = new HashMap<>(rowDo.getFamilyMap().size());
rowDo.getFamilyMap().forEach((family, familyDo) -> {
kvMap.put(family, familyDo.getColumnKvMap());
});
return kvMap;
}
public static Map<String, RowDo> toRowMap(String tableName, Map<String, Map<String, Map<String, String>>> map) {
if (MapUtil.isEmpty(map)) {
return new HashMap<>(0);
}
Map<String, RowDo> rowMap = new HashMap<>(map.size());
map.forEach((row, familyMap) -> {
RowDo rowDo = new RowDo(tableName, row, FamilyDo.toFamilyMap(tableName, row, familyMap));
rowMap.put(row, rowDo);
});
return rowMap;
}
public static List<RowDo> toRowList(String tableName, Map<String, Map<String, Map<String, String>>> map) {
Map<String, RowDo> rowMap = toRowMap(tableName, map);
if (MapUtil.isEmpty(rowMap)) {
return new ArrayList<>();
}
return new ArrayList<>(rowMap.values());
}
public static RowDo toRow(String tableName, String row, Map<String, Map<String, String>> familyColumnMap) {
if (MapUtil.isEmpty(familyColumnMap)) {
return null;
}
Map<String, FamilyDo> familyMap = FamilyDo.toFamilyMap(tableName, row, familyColumnMap);
return new RowDo(tableName, row, familyMap);
}
}

View File

@ -0,0 +1,26 @@
package io.github.dunwu.javadb.hbase.entity;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.Collection;
/**
* Hbase
*
* @author <a href="mailto:forbreak@163.com">Zhang Peng</a>
* @date 2023-11-16
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class ScrollData<T> {
private String startRow;
private String stopRow;
private String scrollRow;
private Integer size;
private Collection<T> content;
}

View File

@ -1,4 +1,4 @@
package io.github.dunwu.javadb.hbase.entity;
package io.github.dunwu.javadb.hbase.entity.scan;
import cn.hutool.core.util.StrUtil;
import lombok.Data;
@ -14,50 +14,32 @@ import java.util.ArrayList;
import java.util.List;
/**
* HBase
* HBase scan
*
* @author <a href="mailto:forbreak@163.com">Zhang Peng</a>
* @date 2023-05-19
*/
@Data
@Accessors(chain = true)
public class BaseFamilyRequest {
public class BaseScan {
/**
*
*/
/** 表名 */
protected String tableName;
/**
* row
*/
/** 起始 row */
protected String startRow;
/**
* row
*/
/** 结束 row */
protected String stopRow;
/**
*
*/
/** 起始时间 */
protected Long minTimeStamp;
/**
*
*/
/** 结束时间 */
protected Long maxTimeStamp;
/**
*
*/
/** 是否降序true: 降序false正序 */
protected boolean reversed = false;
/**
*
*/
protected Integer pageNo;
/**
* pageNo!=null使
*/
protected Integer pageSize = 10;
/**
*
*/
/** 页号 */
protected Integer page;
/** 每页记录数大小 */
protected Integer size = 100;
/** 过滤器列表 */
protected List<Filter> filters = new ArrayList<>();
public void addFilter(Filter filter) {
@ -66,22 +48,31 @@ public class BaseFamilyRequest {
public Scan toScan() throws IOException {
Scan scan = new Scan();
// 缓存1000条数据
scan.setCaching(1000);
scan.setCacheBlocks(false);
scan.setReversed(reversed);
if (StrUtil.isNotBlank(startRow)) {
scan.withStartRow(Bytes.toBytes(startRow), true);
if (reversed) {
scan.withStopRow(Bytes.toBytes(startRow), true);
} else {
scan.withStartRow(Bytes.toBytes(startRow), true);
}
}
if (StrUtil.isNotBlank(stopRow)) {
scan.withStartRow(Bytes.toBytes(stopRow), false);
if (reversed) {
scan.withStartRow(Bytes.toBytes(stopRow), true);
} else {
scan.withStopRow(Bytes.toBytes(stopRow), true);
}
}
if (minTimeStamp != null && maxTimeStamp != null) {
scan.setTimeRange(minTimeStamp, maxTimeStamp);
}
if (pageNo != null) {
PageFilter pageFilter = new PageFilter(pageSize);
if (size != null) {
PageFilter pageFilter = new PageFilter(size);
filters.add(pageFilter);
// 缓存1000条数据
scan.setCaching(1000);
scan.setCacheBlocks(false);
}
FilterList filterList = new FilterList();
for (Filter filter : filters) {

View File

@ -1,9 +1,8 @@
package io.github.dunwu.javadb.hbase.entity;
package io.github.dunwu.javadb.hbase.entity.scan;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.map.MapUtil;
import lombok.Data;
import lombok.experimental.Accessors;
import cn.hutool.core.util.StrUtil;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
@ -13,37 +12,46 @@ import java.util.HashMap;
import java.util.Map;
/**
* HBase
* HBase scan
*
* @author <a href="mailto:forbreak@163.com">Zhang Peng</a>
* @date 2023-05-19
*/
@Data
@Accessors(chain = true)
public class HBaseMultiFamilyRequest extends BaseFamilyRequest {
public class MultiFamilyScan extends BaseScan {
/**
* ,
*/
private final Map<String, Collection<String>> familyColumns = new HashMap<>();
private Map<String, Collection<String>> familyColumnMap = new HashMap<>();
private String scrollRow;
public HBaseMultiFamilyRequest addFamilyColumn(String family, Collection<String> columns) {
this.familyColumns.put(family, columns);
public Map<String, Collection<String>> getFamilyColumnMap() {
return familyColumnMap;
}
public MultiFamilyScan setFamilyColumnMap(
Map<String, Collection<String>> familyColumnMap) {
this.familyColumnMap = familyColumnMap;
return this;
}
public HBaseMultiFamilyRequest addFamilyColumns(Map<String, Collection<String>> familyColumns) {
if (MapUtil.isNotEmpty(familyColumns)) {
this.familyColumns.putAll(familyColumns);
}
public String getScrollRow() {
return scrollRow;
}
public MultiFamilyScan setScrollRow(String scrollRow) {
this.scrollRow = scrollRow;
return this;
}
@Override
public Scan toScan() throws IOException {
Scan scan = super.toScan();
if (MapUtil.isNotEmpty(familyColumns)) {
for (Map.Entry<String, Collection<String>> entry : familyColumns.entrySet()) {
if (StrUtil.isNotBlank(scrollRow)) {
scan.withStartRow(Bytes.toBytes(scrollRow), false);
}
if (MapUtil.isNotEmpty(familyColumnMap)) {
for (Map.Entry<String, Collection<String>> entry : familyColumnMap.entrySet()) {
String family = entry.getKey();
Collection<String> columns = entry.getValue();
if (CollectionUtil.isNotEmpty(columns)) {

View File

@ -1,6 +1,7 @@
package io.github.dunwu.javadb.hbase.entity;
package io.github.dunwu.javadb.hbase.entity.scan;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.util.StrUtil;
import lombok.Data;
import lombok.experimental.Accessors;
import org.apache.hadoop.hbase.client.Scan;
@ -9,23 +10,29 @@ import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
/**
* HBase
* HBase scan
*
* @author <a href="mailto:forbreak@163.com">Zhang Peng</a>
* @date 2023-05-19
*/
@Data
@Accessors(chain = true)
public class HBaseFamilyRequest extends BaseFamilyRequest {
public class SingleFamilyScan extends BaseScan {
private String family;
private Collection<String> columns = new ArrayList<>();
private String scrollRow;
@Override
public Scan toScan() throws IOException {
Scan scan = super.toScan();
if (StrUtil.isNotBlank(scrollRow)) {
scan.withStartRow(Bytes.toBytes(scrollRow), false);
}
if (CollectionUtil.isNotEmpty(this.getColumns())) {
for (String column : columns) {
scan.addColumn(Bytes.toBytes(family), Bytes.toBytes(column));
@ -34,4 +41,13 @@ public class HBaseFamilyRequest extends BaseFamilyRequest {
return scan;
}
public Map<String, Collection<String>> getFamilyColumnMap() {
if (StrUtil.isNotBlank(family) && CollectionUtil.isNotEmpty(columns)) {
Map<String, Collection<String>> familyColumnMap = new HashMap<>(1);
familyColumnMap.put(family, columns);
return familyColumnMap;
}
return new HashMap<>(0);
}
}

View File

@ -0,0 +1,75 @@
package io.github.dunwu.javadb.hbase;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONUtil;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.Arrays;
import java.util.List;
/**
* @author <a href="mailto:forbreak@163.com">Zhang Peng</a>
* @date 2023-11-15
*/
public class HbaseMapperTest {
private static ProductMapper mapper;
static {
HbaseTemplate hbaseTemplate = null;
try {
hbaseTemplate = HbaseFactory.newHbaseTemplate();
HbaseAdmin hbaseAdmin = HbaseFactory.newHbaseAdmin();
mapper = new ProductMapper(hbaseTemplate, hbaseAdmin);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Test
@DisplayName("batchSave")
public void batchSave() {
Product product1 = new Product("test-key-8", "product8", new BigDecimal(4000.0));
Product product2 = new Product("test-key-9", "product9", new BigDecimal(5000.0));
List<Product> products = CollectionUtil.newArrayList(product1, product2);
mapper.batchSave(products);
}
@Test
@DisplayName("batchGet")
public void batchGet() {
List<Product> list = mapper.pojoListByIds(Arrays.asList("test-key-8", "test-key-9"));
System.out.println(JSONUtil.toJsonStr(list));
}
@Test
@DisplayName("scroll")
public void scroll() {
int size = 1;
String lastId = null;
List<Product> list = mapper.scroll(null, size);
if (CollectionUtil.isNotEmpty(list)) {
Product last = CollectionUtil.getLast(list);
System.out.println("entity: " + JSONUtil.toJsonPrettyStr(last));
lastId = last.getId();
}
while (true) {
List<Product> products = mapper.scroll(lastId, size);
if (CollectionUtil.isEmpty(list)) {
break;
}
Product last = CollectionUtil.getLast(products);
System.out.println("entity: " + JSONUtil.toJsonPrettyStr(last));
lastId = last.getId();
if (StrUtil.isBlank(lastId)) {
break;
}
}
}
}

View File

@ -0,0 +1,47 @@
package io.github.dunwu.javadb.hbase;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
* Hbase
*
* @author <a href="mailto:forbreak@163.com">Zhang Peng</a>
* @date 2023-11-13
*/
public class HbaseTemplateDeleteTest {
public static final String TABLE_NAME = "test:test";
private static final HbaseTemplate HBASE_TEMPLATE;
static {
try {
HBASE_TEMPLATE = HbaseFactory.newHbaseTemplate();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Test
@DisplayName("删除单条记录")
public void testDelete() throws IOException {
String rowkey = "test-key-1";
HBASE_TEMPLATE.delete(TABLE_NAME, rowkey);
}
@Test
@DisplayName("批量删除记录")
public void testBatchDelete() throws IOException, InterruptedException {
List<String> rowkeys = new ArrayList<>();
for (int i = 1; i <= 13; i++) {
rowkeys.add("test-key-" + i);
}
HBASE_TEMPLATE.batchDelete(TABLE_NAME, rowkeys.toArray(new String[0]));
}
}

View File

@ -0,0 +1,124 @@
package io.github.dunwu.javadb.hbase;
import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONUtil;
import io.github.dunwu.javadb.hbase.entity.ColumnDo;
import io.github.dunwu.javadb.hbase.entity.FamilyDo;
import io.github.dunwu.javadb.hbase.entity.RowDo;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Get
* <p>
* {@link HbaseTemplatePutTest}
*
* @author <a href="mailto:forbreak@163.com">Zhang Peng</a>
* @date 2023-11-13
*/
public class HbaseTemplateGetTest {
public static final String TABLE_NAME = "test:test";
private static final HbaseTemplate HBASE_TEMPLATE;
static {
try {
HBASE_TEMPLATE = HbaseFactory.newHbaseTemplate();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Test
@DisplayName("查询实体")
public void getEntity() throws IOException {
User user = HBASE_TEMPLATE.getEntity(TABLE_NAME, "test-key-3", "f1", User.class);
System.out.println(StrUtil.format("查询实体: {}", JSONUtil.toJsonStr(user)));
Assertions.assertThat(user).isNotNull();
}
@Test
@DisplayName("查询实体列表")
public void getEntityList() throws IOException {
List<String> rows = Arrays.asList("test-key-3", "test-key-4", "test-key-5");
List<User> list = HBASE_TEMPLATE.getEntityList(TABLE_NAME, rows.toArray(new String[0]), "f1", User.class);
System.out.println(StrUtil.format("查询实体列表: {}", JSONUtil.toJsonStr(list)));
Assertions.assertThat(list).isNotEmpty();
Assertions.assertThat(list.size()).isEqualTo(rows.size());
}
@Test
@DisplayName("查询列")
public void getColumn() throws IOException {
ColumnDo columnDo = HBASE_TEMPLATE.getColumn(TABLE_NAME, "test-key-1", "f1", "key");
System.out.println(StrUtil.format("查询单列: {}", JSONUtil.toJsonStr(columnDo)));
}
@Test
@DisplayName("查询多列")
public void getColumnMap() throws IOException {
Map<String, ColumnDo> columnMap = HBASE_TEMPLATE.getColumnMap(TABLE_NAME, "test-key-3", "f1");
System.out.println(StrUtil.format("查询多列: {}", JSONUtil.toJsonStr(columnMap)));
Assertions.assertThat(columnMap).isNotEmpty();
Map<String, ColumnDo> columnMap2 = HBASE_TEMPLATE.getColumnMap(TABLE_NAME, "test-key-3", "f1", "id", "name");
System.out.println(StrUtil.format("查询多列: {}", JSONUtil.toJsonStr(columnMap2)));
Assertions.assertThat(columnMap2).isNotEmpty();
}
@Test
@DisplayName("查询列族")
public void getFamily() throws IOException {
FamilyDo familyDo = HBASE_TEMPLATE.getFamily(TABLE_NAME, "test-key-7", "f1");
System.out.println(StrUtil.format("查询列族: {}", JSONUtil.toJsonStr(familyDo)));
Assertions.assertThat(familyDo).isNotNull();
Assertions.assertThat(familyDo.getFamily()).isEqualTo("f1");
FamilyDo familyDo2 = HBASE_TEMPLATE.getFamily(TABLE_NAME, "test-key-7", "f2");
System.out.println(StrUtil.format("查询列族: {}", JSONUtil.toJsonStr(familyDo2)));
Assertions.assertThat(familyDo2).isNotNull();
Assertions.assertThat(familyDo2.getFamily()).isEqualTo("f2");
}
@Test
@DisplayName("查询多列族")
public void getFamilyMap() throws IOException {
Map<String, Collection<String>> familyColumnMap = new HashMap<>();
familyColumnMap.put("f1", Collections.singleton("id"));
familyColumnMap.put("f2", Collections.singleton("name"));
Map<String, FamilyDo> familyMap = HBASE_TEMPLATE.getFamilyMap(TABLE_NAME, "test-key-7", familyColumnMap);
System.out.println(StrUtil.format("查询多列族: {}", JSONUtil.toJsonStr(familyMap)));
Assertions.assertThat(familyMap).isNotEmpty();
Assertions.assertThat(familyMap.size()).isEqualTo(familyColumnMap.size());
}
@Test
@DisplayName("查询行")
public void getRow() throws IOException {
RowDo rowDo = HBASE_TEMPLATE.getRow(TABLE_NAME, "test-key-7");
System.out.println(StrUtil.format("查询行: {}", JSONUtil.toJsonStr(rowDo)));
Assertions.assertThat(rowDo).isNotNull();
Assertions.assertThat(rowDo.getRow()).isEqualTo("test-key-7");
}
@Test
@DisplayName("批量查询行记录")
public void getRowMap() throws IOException {
String[] rows = new String[] { "test-key-3", "test-key-4", "test-key-7" };
Map<String, RowDo> rowMap = HBASE_TEMPLATE.getRowMap(TABLE_NAME, rows);
System.out.println(StrUtil.format("批量查询行记录: {}", JSONUtil.toJsonStr(rowMap)));
Assertions.assertThat(rowMap).isNotEmpty();
Assertions.assertThat(rowMap.size()).isEqualTo(rows.length);
}
}

View File

@ -0,0 +1,105 @@
package io.github.dunwu.javadb.hbase;
import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.collection.CollectionUtil;
import org.apache.hadoop.hbase.client.Put;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Hbase Put
*
* @author <a href="mailto:forbreak@163.com">Zhang Peng</a>
* @date 2023-11-13
*/
public class HbaseTemplatePutTest {
public static final String TABLE_NAME = "test:test";
private static final HbaseTemplate HBASE_TEMPLATE;
static {
try {
HBASE_TEMPLATE = HbaseFactory.newHbaseTemplate();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Test
@DisplayName("put 测试 01")
public void put01() throws IOException {
long timestamp = System.currentTimeMillis();
HBASE_TEMPLATE.put(TABLE_NAME, "test-key-0", "f1", "name", "user0");
HBASE_TEMPLATE.put(TABLE_NAME, "test-key-1", timestamp, "f1", "name", "user1");
}
@Test
@DisplayName("put 测试 02")
public void put02() throws IOException {
long timestamp = System.currentTimeMillis();
User user2 = new User(2, "user2");
HBASE_TEMPLATE.put(TABLE_NAME, "test-key-2", "f1", user2);
User user3 = new User(3, "user3");
HBASE_TEMPLATE.put(TABLE_NAME, "test-key-3", timestamp, "f1", user3);
}
@Test
@DisplayName("put 测试 03")
public void put03() throws IOException {
long timestamp = System.currentTimeMillis();
User user4 = new User(4, "user4");
Map<String, Object> map = BeanUtil.beanToMap(user4);
HBASE_TEMPLATE.put(TABLE_NAME, "test-key-4", timestamp, "f1", map);
}
@Test
@DisplayName("put 测试 04")
public void put04() throws IOException {
long timestamp = System.currentTimeMillis();
User user5 = new User(5, "user5");
Product product5 = new Product("test-key-5", "product5", new BigDecimal(4000.0));
Map<String, Map<String, Object>> familyMap = new HashMap<>(2);
Map<String, Object> userMap = BeanUtil.beanToMap(user5);
familyMap.put("f1", userMap);
Map<String, Object> productMap = BeanUtil.beanToMap(product5);
familyMap.put("f2", productMap);
HBASE_TEMPLATE.put(TABLE_NAME, "test-key-5", timestamp, familyMap);
}
@Test
@DisplayName("put 测试 05")
public void put05() throws IOException {
Put put = HbaseTemplate.newPut("test-key-6", null, "f1", "name", "user6");
HBASE_TEMPLATE.put(TABLE_NAME, put);
}
@Test
@DisplayName("put 测试 06")
public void put06() throws IOException, InterruptedException {
long timestamp = System.currentTimeMillis();
User user7 = new User(5, "user7");
Product product7 = new Product("test-key-7", "product5", new BigDecimal(4000.0));
Put put1 = HbaseTemplate.newPut("test-key-7", timestamp, "f1", user7);
Put put2 = HbaseTemplate.newPut("test-key-7", timestamp, "f2", product7);
List<Put> list = Arrays.asList(put1, put2);
HBASE_TEMPLATE.batchPut(TABLE_NAME, list);
}
@Test
@DisplayName("batchPut 测试2")
public void batchPut2() throws IOException, InterruptedException {
Product product1 = new Product("test-key-8", "product8", new BigDecimal(4000.0));
Product product2 = new Product("test-key-9", "product9", new BigDecimal(5000.0));
List<Product> products = CollectionUtil.newArrayList(product1, product2);
HBASE_TEMPLATE.batchPut(TABLE_NAME, "f2", products);
}
}

View File

@ -0,0 +1,183 @@
package io.github.dunwu.javadb.hbase;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.util.ReflectUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONUtil;
import io.github.dunwu.javadb.hbase.entity.PageData;
import io.github.dunwu.javadb.hbase.entity.RowDo;
import io.github.dunwu.javadb.hbase.entity.ScrollData;
import io.github.dunwu.javadb.hbase.entity.scan.MultiFamilyScan;
import io.github.dunwu.javadb.hbase.entity.scan.SingleFamilyScan;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* Get
* <p>
* {@link HbaseTemplatePutTest}
*
* @author <a href="mailto:forbreak@163.com">Zhang Peng</a>
* @date 2023-11-13
*/
public class HbaseTemplateScanTest {
public static final String TABLE_NAME = "test:test";
private static final HbaseTemplate HBASE_TEMPLATE;
static {
try {
HBASE_TEMPLATE = HbaseFactory.newHbaseTemplate();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Test
@DisplayName("单列族分页查询")
public void page() throws IOException {
for (int page = 1; page <= 2; page++) {
SingleFamilyScan scan = new SingleFamilyScan();
scan.setFamily("f1")
.setTableName(TABLE_NAME)
.setPage(page)
.setSize(2)
.setReversed(true);
PageData<RowDo> rowDoMap = HBASE_TEMPLATE.page(scan);
System.out.println(StrUtil.format("查询实体: {}", JSONUtil.toJsonStr(rowDoMap)));
Assertions.assertThat(rowDoMap).isNotNull();
}
}
@Test
@DisplayName("多列族分页查询")
public void page2() throws IOException {
Map<String, Collection<String>> familyColumnMap = new HashMap<>();
familyColumnMap.put("f1", Collections.singleton("id"));
familyColumnMap.put("f2", Collections.singleton("name"));
for (int page = 1; page <= 2; page++) {
MultiFamilyScan scan = new MultiFamilyScan();
scan.setFamilyColumnMap(familyColumnMap)
.setTableName(TABLE_NAME)
.setPage(page)
.setSize(2)
.setReversed(true);
PageData<RowDo> rowDoMap = HBASE_TEMPLATE.page(scan);
System.out.println(StrUtil.format("查询实体: {}", JSONUtil.toJsonStr(rowDoMap)));
Assertions.assertThat(rowDoMap).isNotNull();
}
}
@Test
@DisplayName("查询实体列表")
public void getEntityPage() throws IOException {
SingleFamilyScan scan = new SingleFamilyScan();
scan.setFamily("f1")
.setTableName(TABLE_NAME)
.setPage(1)
.setSize(2)
.setReversed(true);
PageData<User> entityPage = HBASE_TEMPLATE.getEntityPage(scan, User.class);
System.out.println(StrUtil.format("查询实体列表: {}", JSONUtil.toJsonStr(entityPage)));
Assertions.assertThat(entityPage).isNotNull();
}
@Test
@DisplayName("单列族滚动查询")
public void scroll() throws IOException {
SingleFamilyScan scan = new SingleFamilyScan();
scan.setFamily("f1")
.setTableName(TABLE_NAME)
.setSize(1)
.setStartRow("test-key-1")
.setStopRow("test-key-9")
.setReversed(false);
ScrollData<RowDo> data = HBASE_TEMPLATE.scroll(scan);
System.out.println(StrUtil.format("查询实体: {}", JSONUtil.toJsonPrettyStr(data)));
Assertions.assertThat(data).isNotNull();
scan.setScrollRow(data.getScrollRow());
while (true) {
ScrollData<RowDo> next = HBASE_TEMPLATE.scroll(scan);
if (next == null || CollectionUtil.isEmpty(next.getContent())) {
break;
}
System.out.println(StrUtil.format("查询实体: {}", JSONUtil.toJsonPrettyStr(next)));
scan.setScrollRow(next.getScrollRow());
}
}
@Test
@DisplayName("多列族滚动查询")
public void scroll2() throws IOException {
List<String> userFields = Stream.of(ReflectUtil.getFields(User.class))
.map(Field::getName).collect(Collectors.toList());
List<String> productFields = Stream.of(ReflectUtil.getFields(Product.class))
.map(Field::getName).collect(Collectors.toList());
Map<String, Collection<String>> familyColumnMap = new HashMap<>();
familyColumnMap.put("f1", userFields);
familyColumnMap.put("f2", productFields);
MultiFamilyScan scan = new MultiFamilyScan();
scan.setFamilyColumnMap(familyColumnMap)
.setTableName(TABLE_NAME)
.setSize(1)
.setStartRow("test-key-1")
.setStopRow("test-key-9")
.setReversed(true);
ScrollData<RowDo> data = HBASE_TEMPLATE.scroll(scan);
System.out.println(StrUtil.format("查询实体: {}", JSONUtil.toJsonPrettyStr(data)));
Assertions.assertThat(data).isNotNull();
scan.setScrollRow(data.getScrollRow());
while (true) {
ScrollData<RowDo> next = HBASE_TEMPLATE.scroll(scan);
if (next == null || CollectionUtil.isEmpty(next.getContent())) {
break;
}
System.out.println(StrUtil.format("查询实体: {}", JSONUtil.toJsonPrettyStr(next)));
scan.setScrollRow(next.getScrollRow());
}
}
@Test
@DisplayName("滚动查询实体")
public void getEntityScroll() throws IOException {
SingleFamilyScan scan = new SingleFamilyScan();
scan.setFamily("f1")
.setTableName(TABLE_NAME)
.setSize(1)
.setStartRow("test-key-1")
.setStopRow("test-key-9")
.setReversed(false);
ScrollData<User> data = HBASE_TEMPLATE.getEntityScroll(scan, User.class);
System.out.println(StrUtil.format("查询实体: {}", JSONUtil.toJsonPrettyStr(data)));
Assertions.assertThat(data).isNotNull();
scan.setScrollRow(data.getScrollRow());
while (true) {
ScrollData<User> next = HBASE_TEMPLATE.getEntityScroll(scan, User.class);
if (next == null || CollectionUtil.isEmpty(next.getContent())) {
break;
}
System.out.println(StrUtil.format("查询实体: {}", JSONUtil.toJsonPrettyStr(next)));
scan.setScrollRow(next.getScrollRow());
}
}
}

View File

@ -0,0 +1,37 @@
package io.github.dunwu.javadb.hbase;
import io.github.dunwu.javadb.hbase.entity.BaseHbaseEntity;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.math.BigDecimal;
/**
*
*
* @author <a href="mailto:forbreak@163.com">Zhang Peng</a>
* @date 2023-11-15
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Product extends BaseHbaseEntity {
private String id;
private String name;
private BigDecimal price;
@Override
public String getId() {
return id;
}
@Override
public String getIdKey() {
return "id";
}
private static final long serialVersionUID = -2596114168690429555L;
}

View File

@ -0,0 +1,28 @@
package io.github.dunwu.javadb.hbase;
/**
* @author <a href="mailto:forbreak@163.com">Zhang Peng</a>
* @date 2023-11-15
*/
public class ProductMapper extends BaseHbaseMapper<Product> {
public ProductMapper(HbaseTemplate hbaseTemplate, HbaseAdmin hbaseAdmin) {
super(hbaseTemplate, hbaseAdmin);
}
@Override
public String getTableName() {
return "test";
}
@Override
public String getFamily() {
return "f1";
}
@Override
public Class<Product> getEntityClass() {
return Product.class;
}
}

View File

@ -0,0 +1,16 @@
package io.github.dunwu.javadb.hbase;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class User {
private int id;
private String name;
}