feat: 更新 elasticsearch 示例

master
dunwu 2024-02-21 22:32:55 +08:00
parent a1d04b36e4
commit 14b5853e4c
18 changed files with 1010 additions and 259 deletions

View File

@ -3,6 +3,12 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>o
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.7</version>
</parent>
<groupId>io.github.dunwu</groupId>
<artifactId>javadb-elasticsearch6</artifactId>
<version>1.0.0</version>
@ -16,6 +22,36 @@
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.8.8</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.elasticsearch.client</groupId>
@ -23,31 +59,11 @@
<version>6.4.3</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.22</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.7.20</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.15.2</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.10</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-to-slf4j</artifactId>
<version>2.17.1</version>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>6.4.3</version>
</dependency>
</dependencies>
</dependencyManagement>
</project>

View File

@ -2,8 +2,6 @@ package io.github.dunwu.javadb.elasticsearch;
import io.github.dunwu.javadb.elasticsearch.entity.User;
import io.github.dunwu.javadb.elasticsearch.mapper.UserEsMapper;
import io.github.dunwu.javadb.elasticsearch.util.ElasticsearchUtil;
import org.elasticsearch.client.RestHighLevelClient;
import java.io.IOException;
import java.util.Arrays;
@ -11,12 +9,13 @@ import java.util.List;
public class Demo {
private static final String HOSTS = "127.0.0.1:9200";
private static final RestHighLevelClient restHighLevelClient = ElasticsearchUtil.newRestHighLevelClient(HOSTS);
private static final String env = "test";
private static final ElasticsearchTemplate elasticsearchTemplate
= ElasticsearchFactory.newElasticsearchTemplate(env);
public static void main(String[] args) throws IOException, InterruptedException {
UserEsMapper mapper = new UserEsMapper(restHighLevelClient);
UserEsMapper mapper = new UserEsMapper(elasticsearchTemplate);
System.out.println("索引是否存在:" + mapper.isIndexExists());
@ -24,7 +23,7 @@ public class Demo {
User tom = User.builder().id(2L).username("tom").age(20).build();
List<User> users = Arrays.asList(jack, tom);
System.out.println("批量插入:" + mapper.batchInsert(users));
System.out.println("批量插入:" + mapper.batchSave(users));
System.out.println("根据ID查询" + mapper.getById("1").toString());
System.out.println("根据ID查询" + mapper.pojoById("2").toString());
System.out.println("根据ID批量查询" + mapper.pojoListByIds(Arrays.asList("1", "2")).toString());

View File

@ -0,0 +1,166 @@
package io.github.dunwu.javadb.elasticsearch;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.util.ArrayUtil;
import cn.hutool.core.util.StrUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import java.util.List;
import java.util.stream.Collectors;
/**
* Elasticsearch
*
* @author <a href="mailto:forbreak@163.com">Zhang Peng</a>
* @date 2024-02-07
*/
@Slf4j
public class ElasticsearchFactory {
public static int CONNECT_TIMEOUT_MILLIS = 1000;
public static int SOCKET_TIMEOUT_MILLIS = 30000;
public static int CONNECTION_REQUEST_TIMEOUT_MILLIS = 500;
public static int MAX_CONN_TOTAL = 30;
public static int MAX_CONN_PER_ROUTE = 10;
public static RestClient newRestClient() {
// 从配置中心读取环境变量
String env = "test";
return newRestClient(env);
}
public static RestClient newRestClient(String env) {
String hosts = getDefaultEsAddress(env);
return newRestClient(toHttpHostList(hosts));
}
public static RestClient newRestClient(HttpHost[] httpHosts) {
RestClientBuilder builder = getRestClientBuilder(httpHosts);
if (builder == null) {
return null;
}
try {
return builder.build();
} catch (Exception e) {
log.error("【ES】connect failed.", e);
return null;
}
}
public static RestHighLevelClient newRestHighLevelClient() {
// 从配置中心读取环境变量
String env = "test";
return newRestHighLevelClient(env);
}
public static RestHighLevelClient newRestHighLevelClient(String env) {
String hosts = getDefaultEsAddress(env);
return newRestHighLevelClient(toHttpHostList(hosts));
}
public static RestHighLevelClient newRestHighLevelClient(HttpHost[] httpHosts) {
RestClientBuilder builder = getRestClientBuilder(httpHosts);
if (builder == null) {
return null;
}
try {
return new RestHighLevelClient(builder);
} catch (Exception e) {
log.error("【ES】connect failed.", e);
return null;
}
}
public static ElasticsearchTemplate newElasticsearchTemplate() {
// 从配置中心读取环境变量
String env = "test";
return newElasticsearchTemplate(env);
}
public static ElasticsearchTemplate newElasticsearchTemplate(String env) {
String hosts = getDefaultEsAddress(env);
return newElasticsearchTemplate(toHttpHostList(hosts));
}
public static ElasticsearchTemplate newElasticsearchTemplate(HttpHost[] httpHosts) {
RestHighLevelClient client = newRestHighLevelClient(httpHosts);
if (client == null) {
return null;
}
return new ElasticsearchTemplate(client);
}
public static ElasticsearchTemplate newElasticsearchTemplate(RestHighLevelClient client) {
if (client == null) {
return null;
}
return new ElasticsearchTemplate(client);
}
public static RestClientBuilder getRestClientBuilder(HttpHost[] httpHosts) {
if (ArrayUtil.isEmpty(httpHosts)) {
log.error("【ES】connect failed. hosts are empty.");
return null;
}
RestClientBuilder restClientBuilder = RestClient.builder(httpHosts);
restClientBuilder.setRequestConfigCallback(builder -> {
builder.setConnectTimeout(CONNECT_TIMEOUT_MILLIS);
builder.setSocketTimeout(SOCKET_TIMEOUT_MILLIS);
builder.setConnectionRequestTimeout(CONNECTION_REQUEST_TIMEOUT_MILLIS);
return builder;
});
restClientBuilder.setHttpClientConfigCallback(builder -> {
builder.setMaxConnTotal(MAX_CONN_TOTAL);
builder.setMaxConnPerRoute(MAX_CONN_PER_ROUTE);
return builder;
});
return restClientBuilder;
}
private static HttpHost[] toHttpHostList(String hosts) {
if (StrUtil.isBlank(hosts)) {
return null;
}
List<String> strList = StrUtil.split(hosts, ",");
List<HttpHost> list = strList.stream().map(str -> {
List<String> params = StrUtil.split(str, ":");
return new HttpHost(params.get(0), Integer.parseInt(params.get(1)), "http");
}).collect(Collectors.toList());
if (CollectionUtil.isEmpty(list)) {
return new HttpHost[0];
}
return list.toArray(new HttpHost[0]);
}
public static String getDefaultEsAddress() {
// 从配置中心读取环境变量
String env = "test";
return getDefaultEsAddress(env);
}
private static String getDefaultEsAddress(String env) {
String defaultAddress;
switch (env) {
case "prd":
defaultAddress = "127.0.0.1:9200,127.0.0.2:9200,127.0.0.3:9200";
break;
case "pre":
defaultAddress = "127.0.0.1:9200";
break;
case "test":
default:
defaultAddress = "127.0.0.1:9200";
break;
}
return defaultAddress;
}
}

View File

@ -1,13 +1,14 @@
package io.github.dunwu.javadb.elasticsearch.util;
package io.github.dunwu.javadb.elasticsearch;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.io.IoUtil;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.ArrayUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONUtil;
import io.github.dunwu.javadb.elasticsearch.entity.EsEntity;
import io.github.dunwu.javadb.elasticsearch.entity.BaseEsEntity;
import io.github.dunwu.javadb.elasticsearch.entity.Page;
import io.github.dunwu.javadb.elasticsearch.util.JsonUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpHost;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.bulk.BackoffPolicy;
@ -15,6 +16,7 @@ import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.get.MultiGetItemResponse;
import org.elasticsearch.action.get.MultiGetRequest;
@ -23,28 +25,26 @@ import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.Requests;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
@ -57,37 +57,19 @@ import java.util.stream.Stream;
* @date 2023-06-27
*/
@Slf4j
public class ElasticsearchUtil {
public class ElasticsearchTemplate implements Closeable {
public static int CONNECT_TIMEOUT_MILLIS = 1000;
public static int SOCKET_TIMEOUT_MILLIS = 30000;
public static int CONNECTION_REQUEST_TIMEOUT_MILLIS = 500;
public static int MAX_CONN_TOTAL = 30;
public static int MAX_CONN_PER_ROUTE = 10;
private final RestHighLevelClient client;
public static RestClient newRestClient(String hosts) {
HttpHost[] httpHosts = toHttpHostList(hosts);
RestClientBuilder builder = builder(httpHosts);
try {
return builder.build();
} catch (Exception e) {
log.error("【ES】connect failed.", e);
return null;
}
public ElasticsearchTemplate(RestHighLevelClient client) {
this.client = client;
}
public static RestHighLevelClient newRestHighLevelClient(String hosts) {
HttpHost[] httpHosts = toHttpHostList(hosts);
RestClientBuilder builder = builder(httpHosts);
try {
return new RestHighLevelClient(builder);
} catch (Exception e) {
log.error("【ES】connect failed.", e);
return null;
}
public RestHighLevelClient getClient() {
return client;
}
public static BulkProcessor newAsyncBulkProcessor(RestHighLevelClient client) {
public BulkProcessor newAsyncBulkProcessor() {
BulkProcessor.Listener listener = new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
@ -96,7 +78,7 @@ public class ElasticsearchUtil {
@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
if (response.hasFailures()) {
log.error("Bulk [{}] executed with failures,response = {}", executionId,
log.error("【ES】Bulk [{}] executed with failures,response = {}", executionId,
response.buildFailureMessage());
}
}
@ -105,90 +87,87 @@ public class ElasticsearchUtil {
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
}
};
int bulkTimeout = 30;
int bulkActions = 1000;
int bulkSize = 5;
int concurrentRequests = 2;
int flushInterval = 1000;
int retryInterval = 100;
int retryLimit = 3;
BiConsumer<BulkRequest, ActionListener<BulkResponse>> bulkConsumer =
(request, bulkListener) -> client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener);
BackoffPolicy backoffPolicy =
BackoffPolicy.constantBackoff(TimeValue.timeValueMillis(retryInterval), retryLimit);
BulkProcessor bulkProcessor = BulkProcessor.builder(bulkConsumer, listener)
// 1000条数据请求执行一次bulk
.setBulkActions(1000)
.setBulkActions(bulkActions)
// 5mb的数据刷新一次bulk
.setBulkSize(new ByteSizeValue(5L, ByteSizeUnit.MB))
.setBulkSize(new ByteSizeValue(bulkSize, ByteSizeUnit.MB))
// 并发请求数量, 0不并发, 1并发允许执行
.setConcurrentRequests(2)
// 固定1s必须刷新一次
.setFlushInterval(TimeValue.timeValueMillis(1000L))
// 重试3次间隔100ms
.setBackoffPolicy(
BackoffPolicy.constantBackoff(TimeValue.timeValueMillis(200L),
3)).build();
.setConcurrentRequests(concurrentRequests)
// 刷新间隔时间
.setFlushInterval(TimeValue.timeValueMillis(flushInterval))
// 重试次数、间隔时间
.setBackoffPolicy(backoffPolicy).build();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
bulkProcessor.flush();
bulkProcessor.awaitClose(30, TimeUnit.SECONDS);
bulkProcessor.awaitClose(bulkTimeout, TimeUnit.SECONDS);
} catch (Exception e) {
log.error("Failed to close bulkProcessor", e);
log.error("【ES】Failed to close bulkProcessor", e);
}
log.info("bulkProcessor closed!");
log.info("【ES】bulkProcessor closed!");
}));
return bulkProcessor;
}
public static HttpHost[] toHttpHostList(String hosts) {
if (StrUtil.isBlank(hosts)) {
public <T extends BaseEsEntity> T save(String index, String type, T entity) throws IOException {
if (entity == null) {
log.warn("【ES】save 实体为空!");
return null;
}
List<String> strList = StrUtil.split(hosts, ",");
List<HttpHost> list = strList.stream().map(str -> {
List<String> params = StrUtil.split(str, ":");
return new HttpHost(params.get(0), Integer.parseInt(params.get(1)), "http");
}).collect(Collectors.toList());
if (CollectionUtil.isEmpty(list)) {
return new HttpHost[0];
}
return list.toArray(new HttpHost[0]);
}
public static RestClientBuilder builder(HttpHost[] httpHosts) {
RestClientBuilder restClientBuilder = RestClient.builder(httpHosts);
restClientBuilder.setRequestConfigCallback(builder -> {
builder.setConnectTimeout(CONNECT_TIMEOUT_MILLIS);
builder.setSocketTimeout(SOCKET_TIMEOUT_MILLIS);
builder.setConnectionRequestTimeout(CONNECTION_REQUEST_TIMEOUT_MILLIS);
return builder;
});
restClientBuilder.setHttpClientConfigCallback(builder -> {
builder.setMaxConnTotal(MAX_CONN_TOTAL);
builder.setMaxConnPerRoute(MAX_CONN_PER_ROUTE);
return builder;
});
return restClientBuilder;
}
public static <T extends EsEntity> String insert(RestHighLevelClient client, String index, String type, T entity)
throws IOException {
Map<String, Object> map = toMap(entity);
if (MapUtil.isEmpty(map)) {
log.warn("【ES】save 实体数据为空!");
return null;
}
IndexRequest request = new IndexRequest(index, type).source(map);
if (entity.getDocId() != null) {
request.id(entity.getDocId());
}
IndexResponse response = client.index(request, RequestOptions.DEFAULT);
if (response != null && response.getResult() == DocWriteResponse.Result.CREATED) {
return response.getId();
if (response == null) {
log.warn("【ES】save 响应结果为空!");
return null;
}
if (response.getResult() == DocWriteResponse.Result.CREATED
|| response.getResult() == DocWriteResponse.Result.UPDATED) {
return entity;
} else {
log.warn("【ES】save 响应结果无效result: {}", response.getResult());
return null;
}
}
public static <T extends EsEntity> boolean batchInsert(RestHighLevelClient client, String index, String type,
Collection<T> list) throws IOException {
public <T extends BaseEsEntity> boolean batchSave(String index, String type, Collection<T> list)
throws IOException {
if (CollectionUtil.isEmpty(list)) {
return true;
}
BulkRequest bulkRequest = new BulkRequest();
bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
for (T entity : list) {
Map<String, Object> map = ElasticsearchUtil.toMap(entity);
Map<String, Object> map = toMap(entity);
if (MapUtil.isEmpty(map)) {
continue;
}
IndexRequest request = new IndexRequest(index, type).source(map);
if (entity.getDocId() != null) {
request.id(entity.getDocId());
@ -200,16 +179,20 @@ public class ElasticsearchUtil {
return response != null && !response.hasFailures();
}
public static <T extends EsEntity> void asyncBatchInsert(RestHighLevelClient client, String index, String type,
Collection<T> list, ActionListener<BulkResponse> listener) {
public <T extends BaseEsEntity> void asyncBatchSave(String index, String type, Collection<T> list,
ActionListener<BulkResponse> listener) {
if (CollectionUtil.isEmpty(list)) {
return;
}
BulkRequest bulkRequest = new BulkRequest();
bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
for (T entity : list) {
Map<String, Object> map = ElasticsearchUtil.toMap(entity);
Map<String, Object> map = toMap(entity);
if (MapUtil.isEmpty(map)) {
continue;
}
IndexRequest request = new IndexRequest(index, type).source(map);
if (entity.getDocId() != null) {
request.id(entity.getDocId());
@ -220,84 +203,107 @@ public class ElasticsearchUtil {
client.bulkAsync(bulkRequest, RequestOptions.DEFAULT, listener);
}
public static <T extends EsEntity> boolean updateById(RestHighLevelClient client, String index, String type,
T entity) throws IOException {
public <T extends BaseEsEntity> T updateById(String index, String type, T entity) throws IOException {
if (entity == null) {
log.warn("【ES】updateById 实体为空!");
return null;
}
if (entity.getDocId() == null) {
log.warn("【ES】updateById docId 为空!");
return null;
}
Map<String, Object> map = toMap(entity);
if (MapUtil.isEmpty(map)) {
log.warn("【ES】updateById 实体数据为空!");
return null;
}
UpdateRequest request = new UpdateRequest(index, type, entity.getDocId()).doc(map);
UpdateResponse response = client.update(request, RequestOptions.DEFAULT);
if (response == null) {
log.warn("【ES】updateById 响应结果为空!");
return null;
}
if (response.getResult() == DocWriteResponse.Result.UPDATED) {
return entity;
} else {
log.warn("【ES】updateById 响应结果无效result: {}", response.getResult());
return null;
}
}
public <T extends BaseEsEntity> boolean batchUpdateById(String index, String type, Collection<T> list)
throws IOException {
if (CollectionUtil.isEmpty(list)) {
return true;
}
BulkRequest bulkRequest = toUpdateBulkRequest(index, type, list);
BulkResponse response = client.bulk(bulkRequest, RequestOptions.DEFAULT);
return response != null && !response.hasFailures();
}
public <T extends BaseEsEntity> void asyncBatchUpdateById(String index, String type, Collection<T> list,
ActionListener<BulkResponse> listener) {
if (CollectionUtil.isEmpty(list)) {
return;
}
BulkRequest bulkRequest = toUpdateBulkRequest(index, type, list);
client.bulkAsync(bulkRequest, RequestOptions.DEFAULT, listener);
}
private <T extends BaseEsEntity> BulkRequest toUpdateBulkRequest(String index, String type, Collection<T> list) {
BulkRequest bulkRequest = new BulkRequest();
bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
for (T entity : list) {
if (entity == null || entity.getDocId() == null) {
continue;
}
Map<String, Object> map = toMap(entity);
if (MapUtil.isEmpty(map)) {
continue;
}
UpdateRequest request = new UpdateRequest(index, type, entity.getDocId()).doc(map);
bulkRequest.add(request);
}
return bulkRequest;
}
public boolean deleteById(String index, String type, String id) throws IOException {
return batchDeleteById(index, type, Collections.singleton(id));
}
public boolean batchDeleteById(String index, String type, Collection<String> ids) throws IOException {
if (CollectionUtil.isEmpty(ids)) {
return true;
}
BulkRequest bulkRequest = new BulkRequest();
bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
ids.stream().filter(Objects::nonNull).forEach(id -> {
DeleteRequest deleteRequest = new DeleteRequest(index, type, id);
bulkRequest.add(deleteRequest);
});
BulkResponse response = client.bulk(bulkRequest, RequestOptions.DEFAULT);
if (response == null) {
log.warn("【ES】batchDeleteById 响应结果为空!");
return false;
}
Map<String, Object> map = toMap(entity);
UpdateRequest request = new UpdateRequest(index, type, entity.getDocId()).doc(map);
UpdateResponse response = client.update(request, RequestOptions.DEFAULT);
return response != null && response.getResult() == DocWriteResponse.Result.UPDATED;
return !response.hasFailures();
}
public static <T extends EsEntity> boolean batchUpdateById(RestHighLevelClient client, String index, String type,
Collection<T> list) throws IOException {
if (CollectionUtil.isEmpty(list)) {
return true;
}
BulkRequest bulkRequest = new BulkRequest();
for (T entity : list) {
if (entity == null || entity.getDocId() == null) {
continue;
}
Map<String, Object> map = ElasticsearchUtil.toMap(entity);
UpdateRequest request = new UpdateRequest(index, type, entity.getDocId()).doc(map);
bulkRequest.add(request);
}
BulkResponse response = client.bulk(bulkRequest, RequestOptions.DEFAULT);
return response != null && !response.hasFailures();
}
public static <T extends EsEntity> void asyncBatchUpdateById(RestHighLevelClient client, String index,
String type, Collection<T> list, ActionListener<BulkResponse> listener) {
if (CollectionUtil.isEmpty(list)) {
return;
}
BulkRequest bulkRequest = new BulkRequest();
for (T entity : list) {
if (entity == null || entity.getDocId() == null) {
continue;
}
Map<String, Object> map = ElasticsearchUtil.toMap(entity);
UpdateRequest request = new UpdateRequest(index, type, entity.getDocId()).doc(map);
bulkRequest.add(request);
}
client.bulkAsync(bulkRequest, RequestOptions.DEFAULT, listener);
}
public static boolean deleteById(RestHighLevelClient client, String index, String type, String id)
throws IOException {
return batchDeleteById(client, index, type, Collections.singleton(id));
}
public static boolean batchDeleteById(RestHighLevelClient client, String index, String type, Collection<String> ids)
throws IOException {
if (CollectionUtil.isEmpty(ids)) {
return true;
}
BulkRequest bulkRequest = new BulkRequest();
ids.forEach(id -> {
DeleteRequest deleteRequest = new DeleteRequest(index, type, id);
bulkRequest.add(deleteRequest);
});
BulkResponse response = client.bulk(bulkRequest, RequestOptions.DEFAULT);
return response != null && !response.hasFailures();
}
public static void asyncBatchDeleteById(RestHighLevelClient client, String index, String type,
Collection<String> ids, ActionListener<BulkResponse> listener) {
public void asyncBatchDeleteById(String index, String type, Collection<String> ids,
ActionListener<BulkResponse> listener) {
if (CollectionUtil.isEmpty(ids)) {
return;
@ -312,31 +318,32 @@ public class ElasticsearchUtil {
client.bulkAsync(bulkRequest, RequestOptions.DEFAULT, listener);
}
public static SearchResponse getById(RestHighLevelClient client, String index, String type, String id)
throws IOException {
SearchRequest searchRequest = Requests.searchRequest(index).types(type);
QueryBuilder queryBuilder = QueryBuilders.idsQuery().addIds(id);
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.query(queryBuilder);
searchRequest.source(sourceBuilder);
return client.search(searchRequest, RequestOptions.DEFAULT);
public GetResponse getById(String index, String type, String id) throws IOException {
return getById(index, type, id, null);
}
public static <T> T pojoById(RestHighLevelClient client, String index, String type, String id, Class<T> clazz)
throws IOException {
SearchResponse response = getById(client, index, type, id);
public GetResponse getById(String index, String type, String id, Long version) throws IOException {
GetRequest getRequest = new GetRequest(index, type, id);
if (version != null) {
getRequest.version(version);
}
return client.get(getRequest, RequestOptions.DEFAULT);
}
public <T> T pojoById(String index, String type, String id, Class<T> clazz) throws IOException {
return pojoById(index, type, id, null, clazz);
}
public <T> T pojoById(String index, String type, String id, Long version, Class<T> clazz) throws IOException {
GetResponse response = getById(index, type, id, version);
if (response == null) {
return null;
}
List<T> list = ElasticsearchUtil.toPojoList(response, clazz);
if (CollectionUtil.isEmpty(list)) {
return null;
}
return list.get(0);
return toPojo(response, clazz);
}
public static <T> List<T> pojoListByIds(RestHighLevelClient client, String index, String type,
Collection<String> ids, Class<T> clazz) throws IOException {
public <T> List<T> pojoListByIds(String index, String type, Collection<String> ids, Class<T> clazz)
throws IOException {
if (CollectionUtil.isEmpty(ids)) {
return null;
@ -359,7 +366,7 @@ public class ElasticsearchUtil {
if (itemResponse.isFailed()) {
log.error("通过id获取文档失败", itemResponse.getFailure().getFailure());
} else {
T entity = ElasticsearchUtil.toPojo(itemResponse.getResponse(), clazz);
T entity = toPojo(itemResponse.getResponse(), clazz);
if (entity != null) {
list.add(entity);
}
@ -368,36 +375,51 @@ public class ElasticsearchUtil {
return list;
}
public static <T> Page<T> pojoPage(RestHighLevelClient client, String index, String type,
SearchSourceBuilder builder, Class<T> clazz) throws IOException {
SearchResponse response = query(client, index, type, builder);
public <T> Page<T> pojoPage(String index, String type, SearchSourceBuilder builder, Class<T> clazz)
throws IOException {
SearchResponse response = query(index, type, builder);
if (response == null || response.status() != RestStatus.OK) {
return null;
}
List<T> content = toPojoList(response, clazz);
SearchHits searchHits = response.getHits();
return new Page<>(searchHits.getTotalHits(), builder.from(), builder.size(), content);
int offset = builder.from();
int size = builder.size();
int page = offset / size + (offset % size == 0 ? 0 : 1) + 1;
return new Page<>(page, size, searchHits.getTotalHits(), content);
}
public static SearchResponse query(RestHighLevelClient client, String index, String type,
SearchSourceBuilder builder) throws IOException {
public long count(String index, String type, SearchSourceBuilder builder) throws IOException {
SearchResponse response = query(index, type, builder);
if (response == null || response.status() != RestStatus.OK) {
return -1L;
}
SearchHits searchHits = response.getHits();
return searchHits.getTotalHits();
}
public SearchResponse query(String index, String type, SearchSourceBuilder builder) throws IOException {
SearchRequest request = new SearchRequest(index).types(type);
request.source(builder);
return client.search(request, RequestOptions.DEFAULT);
}
public static <T> T toPojo(GetResponse response, Class<T> clazz) {
public SearchResponse query(SearchRequest request) throws IOException {
return client.search(request, RequestOptions.DEFAULT);
}
public <T> T toPojo(GetResponse response, Class<T> clazz) {
if (null == response) {
return null;
} else if (StrUtil.isBlank(response.getSourceAsString())) {
return null;
} else {
return JSONUtil.toBean(response.getSourceAsString(), clazz);
return JsonUtil.toBean(response.getSourceAsString(), clazz);
}
}
public static <T> List<T> toPojoList(SearchResponse response, Class<T> clazz) {
public <T> List<T> toPojoList(SearchResponse response, Class<T> clazz) {
if (response == null || response.status() != RestStatus.OK) {
return new ArrayList<>();
@ -408,12 +430,20 @@ public class ElasticsearchUtil {
}
return Stream.of(response.getHits().getHits())
.map(hit -> JSONUtil.toBean(hit.getSourceAsString(), clazz))
.map(hit -> JsonUtil.toBean(hit.getSourceAsString(), clazz))
.collect(Collectors.toList());
}
public static <T> Map<String, Object> toMap(T entity) {
return JsonUtil.toMap(JsonUtil.toJson(entity));
public <T> Map<String, Object> toMap(T entity) {
return JsonUtil.toMap(JsonUtil.toString(entity));
}
@Override
public synchronized void close() {
if (null == client) {
return;
}
IoUtil.close(client);
}
}

View File

@ -0,0 +1,30 @@
package io.github.dunwu.javadb.elasticsearch.config;
import io.github.dunwu.javadb.elasticsearch.ElasticsearchFactory;
import io.github.dunwu.javadb.elasticsearch.ElasticsearchTemplate;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
/**
* ES
*
* @author <a href="mailto:forbreak@163.com">Zhang Peng</a>
* @date 2024-02-07
*/
@Configuration
@ComponentScan(value = "io.github.dunwu.javadb.elasticsearch.mapper")
public class ElasticsearchConfig {
@Bean("restHighLevelClient")
public RestHighLevelClient restHighLevelClient() {
return ElasticsearchFactory.newRestHighLevelClient();
}
@Bean("elasticsearchTemplate")
public ElasticsearchTemplate elasticsearchTemplate() {
return ElasticsearchFactory.newElasticsearchTemplate();
}
}

View File

@ -0,0 +1,26 @@
package io.github.dunwu.javadb.elasticsearch.config;
import org.springframework.context.annotation.EnableAspectJAutoProxy;
import org.springframework.context.annotation.Import;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* Elasticsearch
*
* @author <a href="mailto:forbreak@163.com">Zhang Peng</a>
* @date 2023-06-30
*/
@Target({ ElementType.TYPE })
@Retention(RetentionPolicy.RUNTIME)
@EnableAspectJAutoProxy(
proxyTargetClass = false
)
@Import({ ElasticsearchConfig.class })
@Documented
public @interface EnableElasticsearch {
}

View File

@ -0,0 +1,27 @@
package io.github.dunwu.javadb.elasticsearch.entity;
import lombok.Data;
import lombok.ToString;
import java.io.Serializable;
/**
* ES
*
* @author <a href="mailto:forbreak@163.com">Zhang Peng</a>
* @since 2023-06-28
*/
@Data
@ToString
public abstract class BaseEsEntity implements Serializable {
/**
*
*/
protected Long version;
protected Float hitScore;
public abstract String getDocId();
}

View File

@ -1,16 +0,0 @@
package io.github.dunwu.javadb.elasticsearch.entity;
/**
* ES
*
* @author <a href="mailto:forbreak@163.com">Zhang Peng</a>
* @since 2023-06-28
*/
public interface EsEntity {
/**
* ES
*/
String getDocId();
}

View File

@ -20,13 +20,13 @@ public class Page<T> {
private int size;
private List<T> content = new ArrayList<>();
public Page(long total, int page, int size) {
public Page(int page, int size, long total) {
this.total = total;
this.page = page;
this.size = size;
}
public Page(long total, int page, int size, Collection<T> list) {
public Page(int page, int size, long total, Collection<T> list) {
this.total = total;
this.page = page;
this.size = size;

View File

@ -1,7 +1,10 @@
package io.github.dunwu.javadb.elasticsearch.entity;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
/**
*
@ -11,7 +14,10 @@ import lombok.Data;
*/
@Data
@Builder
public class User implements EsEntity {
@EqualsAndHashCode(callSuper = true)
@AllArgsConstructor
@NoArgsConstructor
public class User extends BaseEsEntity {
private Long id;
private String username;
@ -21,7 +27,7 @@ public class User implements EsEntity {
@Override
public String getDocId() {
return null;
return String.valueOf(id);
}
}

View File

@ -1,14 +1,14 @@
package io.github.dunwu.javadb.elasticsearch.mapper;
import cn.hutool.core.lang.Assert;
import io.github.dunwu.javadb.elasticsearch.entity.EsEntity;
import io.github.dunwu.javadb.elasticsearch.ElasticsearchTemplate;
import io.github.dunwu.javadb.elasticsearch.entity.BaseEsEntity;
import io.github.dunwu.javadb.elasticsearch.entity.Page;
import io.github.dunwu.javadb.elasticsearch.util.ElasticsearchUtil;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.IndicesClient;
import org.elasticsearch.client.RequestOptions;
@ -26,26 +26,28 @@ import java.util.List;
* @date 2023-06-27
*/
@Slf4j
public abstract class BaseEsMapper<T extends EsEntity> implements EsMapper<T> {
public abstract class BaseEsMapper<T extends BaseEsEntity> implements EsMapper<T> {
private BulkProcessor bulkProcessor;
protected final RestHighLevelClient restHighLevelClient;
protected final ElasticsearchTemplate elasticsearchTemplate;
public BaseEsMapper(RestHighLevelClient restHighLevelClient) {
this.restHighLevelClient = restHighLevelClient;
public BaseEsMapper(ElasticsearchTemplate elasticsearchTemplate) {
this.elasticsearchTemplate = elasticsearchTemplate;
}
@Override
public RestHighLevelClient getClient() throws IOException {
Assert.notNull(restHighLevelClient, () -> new IOException("【ES】not connected."));
return restHighLevelClient;
public RestHighLevelClient getClient() {
if (elasticsearchTemplate == null) {
return null;
}
return elasticsearchTemplate.getClient();
}
@Override
public synchronized BulkProcessor getBulkProcessor() throws IOException {
public synchronized BulkProcessor getBulkProcessor() {
if (bulkProcessor == null) {
bulkProcessor = ElasticsearchUtil.newAsyncBulkProcessor(getClient());
bulkProcessor = elasticsearchTemplate.newAsyncBulkProcessor();
}
return bulkProcessor;
}
@ -59,37 +61,57 @@ public abstract class BaseEsMapper<T extends EsEntity> implements EsMapper<T> {
}
@Override
public SearchResponse getById(String id) throws IOException {
return ElasticsearchUtil.getById(getClient(), getIndex(), getType(), id);
public GetResponse getById(String id) throws IOException {
return getById(id, null);
}
@Override
public GetResponse getById(String id, Long version) throws IOException {
return elasticsearchTemplate.getById(getIndex(), getType(), id, version);
}
@Override
public T pojoById(String id) throws IOException {
return ElasticsearchUtil.pojoById(getClient(), getIndex(), getType(), id, getEntityClass());
return pojoById(id, null);
}
@Override
public T pojoById(String id, Long version) throws IOException {
return elasticsearchTemplate.pojoById(getIndex(), getType(), id, version, getEntityClass());
}
@Override
public List<T> pojoListByIds(Collection<String> ids) throws IOException {
return ElasticsearchUtil.pojoListByIds(getClient(), getIndex(), getType(), ids, getEntityClass());
return elasticsearchTemplate.pojoListByIds(getIndex(), getType(), ids, getEntityClass());
}
@Override
public Page<T> pojoPage(SearchSourceBuilder builder) throws IOException {
return ElasticsearchUtil.pojoPage(getClient(), getIndex(), getType(), builder, getEntityClass());
return elasticsearchTemplate.pojoPage(getIndex(), getType(), builder, getEntityClass());
}
@Override
public String insert(T entity) throws IOException {
return ElasticsearchUtil.insert(getClient(), getIndex(), getType(), entity);
public long count(SearchSourceBuilder builder) throws IOException {
return elasticsearchTemplate.count(getIndex(), getType(), builder);
}
@Override
public boolean batchInsert(Collection<T> list) throws IOException {
return ElasticsearchUtil.batchInsert(getClient(), getIndex(), getType(), list);
public SearchResponse query(SearchSourceBuilder builder) throws IOException {
return elasticsearchTemplate.query(getIndex(), getType(), builder);
}
@Override
public void asyncBatchInsert(Collection<T> list) throws IOException {
public T save(T entity) throws IOException {
return elasticsearchTemplate.save(getIndex(), getType(), entity);
}
@Override
public boolean batchSave(Collection<T> list) throws IOException {
return elasticsearchTemplate.batchSave(getIndex(), getType(), list);
}
@Override
public void asyncBatchSave(Collection<T> list) throws IOException {
ActionListener<BulkResponse> listener = new ActionListener<BulkResponse>() {
@Override
public void onResponse(BulkResponse response) {
@ -105,57 +127,37 @@ public abstract class BaseEsMapper<T extends EsEntity> implements EsMapper<T> {
log.error("【ES】异步批量插入异常", e);
}
};
asyncBatchInsert(list, listener);
asyncBatchSave(list, listener);
}
@Override
public void asyncBatchInsert(Collection<T> list, ActionListener<BulkResponse> listener) throws IOException {
ElasticsearchUtil.asyncBatchInsert(getClient(), getIndex(), getType(), list, listener);
public void asyncBatchSave(Collection<T> list, ActionListener<BulkResponse> listener) {
elasticsearchTemplate.asyncBatchSave(getIndex(), getType(), list, listener);
}
@Override
public boolean updateById(T entity) throws IOException {
return ElasticsearchUtil.updateById(getClient(), getIndex(), getType(), entity);
public T updateById(T entity) throws IOException {
return elasticsearchTemplate.updateById(getIndex(), getType(), entity);
}
@Override
public boolean batchUpdateById(Collection<T> list) throws IOException {
return ElasticsearchUtil.batchUpdateById(getClient(), getIndex(), getType(), list);
return elasticsearchTemplate.batchUpdateById(getIndex(), getType(), list);
}
@Override
public void asyncBatchUpdateById(Collection<T> list) throws IOException {
ActionListener<BulkResponse> listener = new ActionListener<BulkResponse>() {
@Override
public void onResponse(BulkResponse response) {
if (response != null && !response.hasFailures()) {
log.info("【ES】异步批量更新成功");
} else {
log.warn("【ES】异步批量更新失败");
}
}
@Override
public void onFailure(Exception e) {
log.error("【ES】异步批量更新异常", e);
}
};
asyncBatchUpdateById(list, listener);
}
@Override
public void asyncBatchUpdateById(Collection<T> list, ActionListener<BulkResponse> listener) throws IOException {
ElasticsearchUtil.asyncBatchUpdateById(getClient(), getIndex(), getType(), list, listener);
public void asyncBatchUpdateById(Collection<T> list, ActionListener<BulkResponse> listener) {
elasticsearchTemplate.asyncBatchUpdateById(getIndex(), getType(), list, listener);
}
@Override
public boolean deleteById(String id) throws IOException {
return ElasticsearchUtil.deleteById(getClient(), getIndex(), getType(), id);
return elasticsearchTemplate.deleteById(getIndex(), getType(), id);
}
@Override
public boolean batchDeleteById(Collection<String> ids) throws IOException {
return ElasticsearchUtil.batchDeleteById(getClient(), getIndex(), getType(), ids);
return elasticsearchTemplate.batchDeleteById(getIndex(), getType(), ids);
}
@Override
@ -180,7 +182,7 @@ public abstract class BaseEsMapper<T extends EsEntity> implements EsMapper<T> {
@Override
public void asyncBatchDeleteById(Collection<String> ids, ActionListener<BulkResponse> listener) throws IOException {
ElasticsearchUtil.asyncBatchDeleteById(getClient(), getIndex(), getType(), ids, listener);
elasticsearchTemplate.asyncBatchDeleteById(getIndex(), getType(), ids, listener);
}
}

View File

@ -1,17 +1,21 @@
package io.github.dunwu.javadb.elasticsearch.mapper;
import io.github.dunwu.javadb.elasticsearch.entity.EsEntity;
import cn.hutool.core.collection.CollectionUtil;
import io.github.dunwu.javadb.elasticsearch.entity.BaseEsEntity;
import io.github.dunwu.javadb.elasticsearch.entity.Page;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* ES Mapper
@ -19,7 +23,7 @@ import java.util.List;
* @author <a href="mailto:forbreak@163.com">Zhang Peng</a>
* @date 2023-06-27
*/
public interface EsMapper<T extends EsEntity> {
public interface EsMapper<T extends BaseEsEntity> {
/**
*
@ -42,29 +46,48 @@ public interface EsMapper<T extends EsEntity> {
boolean isIndexExists() throws IOException;
SearchResponse getById(String id) throws IOException;
GetResponse getById(String id) throws IOException;
GetResponse getById(String id, Long version) throws IOException;
T pojoById(String id) throws IOException;
T pojoById(String id, Long version) throws IOException;
List<T> pojoListByIds(Collection<String> ids) throws IOException;
default Map<String, T> pojoMapByIds(Collection<String> ids) throws IOException {
List<T> list = pojoListByIds(ids);
if (CollectionUtil.isEmpty(list)) {
return new HashMap<>(0);
}
Map<String, T> map = new HashMap<>(list.size());
for (T entity : list) {
map.put(entity.getDocId(), entity);
}
return map;
}
Page<T> pojoPage(SearchSourceBuilder builder) throws IOException;
String insert(T entity) throws IOException;
long count(SearchSourceBuilder builder) throws IOException;
boolean batchInsert(Collection<T> list) throws IOException;
SearchResponse query(SearchSourceBuilder builder) throws IOException;
void asyncBatchInsert(Collection<T> list) throws IOException;
T save(T entity) throws IOException;
void asyncBatchInsert(Collection<T> list, ActionListener<BulkResponse> listener) throws IOException;
boolean batchSave(Collection<T> list) throws IOException;
boolean updateById(T entity) throws IOException;
void asyncBatchSave(Collection<T> list) throws IOException;
void asyncBatchSave(Collection<T> list, ActionListener<BulkResponse> listener) throws IOException;
T updateById(T entity) throws IOException;
boolean batchUpdateById(Collection<T> list) throws IOException;
void asyncBatchUpdateById(Collection<T> list) throws IOException;
void asyncBatchUpdateById(Collection<T> list, ActionListener<BulkResponse> listener) throws IOException;
void asyncBatchUpdateById(Collection<T> list, ActionListener<BulkResponse> listener);
boolean deleteById(String id) throws IOException;

View File

@ -1,7 +1,8 @@
package io.github.dunwu.javadb.elasticsearch.mapper;
import io.github.dunwu.javadb.elasticsearch.ElasticsearchTemplate;
import io.github.dunwu.javadb.elasticsearch.entity.User;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.stereotype.Component;
/**
* User ES Mapper
@ -9,10 +10,11 @@ import org.elasticsearch.client.RestHighLevelClient;
* @author <a href="mailto:forbreak@163.com">Zhang Peng</a>
* @date 2023-06-27
*/
@Component
public class UserEsMapper extends BaseEsMapper<User> {
public UserEsMapper(RestHighLevelClient restHighLevelClient) {
super(restHighLevelClient);
public UserEsMapper(ElasticsearchTemplate elasticsearchTemplate) {
super(elasticsearchTemplate);
}
@Override

View File

@ -81,7 +81,7 @@ public class JsonUtil {
return null;
}
public static <T> String toJson(T obj) {
public static <T> String toString(T obj) {
if (obj == null) {
return null;
}

View File

@ -0,0 +1,36 @@
package io.github.dunwu.javadb.elasticsearch;
import io.github.dunwu.javadb.elasticsearch.config.EnableElasticsearch;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import org.springframework.test.web.servlet.MockMvc;
import org.springframework.test.web.servlet.setup.MockMvcBuilders;
import org.springframework.web.context.WebApplicationContext;
@EnableElasticsearch
@ExtendWith(SpringExtension.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public abstract class BaseApplicationTests {
// ---------------------------------------------------------------------------- 测试常量数据
// ----------------------------------------------------------------------------
protected MockMvc mockMvc;
@Autowired
private WebApplicationContext context;
@BeforeEach
public void setUp() {
mockMvc = MockMvcBuilders.webAppContextSetup(context).build(); //构造MockMvc
}
@BeforeAll
public static void setEnvironmentInDev() {
}
}

View File

@ -0,0 +1,193 @@
package io.github.dunwu.javadb.elasticsearch;
import io.github.dunwu.javadb.elasticsearch.entity.Page;
import io.github.dunwu.javadb.elasticsearch.entity.User;
import io.github.dunwu.javadb.elasticsearch.util.JsonUtil;
import org.assertj.core.api.Assertions;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* ElasticsearchTemplate
*
* @author <a href="mailto:forbreak@163.com">Zhang Peng</a>
* @date 2023-11-13
*/
public class ElasticsearchTemplateTest {
public static final String INDEX = "user";
public static final String TYPE = "_doc";
public static final String TEST_ID_01 = "1";
public static final String TEST_ID_02 = "2";
private static final ElasticsearchTemplate TEMPLATE;
static {
TEMPLATE = ElasticsearchFactory.newElasticsearchTemplate();
}
@Test
@DisplayName("根据ID精确查询")
public void getById() throws IOException {
GetResponse response = TEMPLATE.getById(INDEX, TYPE, TEST_ID_01);
System.out.println("记录:" + JsonUtil.toString(response.getSourceAsMap()));
}
@Test
@DisplayName("根据ID精确查询POJO")
public void pojoById() throws IOException {
User entity = TEMPLATE.pojoById(INDEX, TYPE, TEST_ID_01, User.class);
System.out.println("记录:" + JsonUtil.toString(entity));
}
@Test
@DisplayName("根据ID精确批量查询POJO")
public void pojoListByIds() throws IOException {
List<String> ids = Arrays.asList(TEST_ID_01, TEST_ID_02);
List<User> list = TEMPLATE.pojoListByIds(INDEX, TYPE, ids, User.class);
Assertions.assertThat(list).isNotEmpty();
Assertions.assertThat(list.size()).isEqualTo(2);
for (User entity : list) {
System.out.println("记录:" + JsonUtil.toString(entity));
}
}
@Test
@DisplayName("分页查询")
public void pojoPage() throws IOException {
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
boolQueryBuilder.must(QueryBuilders.termQuery("theme", 3));
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(boolQueryBuilder);
searchSourceBuilder.from(0);
searchSourceBuilder.size(10);
Page<User> page = TEMPLATE.pojoPage(INDEX, TYPE, searchSourceBuilder, User.class);
Assertions.assertThat(page).isNotNull();
Assertions.assertThat(page.getContent()).isNotEmpty();
for (User entity : page.getContent()) {
System.out.println("记录:" + JsonUtil.toString(entity));
}
}
@Test
@DisplayName("条件数量查询")
public void count() throws IOException {
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
boolQueryBuilder.must(QueryBuilders.termQuery("theme", 3));
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(boolQueryBuilder);
searchSourceBuilder.from(0);
searchSourceBuilder.size(10);
long total = TEMPLATE.count(INDEX, TYPE, searchSourceBuilder);
Assertions.assertThat(total).isNotZero();
System.out.println("符合条件的总记录数:" + total);
}
@Test
@DisplayName("条件查询")
public void query() throws IOException {
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
boolQueryBuilder.must(QueryBuilders.termQuery("theme", 3));
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(boolQueryBuilder);
searchSourceBuilder.from(0);
searchSourceBuilder.size(10);
SearchResponse response = TEMPLATE.query(INDEX, TYPE, searchSourceBuilder);
Assertions.assertThat(response).isNotNull();
Assertions.assertThat(response.getHits()).isNotNull();
for (SearchHit hit : response.getHits().getHits()) {
System.out.println("记录:" + hit.getSourceAsString());
Map<String, Object> map = hit.getSourceAsMap();
Assertions.assertThat(map).isNotNull();
Assertions.assertThat(map.get("theme")).isEqualTo(3);
}
}
@Nested
@DisplayName("写操作测试")
public class WriteTest {
String json1 =
"{\"id\":1,\"username\":\"user1\",\"password\":\"xxxxxx\",\"age\":18,\"email\":\"user1@xxx.com\"}";
String json2 =
"{\"id\":2,\"username\":\"user2\",\"password\":\"xxxxxx\",\"age\":18,\"email\":\"user2@xxx.com\"}";
@Test
@DisplayName("插入、更新")
public void saveAndUpdate() throws IOException, InterruptedException {
User origin = JsonUtil.toBean(json1, User.class);
if (origin == null) {
System.err.println("反序列化失败!");
return;
}
TEMPLATE.save(INDEX, TYPE, origin);
TimeUnit.SECONDS.sleep(1);
User expectEntity = TEMPLATE.pojoById(INDEX, TYPE, origin.getDocId(), User.class);
Assertions.assertThat(expectEntity).isNotNull();
expectEntity.setAge(20);
TEMPLATE.updateById(INDEX, TYPE, expectEntity);
TimeUnit.SECONDS.sleep(18);
User expectEntity2 =
TEMPLATE.pojoById(INDEX, TYPE, origin.getDocId(), User.class);
Assertions.assertThat(expectEntity2).isNotNull();
Assertions.assertThat(expectEntity2.getAge()).isEqualTo(20);
}
@Test
@DisplayName("批量插入、更新")
public void batchSaveAndUpdate() throws IOException, InterruptedException {
User origin1 = JsonUtil.toBean(json1, User.class);
if (origin1 == null) {
System.err.println("反序列化失败!");
return;
}
User origin2 = JsonUtil.toBean(json2, User.class);
if (origin2 == null) {
System.err.println("反序列化失败!");
return;
}
List<User> list = Arrays.asList(origin1, origin2);
List<String> ids = list.stream().map(User::getDocId).collect(Collectors.toList());
TEMPLATE.batchSave(INDEX, TYPE, list);
List<User> newList = TEMPLATE.pojoListByIds(INDEX, TYPE, ids, User.class);
Assertions.assertThat(newList).isNotEmpty();
newList.forEach(entity -> {
entity.setAge(20);
});
TEMPLATE.batchUpdateById(INDEX, TYPE, newList);
TimeUnit.SECONDS.sleep(1);
List<User> expectList =
TEMPLATE.pojoListByIds(INDEX, TYPE, ids, User.class);
Assertions.assertThat(expectList).isNotEmpty();
for (User item : expectList) {
Assertions.assertThat(item.getAge()).isEqualTo(20);
}
}
}
}

View File

@ -0,0 +1,20 @@
package io.github.dunwu.javadb.elasticsearch;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.boot.web.servlet.support.SpringBootServletInitializer;
@SpringBootApplication
public class TestApplication extends SpringBootServletInitializer {
public static void main(String[] args) {
SpringApplication.run(TestApplication.class, args);
}
@Override
protected SpringApplicationBuilder configure(SpringApplicationBuilder builder) {
return builder.sources(TestApplication.class);
}
}

View File

@ -0,0 +1,191 @@
package io.github.dunwu.javadb.elasticsearch.mapper;
import io.github.dunwu.javadb.elasticsearch.BaseApplicationTests;
import io.github.dunwu.javadb.elasticsearch.entity.Page;
import io.github.dunwu.javadb.elasticsearch.entity.User;
import io.github.dunwu.javadb.elasticsearch.util.JsonUtil;
import org.assertj.core.api.Assertions;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* ElasticsearchTemplate
*
* @author <a href="mailto:forbreak@163.com">Zhang Peng</a>
* @date 2023-11-13
*/
public class UserEsMapperTest extends BaseApplicationTests {
@Autowired
private UserEsMapper mapper;
public static final String TEST_ID_01 = "1";
public static final String TEST_ID_02 = "2";
@Test
@DisplayName("根据ID精确查询")
public void getById() throws IOException {
GetResponse response = mapper.getById(TEST_ID_01);
System.out.println("记录:" + JsonUtil.toString(response.getSourceAsMap()));
}
@Test
@DisplayName("根据ID精确查询POJO")
public void pojoById() throws IOException {
User entity = mapper.pojoById(TEST_ID_01);
System.out.println("记录:" + JsonUtil.toString(entity));
}
@Test
@DisplayName("根据ID精确批量查询POJO")
public void pojoListByIds() throws IOException {
List<String> ids = Arrays.asList(TEST_ID_01, TEST_ID_02);
List<User> list = mapper.pojoListByIds(ids);
Assertions.assertThat(list).isNotEmpty();
Assertions.assertThat(list.size()).isEqualTo(2);
for (User entity : list) {
System.out.println("记录:" + JsonUtil.toString(entity));
}
}
@Test
@DisplayName("分页查询")
public void pojoPage() throws IOException {
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
boolQueryBuilder.must(QueryBuilders.termQuery("id", 1));
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(boolQueryBuilder);
searchSourceBuilder.from(0);
searchSourceBuilder.size(10);
Page<User> page = mapper.pojoPage(searchSourceBuilder);
Assertions.assertThat(page).isNotNull();
Assertions.assertThat(page.getContent()).isNotEmpty();
for (User entity : page.getContent()) {
System.out.println("记录:" + JsonUtil.toString(entity));
}
}
@Test
@DisplayName("条件数量查询")
public void count() throws IOException {
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
boolQueryBuilder.must(QueryBuilders.termQuery("id", 1));
// boolQueryBuilder.must(QueryBuilders.rangeQuery("age")
// .from(18)
// .to(25)
// .includeLower(true)
// .includeUpper(true));
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(boolQueryBuilder);
searchSourceBuilder.from(0);
searchSourceBuilder.size(10);
long total = mapper.count(searchSourceBuilder);
Assertions.assertThat(total).isNotZero();
System.out.println("符合条件的总记录数:" + total);
}
@Test
@DisplayName("条件查询")
public void query() throws IOException {
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
boolQueryBuilder.must(QueryBuilders.termQuery("id", 1));
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(boolQueryBuilder);
searchSourceBuilder.from(0);
searchSourceBuilder.size(10);
SearchResponse response = mapper.query(searchSourceBuilder);
Assertions.assertThat(response).isNotNull();
Assertions.assertThat(response.getHits()).isNotNull();
for (SearchHit hit : response.getHits().getHits()) {
System.out.println("记录:" + hit.getSourceAsString());
Map<String, Object> map = hit.getSourceAsMap();
Assertions.assertThat(map).isNotNull();
}
}
@Nested
@DisplayName("写操作测试")
public class WriteTest {
String json1 =
"{\"id\":1,\"username\":\"user1\",\"password\":\"xxxxxx\",\"age\":18,\"email\":\"user1@xxx.com\"}";
String json2 =
"{\"id\":2,\"username\":\"user2\",\"password\":\"xxxxxx\",\"age\":18,\"email\":\"user2@xxx.com\"}";
@Test
@DisplayName("插入、更新")
public void saveAndUpdate() throws IOException, InterruptedException {
User origin = JsonUtil.toBean(json1, User.class);
if (origin == null) {
System.err.println("反序列化失败!");
return;
}
mapper.save(origin);
TimeUnit.SECONDS.sleep(1);
User expectEntity = mapper.pojoById(origin.getDocId());
Assertions.assertThat(expectEntity).isNotNull();
expectEntity.setAge(20);
mapper.updateById(expectEntity);
TimeUnit.SECONDS.sleep(1);
User expectEntity2 = mapper.pojoById(origin.getDocId());
Assertions.assertThat(expectEntity2).isNotNull();
Assertions.assertThat(expectEntity2.getAge()).isEqualTo(20);
}
@Test
@DisplayName("批量插入、更新")
public void batchSaveAndUpdate() throws IOException, InterruptedException {
User origin1 = JsonUtil.toBean(json1, User.class);
if (origin1 == null) {
System.err.println("反序列化失败!");
return;
}
User origin2 = JsonUtil.toBean(json2, User.class);
if (origin2 == null) {
System.err.println("反序列化失败!");
return;
}
List<User> list = Arrays.asList(origin1, origin2);
List<String> ids = list.stream().map(User::getDocId).collect(Collectors.toList());
mapper.batchSave(list);
List<User> newList = mapper.pojoListByIds(ids);
Assertions.assertThat(newList).isNotEmpty();
newList.forEach(entity -> {
entity.setAge(20);
});
mapper.batchUpdateById(newList);
TimeUnit.SECONDS.sleep(1);
List<User> expectList = mapper.pojoListByIds(ids);
Assertions.assertThat(expectList).isNotEmpty();
for (User item : expectList) {
Assertions.assertThat(item.getAge()).isEqualTo(20);
}
}
}
}