HBase Client 使用示例

pull/1/head
Zhang Peng 2019-06-21 18:18:52 +08:00
parent 66acf688dd
commit 89a262a79f
9 changed files with 815 additions and 3 deletions

View File

@ -0,0 +1,62 @@
<?xml version="1.0"?>
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<modelVersion>4.0.0</modelVersion>
<groupId>io.github.dunwu</groupId>
<artifactId>javadb-hbase</artifactId>
<version>1.0.0</version>
<packaging>jar</packaging>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>1.8</java.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
<hbase.version>1.3.1</hbase.version>
<junit.version>4.12</junit.version>
<dunwu.version>0.4.1</dunwu.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
</dependency>
<dependency>
<groupId>io.github.dunwu</groupId>
<artifactId>dunwu-common</artifactId>
</dependency>
<!-- test begin -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>
<!-- test end -->
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>io.github.dunwu</groupId>
<artifactId>dunwu-common</artifactId>
<version>${dunwu.version}</version>
</dependency>
<!-- test begin -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
<!-- test end -->
</dependencies>
</dependencyManagement>
</project>

View File

@ -0,0 +1,24 @@
package io.github.dunwu.javadb;
public enum HBaseConstant {
HBASE_ZOOKEEPER_QUORUM("hbase.zookeeper.quorum"),
HBASE_ENABLE("hbase.enable"),
HBASE_MASTER("hbase.master"),
HBASE_ZOOKEEPER_PROPERTY_CLIENTPORT("hbase.zookeeper.property.clientPort"),
HBASE_HCONNECTION_THREADS_MAX("hbase.hconnection.threads.max"),
HBASE_HCONNECTION_THREADS_CORE("hbase.hconnection.threads.core"),
ZOOKEEPER_ZNODE_PARENT("zookeeper.znode.parent"),
HBASE_COLUMN_FAMILY("hbase.column.family"),
HBASE_EXECUTOR_NUM("hbase.executor.num"),
HBASE_IPC_POOL_SIZE("hbase.client.ipc.pool.size");
private String key;
HBaseConstant(String key) {
this.key = key;
}
public String key() {
return key;
}
}

View File

@ -0,0 +1,78 @@
package io.github.dunwu.javadb;
/**
* HBase Cell
*
* @author Zhang Peng
* @date 2019-03-04
*/
public class HbaseCellEntity {
private String table;
private String row;
private String colFamily;
private String col;
private String val;
public HbaseCellEntity() {}
public HbaseCellEntity(String row, String colFamily, String col, String val) {
this.row = row;
this.colFamily = colFamily;
this.col = col;
this.val = val;
}
public HbaseCellEntity(String table, String row, String colFamily, String col, String val) {
this.table = table;
this.row = row;
this.colFamily = colFamily;
this.col = col;
this.val = val;
}
public String getTable() {
return table;
}
public void setTable(String table) {
this.table = table;
}
public String getRow() {
return row;
}
public void setRow(String row) {
this.row = row;
}
public String getColFamily() {
return colFamily;
}
public void setColFamily(String colFamily) {
this.colFamily = colFamily;
}
public String getCol() {
return col;
}
public void setCol(String col) {
this.col = col;
}
public String getVal() {
return val;
}
public void setVal(String val) {
this.val = val;
}
@Override
public String toString() {
return "HbaseCellEntity{" + "table='" + table + '\'' + ", row='" + row + '\'' + ", colFamily='" + colFamily
+ '\'' + ", col='" + col + '\'' + ", val='" + val + '\'' + '}';
}
}

View File

