mirror of https://github.com/dunwu/db-tutorial.git
feat: ES6 java 客户端示例
parent
8d1ef2086d
commit
aa66b1f3e1
|
@ -32,6 +32,11 @@
|
||||||
<artifactId>hutool-all</artifactId>
|
<artifactId>hutool-all</artifactId>
|
||||||
<version>5.7.20</version>
|
<version>5.7.20</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.fasterxml.jackson.core</groupId>
|
||||||
|
<artifactId>jackson-databind</artifactId>
|
||||||
|
<version>2.15.2</version>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>ch.qos.logback</groupId>
|
<groupId>ch.qos.logback</groupId>
|
||||||
|
|
|
@ -2,15 +2,21 @@ package io.github.dunwu.javadb.elasticsearch;
|
||||||
|
|
||||||
import io.github.dunwu.javadb.elasticsearch.entity.User;
|
import io.github.dunwu.javadb.elasticsearch.entity.User;
|
||||||
import io.github.dunwu.javadb.elasticsearch.mapper.UserEsMapper;
|
import io.github.dunwu.javadb.elasticsearch.mapper.UserEsMapper;
|
||||||
|
import io.github.dunwu.javadb.elasticsearch.util.ElasticsearchUtil;
|
||||||
|
import org.elasticsearch.client.RestHighLevelClient;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
|
|
||||||
public class Demo {
|
public class Demo {
|
||||||
|
|
||||||
|
private static final String HOSTS = "127.0.0.1:9200";
|
||||||
|
private static final RestHighLevelClient restHighLevelClient = ElasticsearchUtil.newRestHighLevelClient(HOSTS);
|
||||||
|
|
||||||
public static void main(String[] args) throws IOException, InterruptedException {
|
public static void main(String[] args) throws IOException, InterruptedException {
|
||||||
UserEsMapper mapper = new UserEsMapper();
|
|
||||||
|
UserEsMapper mapper = new UserEsMapper(restHighLevelClient);
|
||||||
|
|
||||||
System.out.println("索引是否存在:" + mapper.isIndexExists());
|
System.out.println("索引是否存在:" + mapper.isIndexExists());
|
||||||
|
|
||||||
|
@ -24,6 +30,7 @@ public class Demo {
|
||||||
System.out.println("根据ID批量查询:" + mapper.pojoListByIds(Arrays.asList("1", "2")).toString());
|
System.out.println("根据ID批量查询:" + mapper.pojoListByIds(Arrays.asList("1", "2")).toString());
|
||||||
|
|
||||||
Thread.sleep(1000);
|
Thread.sleep(1000);
|
||||||
System.out.println("根据ID批量删除:" + mapper.deleteByIds(Arrays.asList("1", "2")));
|
System.out.println("根据ID批量删除:" + mapper.batchDeleteById(Arrays.asList("1", "2")));
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -7,5 +7,10 @@ package io.github.dunwu.javadb.elasticsearch.entity;
|
||||||
* @since 2023-06-28
|
* @since 2023-06-28
|
||||||
*/
|
*/
|
||||||
public interface EsEntity {
|
public interface EsEntity {
|
||||||
Long getId();
|
|
||||||
|
/**
|
||||||
|
* 获取 ES 主键
|
||||||
|
*/
|
||||||
|
String getDocId();
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,36 @@
|
||||||
|
package io.github.dunwu.javadb.elasticsearch.entity;
|
||||||
|
|
||||||
|
import lombok.Data;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 分页实体
|
||||||
|
*
|
||||||
|
* @author <a href="mailto:forbreak@163.com">Zhang Peng</a>
|
||||||
|
* @date 2023-06-28
|
||||||
|
*/
|
||||||
|
@Data
|
||||||
|
public class Page<T> {
|
||||||
|
|
||||||
|
private long total;
|
||||||
|
private int page;
|
||||||
|
private int size;
|
||||||
|
private List<T> content = new ArrayList<>();
|
||||||
|
|
||||||
|
public Page(long total, int page, int size) {
|
||||||
|
this.total = total;
|
||||||
|
this.page = page;
|
||||||
|
this.size = size;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Page(long total, int page, int size, Collection<T> list) {
|
||||||
|
this.total = total;
|
||||||
|
this.page = page;
|
||||||
|
this.size = size;
|
||||||
|
this.content.addAll(list);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -12,10 +12,16 @@ import lombok.Data;
|
||||||
@Data
|
@Data
|
||||||
@Builder
|
@Builder
|
||||||
public class User implements EsEntity {
|
public class User implements EsEntity {
|
||||||
|
|
||||||
private Long id;
|
private Long id;
|
||||||
private String username;
|
private String username;
|
||||||
private String password;
|
private String password;
|
||||||
private Integer age;
|
private Integer age;
|
||||||
private String email;
|
private String email;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getDocId() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,45 +1,23 @@
|
||||||
package io.github.dunwu.javadb.elasticsearch.mapper;
|
package io.github.dunwu.javadb.elasticsearch.mapper;
|
||||||
|
|
||||||
import cn.hutool.core.bean.BeanUtil;
|
|
||||||
import cn.hutool.core.bean.copier.CopyOptions;
|
|
||||||
import cn.hutool.core.collection.CollectionUtil;
|
|
||||||
import cn.hutool.core.io.IoUtil;
|
|
||||||
import cn.hutool.core.lang.Assert;
|
import cn.hutool.core.lang.Assert;
|
||||||
import io.github.dunwu.javadb.elasticsearch.entity.EsEntity;
|
import io.github.dunwu.javadb.elasticsearch.entity.EsEntity;
|
||||||
|
import io.github.dunwu.javadb.elasticsearch.entity.Page;
|
||||||
import io.github.dunwu.javadb.elasticsearch.util.ElasticsearchUtil;
|
import io.github.dunwu.javadb.elasticsearch.util.ElasticsearchUtil;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.elasticsearch.action.ActionListener;
|
import org.elasticsearch.action.ActionListener;
|
||||||
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
|
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
|
||||||
import org.elasticsearch.action.bulk.BackoffPolicy;
|
|
||||||
import org.elasticsearch.action.bulk.BulkProcessor;
|
import org.elasticsearch.action.bulk.BulkProcessor;
|
||||||
import org.elasticsearch.action.bulk.BulkRequest;
|
|
||||||
import org.elasticsearch.action.bulk.BulkResponse;
|
import org.elasticsearch.action.bulk.BulkResponse;
|
||||||
import org.elasticsearch.action.delete.DeleteRequest;
|
|
||||||
import org.elasticsearch.action.get.MultiGetItemResponse;
|
|
||||||
import org.elasticsearch.action.get.MultiGetRequest;
|
|
||||||
import org.elasticsearch.action.get.MultiGetResponse;
|
|
||||||
import org.elasticsearch.action.index.IndexRequest;
|
|
||||||
import org.elasticsearch.action.index.IndexResponse;
|
|
||||||
import org.elasticsearch.action.search.SearchRequest;
|
|
||||||
import org.elasticsearch.action.search.SearchResponse;
|
import org.elasticsearch.action.search.SearchResponse;
|
||||||
import org.elasticsearch.client.IndicesClient;
|
import org.elasticsearch.client.IndicesClient;
|
||||||
import org.elasticsearch.client.RequestOptions;
|
import org.elasticsearch.client.RequestOptions;
|
||||||
import org.elasticsearch.client.Requests;
|
|
||||||
import org.elasticsearch.client.RestHighLevelClient;
|
import org.elasticsearch.client.RestHighLevelClient;
|
||||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
|
||||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
|
||||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
|
||||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
|
||||||
import org.elasticsearch.index.query.QueryBuilder;
|
|
||||||
import org.elasticsearch.index.query.QueryBuilders;
|
|
||||||
import org.elasticsearch.search.builder.SearchSourceBuilder;
|
import org.elasticsearch.search.builder.SearchSourceBuilder;
|
||||||
|
|
||||||
import java.io.Closeable;
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.*;
|
import java.util.Collection;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.List;
|
||||||
import java.util.function.BiConsumer;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* ES Mapper 基础类
|
* ES Mapper 基础类
|
||||||
|
@ -48,13 +26,15 @@ import java.util.function.BiConsumer;
|
||||||
* @date 2023-06-27
|
* @date 2023-06-27
|
||||||
*/
|
*/
|
||||||
@Slf4j
|
@Slf4j
|
||||||
public abstract class BaseEsMapper<T extends EsEntity> implements EsMapper<T>, Closeable {
|
public abstract class BaseEsMapper<T extends EsEntity> implements EsMapper<T> {
|
||||||
|
|
||||||
public static final String HOSTS = "127.0.0.1:9200";
|
|
||||||
|
|
||||||
private BulkProcessor bulkProcessor;
|
private BulkProcessor bulkProcessor;
|
||||||
|
|
||||||
private final RestHighLevelClient restHighLevelClient = ElasticsearchUtil.newRestHighLevelClient(HOSTS);
|
protected final RestHighLevelClient restHighLevelClient;
|
||||||
|
|
||||||
|
public BaseEsMapper(RestHighLevelClient restHighLevelClient) {
|
||||||
|
this.restHighLevelClient = restHighLevelClient;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public RestHighLevelClient getClient() throws IOException {
|
public RestHighLevelClient getClient() throws IOException {
|
||||||
|
@ -62,11 +42,10 @@ public abstract class BaseEsMapper<T extends EsEntity> implements EsMapper<T>, C
|
||||||
return restHighLevelClient;
|
return restHighLevelClient;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized BulkProcessor getBulkProcessor() {
|
public synchronized BulkProcessor getBulkProcessor() throws IOException {
|
||||||
if (bulkProcessor == null) {
|
if (bulkProcessor == null) {
|
||||||
bulkProcessor = newAsyncBulkProcessor();
|
bulkProcessor = ElasticsearchUtil.newAsyncBulkProcessor(getClient());
|
||||||
}
|
}
|
||||||
return bulkProcessor;
|
return bulkProcessor;
|
||||||
}
|
}
|
||||||
|
@ -75,176 +54,133 @@ public abstract class BaseEsMapper<T extends EsEntity> implements EsMapper<T>, C
|
||||||
public boolean isIndexExists() throws IOException {
|
public boolean isIndexExists() throws IOException {
|
||||||
IndicesClient indicesClient = getClient().indices();
|
IndicesClient indicesClient = getClient().indices();
|
||||||
GetIndexRequest request = new GetIndexRequest();
|
GetIndexRequest request = new GetIndexRequest();
|
||||||
request.indices(getIndexAlias());
|
request.indices(getIndex());
|
||||||
return indicesClient.exists(request, RequestOptions.DEFAULT);
|
return indicesClient.exists(request, RequestOptions.DEFAULT);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public SearchResponse getById(String id) throws IOException {
|
public SearchResponse getById(String id) throws IOException {
|
||||||
SearchRequest searchRequest = Requests.searchRequest(getIndexAlias());
|
return ElasticsearchUtil.getById(getClient(), getIndex(), getType(), id);
|
||||||
QueryBuilder queryBuilder = QueryBuilders.idsQuery().addIds(id);
|
|
||||||
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
|
|
||||||
sourceBuilder.query(queryBuilder);
|
|
||||||
searchRequest.source(sourceBuilder);
|
|
||||||
return getClient().search(searchRequest, RequestOptions.DEFAULT);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public T pojoById(String id) throws IOException {
|
public T pojoById(String id) throws IOException {
|
||||||
SearchResponse response = getById(id);
|
return ElasticsearchUtil.pojoById(getClient(), getIndex(), getType(), id, getEntityClass());
|
||||||
if (response == null) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
List<T> list = ElasticsearchUtil.toPojoList(response, getEntityClass());
|
|
||||||
if (CollectionUtil.isEmpty(list)) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
return list.get(0);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<T> pojoListByIds(Collection<String> ids) throws IOException {
|
public List<T> pojoListByIds(Collection<String> ids) throws IOException {
|
||||||
|
return ElasticsearchUtil.pojoListByIds(getClient(), getIndex(), getType(), ids, getEntityClass());
|
||||||
|
}
|
||||||
|
|
||||||
if (CollectionUtil.isEmpty(ids)) {
|
@Override
|
||||||
return null;
|
public Page<T> pojoPage(SearchSourceBuilder builder) throws IOException {
|
||||||
}
|
return ElasticsearchUtil.pojoPage(getClient(), getIndex(), getType(), builder, getEntityClass());
|
||||||
|
|
||||||
MultiGetRequest request = new MultiGetRequest();
|
|
||||||
for (String id : ids) {
|
|
||||||
request.add(new MultiGetRequest.Item(getIndexAlias(), getIndexType(), id));
|
|
||||||
}
|
|
||||||
|
|
||||||
MultiGetResponse multiGetResponse = getClient().mget(request, RequestOptions.DEFAULT);
|
|
||||||
if (null == multiGetResponse || multiGetResponse.getResponses() == null || multiGetResponse.getResponses().length <= 0) {
|
|
||||||
return new ArrayList<>();
|
|
||||||
}
|
|
||||||
|
|
||||||
List<T> list = new ArrayList<>();
|
|
||||||
for (MultiGetItemResponse itemResponse : multiGetResponse.getResponses()) {
|
|
||||||
if (itemResponse.isFailed()) {
|
|
||||||
log.error("通过id获取文档失败", itemResponse.getFailure().getFailure());
|
|
||||||
} else {
|
|
||||||
T entity = ElasticsearchUtil.toPojo(itemResponse.getResponse(), getEntityClass());
|
|
||||||
if (entity != null) {
|
|
||||||
list.add(entity);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return list;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String insert(T entity) throws IOException {
|
public String insert(T entity) throws IOException {
|
||||||
Map<String, Object> map = new HashMap<>();
|
return ElasticsearchUtil.insert(getClient(), getIndex(), getType(), entity);
|
||||||
BeanUtil.beanToMap(entity, map, CopyOptions.create().ignoreError());
|
|
||||||
XContentBuilder builder = XContentFactory.jsonBuilder();
|
|
||||||
builder.startObject();
|
|
||||||
for (Map.Entry<String, Object> entry : map.entrySet()) {
|
|
||||||
String key = entry.getKey();
|
|
||||||
Object value = entry.getValue();
|
|
||||||
builder.field(key, value);
|
|
||||||
}
|
|
||||||
builder.endObject();
|
|
||||||
|
|
||||||
IndexRequest request = Requests.indexRequest(getIndexAlias()).type(getIndexType()).source(builder);
|
|
||||||
if (entity.getId() != null) {
|
|
||||||
request.id(entity.getId().toString());
|
|
||||||
}
|
|
||||||
|
|
||||||
IndexResponse response = getClient().index(request, RequestOptions.DEFAULT);
|
|
||||||
if (response == null) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
return response.getId();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean batchInsert(Collection<T> list) throws IOException {
|
public boolean batchInsert(Collection<T> list) throws IOException {
|
||||||
|
return ElasticsearchUtil.batchInsert(getClient(), getIndex(), getType(), list);
|
||||||
if (CollectionUtil.isEmpty(list)) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
BulkRequest bulkRequest = new BulkRequest();
|
|
||||||
for (T entity : list) {
|
|
||||||
Map<String, Object> map = ElasticsearchUtil.toMap(entity);
|
|
||||||
IndexRequest request = Requests.indexRequest(getIndexAlias()).type(getIndexType()).source(map);
|
|
||||||
if (entity.getId() != null) {
|
|
||||||
request.id(entity.getId().toString());
|
|
||||||
}
|
|
||||||
bulkRequest.add(request);
|
|
||||||
}
|
|
||||||
|
|
||||||
BulkResponse response = getClient().bulk(bulkRequest, RequestOptions.DEFAULT);
|
|
||||||
return !(response == null || response.hasFailures());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean deleteById(String id) throws IOException {
|
public void asyncBatchInsert(Collection<T> list) throws IOException {
|
||||||
return deleteByIds(Collections.singleton(id));
|
ActionListener<BulkResponse> listener = new ActionListener<BulkResponse>() {
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean deleteByIds(Collection<String> ids) throws IOException {
|
|
||||||
|
|
||||||
if (CollectionUtil.isEmpty(ids)) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
BulkRequest bulkRequest = new BulkRequest();
|
|
||||||
ids.forEach(id -> {
|
|
||||||
DeleteRequest deleteRequest = Requests.deleteRequest(getIndexAlias()).type(getIndexType()).id(id);
|
|
||||||
bulkRequest.add(deleteRequest);
|
|
||||||
});
|
|
||||||
|
|
||||||
BulkResponse response = getClient().bulk(bulkRequest, RequestOptions.DEFAULT);
|
|
||||||
return response != null && !response.hasFailures();
|
|
||||||
}
|
|
||||||
|
|
||||||
private BulkProcessor newAsyncBulkProcessor() {
|
|
||||||
BulkProcessor.Listener listener = new BulkProcessor.Listener() {
|
|
||||||
@Override
|
@Override
|
||||||
public void beforeBulk(long executionId, BulkRequest request) {
|
public void onResponse(BulkResponse response) {
|
||||||
}
|
if (response != null && !response.hasFailures()) {
|
||||||
|
log.info("【ES】异步批量插入成功!");
|
||||||
@Override
|
} else {
|
||||||
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
|
log.warn("【ES】异步批量插入失败!");
|
||||||
if (response.hasFailures()) {
|
|
||||||
log.error("Bulk [{}] executed with failures,response = {}", executionId, response.buildFailureMessage());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
|
public void onFailure(Exception e) {
|
||||||
|
log.error("【ES】异步批量插入异常!", e);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
BiConsumer<BulkRequest, ActionListener<BulkResponse>> bulkConsumer = (request, bulkListener) -> restHighLevelClient.bulkAsync(request, RequestOptions.DEFAULT, bulkListener);
|
asyncBatchInsert(list, listener);
|
||||||
bulkProcessor = BulkProcessor.builder(bulkConsumer, listener)
|
|
||||||
// 1000条数据请求执行一次bulk
|
|
||||||
.setBulkActions(1000)
|
|
||||||
// 5mb的数据刷新一次bulk
|
|
||||||
.setBulkSize(new ByteSizeValue(5L, ByteSizeUnit.MB))
|
|
||||||
// 并发请求数量, 0不并发, 1并发允许执行
|
|
||||||
.setConcurrentRequests(2)
|
|
||||||
// 固定1s必须刷新一次
|
|
||||||
.setFlushInterval(TimeValue.timeValueMillis(1000L))
|
|
||||||
// 重试3次,间隔100ms
|
|
||||||
.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueMillis(200L), 3)).build();
|
|
||||||
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
|
|
||||||
try {
|
|
||||||
bulkProcessor.flush();
|
|
||||||
bulkProcessor.awaitClose(30, TimeUnit.SECONDS);
|
|
||||||
} catch (Exception e) {
|
|
||||||
log.error("Failed to close bulkProcessor", e);
|
|
||||||
}
|
|
||||||
log.info("bulkProcessor closed!");
|
|
||||||
}));
|
|
||||||
return bulkProcessor;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() {
|
public void asyncBatchInsert(Collection<T> list, ActionListener<BulkResponse> listener) throws IOException {
|
||||||
IoUtil.close(restHighLevelClient);
|
ElasticsearchUtil.asyncBatchInsert(getClient(), getIndex(), getType(), list, listener);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean updateById(T entity) throws IOException {
|
||||||
|
return ElasticsearchUtil.updateById(getClient(), getIndex(), getType(), entity);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean batchUpdateById(Collection<T> list) throws IOException {
|
||||||
|
return ElasticsearchUtil.batchUpdateById(getClient(), getIndex(), getType(), list);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void asyncBatchUpdateById(Collection<T> list) throws IOException {
|
||||||
|
ActionListener<BulkResponse> listener = new ActionListener<BulkResponse>() {
|
||||||
|
@Override
|
||||||
|
public void onResponse(BulkResponse response) {
|
||||||
|
if (response != null && !response.hasFailures()) {
|
||||||
|
log.info("【ES】异步批量更新成功!");
|
||||||
|
} else {
|
||||||
|
log.warn("【ES】异步批量更新失败!");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onFailure(Exception e) {
|
||||||
|
log.error("【ES】异步批量更新异常!", e);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
asyncBatchUpdateById(list, listener);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void asyncBatchUpdateById(Collection<T> list, ActionListener<BulkResponse> listener) throws IOException {
|
||||||
|
ElasticsearchUtil.asyncBatchUpdateById(getClient(), getIndex(), getType(), list, listener);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean deleteById(String id) throws IOException {
|
||||||
|
return ElasticsearchUtil.deleteById(getClient(), getIndex(), getType(), id);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean batchDeleteById(Collection<String> ids) throws IOException {
|
||||||
|
return ElasticsearchUtil.batchDeleteById(getClient(), getIndex(), getType(), ids);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void asyncBatchDeleteById(Collection<String> ids) throws IOException {
|
||||||
|
ActionListener<BulkResponse> listener = new ActionListener<BulkResponse>() {
|
||||||
|
@Override
|
||||||
|
public void onResponse(BulkResponse response) {
|
||||||
|
if (response != null && !response.hasFailures()) {
|
||||||
|
log.info("【ES】异步批量删除成功!");
|
||||||
|
} else {
|
||||||
|
log.warn("【ES】异步批量删除失败!ids: {}", ids);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onFailure(Exception e) {
|
||||||
|
log.error("【ES】异步批量删除异常!ids: {}", ids, e);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
asyncBatchDeleteById(ids, listener);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void asyncBatchDeleteById(Collection<String> ids, ActionListener<BulkResponse> listener) throws IOException {
|
||||||
|
ElasticsearchUtil.asyncBatchDeleteById(getClient(), getIndex(), getType(), ids, listener);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,9 +1,13 @@
|
||||||
package io.github.dunwu.javadb.elasticsearch.mapper;
|
package io.github.dunwu.javadb.elasticsearch.mapper;
|
||||||
|
|
||||||
import io.github.dunwu.javadb.elasticsearch.entity.EsEntity;
|
import io.github.dunwu.javadb.elasticsearch.entity.EsEntity;
|
||||||
|
import io.github.dunwu.javadb.elasticsearch.entity.Page;
|
||||||
|
import org.elasticsearch.action.ActionListener;
|
||||||
import org.elasticsearch.action.bulk.BulkProcessor;
|
import org.elasticsearch.action.bulk.BulkProcessor;
|
||||||
|
import org.elasticsearch.action.bulk.BulkResponse;
|
||||||
import org.elasticsearch.action.search.SearchResponse;
|
import org.elasticsearch.action.search.SearchResponse;
|
||||||
import org.elasticsearch.client.RestHighLevelClient;
|
import org.elasticsearch.client.RestHighLevelClient;
|
||||||
|
import org.elasticsearch.search.builder.SearchSourceBuilder;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
@ -17,20 +21,15 @@ import java.util.List;
|
||||||
*/
|
*/
|
||||||
public interface EsMapper<T extends EsEntity> {
|
public interface EsMapper<T extends EsEntity> {
|
||||||
|
|
||||||
/**
|
|
||||||
* 获取索引别名
|
|
||||||
*/
|
|
||||||
String getIndexAlias();
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 获取索引名
|
* 获取索引名
|
||||||
*/
|
*/
|
||||||
String getIndexName();
|
String getIndex();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 获取索引类型
|
* 获取索引类型
|
||||||
*/
|
*/
|
||||||
String getIndexType();
|
String getType();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 获取实体类型
|
* 获取实体类型
|
||||||
|
@ -39,7 +38,7 @@ public interface EsMapper<T extends EsEntity> {
|
||||||
|
|
||||||
RestHighLevelClient getClient() throws IOException;
|
RestHighLevelClient getClient() throws IOException;
|
||||||
|
|
||||||
BulkProcessor getBulkProcessor();
|
BulkProcessor getBulkProcessor() throws IOException;
|
||||||
|
|
||||||
boolean isIndexExists() throws IOException;
|
boolean isIndexExists() throws IOException;
|
||||||
|
|
||||||
|
@ -49,11 +48,30 @@ public interface EsMapper<T extends EsEntity> {
|
||||||
|
|
||||||
List<T> pojoListByIds(Collection<String> ids) throws IOException;
|
List<T> pojoListByIds(Collection<String> ids) throws IOException;
|
||||||
|
|
||||||
|
Page<T> pojoPage(SearchSourceBuilder builder) throws IOException;
|
||||||
|
|
||||||
String insert(T entity) throws IOException;
|
String insert(T entity) throws IOException;
|
||||||
|
|
||||||
boolean batchInsert(Collection<T> list) throws IOException;
|
boolean batchInsert(Collection<T> list) throws IOException;
|
||||||
|
|
||||||
|
void asyncBatchInsert(Collection<T> list) throws IOException;
|
||||||
|
|
||||||
|
void asyncBatchInsert(Collection<T> list, ActionListener<BulkResponse> listener) throws IOException;
|
||||||
|
|
||||||
|
boolean updateById(T entity) throws IOException;
|
||||||
|
|
||||||
|
boolean batchUpdateById(Collection<T> list) throws IOException;
|
||||||
|
|
||||||
|
void asyncBatchUpdateById(Collection<T> list) throws IOException;
|
||||||
|
|
||||||
|
void asyncBatchUpdateById(Collection<T> list, ActionListener<BulkResponse> listener) throws IOException;
|
||||||
|
|
||||||
boolean deleteById(String id) throws IOException;
|
boolean deleteById(String id) throws IOException;
|
||||||
|
|
||||||
boolean deleteByIds(Collection<String> ids) throws IOException;
|
boolean batchDeleteById(Collection<String> ids) throws IOException;
|
||||||
|
|
||||||
|
void asyncBatchDeleteById(Collection<String> ids) throws IOException;
|
||||||
|
|
||||||
|
void asyncBatchDeleteById(Collection<String> ids, ActionListener<BulkResponse> listener) throws IOException;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package io.github.dunwu.javadb.elasticsearch.mapper;
|
package io.github.dunwu.javadb.elasticsearch.mapper;
|
||||||
|
|
||||||
import io.github.dunwu.javadb.elasticsearch.entity.User;
|
import io.github.dunwu.javadb.elasticsearch.entity.User;
|
||||||
|
import org.elasticsearch.client.RestHighLevelClient;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* User ES Mapper
|
* User ES Mapper
|
||||||
|
@ -10,18 +11,17 @@ import io.github.dunwu.javadb.elasticsearch.entity.User;
|
||||||
*/
|
*/
|
||||||
public class UserEsMapper extends BaseEsMapper<User> {
|
public class UserEsMapper extends BaseEsMapper<User> {
|
||||||
|
|
||||||
|
public UserEsMapper(RestHighLevelClient restHighLevelClient) {
|
||||||
|
super(restHighLevelClient);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String getIndexAlias() {
|
public String getIndex() {
|
||||||
return "user";
|
return "user";
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String getIndexName() {
|
public String getType() {
|
||||||
return "user";
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String getIndexType() {
|
|
||||||
return "_doc";
|
return "_doc";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -3,22 +3,57 @@ package io.github.dunwu.javadb.elasticsearch.util;
|
||||||
import cn.hutool.core.bean.BeanUtil;
|
import cn.hutool.core.bean.BeanUtil;
|
||||||
import cn.hutool.core.bean.copier.CopyOptions;
|
import cn.hutool.core.bean.copier.CopyOptions;
|
||||||
import cn.hutool.core.collection.CollectionUtil;
|
import cn.hutool.core.collection.CollectionUtil;
|
||||||
|
import cn.hutool.core.util.ArrayUtil;
|
||||||
import cn.hutool.core.util.StrUtil;
|
import cn.hutool.core.util.StrUtil;
|
||||||
import cn.hutool.json.JSONUtil;
|
import cn.hutool.json.JSONUtil;
|
||||||
|
import io.github.dunwu.javadb.elasticsearch.entity.EsEntity;
|
||||||
|
import io.github.dunwu.javadb.elasticsearch.entity.Page;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.apache.http.HttpHost;
|
import org.apache.http.HttpHost;
|
||||||
|
import org.elasticsearch.action.ActionListener;
|
||||||
|
import org.elasticsearch.action.DocWriteResponse;
|
||||||
|
import org.elasticsearch.action.bulk.BackoffPolicy;
|
||||||
|
import org.elasticsearch.action.bulk.BulkProcessor;
|
||||||
|
import org.elasticsearch.action.bulk.BulkRequest;
|
||||||
|
import org.elasticsearch.action.bulk.BulkResponse;
|
||||||
|
import org.elasticsearch.action.delete.DeleteRequest;
|
||||||
import org.elasticsearch.action.get.GetResponse;
|
import org.elasticsearch.action.get.GetResponse;
|
||||||
|
import org.elasticsearch.action.get.MultiGetItemResponse;
|
||||||
|
import org.elasticsearch.action.get.MultiGetRequest;
|
||||||
|
import org.elasticsearch.action.get.MultiGetResponse;
|
||||||
|
import org.elasticsearch.action.index.IndexRequest;
|
||||||
|
import org.elasticsearch.action.index.IndexResponse;
|
||||||
|
import org.elasticsearch.action.search.SearchRequest;
|
||||||
import org.elasticsearch.action.search.SearchResponse;
|
import org.elasticsearch.action.search.SearchResponse;
|
||||||
|
import org.elasticsearch.action.update.UpdateRequest;
|
||||||
|
import org.elasticsearch.action.update.UpdateResponse;
|
||||||
|
import org.elasticsearch.client.RequestOptions;
|
||||||
|
import org.elasticsearch.client.Requests;
|
||||||
import org.elasticsearch.client.RestClient;
|
import org.elasticsearch.client.RestClient;
|
||||||
import org.elasticsearch.client.RestClientBuilder;
|
import org.elasticsearch.client.RestClientBuilder;
|
||||||
import org.elasticsearch.client.RestHighLevelClient;
|
import org.elasticsearch.client.RestHighLevelClient;
|
||||||
import org.elasticsearch.search.SearchHit;
|
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||||
|
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||||
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
|
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||||
|
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||||
|
import org.elasticsearch.index.query.QueryBuilder;
|
||||||
|
import org.elasticsearch.index.query.QueryBuilders;
|
||||||
|
import org.elasticsearch.rest.RestStatus;
|
||||||
|
import org.elasticsearch.search.SearchHits;
|
||||||
|
import org.elasticsearch.search.builder.SearchSourceBuilder;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.function.BiConsumer;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* ES 工具类
|
* ES 工具类
|
||||||
|
@ -57,6 +92,51 @@ public class ElasticsearchUtil {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static BulkProcessor newAsyncBulkProcessor(RestHighLevelClient client) {
|
||||||
|
BulkProcessor.Listener listener = new BulkProcessor.Listener() {
|
||||||
|
@Override
|
||||||
|
public void beforeBulk(long executionId, BulkRequest request) {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
|
||||||
|
if (response.hasFailures()) {
|
||||||
|
log.error("Bulk [{}] executed with failures,response = {}", executionId,
|
||||||
|
response.buildFailureMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
|
||||||
|
}
|
||||||
|
};
|
||||||
|
BiConsumer<BulkRequest, ActionListener<BulkResponse>> bulkConsumer =
|
||||||
|
(request, bulkListener) -> client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener);
|
||||||
|
BulkProcessor bulkProcessor = BulkProcessor.builder(bulkConsumer, listener)
|
||||||
|
// 1000条数据请求执行一次bulk
|
||||||
|
.setBulkActions(1000)
|
||||||
|
// 5mb的数据刷新一次bulk
|
||||||
|
.setBulkSize(new ByteSizeValue(5L, ByteSizeUnit.MB))
|
||||||
|
// 并发请求数量, 0不并发, 1并发允许执行
|
||||||
|
.setConcurrentRequests(2)
|
||||||
|
// 固定1s必须刷新一次
|
||||||
|
.setFlushInterval(TimeValue.timeValueMillis(1000L))
|
||||||
|
// 重试3次,间隔100ms
|
||||||
|
.setBackoffPolicy(
|
||||||
|
BackoffPolicy.constantBackoff(TimeValue.timeValueMillis(200L),
|
||||||
|
3)).build();
|
||||||
|
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
|
||||||
|
try {
|
||||||
|
bulkProcessor.flush();
|
||||||
|
bulkProcessor.awaitClose(30, TimeUnit.SECONDS);
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("Failed to close bulkProcessor", e);
|
||||||
|
}
|
||||||
|
log.info("bulkProcessor closed!");
|
||||||
|
}));
|
||||||
|
return bulkProcessor;
|
||||||
|
}
|
||||||
|
|
||||||
public static HttpHost[] toHttpHostList(String hosts) {
|
public static HttpHost[] toHttpHostList(String hosts) {
|
||||||
if (StrUtil.isBlank(hosts)) {
|
if (StrUtil.isBlank(hosts)) {
|
||||||
return null;
|
return null;
|
||||||
|
@ -88,6 +168,240 @@ public class ElasticsearchUtil {
|
||||||
return restClientBuilder;
|
return restClientBuilder;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static <T extends EsEntity> String insert(RestHighLevelClient client, String index, String type, T entity)
|
||||||
|
throws IOException {
|
||||||
|
Map<String, Object> map = new HashMap<>();
|
||||||
|
BeanUtil.beanToMap(entity, map, CopyOptions.create().ignoreError());
|
||||||
|
XContentBuilder builder = XContentFactory.jsonBuilder();
|
||||||
|
builder.startObject();
|
||||||
|
for (Map.Entry<String, Object> entry : map.entrySet()) {
|
||||||
|
String key = entry.getKey();
|
||||||
|
Object value = entry.getValue();
|
||||||
|
builder.field(key, value);
|
||||||
|
}
|
||||||
|
builder.endObject();
|
||||||
|
|
||||||
|
IndexRequest request = new IndexRequest(index, type).source(builder);
|
||||||
|
if (entity.getDocId() != null) {
|
||||||
|
request.id(entity.getDocId());
|
||||||
|
}
|
||||||
|
|
||||||
|
IndexResponse response = client.index(request, RequestOptions.DEFAULT);
|
||||||
|
if (response != null && response.getResult() == DocWriteResponse.Result.CREATED) {
|
||||||
|
return response.getId();
|
||||||
|
} else {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static <T extends EsEntity> boolean batchInsert(RestHighLevelClient client, String index, String type,
|
||||||
|
Collection<T> list) throws IOException {
|
||||||
|
|
||||||
|
if (CollectionUtil.isEmpty(list)) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
BulkRequest bulkRequest = new BulkRequest();
|
||||||
|
for (T entity : list) {
|
||||||
|
Map<String, Object> map = ElasticsearchUtil.toMap(entity);
|
||||||
|
IndexRequest request = new IndexRequest(index, type).source(map);
|
||||||
|
if (entity.getDocId() != null) {
|
||||||
|
request.id(entity.getDocId());
|
||||||
|
}
|
||||||
|
bulkRequest.add(request);
|
||||||
|
}
|
||||||
|
|
||||||
|
BulkResponse response = client.bulk(bulkRequest, RequestOptions.DEFAULT);
|
||||||
|
return response != null && !response.hasFailures();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static <T extends EsEntity> void asyncBatchInsert(RestHighLevelClient client, String index, String type,
|
||||||
|
Collection<T> list, ActionListener<BulkResponse> listener) {
|
||||||
|
|
||||||
|
if (CollectionUtil.isEmpty(list)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
BulkRequest bulkRequest = new BulkRequest();
|
||||||
|
for (T entity : list) {
|
||||||
|
Map<String, Object> map = ElasticsearchUtil.toMap(entity);
|
||||||
|
IndexRequest request = new IndexRequest(index, type).source(map);
|
||||||
|
if (entity.getDocId() != null) {
|
||||||
|
request.id(entity.getDocId());
|
||||||
|
}
|
||||||
|
bulkRequest.add(request);
|
||||||
|
}
|
||||||
|
|
||||||
|
client.bulkAsync(bulkRequest, RequestOptions.DEFAULT, listener);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static <T extends EsEntity> boolean updateById(RestHighLevelClient client, String index, String type,
|
||||||
|
T entity) throws IOException {
|
||||||
|
|
||||||
|
if (entity == null || entity.getDocId() == null) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
Map<String, Object> map = toMap(entity);
|
||||||
|
UpdateRequest request = new UpdateRequest(index, type, entity.getDocId()).doc(map);
|
||||||
|
UpdateResponse response = client.update(request, RequestOptions.DEFAULT);
|
||||||
|
return response != null && response.getResult() == DocWriteResponse.Result.UPDATED;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static <T extends EsEntity> boolean batchUpdateById(RestHighLevelClient client, String index, String type,
|
||||||
|
Collection<T> list) throws IOException {
|
||||||
|
|
||||||
|
if (CollectionUtil.isEmpty(list)) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
BulkRequest bulkRequest = new BulkRequest();
|
||||||
|
for (T entity : list) {
|
||||||
|
if (entity == null || entity.getDocId() == null) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
Map<String, Object> map = ElasticsearchUtil.toMap(entity);
|
||||||
|
UpdateRequest request = new UpdateRequest(index, type, entity.getDocId()).doc(map);
|
||||||
|
bulkRequest.add(request);
|
||||||
|
}
|
||||||
|
|
||||||
|
BulkResponse response = client.bulk(bulkRequest, RequestOptions.DEFAULT);
|
||||||
|
return response != null && !response.hasFailures();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static <T extends EsEntity> void asyncBatchUpdateById(RestHighLevelClient client, String index,
|
||||||
|
String type, Collection<T> list, ActionListener<BulkResponse> listener) {
|
||||||
|
|
||||||
|
if (CollectionUtil.isEmpty(list)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
BulkRequest bulkRequest = new BulkRequest();
|
||||||
|
for (T entity : list) {
|
||||||
|
if (entity == null || entity.getDocId() == null) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
Map<String, Object> map = ElasticsearchUtil.toMap(entity);
|
||||||
|
UpdateRequest request = new UpdateRequest(index, type, entity.getDocId()).doc(map);
|
||||||
|
bulkRequest.add(request);
|
||||||
|
}
|
||||||
|
|
||||||
|
client.bulkAsync(bulkRequest, RequestOptions.DEFAULT, listener);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static boolean deleteById(RestHighLevelClient client, String index, String type, String id)
|
||||||
|
throws IOException {
|
||||||
|
return batchDeleteById(client, index, type, Collections.singleton(id));
|
||||||
|
}
|
||||||
|
|
||||||
|
public static boolean batchDeleteById(RestHighLevelClient client, String index, String type, Collection<String> ids)
|
||||||
|
throws IOException {
|
||||||
|
|
||||||
|
if (CollectionUtil.isEmpty(ids)) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
BulkRequest bulkRequest = new BulkRequest();
|
||||||
|
ids.forEach(id -> {
|
||||||
|
DeleteRequest deleteRequest = new DeleteRequest(index, type, id);
|
||||||
|
bulkRequest.add(deleteRequest);
|
||||||
|
});
|
||||||
|
|
||||||
|
BulkResponse response = client.bulk(bulkRequest, RequestOptions.DEFAULT);
|
||||||
|
return response != null && !response.hasFailures();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void asyncBatchDeleteById(RestHighLevelClient client, String index, String type,
|
||||||
|
Collection<String> ids, ActionListener<BulkResponse> listener) {
|
||||||
|
|
||||||
|
if (CollectionUtil.isEmpty(ids)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
BulkRequest bulkRequest = new BulkRequest();
|
||||||
|
ids.forEach(id -> {
|
||||||
|
DeleteRequest deleteRequest = new DeleteRequest(index, type, id);
|
||||||
|
bulkRequest.add(deleteRequest);
|
||||||
|
});
|
||||||
|
|
||||||
|
client.bulkAsync(bulkRequest, RequestOptions.DEFAULT, listener);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static SearchResponse getById(RestHighLevelClient client, String index, String type, String id)
|
||||||
|
throws IOException {
|
||||||
|
SearchRequest searchRequest = Requests.searchRequest(index).types(type);
|
||||||
|
QueryBuilder queryBuilder = QueryBuilders.idsQuery().addIds(id);
|
||||||
|
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
|
||||||
|
sourceBuilder.query(queryBuilder);
|
||||||
|
searchRequest.source(sourceBuilder);
|
||||||
|
return client.search(searchRequest, RequestOptions.DEFAULT);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static <T> T pojoById(RestHighLevelClient client, String index, String type, String id, Class<T> clazz)
|
||||||
|
throws IOException {
|
||||||
|
SearchResponse response = getById(client, index, type, id);
|
||||||
|
if (response == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
List<T> list = ElasticsearchUtil.toPojoList(response, clazz);
|
||||||
|
if (CollectionUtil.isEmpty(list)) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
return list.get(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static <T> List<T> pojoListByIds(RestHighLevelClient client, String index, String type,
|
||||||
|
Collection<String> ids, Class<T> clazz) throws IOException {
|
||||||
|
|
||||||
|
if (CollectionUtil.isEmpty(ids)) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
MultiGetRequest request = new MultiGetRequest();
|
||||||
|
for (String id : ids) {
|
||||||
|
request.add(new MultiGetRequest.Item(index, type, id));
|
||||||
|
}
|
||||||
|
|
||||||
|
MultiGetResponse multiGetResponse = client.mget(request, RequestOptions.DEFAULT);
|
||||||
|
if (null == multiGetResponse
|
||||||
|
|| multiGetResponse.getResponses() == null
|
||||||
|
|| multiGetResponse.getResponses().length <= 0) {
|
||||||
|
return new ArrayList<>();
|
||||||
|
}
|
||||||
|
|
||||||
|
List<T> list = new ArrayList<>();
|
||||||
|
for (MultiGetItemResponse itemResponse : multiGetResponse.getResponses()) {
|
||||||
|
if (itemResponse.isFailed()) {
|
||||||
|
log.error("通过id获取文档失败", itemResponse.getFailure().getFailure());
|
||||||
|
} else {
|
||||||
|
T entity = ElasticsearchUtil.toPojo(itemResponse.getResponse(), clazz);
|
||||||
|
if (entity != null) {
|
||||||
|
list.add(entity);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return list;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static <T> Page<T> pojoPage(RestHighLevelClient client, String index, String type,
|
||||||
|
SearchSourceBuilder builder, Class<T> clazz) throws IOException {
|
||||||
|
SearchResponse response = query(client, index, type, builder);
|
||||||
|
if (response == null || response.status() != RestStatus.OK) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
List<T> content = toPojoList(response, clazz);
|
||||||
|
SearchHits searchHits = response.getHits();
|
||||||
|
return new Page<>(searchHits.getTotalHits(), builder.from(), builder.size(), content);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static SearchResponse query(RestHighLevelClient client, String index, String type,
|
||||||
|
SearchSourceBuilder builder) throws IOException {
|
||||||
|
SearchRequest request = new SearchRequest(index).types(type);
|
||||||
|
request.source(builder);
|
||||||
|
return client.search(request, RequestOptions.DEFAULT);
|
||||||
|
}
|
||||||
|
|
||||||
public static <T> T toPojo(GetResponse response, Class<T> clazz) {
|
public static <T> T toPojo(GetResponse response, Class<T> clazz) {
|
||||||
if (null == response) {
|
if (null == response) {
|
||||||
return null;
|
return null;
|
||||||
|
@ -99,31 +413,22 @@ public class ElasticsearchUtil {
|
||||||
}
|
}
|
||||||
|
|
||||||
public static <T> List<T> toPojoList(SearchResponse response, Class<T> clazz) {
|
public static <T> List<T> toPojoList(SearchResponse response, Class<T> clazz) {
|
||||||
List<T> list = null;
|
|
||||||
try {
|
if (response == null || response.status() != RestStatus.OK) {
|
||||||
if (response != null) {
|
return new ArrayList<>();
|
||||||
SearchHit[] searchHits = response.getHits().getHits();
|
|
||||||
T entity;
|
|
||||||
list = new ArrayList<T>(searchHits.length);
|
|
||||||
for (SearchHit hit : searchHits) {
|
|
||||||
if (null == hit) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
entity = JSONUtil.toBean(hit.getSourceAsString(), clazz);
|
|
||||||
list.add(entity);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch (Exception e) {
|
|
||||||
log.error("解析ES返回结果异常, response:{}", response);
|
|
||||||
throw e;
|
|
||||||
}
|
}
|
||||||
return list;
|
|
||||||
|
if (ArrayUtil.isEmpty(response.getHits().getHits())) {
|
||||||
|
return new ArrayList<>();
|
||||||
|
}
|
||||||
|
|
||||||
|
return Stream.of(response.getHits().getHits())
|
||||||
|
.map(hit -> JSONUtil.toBean(hit.getSourceAsString(), clazz))
|
||||||
|
.collect(Collectors.toList());
|
||||||
}
|
}
|
||||||
|
|
||||||
public static <T> Map<String, Object> toMap(T entity) {
|
public static <T> Map<String, Object> toMap(T entity) {
|
||||||
Map<String, Object> map = new HashMap<>();
|
return JsonUtil.toMap(JsonUtil.toJson(entity));
|
||||||
BeanUtil.beanToMap(entity, map, CopyOptions.create().ignoreError());
|
|
||||||
return map;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,99 @@
|
||||||
|
package io.github.dunwu.javadb.elasticsearch.util;
|
||||||
|
|
||||||
|
import cn.hutool.core.util.StrUtil;
|
||||||
|
import com.fasterxml.jackson.annotation.JsonInclude;
|
||||||
|
import com.fasterxml.jackson.core.JsonParser;
|
||||||
|
import com.fasterxml.jackson.core.type.TypeReference;
|
||||||
|
import com.fasterxml.jackson.databind.DeserializationFeature;
|
||||||
|
import com.fasterxml.jackson.databind.JavaType;
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import com.fasterxml.jackson.databind.json.JsonMapper;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* JSON 工具类
|
||||||
|
*
|
||||||
|
* @author Zhang Peng
|
||||||
|
* @date 2023-06-29
|
||||||
|
*/
|
||||||
|
@Slf4j
|
||||||
|
public class JsonUtil {
|
||||||
|
|
||||||
|
private static final ObjectMapper MAPPER =
|
||||||
|
JsonMapper.builder()
|
||||||
|
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
|
||||||
|
.configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true)
|
||||||
|
.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true)
|
||||||
|
.serializationInclusion(JsonInclude.Include.ALWAYS)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
public static <T> List<T> toList(String json, Class<T> clazz) {
|
||||||
|
if (StrUtil.isBlank(json)) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
JavaType javaType = MAPPER.getTypeFactory().constructParametricType(List.class, clazz);
|
||||||
|
try {
|
||||||
|
return MAPPER.readValue(json, javaType);
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("反序列化失败!json: {}, msg: {}", json, e.getMessage());
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static <K, V> Map<K, V> toMap(String json) {
|
||||||
|
if (StrUtil.isBlank(json)) {
|
||||||
|
return new HashMap<>(0);
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
return MAPPER.readValue(json, new TypeReference<Map<K, V>>() { });
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("反序列化失败!json: {}, msg: {}", json, e.getMessage());
|
||||||
|
}
|
||||||
|
return Collections.emptyMap();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static <T> T toBean(String json, Class<T> clazz) {
|
||||||
|
if (StrUtil.isBlank(json)) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
return MAPPER.readValue(json, clazz);
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("反序列化失败!json: {}, msg: {}", json, e.getMessage());
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static <T> T toBean(String json, TypeReference<T> typeReference) {
|
||||||
|
if (StrUtil.isBlank(json)) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
return (T) MAPPER.readValue(json, typeReference);
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("反序列化失败!json: {}, msg: {}", json, e.getMessage());
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static <T> String toJson(T obj) {
|
||||||
|
if (obj == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
if (obj instanceof String) {
|
||||||
|
return (String) obj;
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
return MAPPER.writeValueAsString(obj);
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("序列化失败!obj: {}, msg: {}", obj, e.getMessage());
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue