diff --git a/codes/javadb/hbase/pom.xml b/codes/javadb/hbase/pom.xml index 4c4776d..4868778 100644 --- a/codes/javadb/hbase/pom.xml +++ b/codes/javadb/hbase/pom.xml @@ -23,10 +23,11 @@ org.apache.hbase hbase-client + ${hbase.version} org.apache.hadoop - hadoop-hdfs + hadoop-auth 2.10.2 @@ -39,37 +40,23 @@ lombok 1.18.22 - - - junit - junit + org.springframework + spring-context-support + 5.1.10.RELEASE + + + commons-logging + commons-logging + + + + + + org.springframework.boot + spring-boot-starter-test + 2.6.3 + test - - - - - - org.apache.hbase - hbase-client - ${hbase.version} - - - - - - - - - - - junit - junit - ${junit.version} - test - - - - diff --git a/codes/javadb/hbase/src/main/java/io/github/dunwu/javadb/hbase/BaseHbaseMapper.java b/codes/javadb/hbase/src/main/java/io/github/dunwu/javadb/hbase/BaseHbaseMapper.java new file mode 100644 index 0000000..d04a2ae --- /dev/null +++ b/codes/javadb/hbase/src/main/java/io/github/dunwu/javadb/hbase/BaseHbaseMapper.java @@ -0,0 +1,155 @@ +package io.github.dunwu.javadb.hbase; + +import cn.hutool.core.collection.CollectionUtil; +import cn.hutool.core.util.StrUtil; +import io.github.dunwu.javadb.hbase.entity.BaseHbaseEntity; +import io.github.dunwu.javadb.hbase.entity.ScrollData; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.util.Bytes; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +/** + * HBase Mapper 基础类 + * + * @author Zhang Peng + * @date 2023-11-15 + */ +@Slf4j +@RequiredArgsConstructor +public abstract class BaseHbaseMapper implements HbaseMapper { + + protected final HbaseTemplate hbaseTemplate; + + protected final HbaseAdmin hbaseAdmin; + + @Override + public Connection getClient() { + return hbaseTemplate.getConnection(); + } + + @Override + public String getNamespace() { + return "default"; + } + + @Override + public boolean existsTable() { + byte[] namespace = Bytes.toBytes(getNamespace()); + byte[] tableName = Bytes.toBytes(getTableName()); + try { + return hbaseAdmin.existsTable(TableName.valueOf(namespace, tableName)); + } catch (IOException e) { + log.error("【Hbase】existsTable 异常", e); + return false; + } + } + + @Override + public String getFamily() { + return "f"; + } + + @Override + public T pojoById(String id) { + if (StrUtil.isBlank(id)) { + return null; + } + try { + return hbaseTemplate.getEntity(getFullTableName(), id, getFamily(), getEntityClass()); + } catch (IOException e) { + log.error("【Hbase】pojoById 异常", e); + return null; + } + } + + @Override + public List pojoListByIds(Collection ids) { + if (CollectionUtil.isEmpty(ids)) { + return null; + } + try { + return hbaseTemplate.getEntityList(getFullTableName(), ids.toArray(new String[0]), + getFamily(), getEntityClass()); + } catch (IOException e) { + log.error("【Hbase】getEntityList 异常", e); + return new ArrayList<>(); + } + } + + @Override + public List scroll(String scrollId, int size) { + try { + ScrollData scrollData = + hbaseTemplate.getEntityScroll(getFullTableName(), getFamily(), scrollId, size, getEntityClass()); + if (scrollData == null || CollectionUtil.isEmpty(scrollData.getContent())) { + return new ArrayList<>(); + } + return new ArrayList<>(scrollData.getContent()); + } catch (IOException e) { + log.error("【Hbase】getEntityScroll 异常", e); + return new ArrayList<>(); + } + } + + @Override + public T save(T entity) { + try { + hbaseTemplate.put(getFullTableName(), entity.getId(), getFamily(), entity); + return entity; + } catch (IOException e) { + log.error("【Hbase】put 异常", e); + return null; + } + } + + @Override + public boolean batchSave(Collection list) { + try { + hbaseTemplate.batchPut(getFullTableName(), getFamily(), list); + return true; + } catch (IOException | InterruptedException e) { + log.error("【Hbase】batchPut 异常", e); + return false; + } + } + + @Override + public boolean deleteById(String id) { + if (StrUtil.isBlank(id)) { + return true; + } + try { + hbaseTemplate.delete(getFullTableName(), id); + return true; + } catch (IOException e) { + log.error("【Hbase】delete 异常", e); + return false; + } + } + + @Override + public boolean batchDeleteById(Collection ids) { + if (CollectionUtil.isEmpty(ids)) { + return true; + } + try { + hbaseTemplate.batchDelete(getFullTableName(), ids.toArray(new String[0])); + return true; + } catch (IOException | InterruptedException e) { + log.error("【Hbase】batchDelete 异常", e); + return false; + } + } + + protected String getFullTableName() { + return StrUtil.format("{}:{}", getNamespace(), getTableName()); + } + +} diff --git a/codes/javadb/hbase/src/main/java/io/github/dunwu/javadb/hbase/HBaseDemo.java b/codes/javadb/hbase/src/main/java/io/github/dunwu/javadb/hbase/HBaseDemo.java deleted file mode 100644 index e9e029a..0000000 --- a/codes/javadb/hbase/src/main/java/io/github/dunwu/javadb/hbase/HBaseDemo.java +++ /dev/null @@ -1,41 +0,0 @@ -package io.github.dunwu.javadb.hbase; - -import lombok.extern.slf4j.Slf4j; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.TableName; - -@Slf4j -public class HBaseDemo { - - public static void main(String[] args) throws Exception { - - // 请改为配置的方式 - // String zkHosts = "192.168.31.127"; - String zkHosts = "192.168.31.255"; - // 请改为配置的方式 - String zkPort = "2181"; - // 请改为配置的方式 - String namespace = "test"; - String tablename = "test"; - Configuration conf = HBaseConfiguration.create(); - conf.set("hbase.zookeeper.quorum", zkHosts); - conf.set("hbase.zookeeper.port", zkPort); - - // 创建命名空间和表 - TableName tableName = TableName.valueOf(namespace, tablename); - HBaseAdminHelper hbaseAdminHelper = HBaseAdminHelper.newInstance(conf); - hbaseAdminHelper.enableTable(tableName); - // hbaseAdminHelper.createNamespace(namespace); - // hbaseAdminHelper.createTable(tableName, "c1"); - // - // String rowKey = IdUtil.fastSimpleUUID(); - // HBaseHelper hbaseHelper = HBaseHelper.newInstance(hbaseAdminHelper.getConnection()); - // hbaseHelper.put(tableName, rowKey, "c1", "name", "jack"); - // String value = hbaseHelper.get(tableName, rowKey, "c1", "name"); - // System.out.println("value = " + value); - - hbaseAdminHelper.close(); - } - -} diff --git a/codes/javadb/hbase/src/main/java/io/github/dunwu/javadb/hbase/HBaseHelper.java b/codes/javadb/hbase/src/main/java/io/github/dunwu/javadb/hbase/HBaseHelper.java deleted file mode 100644 index 0193694..0000000 --- a/codes/javadb/hbase/src/main/java/io/github/dunwu/javadb/hbase/HBaseHelper.java +++ /dev/null @@ -1,920 +0,0 @@ -package io.github.dunwu.javadb.hbase; - -import cn.hutool.core.bean.BeanUtil; -import cn.hutool.core.bean.copier.CopyOptions; -import cn.hutool.core.collection.CollectionUtil; -import cn.hutool.core.io.IoUtil; -import cn.hutool.core.map.MapUtil; -import cn.hutool.core.util.ArrayUtil; -import cn.hutool.core.util.ObjectUtil; -import cn.hutool.core.util.ReflectUtil; -import cn.hutool.core.util.StrUtil; -import io.github.dunwu.javadb.hbase.entity.HBaseFamilyRequest; -import io.github.dunwu.javadb.hbase.entity.HBaseMultiFamilyRequest; -import io.github.dunwu.javadb.hbase.entity.HBaseRowData; -import io.github.dunwu.javadb.hbase.entity.PageData; -import lombok.extern.slf4j.Slf4j; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.Durability; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.filter.Filter; -import org.apache.hadoop.hbase.filter.FilterList; -import org.apache.hadoop.hbase.util.Bytes; - -import java.io.Closeable; -import java.io.IOException; -import java.lang.reflect.Field; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; - -/** - * HBase CRUD 工具类 - * - * @author Zhang Peng - * @date 2023-03-27 - */ -@Slf4j -public class HBaseHelper implements Closeable { - - private final Connection connection; - private final Configuration configuration; - - protected HBaseHelper(Configuration configuration) throws IOException { - this.configuration = configuration; - this.connection = ConnectionFactory.createConnection(configuration); - } - - public static synchronized HBaseHelper newInstance(Configuration configuration) throws IOException { - if (configuration == null) { - throw new IllegalArgumentException("configuration can not be null!"); - } - return new HBaseHelper(configuration); - } - - /** - * 关闭内部持有的 HBase Connection 实例 - */ - @Override - public synchronized void close() { - if (null == connection || connection.isClosed()) { - return; - } - IoUtil.close(connection); - } - - /** - * 获取 HBase 连接实例 - * - * @return / - */ - public Connection getConnection() { - if (null == connection) { - throw new RuntimeException("HBase connection init failed..."); - } - return connection; - } - - /** - * 获取 HBase 配置 - * - * @return / - */ - public Configuration getConfiguration() { - return configuration; - } - - /** - * 获取 {@link Table} 实例 - * - * @param tableName 表名 - * @return / - */ - public Table getTable(String tableName) throws Exception { - return getTable(TableName.valueOf(tableName)); - } - - /** - * 获取 {@link Table} 实例 - * - * @param tableName 表名 - * @return / - */ - - public synchronized Table getTable(TableName tableName) throws Exception { - return connection.getTable(tableName); - } - - public void put(TableName tableName, String row, String family, String column, String value) throws Exception { - put(tableName, row, family, null, column, value); - } - - public void put(TableName tableName, String row, String family, Long timestamp, String column, String value) - throws Exception { - Table table = getTable(tableName); - try { - Put put = new Put(Bytes.toBytes(row)); - if (timestamp != null) { - put.addColumn(Bytes.toBytes(family), Bytes.toBytes(column), timestamp, Bytes.toBytes(value)); - } else { - put.addColumn(Bytes.toBytes(family), Bytes.toBytes(column), Bytes.toBytes(value)); - } - table.put(put); - } finally { - recycle(table); - } - } - - public void put(TableName tableName, String row, String family, Object obj) throws Exception { - put(tableName, row, family, null, obj); - } - - @SuppressWarnings("unchecked") - public void put(TableName tableName, String row, String family, Long timestamp, Object obj) throws Exception { - Map map; - if (obj instanceof Map) { - map = (Map) obj; - } else { - map = BeanUtil.beanToMap(obj); - } - put(tableName, row, family, timestamp, map); - } - - public void put(TableName tableName, String row, String family, Map columnMap) throws Exception { - put(tableName, row, family, null, columnMap); - } - - public void put(TableName tableName, String row, String family, Long timestamp, Map columnMap) - throws Exception { - Put put = new Put(Bytes.toBytes(row)); - columnMap.forEach((column, value) -> { - if (value != null) { - if (timestamp != null) { - put.addColumn(Bytes.toBytes(family), Bytes.toBytes(column), timestamp, - Bytes.toBytes(String.valueOf(value))); - } else { - put.addColumn(Bytes.toBytes(family), Bytes.toBytes(column), Bytes.toBytes(String.valueOf(value))); - } - } - }); - Table table = getTable(tableName); - try { - table.put(put); - } finally { - recycle(table); - } - } - - public void put(TableName tableName, String row, Long timestamp, Map> familyMap) - throws Exception { - Put put = new Put(Bytes.toBytes(row)); - for (Map.Entry> e : familyMap.entrySet()) { - String family = e.getKey(); - Map columnMap = e.getValue(); - if (MapUtil.isNotEmpty(columnMap)) { - for (Map.Entry entry : columnMap.entrySet()) { - String column = entry.getKey(); - Object value = entry.getValue(); - - if (ObjectUtil.isEmpty(value)) { - continue; - } - - if (timestamp != null) { - put.addColumn(Bytes.toBytes(family), Bytes.toBytes(column), timestamp, - Bytes.toBytes(String.valueOf(value))); - } else { - put.addColumn(Bytes.toBytes(family), Bytes.toBytes(column), - Bytes.toBytes(String.valueOf(value))); - } - } - } - } - Table table = getTable(tableName); - try { - table.put(put); - } finally { - recycle(table); - } - } - - public void deleteRow(TableName tableName, String row) throws Exception { - Delete delete = new Delete(Bytes.toBytes(row)); - Table table = getTable(tableName); - try { - table.delete(delete); - } finally { - recycle(table); - } - } - - public long incrementColumnValue(TableName tableName, String row, String family, String column, long amount) - throws Exception { - return incrementColumnValue(tableName, row, family, column, amount, Durability.SYNC_WAL); - } - - public long incrementColumnValue(TableName tableName, String row, String family, String column, long amount, - Durability durability) throws Exception { - Table table = getTable(tableName); - try { - return table.incrementColumnValue(Bytes.toBytes(row), Bytes.toBytes(family), Bytes.toBytes(column), amount, - durability); - } finally { - recycle(table); - } - } - - public void dump(TableName tableName, String[] rows, String[] families, String[] columns) throws Exception { - - List gets = new ArrayList<>(); - for (String row : rows) { - Get get = new Get(Bytes.toBytes(row)); - if (families != null) { - for (String family : families) { - for (String column : columns) { - get.addColumn(Bytes.toBytes(family), Bytes.toBytes(column)); - } - } - } - gets.add(get); - } - - Table table = getTable(tableName); - try { - Result[] results = table.get(gets); - for (Result result : results) { - for (Cell cell : result.rawCells()) { - System.out.println( - "Cell: " + cell + ", Value: " + Bytes.toString(cell.getValueArray(), cell.getValueOffset(), - cell.getValueLength())); - } - } - } finally { - recycle(table); - } - } - - public void dump(TableName tableName) throws Exception { - Table table = getTable(tableName); - try { - ResultScanner scanner = table.getScanner(new Scan()); - for (Result result : scanner) { - dumpResult(result); - } - scanner.close(); - } finally { - recycle(table); - } - } - - /** - * 指定行、列族、列,返回相应单元中的值 - * - * @param tableName 表名 - * @param row 指定行 - * @param family 列族 - * @param column 列 - * @return / - */ - public String getColumn(TableName tableName, String row, String family, String column) throws Exception { - Get get = new Get(Bytes.toBytes(row)); - Table table = getTable(tableName); - try { - Result result = table.get(get); - return Bytes.toString(result.getValue(Bytes.toBytes(family), Bytes.toBytes(column))); - } finally { - recycle(table); - } - } - - /** - * 返回指定行、列族,列的数据,以实体 {@link T} 形式返回数据 - * - * @param tableName 表名 - * @param row 指定行 - * @param family 列族 - * @param clazz 返回实体类型 - * @param 实体类型 - * @return / - */ - public T getFamilyMap(TableName tableName, String row, String family, Class clazz) throws Exception { - Map fieldMap = ReflectUtil.getFieldMap(clazz); - Set columns = fieldMap.keySet(); - Map map = getFamilyMap(tableName, row, family, columns); - if (MapUtil.isEmpty(map)) { - return null; - } - return BeanUtil.mapToBean(map, clazz, true, CopyOptions.create().ignoreError()); - } - - /** - * 返回指定行、列族,列的数据,并以 {@link Map} 形式返回 - * - * @param tableName 表名 - * @param row 指定行 - * @param family 列族 - * @param columns 指定列 - * @return / - */ - public Map getFamilyMap(TableName tableName, String row, String family, - Collection columns) throws Exception { - - if (CollectionUtil.isEmpty(columns)) { - return getFamilyMap(tableName, row, family); - } - - List gets = new ArrayList<>(); - Get get = new Get(Bytes.toBytes(row)); - for (String column : columns) { - get.addColumn(Bytes.toBytes(family), Bytes.toBytes(column)); - } - gets.add(get); - - Table table = getTable(tableName); - try { - Result[] results = table.get(gets); - Map map = new HashMap<>(columns.size()); - for (Result result : results) { - for (String column : columns) { - String value = Bytes.toString(result.getValue(Bytes.toBytes(family), Bytes.toBytes(column))); - map.put(column, value); - } - } - return map; - } finally { - recycle(table); - } - } - - /** - * 返回指定行、列族的所有列数据,并以 {@link Map} 形式返回 - * - * @param tableName 表名 - * @param row 指定行 - * @param family 列族 - * @return / - */ - public Map getFamilyMap(TableName tableName, String row, String family) throws Exception { - List gets = new ArrayList<>(); - Get get = new Get(Bytes.toBytes(row)); - gets.add(get); - Table table = getTable(tableName); - try { - Result[] results = table.get(gets); - Map> familyColumnMap = getAllFamilyMap(results, row); - return familyColumnMap.get(family); - } finally { - recycle(table); - } - } - - /** - * 指定多个row进行批量查询 - * - * @param tableName - * @param rows - * @param family - * @param columns - * @return - * @throws Exception - */ - public Map> getFamilyMapInRows(TableName tableName, List rows, String family, - Collection columns) throws Exception { - - List gets = new ArrayList<>(); - for (String row : rows) { - Get get = new Get(Bytes.toBytes(row)); - for (String column : columns) { - get.addColumn(Bytes.toBytes(family), Bytes.toBytes(column)); - } - gets.add(get); - } - Table table = getTable(tableName); - try { - Result[] results = table.get(gets); - Map> resultMap = new LinkedHashMap<>(gets.size()); - for (Result result : results) { - Map map = new HashMap<>(columns.size()); - for (String column : columns) { - String value = Bytes.toString(result.getValue(Bytes.toBytes(family), Bytes.toBytes(column))); - map.put(column, value); - } - resultMap.put(Bytes.toString(result.getRow()), map); - } - return resultMap; - } finally { - recycle(table); - } - } - - /** - * 返回指定行、列族,列的数据,并以 {@link Map} 形式返回 - * - * @param tableName 表名 - * @param row 指定行 - * @param familyColumns <列族, 要查询的列> - * @return / - */ - public Map> getMultiFamilyMap(TableName tableName, String row, - Map> familyColumns) throws Exception { - - if (MapUtil.isEmpty(familyColumns)) { - return getMultiFamilyMap(tableName, row); - } - - List gets = new ArrayList<>(); - Get get = new Get(Bytes.toBytes(row)); - for (Map.Entry> entry : familyColumns.entrySet()) { - String family = entry.getKey(); - Collection columns = entry.getValue(); - if (CollectionUtil.isNotEmpty(columns)) { - for (String column : columns) { - get.addColumn(Bytes.toBytes(family), Bytes.toBytes(column)); - } - } - } - gets.add(get); - - Table table = getTable(tableName); - try { - Result[] results = table.get(gets); - Map> map = new HashMap<>(familyColumns.size()); - for (Result result : results) { - if (result == null || result.isEmpty()) { - continue; - } - - familyColumns.forEach((family, columns) -> { - Map kvMap = new HashMap<>(); - if (CollectionUtil.isNotEmpty(columns)) { - for (String column : columns) { - String value = - Bytes.toString(result.getValue(Bytes.toBytes(family), Bytes.toBytes(column))); - kvMap.put(column, value); - } - } - map.put(family, kvMap); - }); - } - return map; - } finally { - recycle(table); - } - } - - /** - * 返回指定行所有列族的数据,并以 {@link Map} 形式返回 - * - * @param tableName 表名 - * @param row 指定行 - * @return / - */ - public Map> getMultiFamilyMap(TableName tableName, String row) throws Exception { - - List gets = new ArrayList<>(); - Get get = new Get(Bytes.toBytes(row)); - gets.add(get); - - Table table = getTable(tableName); - try { - Result[] results = table.get(gets); - return getAllFamilyMap(results, row); - } finally { - recycle(table); - } - } - - private Map> getAllFamilyMap(Result[] results, String row) { - Map>> rowFamilyColumnMap = getAllFamilyMapInRows(results); - if (MapUtil.isEmpty(rowFamilyColumnMap)) { - return new HashMap<>(0); - } - return rowFamilyColumnMap.get(row); - } - - public HBaseRowData getRowData(TableName tableName, String row, Map> familyColumns) - throws Exception { - Map> familyColumnMap = getMultiFamilyMap(tableName, row, familyColumns); - return HBaseRowData.buildByMap(row, null, familyColumnMap); - } - - /** - * 指定起止行、列族、多个列、{@link Filter},进行范围查询 - * - * @param tableName 表 - * @param startRow 起始行 - * @param stopRow 结束行 - * @param stopRow rowInclude 控制范围闭合条件[startRowInclude, endRowInclude],默认左闭右开 - * @param family 列族 - * @param columns 将要返回的列(未指定的列不会返回) - * @param operator filter执行条件 - * @param filters {@link Filter} 实体 - * @return 一级 Map 的 key 是 Row Key;二级 Map 的 key 是列,value 是列值 - */ - public Map> scanFamilyMap(TableName tableName, String startRow, String stopRow, - boolean[] rowInclude, String family, Collection columns, FilterList.Operator operator, - Filter... filters) throws Exception { - Scan scan = new Scan(); - fillColumnsToScan(family, columns, scan); - boolean startRowInclude = true, endRowInclude = false; - if (null != rowInclude || rowInclude.length == 2) { - startRowInclude = rowInclude[0]; - endRowInclude = rowInclude[1]; - } - scan.withStartRow(Bytes.toBytes(startRow), startRowInclude); - scan.withStopRow(Bytes.toBytes(stopRow), endRowInclude); - if (ArrayUtil.isNotEmpty(filters)) { - FilterList filterList = new FilterList(filters); - scan.setFilter(filterList); - } - return scanFamilyMap(tableName, scan, family, columns); - } - - /** - * 指定列族、多个列,进行全表范围查询 - * - * @param tableName 表名 - * @param family 列族 - * @param columns 将要返回的列(未指定的列不会返回) - * @return 一级 Map 的 key 是 Row Key;二级 Map 的 key 是列,value 是列值 - */ - public Map> scanFamilyMap(TableName tableName, String family, - Collection columns) - throws Exception { - HBaseFamilyRequest request = new HBaseFamilyRequest(); - request.setFamily(family) - .setColumns(columns) - .setTableName(tableName.getNameAsString()) - .setReversed(true); - return scanFamilyMap(request); - } - - /** - * 指定起止行、列族、多个列,进行范围查询 - * - * @param startRow 起始行 - * @param stopRow 结束行 - * @param tableName 表名 - * @param family 列族 - * @param columns 将要返回的列(未指定的列不会返回) - * @return 一级 Map 的 key 是 Row Key;二级 Map 的 key 是列,value 是列值 - */ - public Map> scanFamilyMap(TableName tableName, String startRow, String stopRow, - String family, Collection columns) throws Exception { - HBaseFamilyRequest request = new HBaseFamilyRequest(); - request.setFamily(family) - .setColumns(columns) - .setTableName(tableName.getNameAsString()) - .setStartRow(startRow) - .setStopRow(stopRow) - .setReversed(true); - return scanFamilyMap(request); - } - - /** - * 指定列族、多个列、{@link Filter},进行全表范围查询 - * - * @param tableName 表名 - * @param family 列族 - * @param columns 将要返回的列(未指定的列不会返回) - * @param filter {@link Filter} 实体 - * @return 一级 Map 的 key 是 Row Key;二级 Map 的 key 是列,value 是列值 - */ - public Map> scanFamilyMap(TableName tableName, String family, - Collection columns, - Filter filter) throws Exception { - HBaseFamilyRequest request = new HBaseFamilyRequest(); - request.setFamily(family) - .setColumns(columns) - .setTableName(tableName.getNameAsString()) - .setReversed(true) - .addFilter(filter); - return scanFamilyMap(request); - } - - /** - * 指定起止行、列族、多个列、{@link Filter},进行范围查询 - * - * @param startRow 起始行 - * @param stopRow 结束行 - * @param tableName 表名 - * @param family 列族 - * @param columns 将要返回的列(未指定的列不会返回) - * @param filter {@link Filter} 实体 - * @return 一级 Map 的 key 是 Row Key;二级 Map 的 key 是列,value 是列值 - */ - public Map> scanFamilyMap(TableName tableName, String startRow, String stopRow, - String family, Collection columns, Filter filter) throws Exception { - HBaseFamilyRequest request = new HBaseFamilyRequest(); - request.setFamily(family) - .setColumns(columns) - .setTableName(tableName.getNameAsString()) - .setStartRow(startRow) - .setStopRow(stopRow) - .setReversed(true) - .addFilter(filter); - return scanFamilyMap(request); - } - - /** - * 指定起止写入时间、列族、多个列、{@link Filter},进行范围查询 - *

- * 注:根据时间范围查询时,会强制按时序倒序排列 - * - * @param tableName 表名 - * @param family 列族 - * @param columns 将要返回的列(未指定的列不会返回) - * @param minStamp 起始写入时间 - * @param maxStamp 结束写入时间 - * @param filter {@link Filter} 实体 - * @return 一级 Map 的 key 是 Row Key;二级 Map 的 key 是列,value 是列值 - */ - public Map> scanFamilyMap(TableName tableName, String family, - Collection columns, - long minStamp, long maxStamp, Filter filter) throws Exception { - HBaseFamilyRequest request = new HBaseFamilyRequest(); - request.setFamily(family) - .setColumns(columns) - .setTableName(tableName.getNameAsString()) - .setMinTimeStamp(minStamp) - .setMaxTimeStamp(maxStamp) - .setReversed(true) - .addFilter(filter); - return scanFamilyMap(request); - } - - /** - * 返回匹配请求条件的数据、{@link Filter},进行范围查询 - *

- * 注:根据时间范围查询时,会强制按时序倒序排列 - * - * @param request {@link HBaseFamilyRequest} 请求条件 - * @return 一级 Map 的 key 是 Row Key;二级 Map 的 key 是列,value 是列值 - */ - public Map> scanFamilyMap(HBaseFamilyRequest request) throws Exception { - return scanFamilyMap(TableName.valueOf(request.getTableName()), - request.toScan(), request.getFamily(), request.getColumns()); - } - - /** - * 指定列族、多个列、{@link Scan},进行范围查询 - * - * @param tableName 表名 - * @param scan {@link Scan} 实体 - * @param family 列族 - * @param columns 将要返回的列(未指定的列不会返回) - * @return 一级 Map 的 key 是 Row Key;二级 Map 的 key 是列,value 是列值 - */ - public Map> scanFamilyMap(TableName tableName, Scan scan, - String family, Collection columns) throws Exception { - - Table table = getTable(tableName); - ResultScanner scanner = null; - try { - scanner = table.getScanner(scan); - Map> map = new LinkedHashMap<>(); - for (Result result : scanner) { - Map columnMap = new HashMap<>(columns.size()); - for (String column : columns) { - String value = Bytes.toString(result.getValue(Bytes.toBytes(family), Bytes.toBytes(column))); - columnMap.put(column, value); - } - map.put(Bytes.toString(result.getRow()), columnMap); - } - return map; - } finally { - IoUtil.close(scanner); - recycle(table); - } - } - - private static void fillColumnsToScan(String family, Collection columns, Scan scan) { - if (StrUtil.isNotBlank(family) && CollectionUtil.isNotEmpty(columns)) { - for (String column : columns) { - scan.addColumn(Bytes.toBytes(family), Bytes.toBytes(column)); - } - } - } - - /** - * 指定多个列族,每个列族包含的列、{@link Scan},进行范围查询 - * - * @param tableName 表名 - * @param scan {@link Scan} 实体 - * @param familyColumns 列族, 列族所包含的列 - * @return 一级 Map 的 key 是 Row Key;二级 Map 的 key 是列族;三级 Map 的 key 是列,value 是列值 - */ - public Map>> scanMultiFamilyMap(TableName tableName, Scan scan, - Map> familyColumns) throws Exception { - - if (MapUtil.isEmpty(familyColumns)) { - return scanMultiFamilyMap(tableName, scan); - } - - Table table = getTable(tableName); - ResultScanner scanner = null; - try { - scanner = table.getScanner(scan); - Map>> familyKvDataMap = new LinkedHashMap<>(); - for (Result result : scanner) { - Map> familyMap = new HashMap<>(); - familyColumns.forEach((family, columns) -> { - Map columnMap = new HashMap<>(); - if (CollectionUtil.isNotEmpty(columns)) { - for (String column : columns) { - String value = Bytes.toString(result.getValue(Bytes.toBytes(family), - Bytes.toBytes(column))); - columnMap.put(column, value); - } - } - familyMap.put(family, columnMap); - }); - familyKvDataMap.put(Bytes.toString(result.getRow()), familyMap); - } - return familyKvDataMap; - } finally { - IoUtil.close(scanner); - recycle(table); - } - } - - /** - * 返回匹配 {@link Scan} 的所有列族的数据 - * - * @param tableName 表名 - * @param scan {@link Scan} 实体 - * @return 一级 Map 的 key 是 Row Key;二级 Map 的 key 是列族;三级 Map 的 key 是列,value 是列值 - */ - public Map>> scanMultiFamilyMap(TableName tableName, Scan scan) - throws Exception { - Table table = getTable(tableName); - ResultScanner scanner = null; - try { - scanner = table.getScanner(scan); - Result[] results = ArrayUtil.toArray(scanner, Result.class); - return getAllFamilyMapInRows(results); - } finally { - IoUtil.close(scanner); - recycle(table); - } - } - - public Map>> scanMultiFamilyMap(HBaseMultiFamilyRequest request) - throws Exception { - return scanMultiFamilyMap(TableName.valueOf(request.getTableName()), request.toScan(), - request.getFamilyColumns()); - } - - private Map>> getAllFamilyMapInRows(Result[] results) { - Map>> rowFamilyColumnMap = new HashMap<>(); - for (Result result : results) { - if (result == null || result.isEmpty()) { - continue; - } - Map> familyColumnMap = new HashMap<>(); - for (Cell cell : result.listCells()) { - String family = Bytes.toString(CellUtil.cloneFamily(cell)); - if (!familyColumnMap.containsKey(family)) { - familyColumnMap.put(family, new HashMap<>()); - } - String column = Bytes.toString(CellUtil.cloneQualifier(cell)); - String value = Bytes.toString(CellUtil.cloneValue(cell)); - familyColumnMap.get(family).put(column, value); - } - rowFamilyColumnMap.put(Bytes.toString(result.getRow()), familyColumnMap); - } - return rowFamilyColumnMap; - } - - /** - * 扫描(scan)一个列族的数据,并返回列表记录 - * - * @param request 单列族请求 - * @return / - */ - public List listRowData(HBaseFamilyRequest request) throws Exception { - Map> rowColumnMap = scanFamilyMap(request); - return HBaseRowData.toRowList(request.getFamily(), rowColumnMap); - } - - /** - * 扫描(scan)多个列族的数据,并返回列表记录 - * - * @param request 多列族请求 - * @return / - */ - public List listRowData(HBaseMultiFamilyRequest request) throws Exception { - Map>> map = scanMultiFamilyMap(request); - return HBaseRowData.toRowList(map); - } - - public PageData pageRowData(HBaseMultiFamilyRequest request) throws Exception { - return pageRowData(TableName.valueOf(request.getTableName()), request.getFamilyColumns(), - request.getPageNo(), request.getPageSize(), request.toScan()); - } - - public PageData pageRowData(TableName tableName, - Map> familyColumns, Integer pageNo, Integer pageSize, Scan scan) throws Exception { - - Table table = getTable(tableName); - Map>> rowMap = new HashMap<>(); - - int page = 1; - byte[] lastRow = null; - long total = 0L; - while (true) { - if (lastRow != null) { - scan.withStartRow(lastRow, false); - } - ResultScanner rs = table.getScanner(scan); - Iterator it = rs.iterator(); - int count = 0; - while (it.hasNext()) { - Result result = it.next(); - if (pageNo == page) { - fillRowMap(result, familyColumns, rowMap); - } - lastRow = result.getRow(); - count++; - } - - page++; - rs.close(); - total += count; - if (count == 0) { - break; - } - } - recycle(table); - List content = HBaseRowData.toRowList(rowMap); - return new PageData<>(pageNo, pageSize, total, content); - } - - private void fillRowMap(Result result, Map> familyColumns, - Map>> rowMap) { - - String row = Bytes.toString(result.getRow()); - if (row == null) { - return; - } - - Map> familyMap; - if (MapUtil.isEmpty(familyColumns)) { - familyMap = new HashMap<>(); - for (Cell cell : result.listCells()) { - String family = Bytes.toString(CellUtil.cloneFamily(cell)); - if (!familyMap.containsKey(family)) { - familyMap.put(family, new HashMap<>()); - } - String column = Bytes.toString(CellUtil.cloneQualifier(cell)); - String value = Bytes.toString(CellUtil.cloneValue(cell)); - familyMap.get(family).put(column, value); - } - } else { - familyMap = new HashMap<>(familyColumns.size()); - familyColumns.forEach((family, columns) -> { - if (CollectionUtil.isNotEmpty(columns)) { - Map columnMap = new HashMap<>(columns.size()); - for (String column : columns) { - String value = Bytes.toString(result.getValue(Bytes.toBytes(family), Bytes.toBytes(column))); - columnMap.put(column, value); - } - familyMap.put(family, columnMap); - } - }); - } - rowMap.put(row, familyMap); - } - - public void dumpResult(Result result) { - for (Cell cell : result.rawCells()) { - String msg = StrUtil.format("Cell: {}, Value: {}", cell, - Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())); - System.out.println(msg); - } - } - - private void recycle(Table table) { - if (null == table) { - return; - } - IoUtil.close(table); - } - -} diff --git a/codes/javadb/hbase/src/main/java/io/github/dunwu/javadb/hbase/HBaseAdminHelper.java b/codes/javadb/hbase/src/main/java/io/github/dunwu/javadb/hbase/HbaseAdmin.java similarity index 68% rename from codes/javadb/hbase/src/main/java/io/github/dunwu/javadb/hbase/HBaseAdminHelper.java rename to codes/javadb/hbase/src/main/java/io/github/dunwu/javadb/hbase/HbaseAdmin.java index 9e990df..5404827 100644 --- a/codes/javadb/hbase/src/main/java/io/github/dunwu/javadb/hbase/HBaseAdminHelper.java +++ b/codes/javadb/hbase/src/main/java/io/github/dunwu/javadb/hbase/HbaseAdmin.java @@ -24,33 +24,37 @@ import java.util.List; * @author Zhang Peng * @date 2023-03-27 */ -public class HBaseAdminHelper implements Closeable { +public class HbaseAdmin implements Closeable { private final Connection connection; private final Configuration configuration; - protected HBaseAdminHelper(Configuration configuration) throws IOException { + protected HbaseAdmin(Configuration configuration) throws IOException { this.configuration = configuration; + // 无需鉴权连接 this.connection = ConnectionFactory.createConnection(configuration); + // 鉴权连接 + // this.connection = ConnectionFactory.createConnection(configuration, null, + // new User.SecureHadoopUser(UserGroupInformation.createRemoteUser("test"))); } - protected HBaseAdminHelper(Connection connection) { + protected HbaseAdmin(Connection connection) { this.configuration = connection.getConfiguration(); this.connection = connection; } - public synchronized static HBaseAdminHelper newInstance(Configuration configuration) throws IOException { + public synchronized static HbaseAdmin newInstance(Configuration configuration) throws IOException { if (configuration == null) { throw new IllegalArgumentException("configuration can not be null!"); } - return new HBaseAdminHelper(configuration); + return new HbaseAdmin(configuration); } - public synchronized static HBaseAdminHelper newInstance(Connection connection) throws IOException { + public synchronized static HbaseAdmin newInstance(Connection connection) throws IOException { if (connection == null) { throw new IllegalArgumentException("connection can not be null!"); } - return new HBaseAdminHelper(connection); + return new HbaseAdmin(connection); } /** @@ -91,10 +95,14 @@ public class HBaseAdminHelper implements Closeable { * @param namespace 命名空间 */ public void createNamespace(String namespace) throws IOException { - NamespaceDescriptor nd = NamespaceDescriptor.create(namespace).build(); - Admin admin = getAdmin(); - admin.createNamespace(nd); - admin.close(); + Admin admin = null; + try { + admin = getAdmin(); + NamespaceDescriptor nd = NamespaceDescriptor.create(namespace).build(); + admin.createNamespace(nd); + } finally { + recycle(admin); + } } /** @@ -113,16 +121,33 @@ public class HBaseAdminHelper implements Closeable { * @param force 是否强制删除 */ public void dropNamespace(String namespace, boolean force) throws IOException { - Admin admin = getAdmin(); - if (force) { - TableName[] tableNames = getAdmin().listTableNamesByNamespace(namespace); - for (TableName name : tableNames) { - admin.disableTable(name); - admin.deleteTable(name); + Admin admin = null; + try { + admin = getAdmin(); + if (force) { + TableName[] tableNames = admin.listTableNamesByNamespace(namespace); + for (TableName name : tableNames) { + admin.disableTable(name); + admin.deleteTable(name); + } } + admin.deleteNamespace(namespace); + } finally { + recycle(admin); + } + } + + /** + * 获取所有命名空间 + */ + public String[] listNamespaces() throws IOException { + Admin admin = null; + try { + admin = getAdmin(); + return admin.listNamespaces(); + } finally { + recycle(admin); } - admin.deleteNamespace(namespace); - admin.close(); } /** @@ -213,7 +238,33 @@ public class HBaseAdminHelper implements Closeable { } /** - * 获取 {@link Table} 实例 + * 获取所有表 + */ + public TableName[] listTableNames() throws IOException { + Admin admin = null; + try { + admin = getAdmin(); + return admin.listTableNames(); + } finally { + recycle(admin); + } + } + + /** + * 获取指定命名空间下的所有表 + */ + public TableName[] listTableNamesByNamespace(String namespace) throws IOException { + Admin admin = null; + try { + admin = getAdmin(); + return admin.listTableNamesByNamespace(namespace); + } finally { + recycle(admin); + } + } + + /** + * 获取 {@link org.apache.hadoop.hbase.client.Table} 实例 * * @param tableName 表名 * @return / @@ -223,7 +274,7 @@ public class HBaseAdminHelper implements Closeable { } /** - * 获取 {@link Admin} 实例 + * 获取 {@link org.apache.hadoop.hbase.client.Admin} 实例 * * @return / */ @@ -231,4 +282,11 @@ public class HBaseAdminHelper implements Closeable { return getConnection().getAdmin(); } + private void recycle(Admin admin) { + if (null == admin) { + return; + } + IoUtil.close(admin); + } + } diff --git a/codes/javadb/hbase/src/main/java/io/github/dunwu/javadb/hbase/HbaseFactory.java b/codes/javadb/hbase/src/main/java/io/github/dunwu/javadb/hbase/HbaseFactory.java new file mode 100644 index 0000000..4708601 --- /dev/null +++ b/codes/javadb/hbase/src/main/java/io/github/dunwu/javadb/hbase/HbaseFactory.java @@ -0,0 +1,35 @@ +package io.github.dunwu.javadb.hbase; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; + +import java.io.IOException; + +/** + * HBase 工具实例化工厂 + * + * @author Zhang Peng + * @date 2023-07-05 + */ +public class HbaseFactory { + + public static HbaseTemplate newHbaseTemplate() throws IOException { + return HbaseTemplate.newInstance(newHbaseConfiguration()); + } + + public static HbaseAdmin newHbaseAdmin() throws IOException { + return HbaseAdmin.newInstance(newHbaseConfiguration()); + } + + public static Configuration newHbaseConfiguration() { + Configuration configuration = HBaseConfiguration.create(); + configuration.set("hbase.zookeeper.quorum", "10.101.129.74,10.101.129.76,10.101.129.77"); + configuration.set("hbase.zookeeper.property.clientPort", "2181"); + configuration.set("hbase.rootdir", "/hbase"); + configuration.set("hbase.meta.replicas.use", "true"); + configuration.set("hbase.client.retries.number", "5"); + configuration.set("hbase.rpc.timeout", "600000"); + return configuration; + } + +} diff --git a/codes/javadb/hbase/src/main/java/io/github/dunwu/javadb/hbase/HbaseMapper.java b/codes/javadb/hbase/src/main/java/io/github/dunwu/javadb/hbase/HbaseMapper.java new file mode 100644 index 0000000..04fa19e --- /dev/null +++ b/codes/javadb/hbase/src/main/java/io/github/dunwu/javadb/hbase/HbaseMapper.java @@ -0,0 +1,104 @@ +package io.github.dunwu.javadb.hbase; + +import io.github.dunwu.javadb.hbase.entity.BaseHbaseEntity; +import org.apache.hadoop.hbase.client.Connection; + +import java.util.Collection; +import java.util.List; + +/** + * Hbase Mapper + * + * @author Zhang Peng + * @date 2023-11-15 + */ +public interface HbaseMapper { + + /** + * 获取 Hbase 官方客户端实体 + */ + Connection getClient(); + + /** + * 获取命名空间 + */ + String getNamespace(); + + /** + * 获取表名 + */ + String getTableName(); + + /** + * 判断表是否存在 + */ + boolean existsTable(); + + /** + * 获取列族 + */ + String getFamily(); + + /** + * 获取实体类型 + */ + Class getEntityClass(); + + /** + * 根据 ID 查数据 + * + * @param id 即 Hbase rowkey + * @return / + */ + T pojoById(String id); + + /** + * 根据 ID 列表批量查数据 + * + * @param ids 即 Hbase rowkey + * @return / + */ + List pojoListByIds(Collection ids); + + /** + * 根据 ID 滚动分页查询 + * + * @param scrollId 为空值时,默认查第一页 + * @param size 每页记录数 + * @return / + */ + List scroll(String scrollId, int size); + + /** + * 保存实体 + * + * @param entity 实体(存于默认的 Hbase 列族 f) + * @return / + */ + T save(T entity); + + /** + * 保存实体列表,每条记录将作为一行保存 + * + * @param list 实体列表(存于默认的 Hbase 列族 f) + * @return / + */ + boolean batchSave(Collection list); + + /** + * 根据 ID 删除记录 + * + * @param id 即 Hbase rowkey + * @return / + */ + boolean deleteById(String id); + + /** + * 根据 ID 列表批量删除记录 + * + * @param ids 即 Hbase rowkey + * @return / + */ + boolean batchDeleteById(Collection ids); + +} diff --git a/codes/javadb/hbase/src/main/java/io/github/dunwu/javadb/hbase/HbaseTemplate.java b/codes/javadb/hbase/src/main/java/io/github/dunwu/javadb/hbase/HbaseTemplate.java new file mode 100644 index 0000000..82271e9 --- /dev/null +++ b/codes/javadb/hbase/src/main/java/io/github/dunwu/javadb/hbase/HbaseTemplate.java @@ -0,0 +1,919 @@ +package io.github.dunwu.javadb.hbase; + +import cn.hutool.core.bean.BeanUtil; +import cn.hutool.core.bean.copier.CopyOptions; +import cn.hutool.core.collection.CollectionUtil; +import cn.hutool.core.io.IoUtil; +import cn.hutool.core.map.MapUtil; +import cn.hutool.core.util.ArrayUtil; +import cn.hutool.core.util.ObjectUtil; +import cn.hutool.core.util.ReflectUtil; +import cn.hutool.core.util.StrUtil; +import io.github.dunwu.javadb.hbase.entity.BaseHbaseEntity; +import io.github.dunwu.javadb.hbase.entity.ColumnDo; +import io.github.dunwu.javadb.hbase.entity.FamilyDo; +import io.github.dunwu.javadb.hbase.entity.PageData; +import io.github.dunwu.javadb.hbase.entity.RowDo; +import io.github.dunwu.javadb.hbase.entity.ScrollData; +import io.github.dunwu.javadb.hbase.entity.scan.MultiFamilyScan; +import io.github.dunwu.javadb.hbase.entity.scan.SingleFamilyScan; +import lombok.extern.slf4j.Slf4j; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Row; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.util.Bytes; + +import java.io.Closeable; +import java.io.IOException; +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * HBase 客户端封装工具类 + * + * @author Zhang Peng + * @date 2023-03-27 + */ +@Slf4j +public class HbaseTemplate implements Closeable { + + private final Connection connection; + + private final Configuration configuration; + + protected HbaseTemplate(Configuration configuration) throws IOException { + this.configuration = configuration; + // 无需鉴权连接 + this.connection = ConnectionFactory.createConnection(configuration); + // 鉴权连接 + // this.connection = ConnectionFactory.createConnection(configuration, null, + // new User.SecureHadoopUser(UserGroupInformation.createRemoteUser("test"))); + } + + protected HbaseTemplate(Connection connection) { + this.configuration = connection.getConfiguration(); + this.connection = connection; + } + + public static synchronized HbaseTemplate newInstance(Configuration configuration) throws IOException { + if (configuration == null) { + throw new IllegalArgumentException("configuration can not be null!"); + } + return new HbaseTemplate(configuration); + } + + public synchronized static HbaseTemplate newInstance(Connection connection) { + if (connection == null) { + throw new IllegalArgumentException("connection can not be null!"); + } + return new HbaseTemplate(connection); + } + + /** + * 关闭内部持有的 HBase Connection 实例 + */ + @Override + public synchronized void close() { + if (null == connection || connection.isClosed()) { + return; + } + IoUtil.close(connection); + } + + /** + * 获取 HBase 连接实例 + * + * @return / + */ + public Connection getConnection() { + if (null == connection) { + throw new RuntimeException("HBase connection init failed..."); + } + return connection; + } + + /** + * 获取 HBase 配置 + * + * @return / + */ + public Configuration getConfiguration() { + return configuration; + } + + /** + * 获取 {@link org.apache.hadoop.hbase.client.Table} 实例 + * + * @param tableName 表名 + * @return / + */ + public Table getTable(String tableName) throws IOException { + return getTable(TableName.valueOf(tableName)); + } + + /** + * 获取 {@link org.apache.hadoop.hbase.client.Table} 实例 + * + * @param tableName 表名 + * @return / + */ + + public synchronized Table getTable(TableName tableName) throws IOException { + return connection.getTable(tableName); + } + + // ===================================================================================== + // put 操作封装 + // ===================================================================================== + + public void put(String tableName, Put put) throws IOException { + if (StrUtil.isBlank(tableName) || put == null) { + return; + } + Table table = getTable(tableName); + try { + table.put(put); + } finally { + recycle(table); + } + } + + public void put(String tableName, String row, String family, String column, String value) + throws IOException { + Put put = newPut(row, null, family, column, value); + put(tableName, put); + } + + public void put(String tableName, String row, Long timestamp, String family, String column, String value) + throws IOException { + Put put = newPut(row, timestamp, family, column, value); + put(tableName, put); + } + + public void put(String tableName, String row, String family, Object obj) throws IOException { + put(tableName, row, null, family, obj); + } + + public void put(String tableName, String row, Long timestamp, String family, Object obj) throws IOException { + Put put = newPut(row, timestamp, family, obj); + put(tableName, put); + } + + public void put(String tableName, String row, String family, Map columnMap) + throws IOException { + Put put = newPut(row, null, family, columnMap); + put(tableName, put); + } + + public void put(String tableName, String row, Long timestamp, String family, Map columnMap) + throws IOException { + Put put = newPut(row, timestamp, family, columnMap); + put(tableName, put); + } + + public void put(String tableName, String row, Long timestamp, Map> familyMap) + throws IOException { + Put put = newPut(row, timestamp, familyMap); + put(tableName, put); + } + + public void batchPut(String tableName, Collection list) throws IOException, InterruptedException { + batch(tableName, list); + } + + public void batchPut(String tableName, String family, Collection list) + throws IOException, InterruptedException { + if (StrUtil.isBlank(tableName) || StrUtil.isBlank(family) || CollectionUtil.isEmpty(list)) { + return; + } + List puts = newPutList(family, list); + batchPut(tableName, puts); + } + + public static Put newPut(String row, Long timestamp, String family, String column, String value) { + if (StrUtil.isBlank(row) || StrUtil.isBlank(family) || StrUtil.isBlank(column) || StrUtil.isBlank(value)) { + return null; + } + Map columnMap = new HashMap<>(1); + columnMap.put(column, value); + return newPut(row, timestamp, family, columnMap); + } + + public static Put newPut(String row, Long timestamp, String family, Map columnMap) { + if (StrUtil.isBlank(row) || StrUtil.isBlank(family) || MapUtil.isEmpty(columnMap)) { + return null; + } + Map> familyMap = new HashMap<>(1); + familyMap.put(family, columnMap); + return newPut(row, timestamp, familyMap); + } + + @SuppressWarnings("unchecked") + public static Put newPut(String row, Long timestamp, String family, Object obj) { + if (obj == null) { + return null; + } + Map columnMap; + if (obj instanceof Map) { + columnMap = (Map) obj; + } else { + columnMap = BeanUtil.beanToMap(obj); + } + return newPut(row, timestamp, family, columnMap); + } + + public static Put newPut(String row, Long timestamp, Map> familyMap) { + + if (StrUtil.isBlank(row) || MapUtil.isEmpty(familyMap)) { + return null; + } + + if (timestamp == null) { + timestamp = System.currentTimeMillis(); + } + + Put put = new Put(Bytes.toBytes(row)); + for (Map.Entry> e : familyMap.entrySet()) { + String family = e.getKey(); + Map columnMap = e.getValue(); + if (MapUtil.isNotEmpty(columnMap)) { + for (Map.Entry entry : columnMap.entrySet()) { + String column = entry.getKey(); + Object value = entry.getValue(); + + if (ObjectUtil.isEmpty(value)) { + continue; + } + put.addColumn(Bytes.toBytes(family), Bytes.toBytes(column), timestamp, + Bytes.toBytes(String.valueOf(value))); + } + } + } + return put; + } + + private static List newPutList(String family, Collection list) { + long timestamp = System.currentTimeMillis(); + return list.stream() + .map(entity -> newPut(entity.getId(), timestamp, family, entity)) + .collect(Collectors.toList()); + } + + // ===================================================================================== + // delete 操作封装 + // ===================================================================================== + + public void delete(String tableName, Delete delete) throws IOException { + if (StrUtil.isBlank(tableName) || delete == null) { + return; + } + Table table = getTable(tableName); + try { + table.delete(delete); + } finally { + recycle(table); + } + } + + public void delete(String tableName, String row) throws IOException { + Delete delete = new Delete(Bytes.toBytes(row)); + delete(tableName, delete); + } + + public void batchDelete(String tableName, String... rows) throws IOException, InterruptedException { + if (ArrayUtil.isEmpty(rows)) { + return; + } + List deletes = Stream.of(rows) + .map(row -> new Delete(Bytes.toBytes(row))) + .distinct().collect(Collectors.toList()); + batchDelete(tableName, deletes); + } + + public void batchDelete(String tableName, List deletes) throws IOException, InterruptedException { + batch(tableName, deletes); + } + + // ===================================================================================== + // get 操作封装 + // ===================================================================================== + + public Result get(String tableName, String row) throws IOException { + if (StrUtil.isBlank(tableName) || StrUtil.isBlank(row)) { + return null; + } + Get get = newGet(row); + return get(tableName, get); + } + + public Result get(String tableName, Get get) throws IOException { + if (StrUtil.isBlank(tableName) || get == null) { + return null; + } + Table table = getTable(tableName); + try { + return table.get(get); + } finally { + recycle(table); + } + } + + public Result[] batchGet(String tableName, String[] rows) throws IOException { + if (StrUtil.isBlank(tableName) || ArrayUtil.isEmpty(rows)) { + return null; + } + List gets = newGetList(rows); + return batchGet(tableName, gets); + } + + public Result[] batchGet(String tableName, List gets) throws IOException { + if (StrUtil.isBlank(tableName) || CollectionUtil.isEmpty(gets)) { + return null; + } + Table table = getTable(tableName); + try { + return table.get(gets); + } finally { + recycle(table); + } + } + + /** + * 指定行、列族,以实体 {@link T} 形式返回数据 + * + * @param tableName 表名 + * @param row 指定行 + * @param family 列族 + * @param clazz 返回实体类型 + * @param 实体类型 + * @return / + */ + public T getEntity(String tableName, String row, String family, Class clazz) throws IOException { + + if (StrUtil.isBlank(tableName) || StrUtil.isBlank(row) || StrUtil.isBlank(family) || clazz == null) { + return null; + } + Map fieldMap = ReflectUtil.getFieldMap(clazz); + String[] columns = fieldMap.keySet().toArray(new String[0]); + Map columnMap = getColumnMap(tableName, row, family, columns); + return toEntity(columnMap, clazz); + } + + /** + * 指定多行、列族,以实体 {@link T} 列表形式返回数据 + * + * @param tableName 表名 + * @param rows 指定多行 + * @param family 列族 + * @param clazz 返回实体类型 + * @param 实体类型 + * @return / + */ + public List getEntityList(String tableName, String[] rows, String family, Class clazz) + throws IOException { + + if (StrUtil.isBlank(tableName) || ArrayUtil.isEmpty(rows) || StrUtil.isBlank(family) || clazz == null) { + return null; + } + + Map fieldMap = ReflectUtil.getFieldMap(clazz); + String[] columns = fieldMap.keySet().toArray(new String[0]); + List gets = newGetList(rows, family, columns); + + Result[] results = batchGet(tableName, gets); + if (ArrayUtil.isEmpty(results)) { + return new ArrayList<>(); + } + + List list = new ArrayList<>(); + for (Result result : results) { + Map columnMap = + getColumnsFromResult(result, tableName, family, CollectionUtil.newArrayList(columns)); + T entity = toEntity(columnMap, clazz); + list.add(entity); + } + return list; + } + + private T toEntity(Map columnMap, Class clazz) { + if (MapUtil.isEmpty(columnMap)) { + return null; + } + return BeanUtil.mapToBean(ColumnDo.toKvMap(columnMap), clazz, true, CopyOptions.create().ignoreError()); + } + + /** + * 查询列信息 + * + * @param tableName 表名 + * @param row 指定行 + * @param family 列族 + * @param column 列 + * @return / + */ + public ColumnDo getColumn(String tableName, String row, String family, String column) throws IOException { + + if (StrUtil.isBlank(tableName) || StrUtil.isBlank(row) || StrUtil.isBlank(family) || StrUtil.isBlank(column)) { + return null; + } + + Result result = get(tableName, row); + if (result == null) { + return null; + } + + return getColumnFromResult(result, tableName, family, column); + } + + /** + * 查询多列信息 + * + * @param tableName 表名 + * @param row 指定行 + * @param family 列族 + * @param columns 指定列 + * @return / + */ + public Map getColumnMap(String tableName, String row, String family, String... columns) + throws IOException { + + if (StrUtil.isBlank(tableName) || StrUtil.isBlank(row) || StrUtil.isBlank(family)) { + return null; + } + + Get get = newGet(row, family, columns); + Result result = get(tableName, get); + if (result == null) { + return null; + } + return getColumnsFromResult(result, tableName, family, Arrays.asList(columns)); + } + + /** + * 查询列族信息 + * + * @param tableName 表名 + * @param row 指定行 + * @param family 指定列族 + * @return / + */ + public FamilyDo getFamily(String tableName, String row, String family) throws IOException { + Map columnMap = getColumnMap(tableName, row, family); + return new FamilyDo(tableName, row, family, columnMap); + } + + /** + * 查询多列族信息 + * + * @param tableName 表名 + * @param row 指定行 + * @param familyColumnMap <列族, 要查询的列> + * @return / + */ + public Map getFamilyMap(String tableName, String row, + Map> familyColumnMap) throws IOException { + + if (StrUtil.isBlank(tableName) || StrUtil.isBlank(row)) { + return new HashMap<>(0); + } + + if (MapUtil.isEmpty(familyColumnMap)) { + RowDo rowDo = getRow(tableName, row); + if (rowDo == null) { + return new HashMap<>(0); + } + return rowDo.getFamilyMap(); + } + + Get get = newGet(row); + for (Map.Entry> entry : familyColumnMap.entrySet()) { + String family = entry.getKey(); + Collection columns = entry.getValue(); + if (CollectionUtil.isNotEmpty(columns)) { + for (String column : columns) { + get.addColumn(Bytes.toBytes(family), Bytes.toBytes(column)); + } + } + } + Result result = get(tableName, get); + if (result == null) { + return null; + } + + return getFamiliesFromResult(result, tableName, familyColumnMap); + } + + /** + * 查询行信息 + * + * @param tableName 表名 + * @param row 指定行 + * @return / + */ + public RowDo getRow(String tableName, String row) throws IOException { + if (StrUtil.isBlank(tableName) || StrUtil.isBlank(row)) { + return null; + } + Result result = get(tableName, row); + if (result == null) { + return null; + } + return getRowFromResult(result, tableName); + } + + /** + * 查询多行信息 + * + * @param tableName 表名 + * @param rows 指定多行 + * @return / + */ + public Map getRowMap(String tableName, String... rows) throws IOException { + if (StrUtil.isBlank(tableName) || ArrayUtil.isEmpty(rows)) { + return null; + } + Result[] results = batchGet(tableName, rows); + if (ArrayUtil.isEmpty(results)) { + return new HashMap<>(0); + } + Map map = new HashMap<>(results.length); + for (Result result : results) { + String row = Bytes.toString(result.getRow()); + RowDo rowDo = getRowFromResult(result, tableName); + map.put(row, rowDo); + } + return map; + } + + private static Get newGet(String row) { + return new Get(Bytes.toBytes(row)); + } + + private static Get newGet(String row, String family, String... columns) { + Get get = newGet(row); + get.addFamily(Bytes.toBytes(family)); + if (ArrayUtil.isNotEmpty(columns)) { + for (String column : columns) { + get.addColumn(Bytes.toBytes(family), Bytes.toBytes(column)); + } + } + return get; + } + + private static List newGetList(String[] rows) { + if (ArrayUtil.isEmpty(rows)) { + return new ArrayList<>(); + } + return Stream.of(rows).map(HbaseTemplate::newGet).collect(Collectors.toList()); + } + + private static List newGetList(String[] rows, String family, String[] columns) { + if (ArrayUtil.isEmpty(rows)) { + return new ArrayList<>(); + } + return Stream.of(rows).map(row -> newGet(row, family, columns)).collect(Collectors.toList()); + } + + // ===================================================================================== + // scan 操作封装 + // ===================================================================================== + + /** + * 返回匹配 {@link org.apache.hadoop.hbase.client.Scan} 的所有列族的数据 + * + * @param tableName 表名 + * @param scan {@link org.apache.hadoop.hbase.client.Scan} 实体 + * @return / + */ + public Result[] scan(String tableName, Scan scan) throws IOException { + Table table = getTable(tableName); + ResultScanner scanner = null; + try { + scanner = table.getScanner(scan); + return ArrayUtil.toArray(scanner, Result.class); + } finally { + IoUtil.close(scanner); + recycle(table); + } + } + + public PageData page(SingleFamilyScan scan) throws IOException { + if (scan == null) { + return null; + } + return getPageData(scan.getTableName(), scan.getPage(), scan.getSize(), scan.toScan(), + scan.getFamilyColumnMap()); + } + + public PageData page(MultiFamilyScan scan) throws IOException { + if (scan == null) { + return null; + } + return getPageData(scan.getTableName(), scan.getPage(), scan.getSize(), scan.toScan(), + scan.getFamilyColumnMap()); + } + + public ScrollData scroll(SingleFamilyScan scan) throws IOException { + if (scan == null) { + return null; + } + return getScrollData(scan.getTableName(), scan.getSize(), scan.toScan(), scan.getFamilyColumnMap()); + } + + public ScrollData scroll(MultiFamilyScan scan) throws IOException { + if (scan == null) { + return null; + } + return getScrollData(scan.getTableName(), scan.getSize(), scan.toScan(), scan.getFamilyColumnMap()); + } + + public PageData getEntityPage(SingleFamilyScan scan, Class clazz) throws IOException { + + Map fieldMap = ReflectUtil.getFieldMap(clazz); + Set columns = fieldMap.keySet(); + scan.setColumns(columns); + + PageData data = page(scan); + if (data == null || CollectionUtil.isEmpty(data.getContent())) { + return new PageData<>(scan.getPage(), scan.getSize(), 0L, new ArrayList<>()); + } + + List list = data.getContent().stream().map(rowDo -> { + Map> familyKvMap = rowDo.getFamilyKvMap(); + Map columnKvMap = familyKvMap.get(scan.getFamily()); + return BeanUtil.mapToBean(columnKvMap, clazz, true, CopyOptions.create().ignoreError()); + }).collect(Collectors.toList()); + return new PageData<>(scan.getPage(), scan.getSize(), data.getTotal(), list); + } + + public ScrollData getEntityScroll(SingleFamilyScan scan, Class clazz) throws IOException { + + Map fieldMap = ReflectUtil.getFieldMap(clazz); + Set columns = fieldMap.keySet(); + scan.setColumns(columns); + + ScrollData data = scroll(scan); + if (data == null || CollectionUtil.isEmpty(data.getContent())) { + return new ScrollData<>(scan.getStartRow(), scan.getStopRow(), null, 0, new ArrayList<>()); + } + + List list = data.getContent().stream().map(rowDo -> { + Map> familyKvMap = rowDo.getFamilyKvMap(); + Map columnKvMap = familyKvMap.get(scan.getFamily()); + return BeanUtil.mapToBean(columnKvMap, clazz, true, CopyOptions.create().ignoreError()); + }).collect(Collectors.toList()); + return new ScrollData<>(data.getStartRow(), data.getStopRow(), data.getScrollRow(), 0, list); + } + + public ScrollData getEntityScroll(String tableName, String family, String scrollRow, int size, + Class clazz) throws IOException { + SingleFamilyScan scan = new SingleFamilyScan(); + scan.setFamily(family) + .setScrollRow(scrollRow) + .setTableName(tableName) + .setSize(size) + .setReversed(false); + return getEntityScroll(scan, clazz); + } + + private PageData getPageData(String tableName, Integer page, Integer size, Scan scan, + Map> familyColumnMap) throws IOException { + Table table = getTable(tableName); + Map rowMap = new HashMap<>(size); + try { + int pageIndex = 1; + byte[] lastRow = null; + long total = 0L; + while (true) { + if (lastRow != null) { + scan.withStartRow(lastRow, false); + } + ResultScanner rs = table.getScanner(scan); + Iterator it = rs.iterator(); + int count = 0; + while (it.hasNext()) { + Result result = it.next(); + if (page == pageIndex) { + RowDo rowDo = getRowFromResult(result, tableName, familyColumnMap); + rowMap.put(rowDo.getRow(), rowDo); + } + lastRow = result.getRow(); + count++; + } + + pageIndex++; + rs.close(); + total += count; + if (count == 0) { + break; + } + } + return new PageData<>(page, size, total, rowMap.values()); + } finally { + recycle(table); + } + } + + private ScrollData getScrollData(String tableName, int size, Scan scan, + Map> familyColumnMap) throws IOException { + Table table = getTable(tableName); + ResultScanner scanner = null; + Map rowMap = new HashMap<>(size); + try { + scanner = table.getScanner(scan); + for (Result result : scanner) { + RowDo rowDo = getRowFromResult(result, tableName, familyColumnMap); + rowMap.put(rowDo.getRow(), rowDo); + } + + String scrollRow = null; + if (MapUtil.isNotEmpty(rowMap)) { + List rows = rowMap.values().stream() + .map(RowDo::getRow) + .collect(Collectors.toList()); + if (scan.isReversed()) { + scrollRow = CollectionUtil.min(rows); + } else { + scrollRow = CollectionUtil.max(rows); + } + } + return new ScrollData<>(Bytes.toString(scan.getStartRow()), Bytes.toString(scan.getStopRow()), + scrollRow, size, rowMap.values()); + } finally { + IoUtil.close(scanner); + recycle(table); + } + } + + // ===================================================================================== + // 其他操作封装 + // ===================================================================================== + + public long incrementColumnValue(String tableName, String row, String family, String column, long amount) + throws IOException { + return incrementColumnValue(tableName, row, family, column, amount, Durability.SYNC_WAL); + } + + public long incrementColumnValue(String tableName, String row, String family, String column, long amount, + Durability durability) throws IOException { + if (StrUtil.isBlank(tableName) || StrUtil.isBlank(row) || StrUtil.isBlank(family) || StrUtil.isBlank(column)) { + return -1L; + } + Table table = getTable(tableName); + try { + return table.incrementColumnValue(Bytes.toBytes(row), Bytes.toBytes(family), Bytes.toBytes(column), amount, + durability); + } finally { + recycle(table); + } + } + + private void batch(String tableName, Collection list) + throws IOException, InterruptedException { + if (StrUtil.isBlank(tableName) || CollectionUtil.isEmpty(list)) { + return; + } + Object[] results = new Object[list.size()]; + Table table = getTable(tableName); + try { + table.batch(new ArrayList<>(list), results); + } finally { + recycle(table); + } + } + + private void recycle(Table table) { + if (null == table) { + return; + } + IoUtil.close(table); + } + + private static RowDo getRowFromResult(Result result, String tableName) { + + if (result == null || result.isEmpty()) { + return null; + } + + String row = Bytes.toString(result.getRow()); + Map> familyColumnMap = new HashMap<>(result.size()); + for (Cell cell : result.listCells()) { + String family = Bytes.toString(CellUtil.cloneFamily(cell)); + if (!familyColumnMap.containsKey(family)) { + familyColumnMap.put(family, new HashMap<>(0)); + } + String column = Bytes.toString(CellUtil.cloneQualifier(cell)); + String value = Bytes.toString(CellUtil.cloneValue(cell)); + long timestamp = cell.getTimestamp(); + ColumnDo columnDo = new ColumnDo(tableName, row, family, timestamp, column, value); + familyColumnMap.get(family).put(column, columnDo); + } + + Map familyMap = new HashMap<>(familyColumnMap.size()); + familyColumnMap.forEach((family, columnMap) -> { + FamilyDo familyDo = new FamilyDo(tableName, row, family, columnMap); + familyMap.put(family, familyDo); + }); + return new RowDo(tableName, row, familyMap); + } + + private static RowDo getRowFromResult(Result result, String tableName, + Map> familyColumnMap) { + if (MapUtil.isEmpty(familyColumnMap)) { + return getRowFromResult(result, tableName); + } + String row = Bytes.toString(result.getRow()); + Map familyMap = getFamiliesFromResult(result, tableName, familyColumnMap); + return new RowDo(tableName, row, familyMap); + } + + private static FamilyDo getFamilyFromResult(Result result, String tableName, String family) { + + if (result == null || result.isEmpty()) { + return null; + } + + RowDo rowDo = getRowFromResult(result, tableName); + if (rowDo == null || MapUtil.isEmpty(rowDo.getFamilyMap())) { + return null; + } + return rowDo.getFamilyMap().get(family); + } + + private static Map getFamiliesFromResult(Result result, String tableName, + Map> familyColumnMap) { + + if (result == null || StrUtil.isBlank(tableName) || MapUtil.isEmpty(familyColumnMap)) { + return new HashMap<>(0); + } + + String row = Bytes.toString(result.getRow()); + Map familyMap = new HashMap<>(familyColumnMap.size()); + familyColumnMap.forEach((family, columns) -> { + Map columnMap; + if (CollectionUtil.isNotEmpty(columns)) { + columnMap = new HashMap<>(columns.size()); + for (String column : columns) { + ColumnDo columnDo = getColumnFromResult(result, tableName, family, column); + columnMap.put(column, columnDo); + } + } else { + columnMap = new HashMap<>(0); + } + familyMap.put(family, new FamilyDo(tableName, row, family, columnMap)); + }); + return familyMap; + } + + private static ColumnDo getColumnFromResult(Result result, String tableName, String family, String column) { + + if (result == null || StrUtil.isBlank(tableName) || StrUtil.isBlank(family) || StrUtil.isBlank(column)) { + return null; + } + + Cell cell = result.getColumnLatestCell(Bytes.toBytes(family), Bytes.toBytes(column)); + if (cell == null) { + return null; + } + String row = Bytes.toString(result.getRow()); + String value = Bytes.toString(CellUtil.cloneValue(cell)); + long timestamp = cell.getTimestamp(); + return new ColumnDo(tableName, row, family, timestamp, column, value); + } + + private static Map getColumnsFromResult(Result result, String tableName, String family, + Collection columns) { + if (CollectionUtil.isEmpty(columns)) { + RowDo rowDo = getRowFromResult(result, tableName); + return rowDo.getFamilyMap().get(family).getColumnMap(); + } + Map columnMap = new HashMap<>(columns.size()); + for (String column : columns) { + ColumnDo columnDo = getColumnFromResult(result, tableName, family, column); + columnMap.put(column, columnDo); + } + return columnMap; + } + +} diff --git a/codes/javadb/hbase/src/main/java/io/github/dunwu/javadb/hbase/config/EnableHbase.java b/codes/javadb/hbase/src/main/java/io/github/dunwu/javadb/hbase/config/EnableHbase.java new file mode 100644 index 0000000..466cc9c --- /dev/null +++ b/codes/javadb/hbase/src/main/java/io/github/dunwu/javadb/hbase/config/EnableHbase.java @@ -0,0 +1,26 @@ +package io.github.dunwu.javadb.hbase.config; + +import org.springframework.context.annotation.EnableAspectJAutoProxy; +import org.springframework.context.annotation.Import; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * 启动 HBase 配置注解 + * + * @author Zhang Peng + * @date 2023-06-30 + */ +@Target({ ElementType.TYPE }) +@Retention(RetentionPolicy.RUNTIME) +@EnableAspectJAutoProxy( + proxyTargetClass = false +) +@Import({ HbaseConfiguration.class }) +@Documented +public @interface EnableHbase { +} diff --git a/codes/javadb/hbase/src/main/java/io/github/dunwu/javadb/hbase/config/HbaseConfiguration.java b/codes/javadb/hbase/src/main/java/io/github/dunwu/javadb/hbase/config/HbaseConfiguration.java new file mode 100644 index 0000000..1b5cb0d --- /dev/null +++ b/codes/javadb/hbase/src/main/java/io/github/dunwu/javadb/hbase/config/HbaseConfiguration.java @@ -0,0 +1,29 @@ +package io.github.dunwu.javadb.hbase.config; + +import io.github.dunwu.javadb.hbase.HbaseAdmin; +import io.github.dunwu.javadb.hbase.HbaseFactory; +import io.github.dunwu.javadb.hbase.HbaseTemplate; +import org.springframework.context.annotation.Bean; + +import java.io.IOException; + +/** + * HBase 启动配置 + * + * @author Zhang Peng + * @date 2023-07-04 + */ +@org.springframework.context.annotation.Configuration +public class HbaseConfiguration { + + @Bean("hbaseTemplate") + public HbaseTemplate hbaseTemplate() throws IOException { + return HbaseFactory.newHbaseTemplate(); + } + + @Bean("hbaseAdmin") + public HbaseAdmin hbaseAdmin() throws IOException { + return HbaseFactory.newHbaseAdmin(); + } + +} diff --git a/codes/javadb/hbase/src/main/java/io/github/dunwu/javadb/hbase/entity/BaseHbaseEntity.java b/codes/javadb/hbase/src/main/java/io/github/dunwu/javadb/hbase/entity/BaseHbaseEntity.java new file mode 100644 index 0000000..1fec54a --- /dev/null +++ b/codes/javadb/hbase/src/main/java/io/github/dunwu/javadb/hbase/entity/BaseHbaseEntity.java @@ -0,0 +1,25 @@ +package io.github.dunwu.javadb.hbase.entity; + +import java.io.Serializable; + +/** + * HBase 基础实体 + * + * @author Zhang Peng + * @date 2023-11-15 + */ +public abstract class BaseHbaseEntity implements Serializable { + + /** + * 获取主键 + */ + public abstract String getId(); + + /** + * 获取主键字段名 + */ + public abstract String getIdKey(); + + private static final long serialVersionUID = 5075127328254616085L; + +} diff --git a/codes/javadb/hbase/src/main/java/io/github/dunwu/javadb/hbase/entity/ColumnDo.java b/codes/javadb/hbase/src/main/java/io/github/dunwu/javadb/hbase/entity/ColumnDo.java new file mode 100644 index 0000000..a2ba66a --- /dev/null +++ b/codes/javadb/hbase/src/main/java/io/github/dunwu/javadb/hbase/entity/ColumnDo.java @@ -0,0 +1,82 @@ +package io.github.dunwu.javadb.hbase.entity; + +import cn.hutool.core.map.MapUtil; +import cn.hutool.core.util.StrUtil; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; + +/** + * HBase 列实体 + * + * @author Zhang Peng + * @date 2023-05-19 + */ +@Data +@NoArgsConstructor +@AllArgsConstructor +public class ColumnDo { + + /** 表名 */ + private String tableName; + /** 行 */ + private String row; + /** 列族 */ + private String family; + /** 时间戳 */ + private Long timestamp; + /** 列 */ + private String column; + /** 列值 */ + private String value; + + public boolean check() { + return check(this); + } + + public static boolean check(ColumnDo columnDo) { + return columnDo != null + && StrUtil.isNotBlank(columnDo.getTableName()) + && StrUtil.isNotBlank(columnDo.getRow()) + && StrUtil.isNotBlank(columnDo.getFamily()) + && StrUtil.isNotEmpty(columnDo.getColumn()); + } + + public static Map toColumnMap(String tableName, String row, String family, + Map columnValueMap) { + if (MapUtil.isEmpty(columnValueMap)) { + return new HashMap<>(0); + } + Map map = new HashMap<>(columnValueMap.size()); + columnValueMap.forEach((column, value) -> { + ColumnDo columnDo = new ColumnDo(tableName, row, family, null, column, value); + if (columnDo.check()) { + map.put(column, columnDo); + } + }); + return map; + } + + public static Map toKvMap(Map columnMap) { + if (MapUtil.isEmpty(columnMap)) { + return new HashMap<>(0); + } + Collection columns = columnMap.values().stream() + .filter(Objects::nonNull) + .collect(Collectors.toList()); + Map map = new HashMap<>(columns.size()); + for (ColumnDo columnDo : columns) { + if (columnDo.check()) { + map.put(columnDo.getColumn(), columnDo.getValue()); + } + } + return map; + } + +} diff --git a/codes/javadb/hbase/src/main/java/io/github/dunwu/javadb/hbase/entity/FamilyDo.java b/codes/javadb/hbase/src/main/java/io/github/dunwu/javadb/hbase/entity/FamilyDo.java new file mode 100644 index 0000000..61d89c7 --- /dev/null +++ b/codes/javadb/hbase/src/main/java/io/github/dunwu/javadb/hbase/entity/FamilyDo.java @@ -0,0 +1,73 @@ +package io.github.dunwu.javadb.hbase.entity; + +import cn.hutool.core.map.MapUtil; +import cn.hutool.core.util.StrUtil; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.HashMap; +import java.util.Map; + +/** + * HBase 列族实体 + * + * @author Zhang Peng + * @date 2023-05-19 + */ +@Data +@NoArgsConstructor +@AllArgsConstructor +public class FamilyDo { + + /** 表名 */ + private String tableName; + /** 行 */ + private String row; + /** 列族 */ + private String family; + /** 列 Map(key 为 column;value 为列详细信息) */ + private Map columnMap; + + public boolean check() { + return check(this); + } + + public Map getColumnKvMap() { + return FamilyDo.getColumnKvMap(this); + } + + public static Map getColumnKvMap(FamilyDo familyDo) { + if (familyDo == null || MapUtil.isEmpty(familyDo.getColumnMap())) { + return new HashMap<>(0); + } + return ColumnDo.toKvMap(familyDo.getColumnMap()); + } + + public static boolean check(FamilyDo familyDo) { + return familyDo != null + && StrUtil.isNotBlank(familyDo.getTableName()) + && StrUtil.isNotBlank(familyDo.getRow()) + && StrUtil.isNotBlank(familyDo.getFamily()) + && MapUtil.isNotEmpty(familyDo.getColumnMap()); + } + + public static Map toFamilyMap(String tableName, String row, + Map> familyColumnValueMap) { + if (MapUtil.isEmpty(familyColumnValueMap)) { + return new HashMap<>(0); + } + + Map familyMap = new HashMap<>(familyColumnValueMap.size()); + familyColumnValueMap.forEach((family, columnMap) -> { + familyMap.put(family, toFamily(tableName, row, family, columnMap)); + }); + return familyMap; + } + + public static FamilyDo toFamily(String tableName, String row, String family, Map columnValueMap) { + Map columnMap = ColumnDo.toColumnMap(tableName, row, family, columnValueMap); + return new FamilyDo(tableName, row, family, columnMap); + } + +} diff --git a/codes/javadb/hbase/src/main/java/io/github/dunwu/javadb/hbase/entity/HBaseFamilyData.java b/codes/javadb/hbase/src/main/java/io/github/dunwu/javadb/hbase/entity/HBaseFamilyData.java deleted file mode 100644 index dc5ddca..0000000 --- a/codes/javadb/hbase/src/main/java/io/github/dunwu/javadb/hbase/entity/HBaseFamilyData.java +++ /dev/null @@ -1,37 +0,0 @@ -package io.github.dunwu.javadb.hbase.entity; - -import cn.hutool.core.map.MapUtil; -import lombok.AllArgsConstructor; -import lombok.Data; -import lombok.NoArgsConstructor; - -import java.util.HashMap; -import java.util.Map; - -/** - * HBase 列族数据结构 - * - * @author Zhang Peng - * @date 2023-05-19 - */ -@Data -@NoArgsConstructor -@AllArgsConstructor -public class HBaseFamilyData { - - private String family; - private Map columnMap; - - public static Map toFamilyMap(Map> map) { - if (MapUtil.isEmpty(map)) { - return new HashMap<>(0); - } - - Map familyMap = new HashMap<>(map.size()); - map.forEach((family, columnMap) -> { - familyMap.put(family, new HBaseFamilyData(family, columnMap)); - }); - return familyMap; - } - -} diff --git a/codes/javadb/hbase/src/main/java/io/github/dunwu/javadb/hbase/entity/HBaseRowData.java b/codes/javadb/hbase/src/main/java/io/github/dunwu/javadb/hbase/entity/HBaseRowData.java deleted file mode 100644 index 6d9bb83..0000000 --- a/codes/javadb/hbase/src/main/java/io/github/dunwu/javadb/hbase/entity/HBaseRowData.java +++ /dev/null @@ -1,98 +0,0 @@ -package io.github.dunwu.javadb.hbase.entity; - -import cn.hutool.core.map.MapUtil; -import lombok.AllArgsConstructor; -import lombok.Data; -import lombok.NoArgsConstructor; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -/** - * HBase 行数据结构 - * - * @author Zhang Peng - * @date 2023-05-19 - */ -@Data -@NoArgsConstructor -@AllArgsConstructor -public class HBaseRowData { - - private String row; - private Long timestamp; - private Map familyMap = new HashMap<>(); - - public Map> toMap() { - return toMap(this); - } - - public static HBaseRowData build(String row, Long timestamp, Map familyMap) { - return new HBaseRowData(row, timestamp, familyMap); - } - - public static HBaseRowData buildByMap(String row, Long timestamp, Map> map) { - return new HBaseRowData(row, timestamp, HBaseFamilyData.toFamilyMap(map)); - } - - public static Map> toMap(HBaseRowData data) { - - if (data == null || MapUtil.isEmpty(data.getFamilyMap())) { - return new HashMap<>(0); - } - - Map> map = new HashMap<>(data.getFamilyMap().size()); - data.getFamilyMap().forEach((family, familyData) -> { - map.put(family, familyData.getColumnMap()); - }); - return map; - } - - public static Map toRowMap(Map>> rowMultiFamilyMap) { - if (MapUtil.isEmpty(rowMultiFamilyMap)) { - return new HashMap<>(0); - } - - Map rowDataMap = new HashMap<>(rowMultiFamilyMap.size()); - rowMultiFamilyMap.forEach((row, familyDataMap) -> { - Map familyMap = HBaseFamilyData.toFamilyMap(familyDataMap); - rowDataMap.put(row, new HBaseRowData(row, null, familyMap)); - }); - return rowDataMap; - } - - public static List toRowList(Map>> rowMultiFamilyMap) { - Map rowMap = toRowMap(rowMultiFamilyMap); - if (MapUtil.isEmpty(rowMap)) { - return new ArrayList<>(); - } - return new ArrayList<>(rowMap.values()); - } - - public static Map toRowMap(String family, Map> rowColumnMap) { - if (MapUtil.isEmpty(rowColumnMap)) { - return new HashMap<>(0); - } - - Map rowDataMap = new HashMap<>(rowColumnMap.size()); - - rowColumnMap.forEach((row, columnMap) -> { - HBaseFamilyData familyData = new HBaseFamilyData(family, columnMap); - Map familyMap = new HashMap<>(); - familyMap.put(family, familyData); - rowDataMap.put(row, new HBaseRowData(row, null, familyMap)); - }); - return rowDataMap; - } - - public static List toRowList(String family, Map> rowColumnMap) { - Map rowMap = toRowMap(family, rowColumnMap); - if (MapUtil.isEmpty(rowMap)) { - return new ArrayList<>(); - } - return new ArrayList<>(rowMap.values()); - } - -} diff --git a/codes/javadb/hbase/src/main/java/io/github/dunwu/javadb/hbase/entity/PageData.java b/codes/javadb/hbase/src/main/java/io/github/dunwu/javadb/hbase/entity/PageData.java index 4d3f433..18c755a 100644 --- a/codes/javadb/hbase/src/main/java/io/github/dunwu/javadb/hbase/entity/PageData.java +++ b/codes/javadb/hbase/src/main/java/io/github/dunwu/javadb/hbase/entity/PageData.java @@ -4,10 +4,16 @@ import lombok.Data; import java.util.Collection; +/** + * HBase 分页数据实体 + * + * @author Zhang Peng + * @date 2023-05-19 + */ @Data public class PageData { - private Integer number; + private Integer page; private Integer size; private Long total; private Integer totalPages; @@ -15,8 +21,8 @@ public class PageData { public PageData() { } - public PageData(Integer number, Integer size, Long total, Collection content) { - this.number = number; + public PageData(Integer page, Integer size, Long total, Collection content) { + this.page = page; this.size = size; this.total = total; this.content = content; diff --git a/codes/javadb/hbase/src/main/java/io/github/dunwu/javadb/hbase/entity/RowDo.java b/codes/javadb/hbase/src/main/java/io/github/dunwu/javadb/hbase/entity/RowDo.java new file mode 100644 index 0000000..5432aca --- /dev/null +++ b/codes/javadb/hbase/src/main/java/io/github/dunwu/javadb/hbase/entity/RowDo.java @@ -0,0 +1,87 @@ +package io.github.dunwu.javadb.hbase.entity; + +import cn.hutool.core.map.MapUtil; +import cn.hutool.core.util.StrUtil; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * HBase 行实体 + * + * @author Zhang Peng + * @date 2023-05-19 + */ +@Data +@NoArgsConstructor +@AllArgsConstructor +public class RowDo { + + /** 表名 */ + private String tableName; + /** 行 */ + private String row; + /** 列族 Map(key 为 family;value 为列族详细信息) */ + private Map familyMap; + + public boolean check() { + return check(this); + } + + public Map> getFamilyKvMap() { + return RowDo.getFamilyKvMap(this); + } + + public static boolean check(RowDo rowDo) { + return rowDo != null + && StrUtil.isNotBlank(rowDo.getTableName()) + && StrUtil.isNotBlank(rowDo.getRow()) + && MapUtil.isNotEmpty(rowDo.getFamilyMap()); + } + + public static Map> getFamilyKvMap(RowDo rowDo) { + if (rowDo == null || MapUtil.isEmpty(rowDo.getFamilyMap())) { + return new HashMap<>(0); + } + Map> kvMap = new HashMap<>(rowDo.getFamilyMap().size()); + rowDo.getFamilyMap().forEach((family, familyDo) -> { + kvMap.put(family, familyDo.getColumnKvMap()); + }); + return kvMap; + } + + public static Map toRowMap(String tableName, Map>> map) { + if (MapUtil.isEmpty(map)) { + return new HashMap<>(0); + } + + Map rowMap = new HashMap<>(map.size()); + map.forEach((row, familyMap) -> { + RowDo rowDo = new RowDo(tableName, row, FamilyDo.toFamilyMap(tableName, row, familyMap)); + rowMap.put(row, rowDo); + }); + return rowMap; + } + + public static List toRowList(String tableName, Map>> map) { + Map rowMap = toRowMap(tableName, map); + if (MapUtil.isEmpty(rowMap)) { + return new ArrayList<>(); + } + return new ArrayList<>(rowMap.values()); + } + + public static RowDo toRow(String tableName, String row, Map> familyColumnMap) { + if (MapUtil.isEmpty(familyColumnMap)) { + return null; + } + Map familyMap = FamilyDo.toFamilyMap(tableName, row, familyColumnMap); + return new RowDo(tableName, row, familyMap); + } + +} diff --git a/codes/javadb/hbase/src/main/java/io/github/dunwu/javadb/hbase/entity/ScrollData.java b/codes/javadb/hbase/src/main/java/io/github/dunwu/javadb/hbase/entity/ScrollData.java new file mode 100644 index 0000000..1465027 --- /dev/null +++ b/codes/javadb/hbase/src/main/java/io/github/dunwu/javadb/hbase/entity/ScrollData.java @@ -0,0 +1,26 @@ +package io.github.dunwu.javadb.hbase.entity; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.Collection; + +/** + * Hbase 滚动数据实体 + * + * @author Zhang Peng + * @date 2023-11-16 + */ +@Data +@NoArgsConstructor +@AllArgsConstructor +public class ScrollData { + + private String startRow; + private String stopRow; + private String scrollRow; + private Integer size; + private Collection content; + +} diff --git a/codes/javadb/hbase/src/main/java/io/github/dunwu/javadb/hbase/entity/BaseFamilyRequest.java b/codes/javadb/hbase/src/main/java/io/github/dunwu/javadb/hbase/entity/scan/BaseScan.java similarity index 59% rename from codes/javadb/hbase/src/main/java/io/github/dunwu/javadb/hbase/entity/BaseFamilyRequest.java rename to codes/javadb/hbase/src/main/java/io/github/dunwu/javadb/hbase/entity/scan/BaseScan.java index c3b6f1a..667b99c 100644 --- a/codes/javadb/hbase/src/main/java/io/github/dunwu/javadb/hbase/entity/BaseFamilyRequest.java +++ b/codes/javadb/hbase/src/main/java/io/github/dunwu/javadb/hbase/entity/scan/BaseScan.java @@ -1,4 +1,4 @@ -package io.github.dunwu.javadb.hbase.entity; +package io.github.dunwu.javadb.hbase.entity.scan; import cn.hutool.core.util.StrUtil; import lombok.Data; @@ -14,50 +14,32 @@ import java.util.ArrayList; import java.util.List; /** - * HBase 封装请求参数 + * HBase 基本 scan 封装请求参数 * * @author Zhang Peng * @date 2023-05-19 */ @Data @Accessors(chain = true) -public class BaseFamilyRequest { +public class BaseScan { - /** - * 表名 - */ + /** 表名 */ protected String tableName; - /** - * 起始 row - */ + /** 起始 row */ protected String startRow; - /** - * 结束 row - */ + /** 结束 row */ protected String stopRow; - /** - * 起始时间 - */ + /** 起始时间 */ protected Long minTimeStamp; - /** - * 结束时间 - */ + /** 结束时间 */ protected Long maxTimeStamp; - /** - * 是否降序 - */ + /** 是否降序(true: 降序;false:正序) */ protected boolean reversed = false; - /** - * 页号 - */ - protected Integer pageNo; - /** - * 默认的每页记录数大小(pageNo!=null时才使用) - */ - protected Integer pageSize = 10; - /** - * 过滤器列表 - */ + /** 页号 */ + protected Integer page; + /** 每页记录数大小 */ + protected Integer size = 100; + /** 过滤器列表 */ protected List filters = new ArrayList<>(); public void addFilter(Filter filter) { @@ -66,22 +48,31 @@ public class BaseFamilyRequest { public Scan toScan() throws IOException { Scan scan = new Scan(); + + // 缓存1000条数据 + scan.setCaching(1000); + scan.setCacheBlocks(false); scan.setReversed(reversed); if (StrUtil.isNotBlank(startRow)) { - scan.withStartRow(Bytes.toBytes(startRow), true); + if (reversed) { + scan.withStopRow(Bytes.toBytes(startRow), true); + } else { + scan.withStartRow(Bytes.toBytes(startRow), true); + } } if (StrUtil.isNotBlank(stopRow)) { - scan.withStartRow(Bytes.toBytes(stopRow), false); + if (reversed) { + scan.withStartRow(Bytes.toBytes(stopRow), true); + } else { + scan.withStopRow(Bytes.toBytes(stopRow), true); + } } if (minTimeStamp != null && maxTimeStamp != null) { scan.setTimeRange(minTimeStamp, maxTimeStamp); } - if (pageNo != null) { - PageFilter pageFilter = new PageFilter(pageSize); + if (size != null) { + PageFilter pageFilter = new PageFilter(size); filters.add(pageFilter); - // 缓存1000条数据 - scan.setCaching(1000); - scan.setCacheBlocks(false); } FilterList filterList = new FilterList(); for (Filter filter : filters) { diff --git a/codes/javadb/hbase/src/main/java/io/github/dunwu/javadb/hbase/entity/HBaseMultiFamilyRequest.java b/codes/javadb/hbase/src/main/java/io/github/dunwu/javadb/hbase/entity/scan/MultiFamilyScan.java similarity index 54% rename from codes/javadb/hbase/src/main/java/io/github/dunwu/javadb/hbase/entity/HBaseMultiFamilyRequest.java rename to codes/javadb/hbase/src/main/java/io/github/dunwu/javadb/hbase/entity/scan/MultiFamilyScan.java index 06e9ae5..69c5899 100644 --- a/codes/javadb/hbase/src/main/java/io/github/dunwu/javadb/hbase/entity/HBaseMultiFamilyRequest.java +++ b/codes/javadb/hbase/src/main/java/io/github/dunwu/javadb/hbase/entity/scan/MultiFamilyScan.java @@ -1,9 +1,8 @@ -package io.github.dunwu.javadb.hbase.entity; +package io.github.dunwu.javadb.hbase.entity.scan; import cn.hutool.core.collection.CollectionUtil; import cn.hutool.core.map.MapUtil; -import lombok.Data; -import lombok.experimental.Accessors; +import cn.hutool.core.util.StrUtil; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Bytes; @@ -13,37 +12,46 @@ import java.util.HashMap; import java.util.Map; /** - * HBase 封装请求参数 + * HBase 多列族 scan 封装请求参数 * * @author Zhang Peng * @date 2023-05-19 */ -@Data -@Accessors(chain = true) -public class HBaseMultiFamilyRequest extends BaseFamilyRequest { +public class MultiFamilyScan extends BaseScan { /** * 列族, 列族所包含的列(不可为空) */ - private final Map> familyColumns = new HashMap<>(); + private Map> familyColumnMap = new HashMap<>(); + private String scrollRow; - public HBaseMultiFamilyRequest addFamilyColumn(String family, Collection columns) { - this.familyColumns.put(family, columns); + public Map> getFamilyColumnMap() { + return familyColumnMap; + } + + public MultiFamilyScan setFamilyColumnMap( + Map> familyColumnMap) { + this.familyColumnMap = familyColumnMap; return this; } - public HBaseMultiFamilyRequest addFamilyColumns(Map> familyColumns) { - if (MapUtil.isNotEmpty(familyColumns)) { - this.familyColumns.putAll(familyColumns); - } + public String getScrollRow() { + return scrollRow; + } + + public MultiFamilyScan setScrollRow(String scrollRow) { + this.scrollRow = scrollRow; return this; } @Override public Scan toScan() throws IOException { Scan scan = super.toScan(); - if (MapUtil.isNotEmpty(familyColumns)) { - for (Map.Entry> entry : familyColumns.entrySet()) { + if (StrUtil.isNotBlank(scrollRow)) { + scan.withStartRow(Bytes.toBytes(scrollRow), false); + } + if (MapUtil.isNotEmpty(familyColumnMap)) { + for (Map.Entry> entry : familyColumnMap.entrySet()) { String family = entry.getKey(); Collection columns = entry.getValue(); if (CollectionUtil.isNotEmpty(columns)) { diff --git a/codes/javadb/hbase/src/main/java/io/github/dunwu/javadb/hbase/entity/HBaseFamilyRequest.java b/codes/javadb/hbase/src/main/java/io/github/dunwu/javadb/hbase/entity/scan/SingleFamilyScan.java similarity index 52% rename from codes/javadb/hbase/src/main/java/io/github/dunwu/javadb/hbase/entity/HBaseFamilyRequest.java rename to codes/javadb/hbase/src/main/java/io/github/dunwu/javadb/hbase/entity/scan/SingleFamilyScan.java index 984b611..9858265 100644 --- a/codes/javadb/hbase/src/main/java/io/github/dunwu/javadb/hbase/entity/HBaseFamilyRequest.java +++ b/codes/javadb/hbase/src/main/java/io/github/dunwu/javadb/hbase/entity/scan/SingleFamilyScan.java @@ -1,6 +1,7 @@ -package io.github.dunwu.javadb.hbase.entity; +package io.github.dunwu.javadb.hbase.entity.scan; import cn.hutool.core.collection.CollectionUtil; +import cn.hutool.core.util.StrUtil; import lombok.Data; import lombok.experimental.Accessors; import org.apache.hadoop.hbase.client.Scan; @@ -9,23 +10,29 @@ import org.apache.hadoop.hbase.util.Bytes; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; +import java.util.Map; /** - * HBase 封装请求参数 + * HBase 单列族 scan 封装请求参数 * * @author Zhang Peng * @date 2023-05-19 */ @Data @Accessors(chain = true) -public class HBaseFamilyRequest extends BaseFamilyRequest { +public class SingleFamilyScan extends BaseScan { private String family; private Collection columns = new ArrayList<>(); + private String scrollRow; @Override public Scan toScan() throws IOException { Scan scan = super.toScan(); + if (StrUtil.isNotBlank(scrollRow)) { + scan.withStartRow(Bytes.toBytes(scrollRow), false); + } if (CollectionUtil.isNotEmpty(this.getColumns())) { for (String column : columns) { scan.addColumn(Bytes.toBytes(family), Bytes.toBytes(column)); @@ -34,4 +41,13 @@ public class HBaseFamilyRequest extends BaseFamilyRequest { return scan; } + public Map> getFamilyColumnMap() { + if (StrUtil.isNotBlank(family) && CollectionUtil.isNotEmpty(columns)) { + Map> familyColumnMap = new HashMap<>(1); + familyColumnMap.put(family, columns); + return familyColumnMap; + } + return new HashMap<>(0); + } + } diff --git a/codes/javadb/hbase/src/test/java/io/github/dunwu/javadb/hbase/HbaseMapperTest.java b/codes/javadb/hbase/src/test/java/io/github/dunwu/javadb/hbase/HbaseMapperTest.java new file mode 100644 index 0000000..4ea069f --- /dev/null +++ b/codes/javadb/hbase/src/test/java/io/github/dunwu/javadb/hbase/HbaseMapperTest.java @@ -0,0 +1,75 @@ +package io.github.dunwu.javadb.hbase; + +import cn.hutool.core.collection.CollectionUtil; +import cn.hutool.core.util.StrUtil; +import cn.hutool.json.JSONUtil; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.math.BigDecimal; +import java.util.Arrays; +import java.util.List; + +/** + * @author Zhang Peng + * @date 2023-11-15 + */ +public class HbaseMapperTest { + + private static ProductMapper mapper; + + static { + HbaseTemplate hbaseTemplate = null; + try { + hbaseTemplate = HbaseFactory.newHbaseTemplate(); + HbaseAdmin hbaseAdmin = HbaseFactory.newHbaseAdmin(); + mapper = new ProductMapper(hbaseTemplate, hbaseAdmin); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Test + @DisplayName("batchSave") + public void batchSave() { + Product product1 = new Product("test-key-8", "product8", new BigDecimal(4000.0)); + Product product2 = new Product("test-key-9", "product9", new BigDecimal(5000.0)); + List products = CollectionUtil.newArrayList(product1, product2); + mapper.batchSave(products); + } + + @Test + @DisplayName("batchGet") + public void batchGet() { + List list = mapper.pojoListByIds(Arrays.asList("test-key-8", "test-key-9")); + System.out.println(JSONUtil.toJsonStr(list)); + } + + @Test + @DisplayName("scroll") + public void scroll() { + int size = 1; + String lastId = null; + List list = mapper.scroll(null, size); + if (CollectionUtil.isNotEmpty(list)) { + Product last = CollectionUtil.getLast(list); + System.out.println("entity: " + JSONUtil.toJsonPrettyStr(last)); + lastId = last.getId(); + } + + while (true) { + List products = mapper.scroll(lastId, size); + if (CollectionUtil.isEmpty(list)) { + break; + } + Product last = CollectionUtil.getLast(products); + System.out.println("entity: " + JSONUtil.toJsonPrettyStr(last)); + lastId = last.getId(); + if (StrUtil.isBlank(lastId)) { + break; + } + } + } + +} diff --git a/codes/javadb/hbase/src/test/java/io/github/dunwu/javadb/hbase/HbaseTemplateDeleteTest.java b/codes/javadb/hbase/src/test/java/io/github/dunwu/javadb/hbase/HbaseTemplateDeleteTest.java new file mode 100644 index 0000000..cffb5b1 --- /dev/null +++ b/codes/javadb/hbase/src/test/java/io/github/dunwu/javadb/hbase/HbaseTemplateDeleteTest.java @@ -0,0 +1,47 @@ +package io.github.dunwu.javadb.hbase; + +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * Hbase 删除测试 + * + * @author Zhang Peng + * @date 2023-11-13 + */ +public class HbaseTemplateDeleteTest { + + public static final String TABLE_NAME = "test:test"; + + private static final HbaseTemplate HBASE_TEMPLATE; + + static { + try { + HBASE_TEMPLATE = HbaseFactory.newHbaseTemplate(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Test + @DisplayName("删除单条记录") + public void testDelete() throws IOException { + String rowkey = "test-key-1"; + HBASE_TEMPLATE.delete(TABLE_NAME, rowkey); + } + + @Test + @DisplayName("批量删除记录") + public void testBatchDelete() throws IOException, InterruptedException { + List rowkeys = new ArrayList<>(); + for (int i = 1; i <= 13; i++) { + rowkeys.add("test-key-" + i); + } + HBASE_TEMPLATE.batchDelete(TABLE_NAME, rowkeys.toArray(new String[0])); + } + +} diff --git a/codes/javadb/hbase/src/test/java/io/github/dunwu/javadb/hbase/HbaseTemplateGetTest.java b/codes/javadb/hbase/src/test/java/io/github/dunwu/javadb/hbase/HbaseTemplateGetTest.java new file mode 100644 index 0000000..3dc250f --- /dev/null +++ b/codes/javadb/hbase/src/test/java/io/github/dunwu/javadb/hbase/HbaseTemplateGetTest.java @@ -0,0 +1,124 @@ +package io.github.dunwu.javadb.hbase; + +import cn.hutool.core.util.StrUtil; +import cn.hutool.json.JSONUtil; +import io.github.dunwu.javadb.hbase.entity.ColumnDo; +import io.github.dunwu.javadb.hbase.entity.FamilyDo; +import io.github.dunwu.javadb.hbase.entity.RowDo; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Get 测试集 + *

+ * 测试前,先完整执行 {@link HbaseTemplatePutTest} + * + * @author Zhang Peng + * @date 2023-11-13 + */ +public class HbaseTemplateGetTest { + + public static final String TABLE_NAME = "test:test"; + + private static final HbaseTemplate HBASE_TEMPLATE; + + static { + try { + HBASE_TEMPLATE = HbaseFactory.newHbaseTemplate(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Test + @DisplayName("查询实体") + public void getEntity() throws IOException { + User user = HBASE_TEMPLATE.getEntity(TABLE_NAME, "test-key-3", "f1", User.class); + System.out.println(StrUtil.format("查询实体: {}", JSONUtil.toJsonStr(user))); + Assertions.assertThat(user).isNotNull(); + } + + @Test + @DisplayName("查询实体列表") + public void getEntityList() throws IOException { + List rows = Arrays.asList("test-key-3", "test-key-4", "test-key-5"); + List list = HBASE_TEMPLATE.getEntityList(TABLE_NAME, rows.toArray(new String[0]), "f1", User.class); + System.out.println(StrUtil.format("查询实体列表: {}", JSONUtil.toJsonStr(list))); + Assertions.assertThat(list).isNotEmpty(); + Assertions.assertThat(list.size()).isEqualTo(rows.size()); + } + + @Test + @DisplayName("查询列") + public void getColumn() throws IOException { + ColumnDo columnDo = HBASE_TEMPLATE.getColumn(TABLE_NAME, "test-key-1", "f1", "key"); + System.out.println(StrUtil.format("查询单列: {}", JSONUtil.toJsonStr(columnDo))); + } + + @Test + @DisplayName("查询多列") + public void getColumnMap() throws IOException { + Map columnMap = HBASE_TEMPLATE.getColumnMap(TABLE_NAME, "test-key-3", "f1"); + System.out.println(StrUtil.format("查询多列: {}", JSONUtil.toJsonStr(columnMap))); + Assertions.assertThat(columnMap).isNotEmpty(); + + Map columnMap2 = HBASE_TEMPLATE.getColumnMap(TABLE_NAME, "test-key-3", "f1", "id", "name"); + System.out.println(StrUtil.format("查询多列: {}", JSONUtil.toJsonStr(columnMap2))); + Assertions.assertThat(columnMap2).isNotEmpty(); + } + + @Test + @DisplayName("查询列族") + public void getFamily() throws IOException { + FamilyDo familyDo = HBASE_TEMPLATE.getFamily(TABLE_NAME, "test-key-7", "f1"); + System.out.println(StrUtil.format("查询列族: {}", JSONUtil.toJsonStr(familyDo))); + Assertions.assertThat(familyDo).isNotNull(); + Assertions.assertThat(familyDo.getFamily()).isEqualTo("f1"); + + FamilyDo familyDo2 = HBASE_TEMPLATE.getFamily(TABLE_NAME, "test-key-7", "f2"); + System.out.println(StrUtil.format("查询列族: {}", JSONUtil.toJsonStr(familyDo2))); + Assertions.assertThat(familyDo2).isNotNull(); + Assertions.assertThat(familyDo2.getFamily()).isEqualTo("f2"); + } + + @Test + @DisplayName("查询多列族") + public void getFamilyMap() throws IOException { + Map> familyColumnMap = new HashMap<>(); + familyColumnMap.put("f1", Collections.singleton("id")); + familyColumnMap.put("f2", Collections.singleton("name")); + Map familyMap = HBASE_TEMPLATE.getFamilyMap(TABLE_NAME, "test-key-7", familyColumnMap); + System.out.println(StrUtil.format("查询多列族: {}", JSONUtil.toJsonStr(familyMap))); + Assertions.assertThat(familyMap).isNotEmpty(); + Assertions.assertThat(familyMap.size()).isEqualTo(familyColumnMap.size()); + } + + @Test + @DisplayName("查询行") + public void getRow() throws IOException { + RowDo rowDo = HBASE_TEMPLATE.getRow(TABLE_NAME, "test-key-7"); + System.out.println(StrUtil.format("查询行: {}", JSONUtil.toJsonStr(rowDo))); + Assertions.assertThat(rowDo).isNotNull(); + Assertions.assertThat(rowDo.getRow()).isEqualTo("test-key-7"); + } + + @Test + @DisplayName("批量查询行记录") + public void getRowMap() throws IOException { + String[] rows = new String[] { "test-key-3", "test-key-4", "test-key-7" }; + Map rowMap = HBASE_TEMPLATE.getRowMap(TABLE_NAME, rows); + System.out.println(StrUtil.format("批量查询行记录: {}", JSONUtil.toJsonStr(rowMap))); + Assertions.assertThat(rowMap).isNotEmpty(); + Assertions.assertThat(rowMap.size()).isEqualTo(rows.length); + } + +} diff --git a/codes/javadb/hbase/src/test/java/io/github/dunwu/javadb/hbase/HbaseTemplatePutTest.java b/codes/javadb/hbase/src/test/java/io/github/dunwu/javadb/hbase/HbaseTemplatePutTest.java new file mode 100644 index 0000000..f759303 --- /dev/null +++ b/codes/javadb/hbase/src/test/java/io/github/dunwu/javadb/hbase/HbaseTemplatePutTest.java @@ -0,0 +1,105 @@ +package io.github.dunwu.javadb.hbase; + +import cn.hutool.core.bean.BeanUtil; +import cn.hutool.core.collection.CollectionUtil; +import org.apache.hadoop.hbase.client.Put; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.math.BigDecimal; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Hbase Put 测试 + * + * @author Zhang Peng + * @date 2023-11-13 + */ +public class HbaseTemplatePutTest { + + public static final String TABLE_NAME = "test:test"; + + private static final HbaseTemplate HBASE_TEMPLATE; + + static { + try { + HBASE_TEMPLATE = HbaseFactory.newHbaseTemplate(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Test + @DisplayName("put 测试 01") + public void put01() throws IOException { + long timestamp = System.currentTimeMillis(); + HBASE_TEMPLATE.put(TABLE_NAME, "test-key-0", "f1", "name", "user0"); + HBASE_TEMPLATE.put(TABLE_NAME, "test-key-1", timestamp, "f1", "name", "user1"); + } + + @Test + @DisplayName("put 测试 02") + public void put02() throws IOException { + long timestamp = System.currentTimeMillis(); + User user2 = new User(2, "user2"); + HBASE_TEMPLATE.put(TABLE_NAME, "test-key-2", "f1", user2); + User user3 = new User(3, "user3"); + HBASE_TEMPLATE.put(TABLE_NAME, "test-key-3", timestamp, "f1", user3); + } + + @Test + @DisplayName("put 测试 03") + public void put03() throws IOException { + long timestamp = System.currentTimeMillis(); + User user4 = new User(4, "user4"); + Map map = BeanUtil.beanToMap(user4); + HBASE_TEMPLATE.put(TABLE_NAME, "test-key-4", timestamp, "f1", map); + } + + @Test + @DisplayName("put 测试 04") + public void put04() throws IOException { + long timestamp = System.currentTimeMillis(); + User user5 = new User(5, "user5"); + Product product5 = new Product("test-key-5", "product5", new BigDecimal(4000.0)); + Map> familyMap = new HashMap<>(2); + Map userMap = BeanUtil.beanToMap(user5); + familyMap.put("f1", userMap); + Map productMap = BeanUtil.beanToMap(product5); + familyMap.put("f2", productMap); + HBASE_TEMPLATE.put(TABLE_NAME, "test-key-5", timestamp, familyMap); + } + + @Test + @DisplayName("put 测试 05") + public void put05() throws IOException { + Put put = HbaseTemplate.newPut("test-key-6", null, "f1", "name", "user6"); + HBASE_TEMPLATE.put(TABLE_NAME, put); + } + + @Test + @DisplayName("put 测试 06") + public void put06() throws IOException, InterruptedException { + long timestamp = System.currentTimeMillis(); + User user7 = new User(5, "user7"); + Product product7 = new Product("test-key-7", "product5", new BigDecimal(4000.0)); + Put put1 = HbaseTemplate.newPut("test-key-7", timestamp, "f1", user7); + Put put2 = HbaseTemplate.newPut("test-key-7", timestamp, "f2", product7); + List list = Arrays.asList(put1, put2); + HBASE_TEMPLATE.batchPut(TABLE_NAME, list); + } + + @Test + @DisplayName("batchPut 测试2") + public void batchPut2() throws IOException, InterruptedException { + Product product1 = new Product("test-key-8", "product8", new BigDecimal(4000.0)); + Product product2 = new Product("test-key-9", "product9", new BigDecimal(5000.0)); + List products = CollectionUtil.newArrayList(product1, product2); + HBASE_TEMPLATE.batchPut(TABLE_NAME, "f2", products); + } + +} diff --git a/codes/javadb/hbase/src/test/java/io/github/dunwu/javadb/hbase/HbaseTemplateScanTest.java b/codes/javadb/hbase/src/test/java/io/github/dunwu/javadb/hbase/HbaseTemplateScanTest.java new file mode 100644 index 0000000..e916b15 --- /dev/null +++ b/codes/javadb/hbase/src/test/java/io/github/dunwu/javadb/hbase/HbaseTemplateScanTest.java @@ -0,0 +1,183 @@ +package io.github.dunwu.javadb.hbase; + +import cn.hutool.core.collection.CollectionUtil; +import cn.hutool.core.util.ReflectUtil; +import cn.hutool.core.util.StrUtil; +import cn.hutool.json.JSONUtil; +import io.github.dunwu.javadb.hbase.entity.PageData; +import io.github.dunwu.javadb.hbase.entity.RowDo; +import io.github.dunwu.javadb.hbase.entity.ScrollData; +import io.github.dunwu.javadb.hbase.entity.scan.MultiFamilyScan; +import io.github.dunwu.javadb.hbase.entity.scan.SingleFamilyScan; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.lang.reflect.Field; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * Get 测试集 + *

+ * 测试前,先完整执行 {@link HbaseTemplatePutTest} + * + * @author Zhang Peng + * @date 2023-11-13 + */ +public class HbaseTemplateScanTest { + + public static final String TABLE_NAME = "test:test"; + + private static final HbaseTemplate HBASE_TEMPLATE; + + static { + try { + HBASE_TEMPLATE = HbaseFactory.newHbaseTemplate(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Test + @DisplayName("单列族分页查询") + public void page() throws IOException { + for (int page = 1; page <= 2; page++) { + SingleFamilyScan scan = new SingleFamilyScan(); + scan.setFamily("f1") + .setTableName(TABLE_NAME) + .setPage(page) + .setSize(2) + .setReversed(true); + PageData rowDoMap = HBASE_TEMPLATE.page(scan); + System.out.println(StrUtil.format("查询实体: {}", JSONUtil.toJsonStr(rowDoMap))); + Assertions.assertThat(rowDoMap).isNotNull(); + } + } + + @Test + @DisplayName("多列族分页查询") + public void page2() throws IOException { + Map> familyColumnMap = new HashMap<>(); + familyColumnMap.put("f1", Collections.singleton("id")); + familyColumnMap.put("f2", Collections.singleton("name")); + for (int page = 1; page <= 2; page++) { + MultiFamilyScan scan = new MultiFamilyScan(); + scan.setFamilyColumnMap(familyColumnMap) + .setTableName(TABLE_NAME) + .setPage(page) + .setSize(2) + .setReversed(true); + PageData rowDoMap = HBASE_TEMPLATE.page(scan); + System.out.println(StrUtil.format("查询实体: {}", JSONUtil.toJsonStr(rowDoMap))); + Assertions.assertThat(rowDoMap).isNotNull(); + } + } + + @Test + @DisplayName("查询实体列表") + public void getEntityPage() throws IOException { + SingleFamilyScan scan = new SingleFamilyScan(); + scan.setFamily("f1") + .setTableName(TABLE_NAME) + .setPage(1) + .setSize(2) + .setReversed(true); + PageData entityPage = HBASE_TEMPLATE.getEntityPage(scan, User.class); + System.out.println(StrUtil.format("查询实体列表: {}", JSONUtil.toJsonStr(entityPage))); + Assertions.assertThat(entityPage).isNotNull(); + } + + @Test + @DisplayName("单列族滚动查询") + public void scroll() throws IOException { + + SingleFamilyScan scan = new SingleFamilyScan(); + scan.setFamily("f1") + .setTableName(TABLE_NAME) + .setSize(1) + .setStartRow("test-key-1") + .setStopRow("test-key-9") + .setReversed(false); + ScrollData data = HBASE_TEMPLATE.scroll(scan); + System.out.println(StrUtil.format("查询实体: {}", JSONUtil.toJsonPrettyStr(data))); + Assertions.assertThat(data).isNotNull(); + scan.setScrollRow(data.getScrollRow()); + + while (true) { + ScrollData next = HBASE_TEMPLATE.scroll(scan); + if (next == null || CollectionUtil.isEmpty(next.getContent())) { + break; + } + System.out.println(StrUtil.format("查询实体: {}", JSONUtil.toJsonPrettyStr(next))); + scan.setScrollRow(next.getScrollRow()); + } + } + + @Test + @DisplayName("多列族滚动查询") + public void scroll2() throws IOException { + List userFields = Stream.of(ReflectUtil.getFields(User.class)) + .map(Field::getName).collect(Collectors.toList()); + List productFields = Stream.of(ReflectUtil.getFields(Product.class)) + .map(Field::getName).collect(Collectors.toList()); + Map> familyColumnMap = new HashMap<>(); + familyColumnMap.put("f1", userFields); + familyColumnMap.put("f2", productFields); + + MultiFamilyScan scan = new MultiFamilyScan(); + scan.setFamilyColumnMap(familyColumnMap) + .setTableName(TABLE_NAME) + .setSize(1) + .setStartRow("test-key-1") + .setStopRow("test-key-9") + .setReversed(true); + ScrollData data = HBASE_TEMPLATE.scroll(scan); + System.out.println(StrUtil.format("查询实体: {}", JSONUtil.toJsonPrettyStr(data))); + Assertions.assertThat(data).isNotNull(); + scan.setScrollRow(data.getScrollRow()); + + while (true) { + ScrollData next = HBASE_TEMPLATE.scroll(scan); + if (next == null || CollectionUtil.isEmpty(next.getContent())) { + break; + } + System.out.println(StrUtil.format("查询实体: {}", JSONUtil.toJsonPrettyStr(next))); + scan.setScrollRow(next.getScrollRow()); + } + } + + @Test + @DisplayName("滚动查询实体") + public void getEntityScroll() throws IOException { + + SingleFamilyScan scan = new SingleFamilyScan(); + scan.setFamily("f1") + .setTableName(TABLE_NAME) + .setSize(1) + .setStartRow("test-key-1") + .setStopRow("test-key-9") + .setReversed(false); + + ScrollData data = HBASE_TEMPLATE.getEntityScroll(scan, User.class); + System.out.println(StrUtil.format("查询实体: {}", JSONUtil.toJsonPrettyStr(data))); + Assertions.assertThat(data).isNotNull(); + scan.setScrollRow(data.getScrollRow()); + + while (true) { + ScrollData next = HBASE_TEMPLATE.getEntityScroll(scan, User.class); + if (next == null || CollectionUtil.isEmpty(next.getContent())) { + break; + } + System.out.println(StrUtil.format("查询实体: {}", JSONUtil.toJsonPrettyStr(next))); + scan.setScrollRow(next.getScrollRow()); + } + } + +} diff --git a/codes/javadb/hbase/src/test/java/io/github/dunwu/javadb/hbase/Product.java b/codes/javadb/hbase/src/test/java/io/github/dunwu/javadb/hbase/Product.java new file mode 100644 index 0000000..5586840 --- /dev/null +++ b/codes/javadb/hbase/src/test/java/io/github/dunwu/javadb/hbase/Product.java @@ -0,0 +1,37 @@ +package io.github.dunwu.javadb.hbase; + +import io.github.dunwu.javadb.hbase.entity.BaseHbaseEntity; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.math.BigDecimal; + +/** + * 产品实体 + * + * @author Zhang Peng + * @date 2023-11-15 + */ +@Data +@NoArgsConstructor +@AllArgsConstructor +public class Product extends BaseHbaseEntity { + + private String id; + private String name; + private BigDecimal price; + + @Override + public String getId() { + return id; + } + + @Override + public String getIdKey() { + return "id"; + } + + private static final long serialVersionUID = -2596114168690429555L; + +} diff --git a/codes/javadb/hbase/src/test/java/io/github/dunwu/javadb/hbase/ProductMapper.java b/codes/javadb/hbase/src/test/java/io/github/dunwu/javadb/hbase/ProductMapper.java new file mode 100644 index 0000000..b95c1ec --- /dev/null +++ b/codes/javadb/hbase/src/test/java/io/github/dunwu/javadb/hbase/ProductMapper.java @@ -0,0 +1,28 @@ +package io.github.dunwu.javadb.hbase; + +/** + * @author Zhang Peng + * @date 2023-11-15 + */ +public class ProductMapper extends BaseHbaseMapper { + + public ProductMapper(HbaseTemplate hbaseTemplate, HbaseAdmin hbaseAdmin) { + super(hbaseTemplate, hbaseAdmin); + } + + @Override + public String getTableName() { + return "test"; + } + + @Override + public String getFamily() { + return "f1"; + } + + @Override + public Class getEntityClass() { + return Product.class; + } + +} diff --git a/codes/javadb/hbase/src/test/java/io/github/dunwu/javadb/hbase/User.java b/codes/javadb/hbase/src/test/java/io/github/dunwu/javadb/hbase/User.java new file mode 100644 index 0000000..63cc86c --- /dev/null +++ b/codes/javadb/hbase/src/test/java/io/github/dunwu/javadb/hbase/User.java @@ -0,0 +1,16 @@ +package io.github.dunwu.javadb.hbase; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@NoArgsConstructor +@AllArgsConstructor +public class User { + + private int id; + + private String name; + +} \ No newline at end of file