feat: 更新 hbase 示例

master
dunwu 2023-11-23 06:48:46 +08:00
parent 14b594b9c2
commit 81a350aa6d
23 changed files with 1186 additions and 715 deletions

View File

@ -1,104 +0,0 @@
package io.github.dunwu.javadb.hbase;
import io.github.dunwu.javadb.hbase.entity.BaseHbaseEntity;
import org.apache.hadoop.hbase.client.Connection;
import java.util.Collection;
import java.util.List;
/**
* Hbase Mapper
*
* @author <a href="mailto:forbreak@163.com">Zhang Peng</a>
* @date 2023-11-15
*/
public interface HbaseMapper<T extends BaseHbaseEntity> {
/**
* Hbase
*/
Connection getClient();
/**
*
*/
String getNamespace();
/**
*
*/
String getTableName();
/**
*
*/
boolean existsTable();
/**
*
*/
String getFamily();
/**
*
*/
Class<T> getEntityClass();
/**
* ID
*
* @param id Hbase rowkey
* @return /
*/
T pojoByRowKey(String id);
/**
* ID
*
* @param ids Hbase rowkey
* @return /
*/
List<T> pojoListByRowKeys(Collection<String> ids);
/**
* ID
*
* @param scrollId
* @param size
* @return /
*/
List<T> scroll(String scrollId, int size);
/**
*
*
* @param entity Hbase f
* @return /
*/
T save(T entity);
/**
*
*
* @param list Hbase f
* @return /
*/
boolean batchSave(Collection<T> list);
/**
* ID
*
* @param id Hbase rowkey
* @return /
*/
boolean delete(String id);
/**
* ID
*
* @param ids Hbase rowkey
* @return /
*/
boolean batchDelete(Collection<String> ids);
}

View File

