feat: 更新 hbase 示例

master
dunwu 2023-11-20 06:50:59 +08:00
parent 7e344cd073
commit 14b594b9c2
11 changed files with 278 additions and 63 deletions

View File

@ -35,6 +35,16 @@
<artifactId>hutool-all</artifactId> <artifactId>hutool-all</artifactId>
<version>5.8.18</version> <version>5.8.18</version>
</dependency> </dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>2.13.4</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83</version>
</dependency>
<dependency> <dependency>
<groupId>org.projectlombok</groupId> <groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId> <artifactId>lombok</artifactId>

View File

@ -36,7 +36,7 @@ public abstract class BaseHbaseMapper<T extends BaseHbaseEntity> implements Hbas
@Override @Override
public String getNamespace() { public String getNamespace() {
return "default"; return "test";
} }
@Override @Override
@ -57,12 +57,12 @@ public abstract class BaseHbaseMapper<T extends BaseHbaseEntity> implements Hbas
} }
@Override @Override
public T pojoById(String id) { public T pojoByRowKey(String rowKey) {
if (StrUtil.isBlank(id)) { if (StrUtil.isBlank(rowKey)) {
return null; return null;
} }
try { try {
return hbaseTemplate.getEntity(getFullTableName(), id, getFamily(), getEntityClass()); return hbaseTemplate.getEntity(getFullTableName(), rowKey, getFamily(), getEntityClass());
} catch (IOException e) { } catch (IOException e) {
log.error("【Hbase】pojoById 异常", e); log.error("【Hbase】pojoById 异常", e);
return null; return null;
@ -70,12 +70,12 @@ public abstract class BaseHbaseMapper<T extends BaseHbaseEntity> implements Hbas
} }
@Override @Override
public List<T> pojoListByIds(Collection<String> ids) { public List<T> pojoListByRowKeys(Collection<String> rowKeys) {
if (CollectionUtil.isEmpty(ids)) { if (CollectionUtil.isEmpty(rowKeys)) {
return null; return null;
} }
try { try {
return hbaseTemplate.getEntityList(getFullTableName(), ids.toArray(new String[0]), return hbaseTemplate.getEntityList(getFullTableName(), rowKeys.toArray(new String[0]),
getFamily(), getEntityClass()); getFamily(), getEntityClass());
} catch (IOException e) { } catch (IOException e) {
log.error("【Hbase】getEntityList 异常", e); log.error("【Hbase】getEntityList 异常", e);
@ -84,10 +84,10 @@ public abstract class BaseHbaseMapper<T extends BaseHbaseEntity> implements Hbas
} }
@Override @Override
public List<T> scroll(String scrollId, int size) { public List<T> scroll(String scrollRowKey, int size) {
try { try {
ScrollData<T> scrollData = ScrollData<T> scrollData =
hbaseTemplate.getEntityScroll(getFullTableName(), getFamily(), scrollId, size, getEntityClass()); hbaseTemplate.getEntityScroll(getFullTableName(), getFamily(), scrollRowKey, size, getEntityClass());
if (scrollData == null || CollectionUtil.isEmpty(scrollData.getContent())) { if (scrollData == null || CollectionUtil.isEmpty(scrollData.getContent())) {
return new ArrayList<>(); return new ArrayList<>();
} }
@ -101,7 +101,8 @@ public abstract class BaseHbaseMapper<T extends BaseHbaseEntity> implements Hbas
@Override @Override
public T save(T entity) { public T save(T entity) {
try { try {
hbaseTemplate.put(getFullTableName(), entity.getId(), getFamily(), entity); String rowKey = entity.getRowKey();
hbaseTemplate.put(getFullTableName(), rowKey, getFamily(), entity);
return entity; return entity;
} catch (IOException e) { } catch (IOException e) {
log.error("【Hbase】put 异常", e); log.error("【Hbase】put 异常", e);
@ -121,12 +122,12 @@ public abstract class BaseHbaseMapper<T extends BaseHbaseEntity> implements Hbas
} }
@Override @Override
public boolean deleteById(String id) { public boolean delete(String rowKey) {
if (StrUtil.isBlank(id)) { if (StrUtil.isBlank(rowKey)) {
return true; return true;
} }
try { try {
hbaseTemplate.delete(getFullTableName(), id); hbaseTemplate.delete(getFullTableName(), rowKey);
return true; return true;
} catch (IOException e) { } catch (IOException e) {
log.error("【Hbase】delete 异常", e); log.error("【Hbase】delete 异常", e);
@ -135,12 +136,12 @@ public abstract class BaseHbaseMapper<T extends BaseHbaseEntity> implements Hbas
} }
@Override @Override
public boolean batchDeleteById(Collection<String> ids) { public boolean batchDelete(Collection<String> rowKeys) {
if (CollectionUtil.isEmpty(ids)) { if (CollectionUtil.isEmpty(rowKeys)) {
return true; return true;
} }
try { try {
hbaseTemplate.batchDelete(getFullTableName(), ids.toArray(new String[0])); hbaseTemplate.batchDelete(getFullTableName(), rowKeys.toArray(new String[0]));
return true; return true;
} catch (IOException | InterruptedException e) { } catch (IOException | InterruptedException e) {
log.error("【Hbase】batchDelete 异常", e); log.error("【Hbase】batchDelete 异常", e);

View File

@ -12,6 +12,8 @@ import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.security.UserGroupInformation;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
@ -32,10 +34,10 @@ public class HbaseAdmin implements Closeable {
protected HbaseAdmin(Configuration configuration) throws IOException { protected HbaseAdmin(Configuration configuration) throws IOException {
this.configuration = configuration; this.configuration = configuration;
// 无需鉴权连接 // 无需鉴权连接
this.connection = ConnectionFactory.createConnection(configuration); // this.connection = ConnectionFactory.createConnection(configuration);
// 鉴权连接 // 鉴权连接
// this.connection = ConnectionFactory.createConnection(configuration, null, this.connection = ConnectionFactory.createConnection(configuration, null,
// new User.SecureHadoopUser(UserGroupInformation.createRemoteUser("test"))); new User.SecureHadoopUser(UserGroupInformation.createRemoteUser("test")));
} }
protected HbaseAdmin(Connection connection) { protected HbaseAdmin(Connection connection) {

View File

@ -23,7 +23,7 @@ public class HbaseFactory {
public static Configuration newHbaseConfiguration() { public static Configuration newHbaseConfiguration() {
Configuration configuration = HBaseConfiguration.create(); Configuration configuration = HBaseConfiguration.create();
configuration.set("hbase.zookeeper.quorum", "10.101.129.74,10.101.129.76,10.101.129.77"); configuration.set("hbase.zookeeper.quorum", "127.0.0.1");
configuration.set("hbase.zookeeper.property.clientPort", "2181"); configuration.set("hbase.zookeeper.property.clientPort", "2181");
configuration.set("hbase.rootdir", "/hbase"); configuration.set("hbase.rootdir", "/hbase");
configuration.set("hbase.meta.replicas.use", "true"); configuration.set("hbase.meta.replicas.use", "true");

View File

@ -50,7 +50,7 @@ public interface HbaseMapper<T extends BaseHbaseEntity> {
* @param id Hbase rowkey * @param id Hbase rowkey
* @return / * @return /
*/ */
T pojoById(String id); T pojoByRowKey(String id);
/** /**
* ID * ID
@ -58,7 +58,7 @@ public interface HbaseMapper<T extends BaseHbaseEntity> {
* @param ids Hbase rowkey * @param ids Hbase rowkey
* @return / * @return /
*/ */
List<T> pojoListByIds(Collection<String> ids); List<T> pojoListByRowKeys(Collection<String> ids);
/** /**
* ID * ID
@ -91,7 +91,7 @@ public interface HbaseMapper<T extends BaseHbaseEntity> {
* @param id Hbase rowkey * @param id Hbase rowkey
* @return / * @return /
*/ */
boolean deleteById(String id); boolean delete(String id);
/** /**
* ID * ID
@ -99,6 +99,6 @@ public interface HbaseMapper<T extends BaseHbaseEntity> {
* @param ids Hbase rowkey * @param ids Hbase rowkey
* @return / * @return /
*/ */
boolean batchDeleteById(Collection<String> ids); boolean batchDelete(Collection<String> ids);
} }

View File

@ -33,7 +33,9 @@ import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Row; import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.security.UserGroupInformation;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
@ -65,10 +67,10 @@ public class HbaseTemplate implements Closeable {
protected HbaseTemplate(Configuration configuration) throws IOException { protected HbaseTemplate(Configuration configuration) throws IOException {
this.configuration = configuration; this.configuration = configuration;
// 无需鉴权连接 // 无需鉴权连接
this.connection = ConnectionFactory.createConnection(configuration); // this.connection = ConnectionFactory.createConnection(configuration);
// 鉴权连接 // 鉴权连接
// this.connection = ConnectionFactory.createConnection(configuration, null, this.connection = ConnectionFactory.createConnection(configuration, null,
// new User.SecureHadoopUser(UserGroupInformation.createRemoteUser("test"))); new User.SecureHadoopUser(UserGroupInformation.createRemoteUser("test")));
} }
protected HbaseTemplate(Connection connection) { protected HbaseTemplate(Connection connection) {
@ -198,6 +200,10 @@ public class HbaseTemplate implements Closeable {
put(tableName, put); put(tableName, put);
} }
public <T extends BaseHbaseEntity> void put(String tableName, String family, T entity) throws IOException {
put(tableName, entity.getRowKey(), family, entity);
}
public void batchPut(String tableName, Collection<Put> list) throws IOException, InterruptedException { public void batchPut(String tableName, Collection<Put> list) throws IOException, InterruptedException {
batch(tableName, list); batch(tableName, list);
} }
@ -273,11 +279,15 @@ public class HbaseTemplate implements Closeable {
return put; return put;
} }
private static <T extends BaseHbaseEntity> List<Put> newPutList(String family, Collection<T> list) { private static <T extends BaseHbaseEntity> List<Put> newPutList(String family, Collection<T> list)
throws IOException {
long timestamp = System.currentTimeMillis(); long timestamp = System.currentTimeMillis();
return list.stream() List<Put> puts = new ArrayList<>();
.map(entity -> newPut(entity.getId(), timestamp, family, entity)) for (T entity : list) {
.collect(Collectors.toList()); Put put = newPut(entity.getRowKey(), timestamp, family, entity);
puts.add(put);
}
return puts;
} }
// ===================================================================================== // =====================================================================================
@ -410,8 +420,10 @@ public class HbaseTemplate implements Closeable {
for (Result result : results) { for (Result result : results) {
Map<String, ColumnDo> columnMap = Map<String, ColumnDo> columnMap =
getColumnsFromResult(result, tableName, family, CollectionUtil.newArrayList(columns)); getColumnsFromResult(result, tableName, family, CollectionUtil.newArrayList(columns));
T entity = toEntity(columnMap, clazz); if (MapUtil.isNotEmpty(columnMap)) {
list.add(entity); T entity = toEntity(columnMap, clazz);
list.add(entity);
}
} }
return list; return list;
} }
@ -911,7 +923,9 @@ public class HbaseTemplate implements Closeable {
Map<String, ColumnDo> columnMap = new HashMap<>(columns.size()); Map<String, ColumnDo> columnMap = new HashMap<>(columns.size());
for (String column : columns) { for (String column : columns) {
ColumnDo columnDo = getColumnFromResult(result, tableName, family, column); ColumnDo columnDo = getColumnFromResult(result, tableName, family, column);
columnMap.put(column, columnDo); if (columnDo != null) {
columnMap.put(column, columnDo);
}
} }
return columnMap; return columnMap;
} }

View File

@ -0,0 +1,42 @@
package io.github.dunwu.javadb.hbase.annotation;
import io.github.dunwu.javadb.hbase.constant.RowType;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
*
*
* @author <a href="mailto:forbreak@163.com">Zhang Peng</a>
* @date 2023-11-17
*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ ElementType.FIELD, ElementType.ANNOTATION_TYPE })
public @interface RowKeyRule {
/**
*
*/
String value() default "";
/**
* {@link RowType}
*/
RowType type() default RowType.ORIGIN_ID;
/**
* ID type {@link RowType#ORIGIN_ID} {@link RowType#BUCKET}
*/
int length() default 0;
/**
* type {@link RowType#BUCKET}
*/
int bucket() default 0;
}

View File

@ -0,0 +1,43 @@
package io.github.dunwu.javadb.hbase.constant;
import lombok.Getter;
/**
* ID
*
* @author <a href="mailto:forbreak@163.com">Zhang Peng</a>
* @date 2023-11-17
*/
@Getter
public enum RowType {
/**
* ID
*/
ORIGIN_ID(1),
/**
* 10 ID
* <p>
* scan 100w
*/
TIMESTAMP(2),
/**
* UUID get
*/
UUID(3),
/**
* ID = bucket(2/3) + timestamp(10) + bizId scan
* <p>
* ID @TableId
*/
BUCKET(4);
private final int key;
RowType(int key) {
this.key = key;
}
}

View File

@ -1,6 +1,16 @@
package io.github.dunwu.javadb.hbase.entity; package io.github.dunwu.javadb.hbase.entity;
import cn.hutool.core.util.HashUtil;
import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.ReflectUtil;
import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.annotation.JSONField;
import com.fasterxml.jackson.annotation.JsonIgnore;
import io.github.dunwu.javadb.hbase.annotation.RowKeyRule;
import java.io.IOException;
import java.io.Serializable; import java.io.Serializable;
import java.lang.reflect.Field;
/** /**
* HBase * HBase
@ -10,15 +20,101 @@ import java.io.Serializable;
*/ */
public abstract class BaseHbaseEntity implements Serializable { public abstract class BaseHbaseEntity implements Serializable {
@JsonIgnore
@JSONField(serialize = false, deserialize = false)
protected String rowKey;
/** /**
* *
*/ */
public abstract String getId(); public String getRowKey() throws IOException {
if (StrUtil.isNotBlank(rowKey)) {
return rowKey;
}
/** String row = null;
* Field[] fields = ReflectUtil.getFields(this.getClass());
*/ for (Field field : fields) {
public abstract String getIdKey(); RowKeyRule rule = field.getAnnotation(RowKeyRule.class);
if (rule != null) {
switch (rule.type()) {
case ORIGIN_ID:
row = getRowKeyForOriginId(this, field, rule);
break;
case TIMESTAMP:
row = getRowKeyForTimestamp();
break;
case UUID:
row = IdUtil.fastSimpleUUID();
break;
case BUCKET:
row = getRowKeyForBucket(this, field, rule);
default:
break;
}
}
}
if (StrUtil.isBlank(row)) {
throw new IOException(StrUtil.format("实体定义错误!未定义 @RowKeyRule",
this.getClass(), BaseHbaseEntity.class.getCanonicalName()));
}
this.rowKey = row;
return row;
}
private static <T extends BaseHbaseEntity> String getRowKeyForOriginId(T entity, Field field, RowKeyRule rowKeyRule)
throws IOException {
String originId;
Object value = ReflectUtil.getFieldValue(entity, field);
if (value instanceof String) {
originId = (String) value;
} else {
originId = String.valueOf(value);
}
if (rowKeyRule.length() == 0) {
throw new IOException(
StrUtil.format("实体定义错误!{} 选择 @RowKey 的 type 为 {},必须指定 length 且不能为 0",
entity.getClass(), rowKeyRule.type()));
}
return StrUtil.padPre(originId, rowKeyRule.length(), "0");
}
private static String getRowKeyForTimestamp() {
String timestamp = String.valueOf(System.currentTimeMillis() / 1000);
return StrUtil.padPre(timestamp, 10, "0");
}
private static <T extends BaseHbaseEntity> String getRowKeyForBucket(T entity, Field field, RowKeyRule rowKeyRule)
throws IOException {
if (rowKeyRule.bucket() == 0) {
throw new IOException(
StrUtil.format("实体定义错误!{} 选择 @RowKey 的 type 为 {},必须指定 bucket 且不能为 0",
entity.getClass(), rowKeyRule.type()));
}
String originId = getRowKeyForOriginId(entity, field, rowKeyRule);
int bucketLength = getBucketIdLength(rowKeyRule.bucket());
String bucketId = String.valueOf(HashUtil.fnvHash(originId) % rowKeyRule.bucket());
String timestamp = String.valueOf(System.currentTimeMillis() / 1000);
return StrUtil.padPre(bucketId, bucketLength, "0")
+ StrUtil.padPre(timestamp, 10, "0")
+ originId;
}
private static int getBucketIdLength(int bucket) {
bucket = bucket - 1;
if (bucket <= 0) {
return 1;
}
int length = 0;
while (bucket > 0) {
length++;
bucket = bucket / 10;
}
return length;
}
private static final long serialVersionUID = 5075127328254616085L; private static final long serialVersionUID = 5075127328254616085L;

View File

@ -3,6 +3,7 @@ package io.github.dunwu.javadb.hbase;
import cn.hutool.core.collection.CollectionUtil; import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONUtil; import cn.hutool.json.JSONUtil;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
@ -31,42 +32,53 @@ public class HbaseMapperTest {
} }
@Test @Test
@DisplayName("batchSave") @DisplayName("批量保存、查询、删除测试")
public void batchSave() { public void batchSave() throws IOException {
Product product1 = new Product("test-key-8", "product8", new BigDecimal(4000.0)); Product product1 = new Product("test-key-8", "product8", new BigDecimal(4000.0));
Product product2 = new Product("test-key-9", "product9", new BigDecimal(5000.0)); Product product2 = new Product("test-key-9", "product9", new BigDecimal(5000.0));
List<Product> products = CollectionUtil.newArrayList(product1, product2); List<Product> products = CollectionUtil.newArrayList(product1, product2);
mapper.batchSave(products); mapper.batchSave(products);
}
@Test List<Product> list = mapper.pojoListByRowKeys(Arrays.asList(product1.getRowKey(), product2.getRowKey()));
@DisplayName("batchGet") Assertions.assertThat(list).isNotEmpty();
public void batchGet() { Assertions.assertThat(list.size()).isEqualTo(2);
List<Product> list = mapper.pojoListByIds(Arrays.asList("test-key-8", "test-key-9"));
System.out.println(JSONUtil.toJsonStr(list)); System.out.println(JSONUtil.toJsonStr(list));
mapper.batchDelete(Arrays.asList(product1.getRowKey(), product2.getRowKey()));
List<Product> list2 = mapper.pojoListByRowKeys(Arrays.asList(product1.getRowKey(), product2.getRowKey()));
Assertions.assertThat(list2).isEmpty();
} }
@Test @Test
@DisplayName("scroll") @DisplayName("scroll")
public void scroll() { public void scroll() throws IOException {
Product product1 = new Product("test-key-8", "product8", new BigDecimal(4000.0));
Product product2 = new Product("test-key-9", "product9", new BigDecimal(5000.0));
List<Product> products = CollectionUtil.newArrayList(product1, product2);
mapper.batchSave(products);
int size = 1; int size = 1;
String lastId = null; String lastRowKey = null;
List<Product> list = mapper.scroll(null, size); List<Product> list = mapper.scroll(null, size);
if (CollectionUtil.isNotEmpty(list)) { if (CollectionUtil.isNotEmpty(list)) {
Product last = CollectionUtil.getLast(list); Product last = CollectionUtil.getLast(list);
System.out.println("entity: " + JSONUtil.toJsonPrettyStr(last)); System.out.println("entity: " + JSONUtil.toJsonPrettyStr(last));
lastId = last.getId(); lastRowKey = last.getRowKey();
} }
while (true) { while (true) {
List<Product> products = mapper.scroll(lastId, size); List<Product> tempList = mapper.scroll(lastRowKey, size);
if (CollectionUtil.isEmpty(list)) { if (CollectionUtil.isEmpty(list)) {
break; break;
} }
Product last = CollectionUtil.getLast(products); Product last = CollectionUtil.getLast(tempList);
if (last == null) {
break;
}
System.out.println("entity: " + JSONUtil.toJsonPrettyStr(last)); System.out.println("entity: " + JSONUtil.toJsonPrettyStr(last));
lastId = last.getId(); lastRowKey = last.getRowKey();
if (StrUtil.isBlank(lastId)) { if (StrUtil.isBlank(lastRowKey)) {
break; break;
} }
} }

View File

@ -1,37 +1,32 @@
package io.github.dunwu.javadb.hbase; package io.github.dunwu.javadb.hbase;
import io.github.dunwu.javadb.hbase.annotation.RowKeyRule;
import io.github.dunwu.javadb.hbase.entity.BaseHbaseEntity; import io.github.dunwu.javadb.hbase.entity.BaseHbaseEntity;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.Data; import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor; import lombok.NoArgsConstructor;
import java.math.BigDecimal; import java.math.BigDecimal;
/** /**
* *
* *
* @author <a href="mailto:forbreak@163.com">Zhang Peng</a> * @author <a href="mailto:forbreak@163.com">Zhang Peng</a>
* @date 2023-11-15 * @date 2023-11-15
*/ */
@EqualsAndHashCode(callSuper = true)
@Data @Data
@NoArgsConstructor @NoArgsConstructor
@AllArgsConstructor @AllArgsConstructor
public class Product extends BaseHbaseEntity { public class Product extends BaseHbaseEntity {
@RowKeyRule(length = 20)
private String id; private String id;
private String name; private String name;
private BigDecimal price; private BigDecimal price;
@Override
public String getId() {
return id;
}
@Override
public String getIdKey() {
return "id";
}
private static final long serialVersionUID = -2596114168690429555L; private static final long serialVersionUID = -2596114168690429555L;
} }