@ -0,0 +1,379 @@
package io.github.dunwu.javadb;
import io.github.dunwu.util.base.PropertiesUtil;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
/**
* HBase
*
* @author Zhang Peng
* @date 2019-03-01
*/
public class HbaseHelper {
private HbaseProperties hbaseProperties;
private Connection connection;
private static final String FIRST_CONFIG = "classpath://config//hbase.properties";
private static final String SECOND_CONFIG = "classpath://application.properties";
public HbaseHelper() throws Exception {
// 初始化参数
Properties properties = loadConfigFile();
if (properties == null) {
throw new Exception("读取 Hbase 配置失败,无法建立连接");
}
Boolean enable = PropertiesUtil.getBoolean(properties, HBaseConstant.HBASE_ENABLE.key(), true);
if (!enable) {
return;
}
String quorum = PropertiesUtil.getString(properties, HBaseConstant.HBASE_ZOOKEEPER_QUORUM.key(), "");
String hbaseMaster = PropertiesUtil.getString(properties, HBaseConstant.HBASE_MASTER.key(), "");
String clientPort = PropertiesUtil.getString(properties,
HBaseConstant.HBASE_ZOOKEEPER_PROPERTY_CLIENTPORT.key(), "");
String znodeParent = PropertiesUtil.getString(properties, HBaseConstant.ZOOKEEPER_ZNODE_PARENT.key(), "");
String maxThreads = PropertiesUtil.getString(properties, HBaseConstant.HBASE_HCONNECTION_THREADS_MAX.key(), "");
String coreThreads = PropertiesUtil.getString(properties, HBaseConstant.HBASE_HCONNECTION_THREADS_CORE.key(),
"");
String columnFamily = PropertiesUtil.getString(properties, HBaseConstant.HBASE_COLUMN_FAMILY.key(), "");
String hbaseExecutorsNum = PropertiesUtil.getString(properties, HBaseConstant.HBASE_EXECUTOR_NUM.key(), "10");
String ipcPoolSize = PropertiesUtil.getString(properties, HBaseConstant.HBASE_IPC_POOL_SIZE.key(), "1");
hbaseProperties = new HbaseProperties(hbaseMaster, quorum, clientPort, znodeParent, maxThreads, coreThreads,
columnFamily, hbaseExecutorsNum, ipcPoolSize);
init(hbaseProperties);
}
private Properties loadConfigFile() {
Properties properties = null;
try {
properties = PropertiesUtil.loadFromFile(FIRST_CONFIG);
} catch (Exception e) {
e.printStackTrace();
}
if (properties == null) {
try {
properties = PropertiesUtil.loadFromFile(SECOND_CONFIG);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
return properties;
}
public HbaseHelper(HbaseProperties hbaseProperties) throws Exception {
this.hbaseProperties = hbaseProperties;
init(hbaseProperties);
}
private void init(HbaseProperties hbaseProperties) throws Exception {
try {
// @formatter:off
Configuration configuration = HBaseConfiguration.create();
configuration.set(HBaseConstant.HBASE_ZOOKEEPER_QUORUM.key(), hbaseProperties.getQuorum());
configuration.set(HBaseConstant.HBASE_MASTER.key(), hbaseProperties.getHbaseMaster());
configuration.set(HBaseConstant.HBASE_ZOOKEEPER_PROPERTY_CLIENTPORT.key(),
hbaseProperties.getClientPort());
configuration.set(HBaseConstant.HBASE_HCONNECTION_THREADS_MAX.key(),
hbaseProperties.getMaxThreads());
configuration.set(HBaseConstant.HBASE_HCONNECTION_THREADS_CORE.key(),
hbaseProperties.getCoreThreads());
configuration.set(HBaseConstant.ZOOKEEPER_ZNODE_PARENT.key(), hbaseProperties.getZnodeParent());
configuration.set(HBaseConstant.HBASE_COLUMN_FAMILY.key(), hbaseProperties.getColumnFamily());
configuration.set(HBaseConstant.HBASE_IPC_POOL_SIZE.key(), hbaseProperties.getIpcPoolSize());
// @formatter:on
connection = ConnectionFactory.createConnection(configuration);
} catch (Exception e) {
throw new Exception("hbase链接未创建", e);
}
}
public void destory() {
if (connection != null) {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
public HTableDescriptor[] listTables() throws Exception {
return listTables(null);
}
public HTableDescriptor[] listTables(String tableName) throws Exception {
if (connection == null) {
throw new Exception("hbase链接未创建");
}
HTableDescriptor[] hTableDescriptors = new HTableDescriptor[0];
try {
if (StringUtils.isEmpty(tableName)) {
hTableDescriptors = connection.getAdmin()
.listTables();
} else {
hTableDescriptors = connection.getAdmin()
.listTables(tableName);
}
} catch (IOException e) {
throw new Exception("执行失败", e);
}
return hTableDescriptors;
}
/**
*
* <p>
*
* <ul>
* <li>create 'tablename','family1','family2','family3'...</li>
* </ul>
*/
public void createTable(String tableName) throws Exception {
createTable(tableName, new String[] {hbaseProperties.getColumnFamily()});
}
/**
*
* <p>
*
* <ul>
* <li>create 'tablename','family1','family2','family3'...</li>
* </ul>
*/
public void createTable(String tableName, String[] colFamilies) throws Exception {
if (connection == null) {
throw new Exception("hbase链接未创建");
}
try {
TableName tablename = TableName.valueOf(tableName);
// 如果表存在,先删除
if (connection.getAdmin()
.isTableAvailable(tablename)) {
dropTable(tableName);
}
HTableDescriptor tableDescriptor = new HTableDescriptor(tablename);
for (String famliy : colFamilies) {
tableDescriptor.addFamily(new HColumnDescriptor(famliy));
}
connection.getAdmin()
.createTable(tableDescriptor);
} catch (IOException e) {
e.printStackTrace();
}
}
/**
*
* <p>
*
* <ul>
* <li>disable 'tablename'</li>
* <li>drop 't1'</li>
* </ul>
*
* @param name
*/
public void dropTable(String name) throws Exception {
if (connection == null) {
throw new Exception("hbase链接未创建");
}
Admin admin = null;
try {
admin = connection.getAdmin();
TableName tableName = TableName.valueOf(name);
// 如果表存在,先删除
if (admin.isTableAvailable(tableName)) {
admin.disableTable(tableName);
admin.deleteTable(tableName);
}
} catch (IOException e) {
e.printStackTrace();
}
}
private Put toPut(HbaseCellEntity hBaseTableDTO) throws Exception {
if (connection == null) {
throw new Exception("hbase链接未创建");
}
Put put = new Put(Bytes.toBytes(hBaseTableDTO.getRow()));
put.addColumn(Bytes.toBytes(hBaseTableDTO.getColFamily()), Bytes.toBytes(hBaseTableDTO.getCol()),
Bytes.toBytes(hBaseTableDTO.getVal()));
return put;
}
public void delete(String tableName, String rowKey) throws Exception {
if (connection == null) {
throw new Exception("hbase链接未创建");
}
Table table = null;
try {
table = connection.getTable(TableName.valueOf(tableName));
Delete delete = new Delete(Bytes.toBytes(rowKey));
table.delete(delete);
} catch (IOException e) {
e.printStackTrace();
throw new Exception("delete失败");
}
}
public String resultToString(Result result) {
if (result == null) {
return null;
}
Cell[] cells = result.rawCells();
StringBuilder sb = new StringBuilder();
for (Cell cell : cells) {
sb.append("{ ");
sb.append("RowName -> ")
.append(new String(CellUtil.cloneRow(cell)));
sb.append(", Timetamp -> ")
.append(cell.getTimestamp());
sb.append(", Column Family -> ")
.append(new String(CellUtil.cloneFamily(cell)));
sb.append(", Row Name -> ")
.append(new String(CellUtil.cloneQualifier(cell)));
sb.append(", value -> ")
.append(new String(CellUtil.cloneValue(cell)));
sb.append(" }\n");
}
return sb.toString();
}
public Result get(String tableName, String rowKey) throws Exception {
return get(tableName, rowKey, null, null);
}
public Result get(String tableName, String rowKey, String colFamily, String qualifier) throws Exception {
if (connection == null) {
throw new Exception("hbase链接未创建");
}
if (connection.isClosed()) {
throw new Exception("hbase 连接已关闭");
}
if (StringUtils.isEmpty(tableName) || StringUtils.isEmpty(rowKey)) {
return null;
}
Result result = null;
try {
Table table = connection.getTable(TableName.valueOf(tableName));
Get get = new Get(Bytes.toBytes(rowKey));
if (StringUtils.isNotEmpty(colFamily)) {
if (StringUtils.isNotEmpty(qualifier)) {
get.addColumn(Bytes.toBytes(colFamily), Bytes.toBytes(qualifier));
} else {
get.addFamily(Bytes.toBytes(colFamily));
}
}
result = table.get(get);
} catch (IOException e) {
throw new Exception("查询时发生异常");
}
return result;
}
public Result get(String tableName, String rowKey, String colFamily) throws Exception {
return get(tableName, rowKey, colFamily, null);
}
public Result[] scan(String tableName) throws Exception {
return scan(tableName, null, null, null, null);
}
public Result[] scan(String tableName, String colFamily, String qualifier, String startRow, String stopRow)
throws Exception {
if (connection == null) {
throw new Exception("hbase链接未创建");
}
if (StringUtils.isEmpty(tableName)) {
return null;
}
ResultScanner resultScanner = null;
List<Result> list = new ArrayList<>();
try {
Table table = connection.getTable(TableName.valueOf(tableName));
Scan scan = new Scan();
if (StringUtils.isNotEmpty(colFamily)) {
if (StringUtils.isNotEmpty(qualifier)) {
scan.addColumn(Bytes.toBytes(colFamily), Bytes.toBytes(qualifier));
}
scan.addFamily(Bytes.toBytes(colFamily));
}
if (StringUtils.isNotEmpty(startRow)) {
scan.setStartRow(Bytes.toBytes(startRow));
}
if (StringUtils.isNotEmpty(stopRow)) {
scan.setStopRow(Bytes.toBytes(stopRow));
}
resultScanner = table.getScanner(scan);
Result result = resultScanner.next();
while (result != null) {
list.add(result);
result = resultScanner.next();
}
} catch (IOException e) {
e.printStackTrace();
} finally {
if (resultScanner != null) {
resultScanner.close();
}
}
return list.toArray(new Result[0]);
}
public Result[] scan(String tableName, String colFamily) throws Exception {
return scan(tableName, colFamily, null, null, null);
}
public Result[] scan(String tableName, String colFamily, String qualifier) throws Exception {
return scan(tableName, colFamily, qualifier, null, null);
}
private List<Result> resultScannerToResults(ResultScanner resultScanner) {
if (resultScanner == null) {
return null;
}
List<Result> list = new ArrayList<>();
Result result = null;
try {
result = resultScanner.next();
while (result != null) {
list.add(result);
result = resultScanner.next();
}
} catch (IOException e) {
e.printStackTrace();
}
return list;
}
public HbaseProperties getHbaseProperties() {
return hbaseProperties;
}
}

View File

@ -0,0 +1,126 @@
package io.github.dunwu.javadb;
import java.io.Serializable;
/**
* Hbase
* @author Zhang Peng
*/
public class HbaseProperties implements Serializable {
private static final long serialVersionUID = 2930639554689310736L;
private String hbaseMaster;
private String quorum;
private String clientPort;
private String znodeParent;
private String maxThreads;
private String coreThreads;
private String columnFamily;
private String hbaseExecutorsNum = "10";
private String ipcPoolSize;
public HbaseProperties() {
}
public HbaseProperties(String hbaseMaster, String quorum, String clientPort, String znodeParent, String maxThreads,
String coreThreads, String columnFamily, String hbaseExecutorsNum, String ipcPoolSize) {
this.hbaseMaster = hbaseMaster;
this.quorum = quorum;
this.clientPort = clientPort;
this.znodeParent = znodeParent;
this.maxThreads = maxThreads;
this.coreThreads = coreThreads;
this.columnFamily = columnFamily;
this.hbaseExecutorsNum = hbaseExecutorsNum;
this.ipcPoolSize = ipcPoolSize;
}
public String getHbaseMaster() {
return hbaseMaster;
}
public void setHbaseMaster(String hbaseMaster) {
this.hbaseMaster = hbaseMaster;
}
public String getQuorum() {
return quorum;
}
public void setQuorum(String quorum) {
this.quorum = quorum;
}
public String getClientPort() {
return clientPort;
}
public void setClientPort(String clientPort) {
this.clientPort = clientPort;
}
public String getZnodeParent() {
return znodeParent;
}
public void setZnodeParent(String znodeParent) {
this.znodeParent = znodeParent;
}
public String getMaxThreads() {
return maxThreads;
}
public void setMaxThreads(String maxThreads) {
this.maxThreads = maxThreads;
}
public String getCoreThreads() {
return coreThreads;
}
public void setCoreThreads(String coreThreads) {
this.coreThreads = coreThreads;
}
public String getColumnFamily() {
return columnFamily;
}
public void setColumnFamily(String columnFamily) {
this.columnFamily = columnFamily;
}
public String getHbaseExecutorsNum() {
return hbaseExecutorsNum;
}
public void setHbaseExecutorsNum(String hbaseExecutorsNum) {
this.hbaseExecutorsNum = hbaseExecutorsNum;
}
public String getIpcPoolSize() {
return ipcPoolSize;
}
public void setIpcPoolSize(String ipcPoolSize) {
this.ipcPoolSize = ipcPoolSize;
}
@Override
public String toString() {
return "HbaseProperties{" + "quorum='" + quorum + '\'' + ", clientPort='" + clientPort + '\''
+ ", znodeParent='" + znodeParent + '\'' + ", maxThreads='" + maxThreads + '\'' + ", coreThreads='"
+ coreThreads + '\'' + ", columnFamily='" + columnFamily + '\'' + ", hbaseExecutorsNum='"
+ hbaseExecutorsNum + '\'' + '}';
}
}

View File

@ -0,0 +1,109 @@
package io.github.dunwu.javadb;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.Result;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
/**
* @author Zhang Peng
* @date 2019-03-29
*/
public class HbaseHelperTest {
private static HbaseHelper hbaseHelper;
@BeforeClass
public static void BeforeClass() {
try {
hbaseHelper = new HbaseHelper();
} catch (Exception e) {
e.printStackTrace();
}
}
@Test
public void listTable() throws Exception {
HTableDescriptor[] hTableDescriptors = hbaseHelper.listTables();
if (hTableDescriptors == null || hTableDescriptors.length <= 0) {
Assert.fail();
}
System.out.println("Tables");
for (HTableDescriptor item : hTableDescriptors) {
System.out.println(item.getTableName());
}
}
@Test
public void createTable() throws Exception {
hbaseHelper.createTable("table1", new String[] {"columnFamliy1", "columnFamliy2"});
HTableDescriptor[] table1s = hbaseHelper.listTables("table1");
if (table1s == null || table1s.length <= 0) {
Assert.fail();
}
hbaseHelper.createTable("table2", new String[] {"columnFamliy1", "columnFamliy2"});
table1s = hbaseHelper.listTables("table2");
if (table1s == null || table1s.length <= 0) {
Assert.fail();
}
}
@Test
public void dropTable() throws Exception {
hbaseHelper.dropTable("table1");
HTableDescriptor[] table1s = hbaseHelper.listTables("table1");
if (table1s != null && table1s.length > 0) {
Assert.fail();
}
}
@Test
public void get() throws Exception {
Result result = hbaseHelper.get("table1", "row1");
System.out.println(hbaseHelper.resultToString(result));
result = hbaseHelper.get("table1", "row2", "columnFamliy1");
System.out.println(hbaseHelper.resultToString(result));
}
@Test
public void scan() throws Exception {
Result[] results = hbaseHelper.scan("table1");
System.out.println("HbaseUtil.scan(\"table1\") result: ");
if (results.length > 0) {
for (Result r : results) {
System.out.println(hbaseHelper.resultToString(r));
}
}
results = hbaseHelper.scan("table1", "columnFamliy1");
System.out.println("HbaseUtil.scan(\"table1\", \"columnFamliy1\" result: ");
if (results.length > 0) {
for (Result r : results) {
System.out.println(hbaseHelper.resultToString(r));
}
}
results = hbaseHelper.scan("table1", "columnFamliy1", "a");
System.out.println("HbaseUtil.scan(\"table1\", \"columnFamliy1\", \"a\") result: ");
if (results.length > 0) {
for (Result r : results) {
System.out.println(hbaseHelper.resultToString(r));
}
}
}
@Test
public void delete() throws Exception {
Result result = hbaseHelper.get("table1", "row1");
System.out.println(result.toString());
hbaseHelper.delete("table1", "row1");
result = hbaseHelper.get("table1", "row1");
System.out.println(result.toString());
}
}

View File

@ -0,0 +1,7 @@
hbase.enable = true
hbase.zookeeper.quorum = localhost,xxxx,xxxx
hbase.zookeeper.property.clientPort = 2181
zookeeper.znode.parent = /hbase
hbase.hconnection.threads.max = 256
hbase.hconnection.threads.core = 32
hbase.column.family = F

View File

@ -9,9 +9,8 @@
<version>2.1.1.RELEASE</version>
</parent>
<groupId>io.github.dunwu.db</groupId>
<artifactId>sqlite-demo</artifactId>
<name>SQLite Demo</name>
<groupId>io.github.dunwu</groupId>
<artifactId>javadb-sqlite</artifactId>
<version>1.0.0</version>
<packaging>jar</packaging>
@ -45,6 +44,16 @@
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<mainClass>io.github.dunwu.db.SqliteApplication</mainClass>
</configuration>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

View File

@ -0,0 +1,18 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>io.github.dunwu</groupId>
<artifactId>javadb</artifactId>
<version>1.0.0</version>
<packaging>pom</packaging>
<modules>
<module>javadb-h2</module>
<module>javadb-hbase</module>
<module>javadb-mysql</module>
<module>javadb-redis</module>
<module>javadb-sqlite</module>
</modules>
</project>