@ -1,8 +1,8 @@
package io.github.dunwu.javadb.hbase; package io.github.dunwu.javadb.hbase;
import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.bean.copier.CopyOptions;
import cn.hutool.core.collection.CollectionUtil; import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.date.DatePattern;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.io.IoUtil; import cn.hutool.core.io.IoUtil;
import cn.hutool.core.map.MapUtil; import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.ArrayUtil; import cn.hutool.core.util.ArrayUtil;
@ -10,13 +10,14 @@ import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.ReflectUtil; import cn.hutool.core.util.ReflectUtil;
import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.StrUtil;
import io.github.dunwu.javadb.hbase.entity.BaseHbaseEntity; import io.github.dunwu.javadb.hbase.entity.BaseHbaseEntity;
import io.github.dunwu.javadb.hbase.entity.ColumnDo; import io.github.dunwu.javadb.hbase.entity.common.ColumnDo;
import io.github.dunwu.javadb.hbase.entity.FamilyDo; import io.github.dunwu.javadb.hbase.entity.common.FamilyDo;
import io.github.dunwu.javadb.hbase.entity.PageData; import io.github.dunwu.javadb.hbase.entity.common.PageData;
import io.github.dunwu.javadb.hbase.entity.RowDo; import io.github.dunwu.javadb.hbase.entity.common.RowDo;
import io.github.dunwu.javadb.hbase.entity.ScrollData; import io.github.dunwu.javadb.hbase.entity.common.ScrollData;
import io.github.dunwu.javadb.hbase.entity.scan.MultiFamilyScan; import io.github.dunwu.javadb.hbase.entity.scan.MultiFamilyScan;
import io.github.dunwu.javadb.hbase.entity.scan.SingleFamilyScan; import io.github.dunwu.javadb.hbase.entity.scan.SingleFamilyScan;
import io.github.dunwu.javadb.hbase.util.JsonUtil;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
@ -43,8 +44,9 @@ import java.lang.reflect.Field;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.HashMap; import java.util.Date;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
@ -221,7 +223,7 @@ public class HbaseTemplate implements Closeable {
if (StrUtil.isBlank(row) || StrUtil.isBlank(family) || StrUtil.isBlank(column) || StrUtil.isBlank(value)) { if (StrUtil.isBlank(row) || StrUtil.isBlank(family) || StrUtil.isBlank(column) || StrUtil.isBlank(value)) {
return null; return null;
} }
Map<String, Object> columnMap = new HashMap<>(1); Map<String, Object> columnMap = new LinkedHashMap<>(1);
columnMap.put(column, value); columnMap.put(column, value);
return newPut(row, timestamp, family, columnMap); return newPut(row, timestamp, family, columnMap);
} }
@ -230,22 +232,16 @@ public class HbaseTemplate implements Closeable {
if (StrUtil.isBlank(row) || StrUtil.isBlank(family) || MapUtil.isEmpty(columnMap)) { if (StrUtil.isBlank(row) || StrUtil.isBlank(family) || MapUtil.isEmpty(columnMap)) {
return null; return null;
} }
Map<String, Map<String, Object>> familyMap = new HashMap<>(1); Map<String, Map<String, Object>> familyMap = new LinkedHashMap<>(1);
familyMap.put(family, columnMap); familyMap.put(family, columnMap);
return newPut(row, timestamp, familyMap); return newPut(row, timestamp, familyMap);
} }
@SuppressWarnings("unchecked")
public static Put newPut(String row, Long timestamp, String family, Object obj) { public static Put newPut(String row, Long timestamp, String family, Object obj) {
if (obj == null) { if (obj == null) {
return null; return null;
} }
Map<String, Object> columnMap; Map<String, Object> columnMap = JsonUtil.toMap(obj);
if (obj instanceof Map) {
columnMap = (Map<String, Object>) obj;
} else {
columnMap = BeanUtil.beanToMap(obj);
}
return newPut(row, timestamp, family, columnMap); return newPut(row, timestamp, family, columnMap);
} }
@ -267,20 +263,26 @@ public class HbaseTemplate implements Closeable {
for (Map.Entry<String, Object> entry : columnMap.entrySet()) { for (Map.Entry<String, Object> entry : columnMap.entrySet()) {
String column = entry.getKey(); String column = entry.getKey();
Object value = entry.getValue(); Object value = entry.getValue();
if (ObjectUtil.isEmpty(value)) { if (ObjectUtil.isEmpty(value)) {
continue; continue;
} }
put.addColumn(Bytes.toBytes(family), Bytes.toBytes(column), timestamp, if (value instanceof String) {
Bytes.toBytes(String.valueOf(value))); put.addColumn(Bytes.toBytes(family), Bytes.toBytes(column), timestamp,
Bytes.toBytes(value.toString()));
} else if (value instanceof Date) {
put.addColumn(Bytes.toBytes(family), Bytes.toBytes(column), timestamp,
Bytes.toBytes(DateUtil.format((Date) value, DatePattern.NORM_DATETIME_PATTERN)));
} else {
put.addColumn(Bytes.toBytes(family), Bytes.toBytes(column),
timestamp, Bytes.toBytes(JsonUtil.toString(value)));
}
} }
} }
} }
return put; return put;
} }
private static <T extends BaseHbaseEntity> List<Put> newPutList(String family, Collection<T> list) private static <T extends BaseHbaseEntity> List<Put> newPutList(String family, Collection<T> list) {
throws IOException {
long timestamp = System.currentTimeMillis(); long timestamp = System.currentTimeMillis();
List<Put> puts = new ArrayList<>(); List<Put> puts = new ArrayList<>();
for (T entity : list) { for (T entity : list) {
@ -387,7 +389,10 @@ public class HbaseTemplate implements Closeable {
Map<String, Field> fieldMap = ReflectUtil.getFieldMap(clazz); Map<String, Field> fieldMap = ReflectUtil.getFieldMap(clazz);
String[] columns = fieldMap.keySet().toArray(new String[0]); String[] columns = fieldMap.keySet().toArray(new String[0]);
Map<String, ColumnDo> columnMap = getColumnMap(tableName, row, family, columns); Map<String, ColumnDo> columnMap = getColumnMap(tableName, row, family, columns);
return toEntity(columnMap, clazz); if (MapUtil.isEmpty(columnMap)) {
return null;
}
return toEntity(ColumnDo.toKvMap(columnMap), clazz);
} }
/** /**
@ -402,6 +407,33 @@ public class HbaseTemplate implements Closeable {
*/ */
public <T> List<T> getEntityList(String tableName, String[] rows, String family, Class<T> clazz) public <T> List<T> getEntityList(String tableName, String[] rows, String family, Class<T> clazz)
throws IOException { throws IOException {
Map<String, T> map = getEntityMap(tableName, rows, family, clazz);
if (MapUtil.isEmpty(map)) {
return new ArrayList<>(0);
}
return new ArrayList<>(map.values());
}
/**
* {@link T}
*
* @param tableName
* @param rows
* @param family
* @param clazz
* @param <T>
* @return /
*/
public <T> List<T> getEntityList(String tableName, Collection<String> rows, String family, Class<T> clazz)
throws IOException {
if (CollectionUtil.isEmpty(rows)) {
return new ArrayList<>(0);
}
return getEntityList(tableName, rows.toArray(new String[0]), family, clazz);
}
public <T> Map<String, T> getEntityMap(String tableName, String[] rows, String family, Class<T> clazz)
throws IOException {
if (StrUtil.isBlank(tableName) || ArrayUtil.isEmpty(rows) || StrUtil.isBlank(family) || clazz == null) { if (StrUtil.isBlank(tableName) || ArrayUtil.isEmpty(rows) || StrUtil.isBlank(family) || clazz == null) {
return null; return null;
@ -413,26 +445,27 @@ public class HbaseTemplate implements Closeable {
Result[] results = batchGet(tableName, gets); Result[] results = batchGet(tableName, gets);
if (ArrayUtil.isEmpty(results)) { if (ArrayUtil.isEmpty(results)) {
return new ArrayList<>(); return new LinkedHashMap<>(0);
} }
List<T> list = new ArrayList<>(); Map<String, T> map = new LinkedHashMap<>(results.length);
for (Result result : results) { for (Result result : results) {
Map<String, ColumnDo> columnMap = Map<String, ColumnDo> columnMap =
getColumnsFromResult(result, tableName, family, CollectionUtil.newArrayList(columns)); getColumnsFromResult(result, tableName, family, CollectionUtil.newArrayList(columns));
if (MapUtil.isNotEmpty(columnMap)) { if (MapUtil.isNotEmpty(columnMap)) {
T entity = toEntity(columnMap, clazz); T entity = toEntity(ColumnDo.toKvMap(columnMap), clazz);
list.add(entity); map.put(Bytes.toString(result.getRow()), entity);
} }
} }
return list; return map;
} }
private <T> T toEntity(Map<String, ColumnDo> columnMap, Class<T> clazz) { public <T> Map<String, T> getEntityMap(String tableName, Collection<String> rows, String family, Class<T> clazz)
if (MapUtil.isEmpty(columnMap)) { throws IOException {
return null; if (CollectionUtil.isEmpty(rows)) {
return new LinkedHashMap<>(0);
} }
return BeanUtil.mapToBean(ColumnDo.toKvMap(columnMap), clazz, true, CopyOptions.create().ignoreError()); return getEntityMap(tableName, rows.toArray(new String[0]), family, clazz);
} }
/** /**
@ -492,6 +525,9 @@ public class HbaseTemplate implements Closeable {
*/ */
public FamilyDo getFamily(String tableName, String row, String family) throws IOException { public FamilyDo getFamily(String tableName, String row, String family) throws IOException {
Map<String, ColumnDo> columnMap = getColumnMap(tableName, row, family); Map<String, ColumnDo> columnMap = getColumnMap(tableName, row, family);
if (MapUtil.isEmpty(columnMap)) {
return null;
}
return new FamilyDo(tableName, row, family, columnMap); return new FamilyDo(tableName, row, family, columnMap);
} }
@ -507,13 +543,13 @@ public class HbaseTemplate implements Closeable {
Map<String, Collection<String>> familyColumnMap) throws IOException { Map<String, Collection<String>> familyColumnMap) throws IOException {
if (StrUtil.isBlank(tableName) || StrUtil.isBlank(row)) { if (StrUtil.isBlank(tableName) || StrUtil.isBlank(row)) {
return new HashMap<>(0); return new LinkedHashMap<>(0);
} }
if (MapUtil.isEmpty(familyColumnMap)) { if (MapUtil.isEmpty(familyColumnMap)) {
RowDo rowDo = getRow(tableName, row); RowDo rowDo = getRow(tableName, row);
if (rowDo == null) { if (rowDo == null) {
return new HashMap<>(0); return new LinkedHashMap<>(0);
} }
return rowDo.getFamilyMap(); return rowDo.getFamilyMap();
} }
@ -567,9 +603,9 @@ public class HbaseTemplate implements Closeable {
} }
Result[] results = batchGet(tableName, rows); Result[] results = batchGet(tableName, rows);
if (ArrayUtil.isEmpty(results)) { if (ArrayUtil.isEmpty(results)) {
return new HashMap<>(0); return new LinkedHashMap<>(0);
} }
Map<String, RowDo> map = new HashMap<>(results.length); Map<String, RowDo> map = new LinkedHashMap<>(results.length);
for (Result result : results) { for (Result result : results) {
String row = Bytes.toString(result.getRow()); String row = Bytes.toString(result.getRow());
RowDo rowDo = getRowFromResult(result, tableName); RowDo rowDo = getRowFromResult(result, tableName);
@ -674,7 +710,7 @@ public class HbaseTemplate implements Closeable {
List<T> list = data.getContent().stream().map(rowDo -> { List<T> list = data.getContent().stream().map(rowDo -> {
Map<String, Map<String, String>> familyKvMap = rowDo.getFamilyKvMap(); Map<String, Map<String, String>> familyKvMap = rowDo.getFamilyKvMap();
Map<String, String> columnKvMap = familyKvMap.get(scan.getFamily()); Map<String, String> columnKvMap = familyKvMap.get(scan.getFamily());
return BeanUtil.mapToBean(columnKvMap, clazz, true, CopyOptions.create().ignoreError()); return toEntity(columnKvMap, clazz);
}).collect(Collectors.toList()); }).collect(Collectors.toList());
return new PageData<>(scan.getPage(), scan.getSize(), data.getTotal(), list); return new PageData<>(scan.getPage(), scan.getSize(), data.getTotal(), list);
} }
@ -693,7 +729,7 @@ public class HbaseTemplate implements Closeable {
List<T> list = data.getContent().stream().map(rowDo -> { List<T> list = data.getContent().stream().map(rowDo -> {
Map<String, Map<String, String>> familyKvMap = rowDo.getFamilyKvMap(); Map<String, Map<String, String>> familyKvMap = rowDo.getFamilyKvMap();
Map<String, String> columnKvMap = familyKvMap.get(scan.getFamily()); Map<String, String> columnKvMap = familyKvMap.get(scan.getFamily());
return BeanUtil.mapToBean(columnKvMap, clazz, true, CopyOptions.create().ignoreError()); return toEntity(columnKvMap, clazz);
}).collect(Collectors.toList()); }).collect(Collectors.toList());
return new ScrollData<>(data.getStartRow(), data.getStopRow(), data.getScrollRow(), 0, list); return new ScrollData<>(data.getStartRow(), data.getStopRow(), data.getScrollRow(), 0, list);
} }
@ -712,7 +748,7 @@ public class HbaseTemplate implements Closeable {
private PageData<RowDo> getPageData(String tableName, Integer page, Integer size, Scan scan, private PageData<RowDo> getPageData(String tableName, Integer page, Integer size, Scan scan,
Map<String, Collection<String>> familyColumnMap) throws IOException { Map<String, Collection<String>> familyColumnMap) throws IOException {
Table table = getTable(tableName); Table table = getTable(tableName);
Map<String, RowDo> rowMap = new HashMap<>(size); Map<String, RowDo> rowMap = new LinkedHashMap<>(size);
try { try {
int pageIndex = 1; int pageIndex = 1;
byte[] lastRow = null; byte[] lastRow = null;
@ -728,7 +764,9 @@ public class HbaseTemplate implements Closeable {
Result result = it.next(); Result result = it.next();
if (page == pageIndex) { if (page == pageIndex) {
RowDo rowDo = getRowFromResult(result, tableName, familyColumnMap); RowDo rowDo = getRowFromResult(result, tableName, familyColumnMap);
rowMap.put(rowDo.getRow(), rowDo); if (rowDo != null) {
rowMap.put(rowDo.getRow(), rowDo);
}
} }
lastRow = result.getRow(); lastRow = result.getRow();
count++; count++;
@ -751,12 +789,14 @@ public class HbaseTemplate implements Closeable {
Map<String, Collection<String>> familyColumnMap) throws IOException { Map<String, Collection<String>> familyColumnMap) throws IOException {
Table table = getTable(tableName); Table table = getTable(tableName);
ResultScanner scanner = null; ResultScanner scanner = null;
Map<String, RowDo> rowMap = new HashMap<>(size); Map<String, RowDo> rowMap = new LinkedHashMap<>(size);
try { try {
scanner = table.getScanner(scan); scanner = table.getScanner(scan);
for (Result result : scanner) { for (Result result : scanner) {
RowDo rowDo = getRowFromResult(result, tableName, familyColumnMap); RowDo rowDo = getRowFromResult(result, tableName, familyColumnMap);
rowMap.put(rowDo.getRow(), rowDo); if (rowDo != null) {
rowMap.put(rowDo.getRow(), rowDo);
}
} }
String scrollRow = null; String scrollRow = null;
@ -829,24 +869,25 @@ public class HbaseTemplate implements Closeable {
} }
String row = Bytes.toString(result.getRow()); String row = Bytes.toString(result.getRow());
Map<String, Map<String, ColumnDo>> familyColumnMap = new HashMap<>(result.size()); Map<String, Map<String, ColumnDo>> familyColumnMap = new LinkedHashMap<>(result.size());
for (Cell cell : result.listCells()) { for (Cell cell : result.listCells()) {
String family = Bytes.toString(CellUtil.cloneFamily(cell)); String family = Bytes.toString(CellUtil.cloneFamily(cell));
if (!familyColumnMap.containsKey(family)) { if (!familyColumnMap.containsKey(family)) {
familyColumnMap.put(family, new HashMap<>(0)); familyColumnMap.put(family, new LinkedHashMap<>(0));
} }
String column = Bytes.toString(CellUtil.cloneQualifier(cell)); String column = Bytes.toString(CellUtil.cloneQualifier(cell));
String value = Bytes.toString(CellUtil.cloneValue(cell)); ColumnDo columnDo = getColumnFromResult(result, tableName, family, column);
long timestamp = cell.getTimestamp();
ColumnDo columnDo = new ColumnDo(tableName, row, family, timestamp, column, value);
familyColumnMap.get(family).put(column, columnDo); familyColumnMap.get(family).put(column, columnDo);
} }
Map<String, FamilyDo> familyMap = new HashMap<>(familyColumnMap.size()); Map<String, FamilyDo> familyMap = new LinkedHashMap<>(familyColumnMap.size());
familyColumnMap.forEach((family, columnMap) -> { familyColumnMap.forEach((family, columnMap) -> {
FamilyDo familyDo = new FamilyDo(tableName, row, family, columnMap); FamilyDo familyDo = new FamilyDo(tableName, row, family, columnMap);
familyMap.put(family, familyDo); familyMap.put(family, familyDo);
}); });
if (MapUtil.isEmpty(familyMap)) {
return null;
}
return new RowDo(tableName, row, familyMap); return new RowDo(tableName, row, familyMap);
} }
@ -857,6 +898,9 @@ public class HbaseTemplate implements Closeable {
} }
String row = Bytes.toString(result.getRow()); String row = Bytes.toString(result.getRow());
Map<String, FamilyDo> familyMap = getFamiliesFromResult(result, tableName, familyColumnMap); Map<String, FamilyDo> familyMap = getFamiliesFromResult(result, tableName, familyColumnMap);
if (MapUtil.isEmpty(familyMap)) {
return null;
}
return new RowDo(tableName, row, familyMap); return new RowDo(tableName, row, familyMap);
} }
@ -877,23 +921,24 @@ public class HbaseTemplate implements Closeable {
Map<String, Collection<String>> familyColumnMap) { Map<String, Collection<String>> familyColumnMap) {
if (result == null || StrUtil.isBlank(tableName) || MapUtil.isEmpty(familyColumnMap)) { if (result == null || StrUtil.isBlank(tableName) || MapUtil.isEmpty(familyColumnMap)) {
return new HashMap<>(0); return new LinkedHashMap<>(0);
} }
String row = Bytes.toString(result.getRow()); String row = Bytes.toString(result.getRow());
Map<String, FamilyDo> familyMap = new HashMap<>(familyColumnMap.size()); Map<String, FamilyDo> familyMap = new LinkedHashMap<>(familyColumnMap.size());
familyColumnMap.forEach((family, columns) -> { familyColumnMap.forEach((family, columns) -> {
Map<String, ColumnDo> columnMap; FamilyDo familyDo;
if (CollectionUtil.isNotEmpty(columns)) { if (CollectionUtil.isNotEmpty(columns)) {
columnMap = new HashMap<>(columns.size()); Map<String, ColumnDo> columnMap = new LinkedHashMap<>(columns.size());
for (String column : columns) { for (String column : columns) {
ColumnDo columnDo = getColumnFromResult(result, tableName, family, column); ColumnDo columnDo = getColumnFromResult(result, tableName, family, column);
columnMap.put(column, columnDo); columnMap.put(column, columnDo);
} }
familyDo = new FamilyDo(tableName, row, family, columnMap);
} else { } else {
columnMap = new HashMap<>(0); familyDo = getFamilyFromResult(result, tableName, family);
} }
familyMap.put(family, new FamilyDo(tableName, row, family, columnMap)); familyMap.put(family, familyDo);
}); });
return familyMap; return familyMap;
} }
@ -918,9 +963,12 @@ public class HbaseTemplate implements Closeable {
Collection<String> columns) { Collection<String> columns) {
if (CollectionUtil.isEmpty(columns)) { if (CollectionUtil.isEmpty(columns)) {
RowDo rowDo = getRowFromResult(result, tableName); RowDo rowDo = getRowFromResult(result, tableName);
if (rowDo == null) {
return new LinkedHashMap<>(0);
}
return rowDo.getFamilyMap().get(family).getColumnMap(); return rowDo.getFamilyMap().get(family).getColumnMap();
} }
Map<String, ColumnDo> columnMap = new HashMap<>(columns.size()); Map<String, ColumnDo> columnMap = new LinkedHashMap<>(columns.size());
for (String column : columns) { for (String column : columns) {
ColumnDo columnDo = getColumnFromResult(result, tableName, family, column); ColumnDo columnDo = getColumnFromResult(result, tableName, family, column);
if (columnDo != null) { if (columnDo != null) {
@ -930,4 +978,34 @@ public class HbaseTemplate implements Closeable {
return columnMap; return columnMap;
} }
private static <T> T toEntity(Map<String, String> kvMap, Class<T> clazz) {
if (MapUtil.isEmpty(kvMap)) {
return null;
}
MapUtil.removeNullValue(kvMap);
T obj;
try {
Map<String, Class<?>> typeMap = new LinkedHashMap<>();
Field[] fields = ReflectUtil.getFields(clazz);
for (Field f : fields) {
typeMap.put(f.getName(), f.getType());
}
obj = clazz.newInstance();
for (Map.Entry<String, String> entry : kvMap.entrySet()) {
String key = entry.getKey();
String value = entry.getValue();
Class<?> filedType = typeMap.get(key);
if (filedType != null) {
Object fieldObj = JsonUtil.toBean(value, filedType);
ReflectUtil.setFieldValue(obj, key, fieldObj);
}
}
return obj;
} catch (InstantiationException | IllegalAccessException e) {
throw new RuntimeException(e);
}
}
} }

View File

@ -16,13 +16,13 @@ import java.lang.annotation.Target;
*/ */
@Documented @Documented
@Retention(RetentionPolicy.RUNTIME) @Retention(RetentionPolicy.RUNTIME)
@Target({ ElementType.FIELD, ElementType.ANNOTATION_TYPE }) @Target({ ElementType.TYPE, ElementType.ANNOTATION_TYPE })
public @interface RowKeyRule { public @interface RowKeyRule {
/** /**
* *
*/ */
String value() default ""; String pk();
/** /**
* {@link RowType} * {@link RowType}

View File

@ -0,0 +1,111 @@
package io.github.dunwu.javadb.hbase.annotation;
import cn.hutool.core.util.HashUtil;
import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.ReflectUtil;
import cn.hutool.core.util.StrUtil;
import io.github.dunwu.javadb.hbase.entity.BaseHbaseEntity;
import java.lang.reflect.Field;
/**
* {@link RowKeyRule}
*
* @author <a href="mailto:forbreak@163.com">Zhang Peng</a>
* @date 2023-11-20
*/
public class RowKeyRuleParser {
/**
*
*/
public static <T extends BaseHbaseEntity> String getRowKey(T entity) throws IllegalArgumentException {
String row = null;
Class<? extends BaseHbaseEntity> clazz = entity.getClass();
RowKeyRule rule = clazz.getAnnotation(RowKeyRule.class);
if (rule == null) {
throw new IllegalArgumentException(StrUtil.format("实体定义错误!未定义 @RowKeyRule", entity.getClass(),
BaseHbaseEntity.class.getCanonicalName()));
}
Field field = ReflectUtil.getField(clazz, rule.pk());
if (field == null) {
throw new IllegalArgumentException(StrUtil.format("实体定义错误!@RowKeyRule 中未指定 value", entity.getClass(),
BaseHbaseEntity.class.getCanonicalName()));
}
switch (rule.type()) {
case ORIGIN_ID:
row = getRowKeyForOriginId(entity, field, rule);
break;
case TIMESTAMP:
row = getRowKeyForTimestamp();
break;
case UUID:
row = IdUtil.fastSimpleUUID();
break;
case BUCKET:
row = getRowKeyForBucket(entity, field, rule);
default:
break;
}
if (StrUtil.isBlank(row)) {
throw new IllegalArgumentException(StrUtil.format("实体定义错误!未定义 @RowKeyRule", entity.getClass(),
BaseHbaseEntity.class.getCanonicalName()));
}
return row;
}
private static <T extends BaseHbaseEntity> String getRowKeyForOriginId(T entity, Field field, RowKeyRule rule)
throws IllegalArgumentException {
String originId;
Object value = ReflectUtil.getFieldValue(entity, field);
if (value instanceof String) {
originId = (String) value;
} else {
originId = String.valueOf(value);
}
if (rule.length() == 0) {
throw new IllegalArgumentException(
StrUtil.format("实体定义错误!{} 选择 @RowKey 的 type 为 {},必须指定 length 且不能为 0",
entity.getClass(), rule.type()));
}
return StrUtil.padPre(originId, rule.length(), "0");
}
private static String getRowKeyForTimestamp() {
String timestamp = String.valueOf(System.currentTimeMillis() / 1000);
return StrUtil.padPre(timestamp, 10, "0");
}
private static <T extends BaseHbaseEntity> String getRowKeyForBucket(T entity, Field field, RowKeyRule rowKeyRule)
throws IllegalArgumentException {
if (rowKeyRule.bucket() == 0) {
throw new IllegalArgumentException(
StrUtil.format("实体定义错误!{} 选择 @RowKey 的 type 为 {},必须指定 bucket 且不能为 0",
entity.getClass(), rowKeyRule.type()));
}
String originId = getRowKeyForOriginId(entity, field, rowKeyRule);
int bucketLength = getBucketIdLength(rowKeyRule.bucket());
String bucketId = String.valueOf(HashUtil.fnvHash(originId) % rowKeyRule.bucket());
return StrUtil.padPre(bucketId, bucketLength, "0") + originId;
}
private static int getBucketIdLength(int bucket) {
bucket = bucket - 1;
if (bucket <= 0) {
return 1;
}
int length = 0;
while (bucket > 0) {
length++;
bucket = bucket / 10;
}
return length;
}
}

View File

@ -1,16 +1,7 @@
package io.github.dunwu.javadb.hbase.entity; package io.github.dunwu.javadb.hbase.entity;
import cn.hutool.core.util.HashUtil;
import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.ReflectUtil;
import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.annotation.JSONField;
import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonIgnore;
import io.github.dunwu.javadb.hbase.annotation.RowKeyRule; import io.github.dunwu.javadb.hbase.annotation.RowKeyRuleParser;
import java.io.IOException;
import java.io.Serializable;
import java.lang.reflect.Field;
/** /**
* HBase * HBase
@ -18,104 +9,14 @@ import java.lang.reflect.Field;
* @author <a href="mailto:forbreak@163.com">Zhang Peng</a> * @author <a href="mailto:forbreak@163.com">Zhang Peng</a>
* @date 2023-11-15 * @date 2023-11-15
*/ */
public abstract class BaseHbaseEntity implements Serializable { public interface BaseHbaseEntity {
@JsonIgnore
@JSONField(serialize = false, deserialize = false)
protected String rowKey;
/** /**
* *
*/ */
public String getRowKey() throws IOException { @JsonIgnore
if (StrUtil.isNotBlank(rowKey)) { default String getRowKey() {
return rowKey; return RowKeyRuleParser.getRowKey(this);
}
String row = null;
Field[] fields = ReflectUtil.getFields(this.getClass());
for (Field field : fields) {
RowKeyRule rule = field.getAnnotation(RowKeyRule.class);
if (rule != null) {
switch (rule.type()) {
case ORIGIN_ID:
row = getRowKeyForOriginId(this, field, rule);
break;
case TIMESTAMP:
row = getRowKeyForTimestamp();
break;
case UUID:
row = IdUtil.fastSimpleUUID();
break;
case BUCKET:
row = getRowKeyForBucket(this, field, rule);
default:
break;
}
}
}
if (StrUtil.isBlank(row)) {
throw new IOException(StrUtil.format("实体定义错误!未定义 @RowKeyRule",
this.getClass(), BaseHbaseEntity.class.getCanonicalName()));
}
this.rowKey = row;
return row;
} }
private static <T extends BaseHbaseEntity> String getRowKeyForOriginId(T entity, Field field, RowKeyRule rowKeyRule)
throws IOException {
String originId;
Object value = ReflectUtil.getFieldValue(entity, field);
if (value instanceof String) {
originId = (String) value;
} else {
originId = String.valueOf(value);
}
if (rowKeyRule.length() == 0) {
throw new IOException(
StrUtil.format("实体定义错误!{} 选择 @RowKey 的 type 为 {},必须指定 length 且不能为 0",
entity.getClass(), rowKeyRule.type()));
}
return StrUtil.padPre(originId, rowKeyRule.length(), "0");
}
private static String getRowKeyForTimestamp() {
String timestamp = String.valueOf(System.currentTimeMillis() / 1000);
return StrUtil.padPre(timestamp, 10, "0");
}
private static <T extends BaseHbaseEntity> String getRowKeyForBucket(T entity, Field field, RowKeyRule rowKeyRule)
throws IOException {
if (rowKeyRule.bucket() == 0) {
throw new IOException(
StrUtil.format("实体定义错误!{} 选择 @RowKey 的 type 为 {},必须指定 bucket 且不能为 0",
entity.getClass(), rowKeyRule.type()));
}
String originId = getRowKeyForOriginId(entity, field, rowKeyRule);
int bucketLength = getBucketIdLength(rowKeyRule.bucket());
String bucketId = String.valueOf(HashUtil.fnvHash(originId) % rowKeyRule.bucket());
String timestamp = String.valueOf(System.currentTimeMillis() / 1000);
return StrUtil.padPre(bucketId, bucketLength, "0")
+ StrUtil.padPre(timestamp, 10, "0")
+ originId;
}
private static int getBucketIdLength(int bucket) {
bucket = bucket - 1;
if (bucket <= 0) {
return 1;
}
int length = 0;
while (bucket > 0) {
length++;
bucket = bucket / 10;
}
return length;
}
private static final long serialVersionUID = 5075127328254616085L;
} }

View File

@ -1,4 +1,4 @@
package io.github.dunwu.javadb.hbase.entity; package io.github.dunwu.javadb.hbase.entity.common;
import cn.hutool.core.map.MapUtil; import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.StrUtil;

View File

@ -1,4 +1,4 @@
package io.github.dunwu.javadb.hbase.entity; package io.github.dunwu.javadb.hbase.entity.common;
import cn.hutool.core.map.MapUtil; import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.StrUtil;

View File

@ -1,4 +1,4 @@
package io.github.dunwu.javadb.hbase.entity; package io.github.dunwu.javadb.hbase.entity.common;
import lombok.Data; import lombok.Data;

View File

@ -1,4 +1,4 @@
package io.github.dunwu.javadb.hbase.entity; package io.github.dunwu.javadb.hbase.entity.common;
import cn.hutool.core.map.MapUtil; import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.StrUtil;

View File

@ -1,4 +1,4 @@
package io.github.dunwu.javadb.hbase.entity; package io.github.dunwu.javadb.hbase.entity.common;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.Data; import lombok.Data;

View File

@ -42,12 +42,18 @@ public class SingleFamilyScan extends BaseScan {
} }
public Map<String, Collection<String>> getFamilyColumnMap() { public Map<String, Collection<String>> getFamilyColumnMap() {
if (StrUtil.isNotBlank(family) && CollectionUtil.isNotEmpty(columns)) {
Map<String, Collection<String>> familyColumnMap = new HashMap<>(1); if (StrUtil.isBlank(family)) {
familyColumnMap.put(family, columns); return new HashMap<>(0);
return familyColumnMap;
} }
return new HashMap<>(0);
Map<String, Collection<String>> familyColumnMap = new HashMap<>(1);
if (CollectionUtil.isNotEmpty(columns)) {
familyColumnMap.put(family, columns);
} else {
familyColumnMap.put(family, new ArrayList<>());
}
return familyColumnMap;
} }
} }

View File

@ -1,19 +1,23 @@
package io.github.dunwu.javadb.hbase; package io.github.dunwu.javadb.hbase.mapper;
import cn.hutool.core.collection.CollectionUtil; import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.StrUtil;
import io.github.dunwu.javadb.hbase.HbaseTemplate;
import io.github.dunwu.javadb.hbase.entity.BaseHbaseEntity; import io.github.dunwu.javadb.hbase.entity.BaseHbaseEntity;
import io.github.dunwu.javadb.hbase.entity.ScrollData; import io.github.dunwu.javadb.hbase.entity.common.ScrollData;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException; import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
/** /**
* HBase Mapper * HBase Mapper
@ -27,8 +31,6 @@ public abstract class BaseHbaseMapper<T extends BaseHbaseEntity> implements Hbas
protected final HbaseTemplate hbaseTemplate; protected final HbaseTemplate hbaseTemplate;
protected final HbaseAdmin hbaseAdmin;
@Override @Override
public Connection getClient() { public Connection getClient() {
return hbaseTemplate.getConnection(); return hbaseTemplate.getConnection();
@ -36,19 +38,7 @@ public abstract class BaseHbaseMapper<T extends BaseHbaseEntity> implements Hbas
@Override @Override
public String getNamespace() { public String getNamespace() {
return "test"; return "default";
}
@Override
public boolean existsTable() {
byte[] namespace = Bytes.toBytes(getNamespace());
byte[] tableName = Bytes.toBytes(getTableName());
try {
return hbaseAdmin.existsTable(TableName.valueOf(namespace, tableName));
} catch (IOException e) {
log.error("【Hbase】existsTable 异常", e);
return false;
}
} }
@Override @Override
@ -57,34 +47,111 @@ public abstract class BaseHbaseMapper<T extends BaseHbaseEntity> implements Hbas
} }
@Override @Override
public T pojoByRowKey(String rowKey) { public int deleteById(Serializable id) {
String rowKey = getIdStr(id);
if (StrUtil.isBlank(rowKey)) {
return 0;
}
try {
hbaseTemplate.delete(getFullTableName(), rowKey);
return 1;
} catch (IOException e) {
log.error("【Hbase】deleteById 异常", e);
return 0;
}
}
@Override
public int deleteById(T entity) {
if (entity == null) {
return 0;
}
return deleteById(entity.getRowKey());
}
@Override
public int deleteBatchIds(Collection<? extends Serializable> ids) {
if (CollectionUtil.isEmpty(ids)) {
return 0;
}
List<String> rowKeys = getIdStrList(ids);
try {
hbaseTemplate.batchDelete(getFullTableName(), rowKeys.toArray(new String[0]));
return rowKeys.size();
} catch (IOException | InterruptedException e) {
log.error("【Hbase】deleteBatchIds 异常", e);
return 0;
}
}
@Override
public int save(T entity) {
try {
String rowKey = entity.getRowKey();
hbaseTemplate.put(getFullTableName(), rowKey, getFamily(), entity);
return 1;
} catch (IOException e) {
log.error("【Hbase】updateById 异常", e);
return 0;
}
}
@Override
public int batchSave(Collection<T> list) {
if (CollectionUtil.isEmpty(list)) {
return 0;
}
try {
hbaseTemplate.batchPut(getFullTableName(), getFamily(), list);
return list.size();
} catch (IOException | InterruptedException e) {
log.error("【Hbase】batchSave 异常", e);
return 0;
}
}
@Override
public T getOneById(Serializable id) {
String rowKey = getIdStr(id);
if (StrUtil.isBlank(rowKey)) { if (StrUtil.isBlank(rowKey)) {
return null; return null;
} }
try { try {
return hbaseTemplate.getEntity(getFullTableName(), rowKey, getFamily(), getEntityClass()); return hbaseTemplate.getEntity(getFullTableName(), rowKey, getFamily(), getEntityClass());
} catch (IOException e) { } catch (IOException e) {
log.error("【Hbase】pojoById 异常", e); log.error("【Hbase】getOneById 异常", e);
return null; return null;
} }
} }
@Override @Override
public List<T> pojoListByRowKeys(Collection<String> rowKeys) { public Map<? extends Serializable, T> getMapByIds(Collection<? extends Serializable> ids) {
if (CollectionUtil.isEmpty(rowKeys)) {
return null; if (CollectionUtil.isEmpty(ids)) {
return new LinkedHashMap<>(0);
} }
List<String> rowKeys = getIdStrList(ids);
try { try {
return hbaseTemplate.getEntityList(getFullTableName(), rowKeys.toArray(new String[0]), return hbaseTemplate.getEntityMap(getFullTableName(), rowKeys.toArray(new String[0]), getFamily(),
getFamily(), getEntityClass()); getEntityClass());
} catch (IOException e) { } catch (IOException e) {
log.error("【Hbase】getEntityList 异常", e); log.error("【Hbase】getMapByIds 异常", e);
return new ArrayList<>(); return new LinkedHashMap<>(0);
} }
} }
@Override @Override
public List<T> scroll(String scrollRowKey, int size) { public List<T> scroll(Serializable scrollId, int size) {
String scrollRowKey = getIdStr(scrollId);
try { try {
ScrollData<T> scrollData = ScrollData<T> scrollData =
hbaseTemplate.getEntityScroll(getFullTableName(), getFamily(), scrollRowKey, size, getEntityClass()); hbaseTemplate.getEntityScroll(getFullTableName(), getFamily(), scrollRowKey, size, getEntityClass());
@ -98,59 +165,30 @@ public abstract class BaseHbaseMapper<T extends BaseHbaseEntity> implements Hbas
} }
} }
@Override
public T save(T entity) {
try {
String rowKey = entity.getRowKey();
hbaseTemplate.put(getFullTableName(), rowKey, getFamily(), entity);
return entity;
} catch (IOException e) {
log.error("【Hbase】put 异常", e);
return null;
}
}
@Override
public boolean batchSave(Collection<T> list) {
try {
hbaseTemplate.batchPut(getFullTableName(), getFamily(), list);
return true;
} catch (IOException | InterruptedException e) {
log.error("【Hbase】batchPut 异常", e);
return false;
}
}
@Override
public boolean delete(String rowKey) {
if (StrUtil.isBlank(rowKey)) {
return true;
}
try {
hbaseTemplate.delete(getFullTableName(), rowKey);
return true;
} catch (IOException e) {
log.error("【Hbase】delete 异常", e);
return false;
}
}
@Override
public boolean batchDelete(Collection<String> rowKeys) {
if (CollectionUtil.isEmpty(rowKeys)) {
return true;
}
try {
hbaseTemplate.batchDelete(getFullTableName(), rowKeys.toArray(new String[0]));
return true;
} catch (IOException | InterruptedException e) {
log.error("【Hbase】batchDelete 异常", e);
return false;
}
}
protected String getFullTableName() { protected String getFullTableName() {
return StrUtil.format("{}:{}", getNamespace(), getTableName()); return StrUtil.format("{}:{}", getNamespace(), getTableName());
} }
protected String getIdStr(Serializable id) {
if (id == null) {
return null;
}
String rowKey;
if (id instanceof String) {
rowKey = (String) id;
} else {
rowKey = String.valueOf(id);
}
return rowKey;
}
protected List<String> getIdStrList(Collection<? extends Serializable> ids) {
if (CollectionUtil.isEmpty(ids)) {
return new ArrayList<>(0);
}
return ids.stream().map(this::getIdStr).filter(Objects::nonNull).collect(Collectors.toList());
}
} }

View File

@ -0,0 +1,78 @@
package io.github.dunwu.javadb.hbase.mapper;
import java.io.Serializable;
import java.util.Collection;
import java.util.List;
/**
* Mapper
*
* @author <a href="mailto:forbreak@163.com">Zhang Peng</a>
* @date 2023-11-22
*/
public interface CommonMapper<T> {
/**
*
*
* @param entity
*/
int insert(T entity);
/**
*
*
* @param list
*/
int insertBatch(Collection<T> list);
/**
* ID
*
* @param id ID
*/
int deleteById(Serializable id);
/**
* (ID)
*
* @param entity
*/
int deleteById(T entity);
/**
* ID
*
* @param idList ID( null empty)
*/
int deleteBatchIds(Collection<? extends Serializable> idList);
/**
* ID
*
* @param entity
*/
int updateById(T entity);
/**
*
*
* @param list
*/
int updateBatchById(Collection<T> list);
/**
* ID
*
* @param id ID
*/
T getOneById(Serializable id);
/**
* ID
*
* @param idList ID( null empty)
*/
List<T> getListByIds(Collection<? extends Serializable> idList);
}

View File

@ -0,0 +1,106 @@
package io.github.dunwu.javadb.hbase.mapper;
import cn.hutool.core.map.MapUtil;
import io.github.dunwu.javadb.hbase.entity.BaseHbaseEntity;
import org.apache.hadoop.hbase.client.Connection;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
/**
* Hbase Mapper
*
* @author <a href="mailto:forbreak@163.com">Zhang Peng</a>
* @date 2023-11-15
*/
public interface HbaseMapper<T extends BaseHbaseEntity> extends CommonMapper<T> {
/**
* Hbase
*/
Connection getClient();
/**
*
*/
String getNamespace();
/**
*
*/
String getTableName();
/**
*
*/
String getFamily();
/**
*
*/
Class<T> getEntityClass();
@Override
default int insert(T entity) {
return save(entity);
}
@Override
default int updateById(T entity) {
return save(entity);
}
/**
*
*
* @param entity
*/
int save(T entity);
@Override
default int insertBatch(Collection<T> list) {
return batchSave(list);
}
@Override
default int updateBatchById(Collection<T> list) {
return batchSave(list);
}
/**
*
*
* @param list
*/
int batchSave(Collection<T> list);
@Override
default List<T> getListByIds(Collection<? extends Serializable> ids) {
Map<? extends Serializable, T> map = getMapByIds(ids);
if (MapUtil.isEmpty(map)) {
return new ArrayList<>();
}
return new ArrayList<>(map.values());
}
/**
* ID Map
*
* @param ids Hbase rowkey
* @return /
*/
Map<? extends Serializable, T> getMapByIds(Collection<? extends Serializable> ids);
/**
* ID
*
* @param scrollId
* @param size
* @return /
*/
List<T> scroll(Serializable scrollId, int size);
}

View File

@ -0,0 +1,144 @@
package io.github.dunwu.javadb.hbase.util;
import cn.hutool.core.util.StrUtil;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.type.TypeFactory;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Slf4j
public class JsonUtil {
private JsonUtil() { }
private static final ObjectMapper OBJECT_MAPPER;
private static final TypeFactory TYPE_FACTORY;
static {
OBJECT_MAPPER = new ObjectMapper();
OBJECT_MAPPER.setSerializationInclusion(JsonInclude.Include.NON_NULL);
OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
TYPE_FACTORY = OBJECT_MAPPER.getTypeFactory();
}
public static ObjectMapper getInstance() {
return OBJECT_MAPPER;
}
/**
*
*/
@SuppressWarnings("unchecked")
public static <T> T toBean(String json, Class<T> clazz) {
if (StrUtil.isBlank(json)) {
return null;
}
if (clazz == String.class) {
return (T) json;
}
try {
return OBJECT_MAPPER.readValue(json, clazz);
} catch (IOException e) {
log.error("反序列化失败json: {}, msg: {}", json, e.getMessage());
}
return null;
}
/**
*
*/
public static <T> T toBean(String json, TypeReference<T> typeReference) {
if (StrUtil.isBlank(json)) {
return null;
}
try {
return (T) OBJECT_MAPPER.readValue(json, typeReference);
} catch (Exception e) {
log.error("反序列化失败json: {}, msg: {}", json, e.getMessage());
}
return null;
}
public static <T, K, V> T toBean(Map<K, V> map, Class<T> clazz) {
return OBJECT_MAPPER.convertValue(toString(map), clazz);
}
public static String toString(Object obj) {
if (obj == null) {
return null;
}
if (obj instanceof String) {
return (String) obj;
}
try {
return OBJECT_MAPPER.writeValueAsString(obj);
} catch (JsonProcessingException e) {
log.error("序列化失败obj: {}, msg: {}", obj, e.getMessage());
}
return null;
}
public static <K, V> Map<K, V> toMap(String json) {
if (StrUtil.isBlank(json)) {
return new HashMap<>(0);
}
try {
return OBJECT_MAPPER.readValue(json, new TypeReference<Map<K, V>>() { });
} catch (Exception e) {
log.error("反序列化失败json: {}, msg: {}", json, e.getMessage());
}
return Collections.emptyMap();
}
public static <K, V> Map<K, V> toMap(Object obj) {
if (obj == null) {
return null;
}
try {
return OBJECT_MAPPER.readValue(toString(obj), new TypeReference<Map<K, V>>() { });
} catch (IOException e) {
log.error("反序列化失败json: {}, msg: {}", toString(obj), e.getMessage());
}
return null;
}
public static <T> List<T> toList(String json, Class<T> clazz) {
if (StrUtil.isBlank(json)) {
return null;
}
JavaType javaType = TYPE_FACTORY.constructParametricType(List.class, clazz);
try {
return OBJECT_MAPPER.readValue(json, javaType);
} catch (IOException e) {
log.error("反序列化失败json: {}, msg: {}", json, e.getMessage());
}
return null;
}
public static <T> List<T> toList(String json, TypeReference<T> typeReference) {
if (StrUtil.isBlank(json)) {
return null;
}
JavaType elementType = TYPE_FACTORY.constructType(typeReference);
JavaType javaType = TYPE_FACTORY.constructParametricType(List.class, elementType);
try {
return OBJECT_MAPPER.readValue(json, javaType);
} catch (IOException e) {
log.error("反序列化失败json: {}, msg: {}", json, e.getMessage());
}
return null;
}
}

View File

@ -1,7 +1,6 @@
package io.github.dunwu.javadb.hbase; package io.github.dunwu.javadb.hbase;
import cn.hutool.core.collection.CollectionUtil; import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONUtil; import cn.hutool.json.JSONUtil;
import org.assertj.core.api.Assertions; import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.DisplayName;
@ -9,8 +8,11 @@ import org.junit.jupiter.api.Test;
import java.io.IOException; import java.io.IOException;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.util.Arrays; import java.util.Collections;
import java.util.Date;
import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map;
/** /**
* @author <a href="mailto:forbreak@163.com">Zhang Peng</a> * @author <a href="mailto:forbreak@163.com">Zhang Peng</a>
@ -18,70 +20,57 @@ import java.util.List;
*/ */
public class HbaseMapperTest { public class HbaseMapperTest {
private static ProductMapper mapper; private static final OrderMapper mapper;
static { static {
HbaseTemplate hbaseTemplate = null; HbaseTemplate hbaseTemplate = null;
try { try {
hbaseTemplate = HbaseFactory.newHbaseTemplate(); hbaseTemplate = HbaseFactory.newHbaseTemplate();
HbaseAdmin hbaseAdmin = HbaseFactory.newHbaseAdmin(); mapper = new OrderMapper(hbaseTemplate);
mapper = new ProductMapper(hbaseTemplate, hbaseAdmin);
} catch (IOException e) { } catch (IOException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
} }
@Test @Test
@DisplayName("批量保存、查询、删除测试") @DisplayName("批量保存、查询、删除 BaseHbaseEntity 实体")
public void batchSave() throws IOException { public void batchSave() {
Product product1 = new Product("test-key-8", "product8", new BigDecimal(4000.0));
Product product2 = new Product("test-key-9", "product9", new BigDecimal(5000.0));
List<Product> products = CollectionUtil.newArrayList(product1, product2);
mapper.batchSave(products);
List<Product> list = mapper.pojoListByRowKeys(Arrays.asList(product1.getRowKey(), product2.getRowKey())); Date now = new Date();
Product product1 = new Product("1", "product1", new BigDecimal(4000.0));
Product product2 = new Product("2", "product2", new BigDecimal(5000.0));
List<Product> products = CollectionUtil.newArrayList(product1, product2);
User user1 = new User(1, "user1");
Map<String, String> tags = new LinkedHashMap<>();
tags.put("type", "tool");
tags.put("color", "red");
Order originOrder = Order.builder()
.id("1")
.user(user1)
.products(products)
.desc("测试订单")
.date(now)
.tags(tags)
.build();
mapper.batchSave(Collections.singleton(originOrder));
List<Order> list = mapper.getListByIds(Collections.singleton(originOrder.getRowKey()));
Assertions.assertThat(list).isNotEmpty(); Assertions.assertThat(list).isNotEmpty();
Assertions.assertThat(list.size()).isEqualTo(2); Order order = list.get(0);
Assertions.assertThat(order).isNotNull();
Assertions.assertThat(order.getDate()).isNotNull().isEqualTo(now);
Assertions.assertThat(order.getTags()).isNotNull().isEqualTo(tags);
Assertions.assertThat(order.getUser()).isNotNull().isEqualTo(user1);
Assertions.assertThat(order.getProducts()).isNotEmpty();
Assertions.assertThat(list).isNotEmpty();
Assertions.assertThat(list.size()).isEqualTo(1);
System.out.println(JSONUtil.toJsonStr(list)); System.out.println(JSONUtil.toJsonStr(list));
mapper.batchDelete(Arrays.asList(product1.getRowKey(), product2.getRowKey())); mapper.deleteBatchIds(Collections.singletonList(originOrder.getRowKey()));
List<Product> list2 = mapper.pojoListByRowKeys(Arrays.asList(product1.getRowKey(), product2.getRowKey())); List<Order> list2 = mapper.getListByIds(Collections.singletonList(originOrder.getRowKey()));
Assertions.assertThat(list2).isEmpty(); Assertions.assertThat(list2).isEmpty();
} }
@Test
@DisplayName("scroll")
public void scroll() throws IOException {
Product product1 = new Product("test-key-8", "product8", new BigDecimal(4000.0));
Product product2 = new Product("test-key-9", "product9", new BigDecimal(5000.0));
List<Product> products = CollectionUtil.newArrayList(product1, product2);
mapper.batchSave(products);
int size = 1;
String lastRowKey = null;
List<Product> list = mapper.scroll(null, size);
if (CollectionUtil.isNotEmpty(list)) {
Product last = CollectionUtil.getLast(list);
System.out.println("entity: " + JSONUtil.toJsonPrettyStr(last));
lastRowKey = last.getRowKey();
}
while (true) {
List<Product> tempList = mapper.scroll(lastRowKey, size);
if (CollectionUtil.isEmpty(list)) {
break;
}
Product last = CollectionUtil.getLast(tempList);
if (last == null) {
break;
}
System.out.println("entity: " + JSONUtil.toJsonPrettyStr(last));
lastRowKey = last.getRowKey();
if (StrUtil.isBlank(lastRowKey)) {
break;
}
}
}
} }

View File

@ -1,47 +0,0 @@
package io.github.dunwu.javadb.hbase;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
* Hbase
*
* @author <a href="mailto:forbreak@163.com">Zhang Peng</a>
* @date 2023-11-13
*/
public class HbaseTemplateDeleteTest {
public static final String TABLE_NAME = "test:test";
private static final HbaseTemplate HBASE_TEMPLATE;
static {
try {
HBASE_TEMPLATE = HbaseFactory.newHbaseTemplate();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Test
@DisplayName("删除单条记录")
public void testDelete() throws IOException {
String rowkey = "test-key-1";
HBASE_TEMPLATE.delete(TABLE_NAME, rowkey);
}
@Test
@DisplayName("批量删除记录")
public void testBatchDelete() throws IOException, InterruptedException {
List<String> rowkeys = new ArrayList<>();
for (int i = 1; i <= 13; i++) {
rowkeys.add("test-key-" + i);
}
HBASE_TEMPLATE.batchDelete(TABLE_NAME, rowkeys.toArray(new String[0]));
}
}

View File

@ -1,26 +1,28 @@
package io.github.dunwu.javadb.hbase; package io.github.dunwu.javadb.hbase;
import cn.hutool.core.util.StrUtil; import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.json.JSONUtil; import cn.hutool.core.util.ObjectUtil;
import io.github.dunwu.javadb.hbase.entity.ColumnDo; import io.github.dunwu.javadb.hbase.entity.BaseHbaseEntity;
import io.github.dunwu.javadb.hbase.entity.FamilyDo; import io.github.dunwu.javadb.hbase.entity.common.ColumnDo;
import io.github.dunwu.javadb.hbase.entity.RowDo; import io.github.dunwu.javadb.hbase.entity.common.FamilyDo;
import io.github.dunwu.javadb.hbase.entity.common.RowDo;
import io.github.dunwu.javadb.hbase.util.JsonUtil;
import org.apache.hadoop.hbase.client.Put;
import org.assertj.core.api.Assertions; import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.math.BigDecimal;
import java.util.Collection; import java.util.Date;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.stream.Collectors;
/** /**
* Get * Hbase Get
* <p>
* {@link HbaseTemplatePutTest}
* *
* @author <a href="mailto:forbreak@163.com">Zhang Peng</a> * @author <a href="mailto:forbreak@163.com">Zhang Peng</a>
* @date 2023-11-13 * @date 2023-11-13
@ -40,85 +42,267 @@ public class HbaseTemplateGetTest {
} }
@Test @Test
@DisplayName("查询实体") @DisplayName("put、get 单列数据")
public void getEntity() throws IOException { public void test00() throws IOException {
User user = HBASE_TEMPLATE.getEntity(TABLE_NAME, "test-key-3", "f1", User.class); long timestamp = System.currentTimeMillis();
System.out.println(StrUtil.format("查询实体: {}", JSONUtil.toJsonStr(user))); HBASE_TEMPLATE.put(TABLE_NAME, "test-key-0", "f1", "name", "user0");
Assertions.assertThat(user).isNotNull(); ColumnDo columnDo = HBASE_TEMPLATE.getColumn(TABLE_NAME, "test-key-0", "f1", "name");
Assertions.assertThat(columnDo).isNotNull();
Assertions.assertThat(columnDo.getColumn()).isEqualTo("name");
Assertions.assertThat(columnDo.getValue()).isEqualTo("user0");
HBASE_TEMPLATE.put(TABLE_NAME, "test-key-0", timestamp, "f2", "姓名", "张三");
ColumnDo columnDo2 = HBASE_TEMPLATE.getColumn(TABLE_NAME, "test-key-0", "f2", "姓名");
Assertions.assertThat(columnDo2).isNotNull();
Assertions.assertThat(columnDo2.getColumn()).isEqualTo("姓名");
Assertions.assertThat(columnDo2.getValue()).isEqualTo("张三");
Assertions.assertThat(columnDo2.getTimestamp()).isEqualTo(timestamp);
HBASE_TEMPLATE.delete(TABLE_NAME, "test-key-0");
columnDo = HBASE_TEMPLATE.getColumn(TABLE_NAME, "test-key-0", "f1", "name");
Assertions.assertThat(columnDo).isNull();
columnDo2 = HBASE_TEMPLATE.getColumn(TABLE_NAME, "test-key-0", "f2", "姓名");
Assertions.assertThat(columnDo2).isNull();
} }
@Test @Test
@DisplayName("查询实体列表") @DisplayName("put、get 多列数据")
public void getEntityList() throws IOException { public void test01() throws IOException {
List<String> rows = Arrays.asList("test-key-3", "test-key-4", "test-key-5");
List<User> list = HBASE_TEMPLATE.getEntityList(TABLE_NAME, rows.toArray(new String[0]), "f1", User.class); String row = "test-key-1";
System.out.println(StrUtil.format("查询实体列表: {}", JSONUtil.toJsonStr(list))); long timestamp = System.currentTimeMillis();
Map<String, Object> map1 = new HashMap<>(2);
map1.put("id", 1);
map1.put("name", "zhangsan");
Map<String, Object> map2 = new HashMap<>(2);
map2.put("编号", 1);
map2.put("姓名", "张三");
HBASE_TEMPLATE.put(TABLE_NAME, row, timestamp, "f1", map1);
HBASE_TEMPLATE.put(TABLE_NAME, row, timestamp, "f2", map2);
Map<String, ColumnDo> f1ColumnMap = HBASE_TEMPLATE.getColumnMap(TABLE_NAME, row, "f1", "id", "name");
Assertions.assertThat(f1ColumnMap).isNotEmpty();
Assertions.assertThat(f1ColumnMap.get("id")).isNotNull();
Assertions.assertThat(f1ColumnMap.get("id").getValue()).isEqualTo(String.valueOf(1));
Assertions.assertThat(f1ColumnMap.get("name")).isNotNull();
Assertions.assertThat(f1ColumnMap.get("name").getValue()).isEqualTo("zhangsan");
Map<String, ColumnDo> f2ColumnMap = HBASE_TEMPLATE.getColumnMap(TABLE_NAME, row, "f2", "编号", "姓名");
Assertions.assertThat(f2ColumnMap).isNotEmpty();
Assertions.assertThat(f2ColumnMap.get("编号")).isNotNull();
Assertions.assertThat(f2ColumnMap.get("编号").getValue()).isEqualTo(String.valueOf(1));
Assertions.assertThat(f2ColumnMap.get("姓名")).isNotNull();
Assertions.assertThat(f2ColumnMap.get("姓名").getValue()).isEqualTo("张三");
HBASE_TEMPLATE.delete(TABLE_NAME, row);
f1ColumnMap = HBASE_TEMPLATE.getColumnMap(TABLE_NAME, row, "f1", "id", "name");
Assertions.assertThat(f1ColumnMap).isEmpty();
f2ColumnMap = HBASE_TEMPLATE.getColumnMap(TABLE_NAME, row, "f2", "编号", "姓名");
Assertions.assertThat(f2ColumnMap).isEmpty();
}
@Test
@DisplayName("put、get 列族数据")
public void test02() throws IOException {
String row = "test-key-2";
long timestamp = System.currentTimeMillis();
Map<String, Object> map1 = new HashMap<>(2);
map1.put("id", 1);
map1.put("name", "zhangsan");
Map<String, Object> map2 = new HashMap<>(2);
map2.put("编号", 1);
map2.put("姓名", "张三");
HBASE_TEMPLATE.put(TABLE_NAME, row, timestamp, "f1", map1);
HBASE_TEMPLATE.put(TABLE_NAME, row, timestamp, "f2", map2);
FamilyDo f1 = HBASE_TEMPLATE.getFamily(TABLE_NAME, row, "f1");
Assertions.assertThat(f1).isNotNull();
Assertions.assertThat(f1.getColumnMap().get("id")).isNotNull();
Assertions.assertThat(f1.getColumnMap().get("id").getValue()).isEqualTo(String.valueOf(1));
Assertions.assertThat(f1.getColumnMap().get("name")).isNotNull();
Assertions.assertThat(f1.getColumnMap().get("name").getValue()).isEqualTo("zhangsan");
FamilyDo f2 = HBASE_TEMPLATE.getFamily(TABLE_NAME, row, "f2");
Assertions.assertThat(f2).isNotNull();
Assertions.assertThat(f2.getColumnMap().get("编号")).isNotNull();
Assertions.assertThat(f2.getColumnMap().get("编号").getValue()).isEqualTo(String.valueOf(1));
Assertions.assertThat(f2.getColumnMap().get("姓名")).isNotNull();
Assertions.assertThat(f2.getColumnMap().get("姓名").getValue()).isEqualTo("张三");
HBASE_TEMPLATE.delete(TABLE_NAME, row);
f1 = HBASE_TEMPLATE.getFamily(TABLE_NAME, row, "f1");
Assertions.assertThat(f1).isNull();
f2 = HBASE_TEMPLATE.getFamily(TABLE_NAME, row, "f2");
Assertions.assertThat(f2).isNull();
}
@Test
@DisplayName("put、get 单行数据")
public void test03() throws IOException {
String row = "test-key-3";
long timestamp = System.currentTimeMillis();
Map<String, Object> map1 = new HashMap<>(2);
map1.put("id", 1);
map1.put("name", "zhangsan");
Map<String, Object> map2 = new HashMap<>(2);
map2.put("编号", 1);
map2.put("姓名", "张三");
Map<String, Map<String, Object>> familyMap = new HashMap<>(2);
familyMap.put("f1", map1);
familyMap.put("f2", map2);
HBASE_TEMPLATE.put(TABLE_NAME, row, timestamp, familyMap);
RowDo rowDo = HBASE_TEMPLATE.getRow(TABLE_NAME, row);
Assertions.assertThat(rowDo).isNotNull();
FamilyDo f1 = rowDo.getFamilyMap().get("f1");
Assertions.assertThat(f1).isNotNull();
Assertions.assertThat(f1.getColumnMap()).isNotEmpty();
Assertions.assertThat(f1.getColumnMap().get("id")).isNotNull();
Assertions.assertThat(f1.getColumnMap().get("id").getValue()).isEqualTo(String.valueOf(1));
Assertions.assertThat(f1.getColumnMap().get("name")).isNotNull();
Assertions.assertThat(f1.getColumnMap().get("name").getValue()).isEqualTo("zhangsan");
FamilyDo f2 = rowDo.getFamilyMap().get("f2");
Assertions.assertThat(f2).isNotNull();
Assertions.assertThat(f2.getColumnMap()).isNotEmpty();
Assertions.assertThat(f2.getColumnMap().get("编号")).isNotNull();
Assertions.assertThat(f2.getColumnMap().get("编号").getValue()).isEqualTo(String.valueOf(1));
Assertions.assertThat(f2.getColumnMap().get("姓名")).isNotNull();
Assertions.assertThat(f2.getColumnMap().get("姓名").getValue()).isEqualTo("张三");
HBASE_TEMPLATE.delete(TABLE_NAME, row);
rowDo = HBASE_TEMPLATE.getRow(TABLE_NAME, row);
Assertions.assertThat(rowDo).isNull();
}
@Test
@DisplayName("put get 多行数据")
public void test04() throws IOException, InterruptedException {
long timestamp = System.currentTimeMillis();
Map<String, Object> columnMap1 = new HashMap<>(2);
columnMap1.put("id", 1);
columnMap1.put("name", "zhangsan");
Put put = HbaseTemplate.newPut("test-key-1", timestamp, "f1", columnMap1);
Map<String, Object> columnMap2 = new HashMap<>(2);
columnMap2.put("id", 2);
columnMap2.put("name", "lisi");
Put put2 = HbaseTemplate.newPut("test-key-2", timestamp, "f1", columnMap2);
List<Put> puts = CollectionUtil.newArrayList(put, put2);
HBASE_TEMPLATE.batchPut(TABLE_NAME, puts);
Map<String, RowDo> rowMap = HBASE_TEMPLATE.getRowMap(TABLE_NAME, "test-key-1", "test-key-2");
RowDo rowDo1 = rowMap.get("test-key-1");
Assertions.assertThat(rowDo1).isNotNull();
FamilyDo f1 = rowDo1.getFamilyMap().get("f1");
Assertions.assertThat(f1).isNotNull();
Assertions.assertThat(f1.getColumnMap()).isNotEmpty();
Assertions.assertThat(f1.getColumnMap().get("id")).isNotNull();
Assertions.assertThat(f1.getColumnMap().get("id").getValue()).isEqualTo(String.valueOf(1));
Assertions.assertThat(f1.getColumnMap().get("name")).isNotNull();
Assertions.assertThat(f1.getColumnMap().get("name").getValue()).isEqualTo("zhangsan");
RowDo rowDo2 = rowMap.get("test-key-2");
FamilyDo f2 = rowDo2.getFamilyMap().get("f1");
Assertions.assertThat(f2).isNotNull();
Assertions.assertThat(f2.getColumnMap()).isNotEmpty();
Assertions.assertThat(f2.getColumnMap().get("id")).isNotNull();
Assertions.assertThat(f2.getColumnMap().get("id").getValue()).isEqualTo(String.valueOf(2));
Assertions.assertThat(f2.getColumnMap().get("name")).isNotNull();
Assertions.assertThat(f2.getColumnMap().get("name").getValue()).isEqualTo("lisi");
HBASE_TEMPLATE.batchDelete(TABLE_NAME, "test-key-1", "test-key-2");
rowDo1 = HBASE_TEMPLATE.getRow(TABLE_NAME, "test-key-1");
Assertions.assertThat(rowDo1).isNull();
rowDo2 = HBASE_TEMPLATE.getRow(TABLE_NAME, "test-key-2");
Assertions.assertThat(rowDo2).isNull();
}
@Test
@DisplayName("put get 简单 Java 实体数据")
public void test05() throws IOException, InterruptedException {
User originUser1 = new User(1, "user1");
HBASE_TEMPLATE.put(TABLE_NAME, "test-key-1", "f1", originUser1);
User user1 = HBASE_TEMPLATE.getEntity(TABLE_NAME, "test-key-1", "f1", User.class);
Assertions.assertThat(user1).isNotNull();
Assertions.assertThat(ObjectUtil.equals(originUser1, user1)).isTrue();
HBASE_TEMPLATE.batchDelete(TABLE_NAME, "test-key-1", "test-key-2");
user1 = HBASE_TEMPLATE.getEntity(TABLE_NAME, "test-key-1", "f1", User.class);
Assertions.assertThat(user1).isNull();
}
@Test
@DisplayName("put get 实现 BaseHbaseEntity 的简单 Java 实体数据")
public void test06() throws IOException, InterruptedException {
Product product1 = new Product("1", "product1", new BigDecimal(4000.0));
Product product2 = new Product("2", "product2", new BigDecimal(5000.0));
List<Product> products = CollectionUtil.newArrayList(product1, product2);
HBASE_TEMPLATE.batchPut(TABLE_NAME, "f1", products);
List<String> rows = products.stream().map(BaseHbaseEntity::getRowKey).collect(Collectors.toList());
List<Product> list = HBASE_TEMPLATE.getEntityList(TABLE_NAME, rows, "f1", Product.class);
Assertions.assertThat(list).isNotEmpty(); Assertions.assertThat(list).isNotEmpty();
Assertions.assertThat(list.size()).isEqualTo(rows.size()); Assertions.assertThat(list.size()).isEqualTo(rows.size());
HBASE_TEMPLATE.batchDelete(TABLE_NAME, rows.toArray(new String[0]));
product1 = HBASE_TEMPLATE.getEntity(TABLE_NAME, "test-key-1", "f1", Product.class);
Assertions.assertThat(product1).isNull();
product2 = HBASE_TEMPLATE.getEntity(TABLE_NAME, "test-key-2", "f1", Product.class);
Assertions.assertThat(product2).isNull();
list = HBASE_TEMPLATE.getEntityList(TABLE_NAME, rows, "f1", Product.class);
Assertions.assertThat(list).isEmpty();
} }
@Test @Test
@DisplayName("查询列") @DisplayName("put get 实现 BaseHbaseEntity 的复杂 Java 实体数据")
public void getColumn() throws IOException { public void test07() throws IOException {
ColumnDo columnDo = HBASE_TEMPLATE.getColumn(TABLE_NAME, "test-key-1", "f1", "key");
System.out.println(StrUtil.format("查询单列: {}", JSONUtil.toJsonStr(columnDo)));
}
@Test Date now = new Date();
@DisplayName("查询多列") Product product1 = new Product("1", "product1", new BigDecimal(4000.0));
public void getColumnMap() throws IOException { Product product2 = new Product("2", "product2", new BigDecimal(5000.0));
Map<String, ColumnDo> columnMap = HBASE_TEMPLATE.getColumnMap(TABLE_NAME, "test-key-3", "f1"); List<Product> products = CollectionUtil.newArrayList(product1, product2);
System.out.println(StrUtil.format("查询多列: {}", JSONUtil.toJsonStr(columnMap))); User user1 = new User(1, "user1");
Assertions.assertThat(columnMap).isNotEmpty(); Map<String, String> tags = new LinkedHashMap<>();
tags.put("type", "tool");
tags.put("color", "red");
Map<String, ColumnDo> columnMap2 = HBASE_TEMPLATE.getColumnMap(TABLE_NAME, "test-key-3", "f1", "id", "name"); Order originOrder = Order.builder()
System.out.println(StrUtil.format("查询多列: {}", JSONUtil.toJsonStr(columnMap2))); .id("1")
Assertions.assertThat(columnMap2).isNotEmpty(); .user(user1)
} .products(products)
.desc("测试订单")
.date(now)
.tags(tags)
.build();
@Test HBASE_TEMPLATE.put(TABLE_NAME, "f1", originOrder);
@DisplayName("查询列族")
public void getFamily() throws IOException {
FamilyDo familyDo = HBASE_TEMPLATE.getFamily(TABLE_NAME, "test-key-7", "f1");
System.out.println(StrUtil.format("查询列族: {}", JSONUtil.toJsonStr(familyDo)));
Assertions.assertThat(familyDo).isNotNull();
Assertions.assertThat(familyDo.getFamily()).isEqualTo("f1");
FamilyDo familyDo2 = HBASE_TEMPLATE.getFamily(TABLE_NAME, "test-key-7", "f2"); Order order = HBASE_TEMPLATE.getEntity(TABLE_NAME, originOrder.getRowKey(), "f1", Order.class);
System.out.println(StrUtil.format("查询列族: {}", JSONUtil.toJsonStr(familyDo2))); Assertions.assertThat(order).isNotNull();
Assertions.assertThat(familyDo2).isNotNull(); Assertions.assertThat(order.getDate()).isNotNull().isEqualTo(now);
Assertions.assertThat(familyDo2.getFamily()).isEqualTo("f2"); Assertions.assertThat(order.getTags()).isNotNull().isEqualTo(tags);
} Assertions.assertThat(order.getUser()).isNotNull().isEqualTo(user1);
Assertions.assertThat(order.getProducts()).isNotEmpty();
@Test System.out.println("order: " + JsonUtil.toString(order));
@DisplayName("查询多列族")
public void getFamilyMap() throws IOException {
Map<String, Collection<String>> familyColumnMap = new HashMap<>();
familyColumnMap.put("f1", Collections.singleton("id"));
familyColumnMap.put("f2", Collections.singleton("name"));
Map<String, FamilyDo> familyMap = HBASE_TEMPLATE.getFamilyMap(TABLE_NAME, "test-key-7", familyColumnMap);
System.out.println(StrUtil.format("查询多列族: {}", JSONUtil.toJsonStr(familyMap)));
Assertions.assertThat(familyMap).isNotEmpty();
Assertions.assertThat(familyMap.size()).isEqualTo(familyColumnMap.size());
}
@Test HBASE_TEMPLATE.delete(TABLE_NAME, originOrder.getRowKey());
@DisplayName("查询行") order = HBASE_TEMPLATE.getEntity(TABLE_NAME, order.getRowKey(), "f1", Order.class);
public void getRow() throws IOException { Assertions.assertThat(order).isNull();
RowDo rowDo = HBASE_TEMPLATE.getRow(TABLE_NAME, "test-key-7");
System.out.println(StrUtil.format("查询行: {}", JSONUtil.toJsonStr(rowDo)));
Assertions.assertThat(rowDo).isNotNull();
Assertions.assertThat(rowDo.getRow()).isEqualTo("test-key-7");
}
@Test
@DisplayName("批量查询行记录")
public void getRowMap() throws IOException {
String[] rows = new String[] { "test-key-3", "test-key-4", "test-key-7" };
Map<String, RowDo> rowMap = HBASE_TEMPLATE.getRowMap(TABLE_NAME, rows);
System.out.println(StrUtil.format("批量查询行记录: {}", JSONUtil.toJsonStr(rowMap)));
Assertions.assertThat(rowMap).isNotEmpty();
Assertions.assertThat(rowMap.size()).isEqualTo(rows.length);
} }
} }

View File

@ -1,105 +0,0 @@
package io.github.dunwu.javadb.hbase;
import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.collection.CollectionUtil;
import org.apache.hadoop.hbase.client.Put;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Hbase Put
*
* @author <a href="mailto:forbreak@163.com">Zhang Peng</a>
* @date 2023-11-13
*/
public class HbaseTemplatePutTest {
public static final String TABLE_NAME = "test:test";
private static final HbaseTemplate HBASE_TEMPLATE;
static {
try {
HBASE_TEMPLATE = HbaseFactory.newHbaseTemplate();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Test
@DisplayName("put 测试 01")
public void put01() throws IOException {
long timestamp = System.currentTimeMillis();
HBASE_TEMPLATE.put(TABLE_NAME, "test-key-0", "f1", "name", "user0");
HBASE_TEMPLATE.put(TABLE_NAME, "test-key-1", timestamp, "f1", "name", "user1");
}
@Test
@DisplayName("put 测试 02")
public void put02() throws IOException {
long timestamp = System.currentTimeMillis();
User user2 = new User(2, "user2");
HBASE_TEMPLATE.put(TABLE_NAME, "test-key-2", "f1", user2);
User user3 = new User(3, "user3");
HBASE_TEMPLATE.put(TABLE_NAME, "test-key-3", timestamp, "f1", user3);
}
@Test
@DisplayName("put 测试 03")
public void put03() throws IOException {
long timestamp = System.currentTimeMillis();
User user4 = new User(4, "user4");
Map<String, Object> map = BeanUtil.beanToMap(user4);
HBASE_TEMPLATE.put(TABLE_NAME, "test-key-4", timestamp, "f1", map);
}
@Test
@DisplayName("put 测试 04")
public void put04() throws IOException {
long timestamp = System.currentTimeMillis();
User user5 = new User(5, "user5");
Product product5 = new Product("test-key-5", "product5", new BigDecimal(4000.0));
Map<String, Map<String, Object>> familyMap = new HashMap<>(2);
Map<String, Object> userMap = BeanUtil.beanToMap(user5);
familyMap.put("f1", userMap);
Map<String, Object> productMap = BeanUtil.beanToMap(product5);
familyMap.put("f2", productMap);
HBASE_TEMPLATE.put(TABLE_NAME, "test-key-5", timestamp, familyMap);
}
@Test
@DisplayName("put 测试 05")
public void put05() throws IOException {
Put put = HbaseTemplate.newPut("test-key-6", null, "f1", "name", "user6");
HBASE_TEMPLATE.put(TABLE_NAME, put);
}
@Test
@DisplayName("put 测试 06")
public void put06() throws IOException, InterruptedException {
long timestamp = System.currentTimeMillis();
User user7 = new User(5, "user7");
Product product7 = new Product("test-key-7", "product5", new BigDecimal(4000.0));
Put put1 = HbaseTemplate.newPut("test-key-7", timestamp, "f1", user7);
Put put2 = HbaseTemplate.newPut("test-key-7", timestamp, "f2", product7);
List<Put> list = Arrays.asList(put1, put2);
HBASE_TEMPLATE.batchPut(TABLE_NAME, list);
}
@Test
@DisplayName("batchPut 测试2")
public void batchPut2() throws IOException, InterruptedException {
Product product1 = new Product("test-key-8", "product8", new BigDecimal(4000.0));
Product product2 = new Product("test-key-9", "product9", new BigDecimal(5000.0));
List<Product> products = CollectionUtil.newArrayList(product1, product2);
HBASE_TEMPLATE.batchPut(TABLE_NAME, "f2", products);
}
}

View File

@ -1,32 +1,31 @@
package io.github.dunwu.javadb.hbase; package io.github.dunwu.javadb.hbase;
import cn.hutool.core.collection.CollectionUtil; import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.util.ReflectUtil; import cn.hutool.core.util.RandomUtil;
import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONUtil; import cn.hutool.json.JSONUtil;
import io.github.dunwu.javadb.hbase.entity.PageData; import io.github.dunwu.javadb.hbase.entity.common.PageData;
import io.github.dunwu.javadb.hbase.entity.RowDo; import io.github.dunwu.javadb.hbase.entity.common.RowDo;
import io.github.dunwu.javadb.hbase.entity.ScrollData; import io.github.dunwu.javadb.hbase.entity.common.ScrollData;
import io.github.dunwu.javadb.hbase.entity.scan.MultiFamilyScan; import io.github.dunwu.javadb.hbase.entity.scan.MultiFamilyScan;
import io.github.dunwu.javadb.hbase.entity.scan.SingleFamilyScan; import io.github.dunwu.javadb.hbase.entity.scan.SingleFamilyScan;
import org.apache.hadoop.hbase.client.Put;
import org.assertj.core.api.Assertions; import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import java.io.IOException; import java.io.IOException;
import java.lang.reflect.Field; import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/** /**
* Get * Get
* <p> * <p>
* {@link HbaseTemplatePutTest} * {@link HbaseTemplateGetTest}
* *
* @author <a href="mailto:forbreak@163.com">Zhang Peng</a> * @author <a href="mailto:forbreak@163.com">Zhang Peng</a>
* @date 2023-11-13 * @date 2023-11-13
@ -45,103 +44,136 @@ public class HbaseTemplateScanTest {
} }
} }
@Test
@DisplayName("批量初始化")
public void init() throws IOException, InterruptedException {
List<Product> products = new ArrayList<>();
List<Put> userPuts = new ArrayList<>();
for (int i = 1; i <= 100; i++) {
Product product = new Product(String.valueOf(i), "product" + i,
new BigDecimal(RandomUtil.randomDouble(9999.0)));
products.add(product);
User user = new User(i, "user" + i);
Put put = HbaseTemplate.newPut(product.getRowKey(), null, "f2", user);
userPuts.add(put);
}
HBASE_TEMPLATE.batchPut(TABLE_NAME, "f1", products);
HBASE_TEMPLATE.batchPut(TABLE_NAME, userPuts);
}
@Test @Test
@DisplayName("单列族分页查询") @DisplayName("单列族分页查询")
public void page() throws IOException { public void test01() throws IOException {
for (int page = 1; page <= 2; page++) { SingleFamilyScan scan = new SingleFamilyScan();
SingleFamilyScan scan = new SingleFamilyScan(); scan.setFamily("f1")
scan.setFamily("f1") .setTableName(TABLE_NAME)
.setTableName(TABLE_NAME) .setPage(1)
.setPage(page) .setSize(10)
.setSize(2) .setReversed(true);
.setReversed(true); PageData<RowDo> firstPage = HBASE_TEMPLATE.page(scan);
PageData<RowDo> rowDoMap = HBASE_TEMPLATE.page(scan); System.out.println(StrUtil.format("第 {} 页数据: {}", 1, JSONUtil.toJsonStr(firstPage)));
System.out.println(StrUtil.format("查询实体: {}", JSONUtil.toJsonStr(rowDoMap)));
Assertions.assertThat(rowDoMap).isNotNull(); int totalPages = firstPage.getTotalPages();
for (int page = 2; page <= totalPages; page++) {
scan.setPage(page);
PageData<RowDo> nextPage = HBASE_TEMPLATE.page(scan);
System.out.println(StrUtil.format("第 {} 页数据: {}", page, JSONUtil.toJsonStr(nextPage)));
Assertions.assertThat(nextPage).isNotNull();
} }
} }
@Test @Test
@DisplayName("多列族分页查询") @DisplayName("多列族分页查询")
public void page2() throws IOException { public void test02() throws IOException {
Map<String, Collection<String>> familyColumnMap = new HashMap<>(); Map<String, Collection<String>> familyColumnMap = new HashMap<>();
familyColumnMap.put("f1", Collections.singleton("id")); familyColumnMap.put("f1", CollectionUtil.newArrayList("id", "name", "price"));
familyColumnMap.put("f2", Collections.singleton("name")); familyColumnMap.put("f2", CollectionUtil.newArrayList("id", "name"));
for (int page = 1; page <= 2; page++) {
MultiFamilyScan scan = new MultiFamilyScan(); MultiFamilyScan scan = new MultiFamilyScan();
scan.setFamilyColumnMap(familyColumnMap) scan.setFamilyColumnMap(familyColumnMap)
.setTableName(TABLE_NAME) .setTableName(TABLE_NAME)
.setPage(page) .setPage(1)
.setSize(2) .setSize(10)
.setReversed(true); .setReversed(true);
PageData<RowDo> rowDoMap = HBASE_TEMPLATE.page(scan); PageData<RowDo> firstPage = HBASE_TEMPLATE.page(scan);
System.out.println(StrUtil.format("查询实体: {}", JSONUtil.toJsonStr(rowDoMap))); System.out.println(StrUtil.format("第 {} 页数据: {}", 1, JSONUtil.toJsonStr(firstPage)));
Assertions.assertThat(rowDoMap).isNotNull();
int totalPages = firstPage.getTotalPages();
for (int page = 1; page <= totalPages; page++) {
scan.setPage(page);
PageData<RowDo> nextPage = HBASE_TEMPLATE.page(scan);
System.out.println(StrUtil.format("查询实体: {}", JSONUtil.toJsonStr(nextPage)));
Assertions.assertThat(nextPage).isNotNull();
} }
} }
@Test @Test
@DisplayName("查询实体列表") @DisplayName("实体分页查询")
public void getEntityPage() throws IOException { public void test03() throws IOException {
SingleFamilyScan scan = new SingleFamilyScan(); SingleFamilyScan scan = new SingleFamilyScan();
scan.setFamily("f1") scan.setFamily("f2")
.setTableName(TABLE_NAME) .setTableName(TABLE_NAME)
.setPage(1) .setPage(1)
.setSize(2) .setSize(10)
.setReversed(true); .setReversed(true);
PageData<User> entityPage = HBASE_TEMPLATE.getEntityPage(scan, User.class); PageData<User> firstPage = HBASE_TEMPLATE.getEntityPage(scan, User.class);
System.out.println(StrUtil.format("查询实体列表: {}", JSONUtil.toJsonStr(entityPage))); System.out.println(StrUtil.format("第 {} 页数据: {}", 1, JSONUtil.toJsonStr(firstPage)));
Assertions.assertThat(entityPage).isNotNull();
int totalPages = firstPage.getTotalPages();
for (int page = 2; page <= totalPages; page++) {
scan.setPage(page);
PageData<User> nextPage = HBASE_TEMPLATE.getEntityPage(scan, User.class);
System.out.println(StrUtil.format("第 {} 页数据: {}", page, JSONUtil.toJsonStr(nextPage)));
Assertions.assertThat(nextPage).isNotNull();
}
} }
@Test @Test
@DisplayName("单列族滚动查询") @DisplayName("单列族滚动查询")
public void scroll() throws IOException { public void test04() throws IOException {
SingleFamilyScan scan = new SingleFamilyScan(); SingleFamilyScan scan = new SingleFamilyScan();
scan.setFamily("f1") scan.setFamily("f1")
.setTableName(TABLE_NAME) .setTableName(TABLE_NAME)
.setSize(1) .setSize(10)
.setStartRow("test-key-1")
.setStopRow("test-key-9")
.setReversed(false); .setReversed(false);
ScrollData<RowDo> data = HBASE_TEMPLATE.scroll(scan);
System.out.println(StrUtil.format("查询实体: {}", JSONUtil.toJsonPrettyStr(data))); int page = 1;
Assertions.assertThat(data).isNotNull(); ScrollData<RowDo> first = HBASE_TEMPLATE.scroll(scan);
scan.setScrollRow(data.getScrollRow()); System.out.println(StrUtil.format("第 {} 页数据: {}", page, JSONUtil.toJsonPrettyStr(first)));
Assertions.assertThat(first).isNotNull();
scan.setScrollRow(first.getScrollRow());
while (true) { while (true) {
page++;
ScrollData<RowDo> next = HBASE_TEMPLATE.scroll(scan); ScrollData<RowDo> next = HBASE_TEMPLATE.scroll(scan);
if (next == null || CollectionUtil.isEmpty(next.getContent())) { if (next == null || CollectionUtil.isEmpty(next.getContent())) {
break; break;
} }
System.out.println(StrUtil.format("查询实体: {}", JSONUtil.toJsonPrettyStr(next))); System.out.println(StrUtil.format("第 {} 页数据: {}", page, JSONUtil.toJsonPrettyStr(first)));
scan.setScrollRow(next.getScrollRow()); scan.setScrollRow(next.getScrollRow());
} }
} }
@Test @Test
@DisplayName("多列族滚动查询") @DisplayName("多列族滚动查询")
public void scroll2() throws IOException { public void test05() throws IOException {
List<String> userFields = Stream.of(ReflectUtil.getFields(User.class))
.map(Field::getName).collect(Collectors.toList());
List<String> productFields = Stream.of(ReflectUtil.getFields(Product.class))
.map(Field::getName).collect(Collectors.toList());
Map<String, Collection<String>> familyColumnMap = new HashMap<>(); Map<String, Collection<String>> familyColumnMap = new HashMap<>();
familyColumnMap.put("f1", userFields); familyColumnMap.put("f1", CollectionUtil.newArrayList("id", "name", "price"));
familyColumnMap.put("f2", productFields); familyColumnMap.put("f2", CollectionUtil.newArrayList("id", "name"));
MultiFamilyScan scan = new MultiFamilyScan(); MultiFamilyScan scan = new MultiFamilyScan();
scan.setFamilyColumnMap(familyColumnMap) scan.setFamilyColumnMap(familyColumnMap)
.setTableName(TABLE_NAME) .setTableName(TABLE_NAME)
.setSize(1) .setSize(10)
.setStartRow("test-key-1")
.setStopRow("test-key-9")
.setReversed(true); .setReversed(true);
ScrollData<RowDo> data = HBASE_TEMPLATE.scroll(scan);
System.out.println(StrUtil.format("查询实体: {}", JSONUtil.toJsonPrettyStr(data))); ScrollData<RowDo> first = HBASE_TEMPLATE.scroll(scan);
Assertions.assertThat(data).isNotNull(); System.out.println(StrUtil.format("查询实体: {}", JSONUtil.toJsonPrettyStr(first)));
scan.setScrollRow(data.getScrollRow()); Assertions.assertThat(first).isNotNull();
scan.setScrollRow(first.getScrollRow());
while (true) { while (true) {
ScrollData<RowDo> next = HBASE_TEMPLATE.scroll(scan); ScrollData<RowDo> next = HBASE_TEMPLATE.scroll(scan);
@ -155,23 +187,21 @@ public class HbaseTemplateScanTest {
@Test @Test
@DisplayName("滚动查询实体") @DisplayName("滚动查询实体")
public void getEntityScroll() throws IOException { public void test06() throws IOException {
SingleFamilyScan scan = new SingleFamilyScan(); SingleFamilyScan scan = new SingleFamilyScan();
scan.setFamily("f1") scan.setFamily("f1")
.setTableName(TABLE_NAME) .setTableName(TABLE_NAME)
.setSize(1) .setSize(10)
.setStartRow("test-key-1")
.setStopRow("test-key-9")
.setReversed(false); .setReversed(false);
ScrollData<User> data = HBASE_TEMPLATE.getEntityScroll(scan, User.class); ScrollData<Product> first = HBASE_TEMPLATE.getEntityScroll(scan, Product.class);
System.out.println(StrUtil.format("查询实体: {}", JSONUtil.toJsonPrettyStr(data))); System.out.println(StrUtil.format("查询实体: {}", JSONUtil.toJsonPrettyStr(first)));
Assertions.assertThat(data).isNotNull(); Assertions.assertThat(first).isNotNull();
scan.setScrollRow(data.getScrollRow()); scan.setScrollRow(first.getScrollRow());
while (true) { while (true) {
ScrollData<User> next = HBASE_TEMPLATE.getEntityScroll(scan, User.class); ScrollData<Product> next = HBASE_TEMPLATE.getEntityScroll(scan, Product.class);
if (next == null || CollectionUtil.isEmpty(next.getContent())) { if (next == null || CollectionUtil.isEmpty(next.getContent())) {
break; break;
} }
@ -180,4 +210,33 @@ public class HbaseTemplateScanTest {
} }
} }
@Test
@DisplayName("滚动删除全部记录")
public void clear() throws IOException, InterruptedException {
SingleFamilyScan scan = new SingleFamilyScan();
scan.setFamily("f1")
.setTableName(TABLE_NAME)
.setSize(100)
.setReversed(false);
ScrollData<RowDo> first = HBASE_TEMPLATE.scroll(scan);
System.out.println(StrUtil.format("查询实体: {}", JSONUtil.toJsonPrettyStr(first)));
Assertions.assertThat(first).isNotNull();
scan.setScrollRow(first.getScrollRow());
HBASE_TEMPLATE.batchDelete(TABLE_NAME,
first.getContent().stream().map(RowDo::getRow).distinct().toArray(String[]::new));
while (true) {
ScrollData<RowDo> next = HBASE_TEMPLATE.scroll(scan);
if (next == null || CollectionUtil.isEmpty(next.getContent())) {
break;
}
System.out.println(StrUtil.format("查询实体: {}", JSONUtil.toJsonPrettyStr(next)));
scan.setScrollRow(next.getScrollRow());
HBASE_TEMPLATE.batchDelete(TABLE_NAME,
next.getContent().stream().map(RowDo::getRow).distinct().toArray(String[]::new));
}
}
} }

View File

@ -0,0 +1,34 @@
package io.github.dunwu.javadb.hbase;
import io.github.dunwu.javadb.hbase.annotation.RowKeyRule;
import io.github.dunwu.javadb.hbase.entity.BaseHbaseEntity;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.Date;
import java.util.List;
import java.util.Map;
/**
* Java
*
* @author <a href="mailto:forbreak@163.com">Zhang Peng</a>
* @date 2023-11-20
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@RowKeyRule(pk = "id", length = 20)
public class Order implements BaseHbaseEntity {
private String id;
private User user;
private List<Product> products;
private String desc;
private Date date;
private Map<String, String> tags;
}

View File

@ -1,13 +1,15 @@
package io.github.dunwu.javadb.hbase; package io.github.dunwu.javadb.hbase;
import io.github.dunwu.javadb.hbase.mapper.BaseHbaseMapper;
/** /**
* @author <a href="mailto:forbreak@163.com">Zhang Peng</a> * @author <a href="mailto:forbreak@163.com">Zhang Peng</a>
* @date 2023-11-15 * @date 2023-11-15
*/ */
public class ProductMapper extends BaseHbaseMapper<Product> { public class OrderMapper extends BaseHbaseMapper<Order> {
public ProductMapper(HbaseTemplate hbaseTemplate, HbaseAdmin hbaseAdmin) { public OrderMapper(HbaseTemplate hbaseTemplate) {
super(hbaseTemplate, hbaseAdmin); super(hbaseTemplate);
} }
@Override @Override
@ -21,8 +23,8 @@ public class ProductMapper extends BaseHbaseMapper<Product> {
} }
@Override @Override
public Class<Product> getEntityClass() { public Class<Order> getEntityClass() {
return Product.class; return Order.class;
} }
} }

View File

@ -4,25 +4,22 @@ import io.github.dunwu.javadb.hbase.annotation.RowKeyRule;
import io.github.dunwu.javadb.hbase.entity.BaseHbaseEntity; import io.github.dunwu.javadb.hbase.entity.BaseHbaseEntity;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.Data; import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor; import lombok.NoArgsConstructor;
import java.math.BigDecimal; import java.math.BigDecimal;
/** /**
* *
* *
* @author <a href="mailto:forbreak@163.com">Zhang Peng</a> * @author <a href="mailto:forbreak@163.com">Zhang Peng</a>
* @date 2023-11-15 * @date 2023-11-15
*/ */
@EqualsAndHashCode(callSuper = true)
@Data @Data
@NoArgsConstructor @NoArgsConstructor
@AllArgsConstructor @AllArgsConstructor
public class Product extends BaseHbaseEntity { @RowKeyRule(pk = "id", length = 10)
public class Product implements BaseHbaseEntity {
@RowKeyRule(length = 20)
private String id; private String id;
private String name; private String name;
private BigDecimal price; private BigDecimal price;