diff --git a/codes/javadb/elasticsearch/elasticsearch6/pom.xml b/codes/javadb/elasticsearch/elasticsearch6/pom.xml index 759a461..ffd5de2 100644 --- a/codes/javadb/elasticsearch/elasticsearch6/pom.xml +++ b/codes/javadb/elasticsearch/elasticsearch6/pom.xml @@ -32,6 +32,11 @@ hutool-all 5.7.20 + + com.fasterxml.jackson.core + jackson-databind + 2.15.2 + ch.qos.logback diff --git a/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/Demo.java b/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/Demo.java index 29ec08b..6e7523f 100644 --- a/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/Demo.java +++ b/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/Demo.java @@ -2,15 +2,21 @@ package io.github.dunwu.javadb.elasticsearch; import io.github.dunwu.javadb.elasticsearch.entity.User; import io.github.dunwu.javadb.elasticsearch.mapper.UserEsMapper; +import io.github.dunwu.javadb.elasticsearch.util.ElasticsearchUtil; +import org.elasticsearch.client.RestHighLevelClient; import java.io.IOException; import java.util.Arrays; import java.util.List; - public class Demo { + + private static final String HOSTS = "127.0.0.1:9200"; + private static final RestHighLevelClient restHighLevelClient = ElasticsearchUtil.newRestHighLevelClient(HOSTS); + public static void main(String[] args) throws IOException, InterruptedException { - UserEsMapper mapper = new UserEsMapper(); + + UserEsMapper mapper = new UserEsMapper(restHighLevelClient); System.out.println("索引是否存在:" + mapper.isIndexExists()); @@ -24,6 +30,7 @@ public class Demo { System.out.println("根据ID批量查询:" + mapper.pojoListByIds(Arrays.asList("1", "2")).toString()); Thread.sleep(1000); - System.out.println("根据ID批量删除:" + mapper.deleteByIds(Arrays.asList("1", "2"))); + System.out.println("根据ID批量删除:" + mapper.batchDeleteById(Arrays.asList("1", "2"))); } + } diff --git a/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/entity/EsEntity.java b/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/entity/EsEntity.java index 8956485..df6d973 100644 --- a/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/entity/EsEntity.java +++ b/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/entity/EsEntity.java @@ -7,5 +7,10 @@ package io.github.dunwu.javadb.elasticsearch.entity; * @since 2023-06-28 */ public interface EsEntity { - Long getId(); + + /** + * 获取 ES 主键 + */ + String getDocId(); + } diff --git a/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/entity/Page.java b/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/entity/Page.java new file mode 100644 index 0000000..76f178f --- /dev/null +++ b/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/entity/Page.java @@ -0,0 +1,36 @@ +package io.github.dunwu.javadb.elasticsearch.entity; + +import lombok.Data; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +/** + * 分页实体 + * + * @author Zhang Peng + * @date 2023-06-28 + */ +@Data +public class Page { + + private long total; + private int page; + private int size; + private List content = new ArrayList<>(); + + public Page(long total, int page, int size) { + this.total = total; + this.page = page; + this.size = size; + } + + public Page(long total, int page, int size, Collection list) { + this.total = total; + this.page = page; + this.size = size; + this.content.addAll(list); + } + +} diff --git a/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/entity/User.java b/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/entity/User.java index f0eddd1..58428a1 100644 --- a/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/entity/User.java +++ b/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/entity/User.java @@ -12,10 +12,16 @@ import lombok.Data; @Data @Builder public class User implements EsEntity { + private Long id; private String username; private String password; private Integer age; private String email; + @Override + public String getDocId() { + return null; + } + } diff --git a/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/mapper/BaseEsMapper.java b/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/mapper/BaseEsMapper.java index 685fa05..4033b4e 100644 --- a/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/mapper/BaseEsMapper.java +++ b/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/mapper/BaseEsMapper.java @@ -1,45 +1,23 @@ package io.github.dunwu.javadb.elasticsearch.mapper; -import cn.hutool.core.bean.BeanUtil; -import cn.hutool.core.bean.copier.CopyOptions; -import cn.hutool.core.collection.CollectionUtil; -import cn.hutool.core.io.IoUtil; import cn.hutool.core.lang.Assert; import io.github.dunwu.javadb.elasticsearch.entity.EsEntity; +import io.github.dunwu.javadb.elasticsearch.entity.Page; import io.github.dunwu.javadb.elasticsearch.util.ElasticsearchUtil; import lombok.extern.slf4j.Slf4j; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.get.GetIndexRequest; -import org.elasticsearch.action.bulk.BackoffPolicy; import org.elasticsearch.action.bulk.BulkProcessor; -import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; -import org.elasticsearch.action.delete.DeleteRequest; -import org.elasticsearch.action.get.MultiGetItemResponse; -import org.elasticsearch.action.get.MultiGetRequest; -import org.elasticsearch.action.get.MultiGetResponse; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.index.IndexResponse; -import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.IndicesClient; import org.elasticsearch.client.RequestOptions; -import org.elasticsearch.client.Requests; import org.elasticsearch.client.RestHighLevelClient; -import org.elasticsearch.common.unit.ByteSizeUnit; -import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.xcontent.XContentBuilder; -import org.elasticsearch.common.xcontent.XContentFactory; -import org.elasticsearch.index.query.QueryBuilder; -import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.builder.SearchSourceBuilder; -import java.io.Closeable; import java.io.IOException; -import java.util.*; -import java.util.concurrent.TimeUnit; -import java.util.function.BiConsumer; +import java.util.Collection; +import java.util.List; /** * ES Mapper 基础类 @@ -48,13 +26,15 @@ import java.util.function.BiConsumer; * @date 2023-06-27 */ @Slf4j -public abstract class BaseEsMapper implements EsMapper, Closeable { - - public static final String HOSTS = "127.0.0.1:9200"; +public abstract class BaseEsMapper implements EsMapper { private BulkProcessor bulkProcessor; - private final RestHighLevelClient restHighLevelClient = ElasticsearchUtil.newRestHighLevelClient(HOSTS); + protected final RestHighLevelClient restHighLevelClient; + + public BaseEsMapper(RestHighLevelClient restHighLevelClient) { + this.restHighLevelClient = restHighLevelClient; + } @Override public RestHighLevelClient getClient() throws IOException { @@ -62,11 +42,10 @@ public abstract class BaseEsMapper implements EsMapper, C return restHighLevelClient; } - @Override - public synchronized BulkProcessor getBulkProcessor() { + public synchronized BulkProcessor getBulkProcessor() throws IOException { if (bulkProcessor == null) { - bulkProcessor = newAsyncBulkProcessor(); + bulkProcessor = ElasticsearchUtil.newAsyncBulkProcessor(getClient()); } return bulkProcessor; } @@ -75,176 +54,133 @@ public abstract class BaseEsMapper implements EsMapper, C public boolean isIndexExists() throws IOException { IndicesClient indicesClient = getClient().indices(); GetIndexRequest request = new GetIndexRequest(); - request.indices(getIndexAlias()); + request.indices(getIndex()); return indicesClient.exists(request, RequestOptions.DEFAULT); } @Override public SearchResponse getById(String id) throws IOException { - SearchRequest searchRequest = Requests.searchRequest(getIndexAlias()); - QueryBuilder queryBuilder = QueryBuilders.idsQuery().addIds(id); - SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); - sourceBuilder.query(queryBuilder); - searchRequest.source(sourceBuilder); - return getClient().search(searchRequest, RequestOptions.DEFAULT); + return ElasticsearchUtil.getById(getClient(), getIndex(), getType(), id); } @Override public T pojoById(String id) throws IOException { - SearchResponse response = getById(id); - if (response == null) { - return null; - } - List list = ElasticsearchUtil.toPojoList(response, getEntityClass()); - if (CollectionUtil.isEmpty(list)) { - return null; - } - return list.get(0); + return ElasticsearchUtil.pojoById(getClient(), getIndex(), getType(), id, getEntityClass()); } @Override public List pojoListByIds(Collection ids) throws IOException { + return ElasticsearchUtil.pojoListByIds(getClient(), getIndex(), getType(), ids, getEntityClass()); + } - if (CollectionUtil.isEmpty(ids)) { - return null; - } - - MultiGetRequest request = new MultiGetRequest(); - for (String id : ids) { - request.add(new MultiGetRequest.Item(getIndexAlias(), getIndexType(), id)); - } - - MultiGetResponse multiGetResponse = getClient().mget(request, RequestOptions.DEFAULT); - if (null == multiGetResponse || multiGetResponse.getResponses() == null || multiGetResponse.getResponses().length <= 0) { - return new ArrayList<>(); - } - - List list = new ArrayList<>(); - for (MultiGetItemResponse itemResponse : multiGetResponse.getResponses()) { - if (itemResponse.isFailed()) { - log.error("通过id获取文档失败", itemResponse.getFailure().getFailure()); - } else { - T entity = ElasticsearchUtil.toPojo(itemResponse.getResponse(), getEntityClass()); - if (entity != null) { - list.add(entity); - } - } - } - return list; + @Override + public Page pojoPage(SearchSourceBuilder builder) throws IOException { + return ElasticsearchUtil.pojoPage(getClient(), getIndex(), getType(), builder, getEntityClass()); } @Override public String insert(T entity) throws IOException { - Map map = new HashMap<>(); - BeanUtil.beanToMap(entity, map, CopyOptions.create().ignoreError()); - XContentBuilder builder = XContentFactory.jsonBuilder(); - builder.startObject(); - for (Map.Entry entry : map.entrySet()) { - String key = entry.getKey(); - Object value = entry.getValue(); - builder.field(key, value); - } - builder.endObject(); - - IndexRequest request = Requests.indexRequest(getIndexAlias()).type(getIndexType()).source(builder); - if (entity.getId() != null) { - request.id(entity.getId().toString()); - } - - IndexResponse response = getClient().index(request, RequestOptions.DEFAULT); - if (response == null) { - return null; - } - return response.getId(); + return ElasticsearchUtil.insert(getClient(), getIndex(), getType(), entity); } @Override public boolean batchInsert(Collection list) throws IOException { - - if (CollectionUtil.isEmpty(list)) { - return true; - } - - BulkRequest bulkRequest = new BulkRequest(); - for (T entity : list) { - Map map = ElasticsearchUtil.toMap(entity); - IndexRequest request = Requests.indexRequest(getIndexAlias()).type(getIndexType()).source(map); - if (entity.getId() != null) { - request.id(entity.getId().toString()); - } - bulkRequest.add(request); - } - - BulkResponse response = getClient().bulk(bulkRequest, RequestOptions.DEFAULT); - return !(response == null || response.hasFailures()); + return ElasticsearchUtil.batchInsert(getClient(), getIndex(), getType(), list); } @Override - public boolean deleteById(String id) throws IOException { - return deleteByIds(Collections.singleton(id)); - } - - @Override - public boolean deleteByIds(Collection ids) throws IOException { - - if (CollectionUtil.isEmpty(ids)) { - return true; - } - - BulkRequest bulkRequest = new BulkRequest(); - ids.forEach(id -> { - DeleteRequest deleteRequest = Requests.deleteRequest(getIndexAlias()).type(getIndexType()).id(id); - bulkRequest.add(deleteRequest); - }); - - BulkResponse response = getClient().bulk(bulkRequest, RequestOptions.DEFAULT); - return response != null && !response.hasFailures(); - } - - private BulkProcessor newAsyncBulkProcessor() { - BulkProcessor.Listener listener = new BulkProcessor.Listener() { + public void asyncBatchInsert(Collection list) throws IOException { + ActionListener listener = new ActionListener() { @Override - public void beforeBulk(long executionId, BulkRequest request) { - } - - @Override - public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { - if (response.hasFailures()) { - log.error("Bulk [{}] executed with failures,response = {}", executionId, response.buildFailureMessage()); + public void onResponse(BulkResponse response) { + if (response != null && !response.hasFailures()) { + log.info("【ES】异步批量插入成功!"); + } else { + log.warn("【ES】异步批量插入失败!"); } } @Override - public void afterBulk(long executionId, BulkRequest request, Throwable failure) { + public void onFailure(Exception e) { + log.error("【ES】异步批量插入异常!", e); } }; - BiConsumer> bulkConsumer = (request, bulkListener) -> restHighLevelClient.bulkAsync(request, RequestOptions.DEFAULT, bulkListener); - bulkProcessor = BulkProcessor.builder(bulkConsumer, listener) - // 1000条数据请求执行一次bulk - .setBulkActions(1000) - // 5mb的数据刷新一次bulk - .setBulkSize(new ByteSizeValue(5L, ByteSizeUnit.MB)) - // 并发请求数量, 0不并发, 1并发允许执行 - .setConcurrentRequests(2) - // 固定1s必须刷新一次 - .setFlushInterval(TimeValue.timeValueMillis(1000L)) - // 重试3次,间隔100ms - .setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueMillis(200L), 3)).build(); - Runtime.getRuntime().addShutdownHook(new Thread(() -> { - try { - bulkProcessor.flush(); - bulkProcessor.awaitClose(30, TimeUnit.SECONDS); - } catch (Exception e) { - log.error("Failed to close bulkProcessor", e); - } - log.info("bulkProcessor closed!"); - })); - return bulkProcessor; + asyncBatchInsert(list, listener); } @Override - public void close() { - IoUtil.close(restHighLevelClient); + public void asyncBatchInsert(Collection list, ActionListener listener) throws IOException { + ElasticsearchUtil.asyncBatchInsert(getClient(), getIndex(), getType(), list, listener); + } + + @Override + public boolean updateById(T entity) throws IOException { + return ElasticsearchUtil.updateById(getClient(), getIndex(), getType(), entity); + } + + @Override + public boolean batchUpdateById(Collection list) throws IOException { + return ElasticsearchUtil.batchUpdateById(getClient(), getIndex(), getType(), list); + } + + @Override + public void asyncBatchUpdateById(Collection list) throws IOException { + ActionListener listener = new ActionListener() { + @Override + public void onResponse(BulkResponse response) { + if (response != null && !response.hasFailures()) { + log.info("【ES】异步批量更新成功!"); + } else { + log.warn("【ES】异步批量更新失败!"); + } + } + + @Override + public void onFailure(Exception e) { + log.error("【ES】异步批量更新异常!", e); + } + }; + asyncBatchUpdateById(list, listener); + } + + @Override + public void asyncBatchUpdateById(Collection list, ActionListener listener) throws IOException { + ElasticsearchUtil.asyncBatchUpdateById(getClient(), getIndex(), getType(), list, listener); + } + + @Override + public boolean deleteById(String id) throws IOException { + return ElasticsearchUtil.deleteById(getClient(), getIndex(), getType(), id); + } + + @Override + public boolean batchDeleteById(Collection ids) throws IOException { + return ElasticsearchUtil.batchDeleteById(getClient(), getIndex(), getType(), ids); + } + + @Override + public void asyncBatchDeleteById(Collection ids) throws IOException { + ActionListener listener = new ActionListener() { + @Override + public void onResponse(BulkResponse response) { + if (response != null && !response.hasFailures()) { + log.info("【ES】异步批量删除成功!"); + } else { + log.warn("【ES】异步批量删除失败!ids: {}", ids); + } + } + + @Override + public void onFailure(Exception e) { + log.error("【ES】异步批量删除异常!ids: {}", ids, e); + } + }; + asyncBatchDeleteById(ids, listener); + } + + @Override + public void asyncBatchDeleteById(Collection ids, ActionListener listener) throws IOException { + ElasticsearchUtil.asyncBatchDeleteById(getClient(), getIndex(), getType(), ids, listener); } } diff --git a/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/mapper/EsMapper.java b/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/mapper/EsMapper.java index 0ab269c..fa01d7c 100644 --- a/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/mapper/EsMapper.java +++ b/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/mapper/EsMapper.java @@ -1,9 +1,13 @@ package io.github.dunwu.javadb.elasticsearch.mapper; import io.github.dunwu.javadb.elasticsearch.entity.EsEntity; +import io.github.dunwu.javadb.elasticsearch.entity.Page; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.bulk.BulkProcessor; +import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.search.builder.SearchSourceBuilder; import java.io.IOException; import java.util.Collection; @@ -17,20 +21,15 @@ import java.util.List; */ public interface EsMapper { - /** - * 获取索引别名 - */ - String getIndexAlias(); - /** * 获取索引名 */ - String getIndexName(); + String getIndex(); /** * 获取索引类型 */ - String getIndexType(); + String getType(); /** * 获取实体类型 @@ -39,7 +38,7 @@ public interface EsMapper { RestHighLevelClient getClient() throws IOException; - BulkProcessor getBulkProcessor(); + BulkProcessor getBulkProcessor() throws IOException; boolean isIndexExists() throws IOException; @@ -49,11 +48,30 @@ public interface EsMapper { List pojoListByIds(Collection ids) throws IOException; + Page pojoPage(SearchSourceBuilder builder) throws IOException; + String insert(T entity) throws IOException; boolean batchInsert(Collection list) throws IOException; + void asyncBatchInsert(Collection list) throws IOException; + + void asyncBatchInsert(Collection list, ActionListener listener) throws IOException; + + boolean updateById(T entity) throws IOException; + + boolean batchUpdateById(Collection list) throws IOException; + + void asyncBatchUpdateById(Collection list) throws IOException; + + void asyncBatchUpdateById(Collection list, ActionListener listener) throws IOException; + boolean deleteById(String id) throws IOException; - boolean deleteByIds(Collection ids) throws IOException; + boolean batchDeleteById(Collection ids) throws IOException; + + void asyncBatchDeleteById(Collection ids) throws IOException; + + void asyncBatchDeleteById(Collection ids, ActionListener listener) throws IOException; + } diff --git a/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/mapper/UserEsMapper.java b/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/mapper/UserEsMapper.java index c99c2ed..c60af8a 100644 --- a/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/mapper/UserEsMapper.java +++ b/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/mapper/UserEsMapper.java @@ -1,6 +1,7 @@ package io.github.dunwu.javadb.elasticsearch.mapper; import io.github.dunwu.javadb.elasticsearch.entity.User; +import org.elasticsearch.client.RestHighLevelClient; /** * User ES Mapper @@ -10,18 +11,17 @@ import io.github.dunwu.javadb.elasticsearch.entity.User; */ public class UserEsMapper extends BaseEsMapper { + public UserEsMapper(RestHighLevelClient restHighLevelClient) { + super(restHighLevelClient); + } + @Override - public String getIndexAlias() { + public String getIndex() { return "user"; } @Override - public String getIndexName() { - return "user"; - } - - @Override - public String getIndexType() { + public String getType() { return "_doc"; } diff --git a/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/util/ElasticsearchUtil.java b/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/util/ElasticsearchUtil.java index e3fa451..4865e05 100644 --- a/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/util/ElasticsearchUtil.java +++ b/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/util/ElasticsearchUtil.java @@ -3,22 +3,57 @@ package io.github.dunwu.javadb.elasticsearch.util; import cn.hutool.core.bean.BeanUtil; import cn.hutool.core.bean.copier.CopyOptions; import cn.hutool.core.collection.CollectionUtil; +import cn.hutool.core.util.ArrayUtil; import cn.hutool.core.util.StrUtil; import cn.hutool.json.JSONUtil; +import io.github.dunwu.javadb.elasticsearch.entity.EsEntity; +import io.github.dunwu.javadb.elasticsearch.entity.Page; import lombok.extern.slf4j.Slf4j; import org.apache.http.HttpHost; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.DocWriteResponse; +import org.elasticsearch.action.bulk.BackoffPolicy; +import org.elasticsearch.action.bulk.BulkProcessor; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.get.MultiGetItemResponse; +import org.elasticsearch.action.get.MultiGetRequest; +import org.elasticsearch.action.get.MultiGetResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.action.update.UpdateResponse; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.Requests; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestClientBuilder; import org.elasticsearch.client.RestHighLevelClient; -import org.elasticsearch.search.SearchHit; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.search.SearchHits; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; import java.util.stream.Collectors; +import java.util.stream.Stream; /** * ES 工具类 @@ -57,6 +92,51 @@ public class ElasticsearchUtil { } } + public static BulkProcessor newAsyncBulkProcessor(RestHighLevelClient client) { + BulkProcessor.Listener listener = new BulkProcessor.Listener() { + @Override + public void beforeBulk(long executionId, BulkRequest request) { + } + + @Override + public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { + if (response.hasFailures()) { + log.error("Bulk [{}] executed with failures,response = {}", executionId, + response.buildFailureMessage()); + } + } + + @Override + public void afterBulk(long executionId, BulkRequest request, Throwable failure) { + } + }; + BiConsumer> bulkConsumer = + (request, bulkListener) -> client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener); + BulkProcessor bulkProcessor = BulkProcessor.builder(bulkConsumer, listener) + // 1000条数据请求执行一次bulk + .setBulkActions(1000) + // 5mb的数据刷新一次bulk + .setBulkSize(new ByteSizeValue(5L, ByteSizeUnit.MB)) + // 并发请求数量, 0不并发, 1并发允许执行 + .setConcurrentRequests(2) + // 固定1s必须刷新一次 + .setFlushInterval(TimeValue.timeValueMillis(1000L)) + // 重试3次,间隔100ms + .setBackoffPolicy( + BackoffPolicy.constantBackoff(TimeValue.timeValueMillis(200L), + 3)).build(); + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + try { + bulkProcessor.flush(); + bulkProcessor.awaitClose(30, TimeUnit.SECONDS); + } catch (Exception e) { + log.error("Failed to close bulkProcessor", e); + } + log.info("bulkProcessor closed!"); + })); + return bulkProcessor; + } + public static HttpHost[] toHttpHostList(String hosts) { if (StrUtil.isBlank(hosts)) { return null; @@ -88,6 +168,240 @@ public class ElasticsearchUtil { return restClientBuilder; } + public static String insert(RestHighLevelClient client, String index, String type, T entity) + throws IOException { + Map map = new HashMap<>(); + BeanUtil.beanToMap(entity, map, CopyOptions.create().ignoreError()); + XContentBuilder builder = XContentFactory.jsonBuilder(); + builder.startObject(); + for (Map.Entry entry : map.entrySet()) { + String key = entry.getKey(); + Object value = entry.getValue(); + builder.field(key, value); + } + builder.endObject(); + + IndexRequest request = new IndexRequest(index, type).source(builder); + if (entity.getDocId() != null) { + request.id(entity.getDocId()); + } + + IndexResponse response = client.index(request, RequestOptions.DEFAULT); + if (response != null && response.getResult() == DocWriteResponse.Result.CREATED) { + return response.getId(); + } else { + return null; + } + } + + public static boolean batchInsert(RestHighLevelClient client, String index, String type, + Collection list) throws IOException { + + if (CollectionUtil.isEmpty(list)) { + return true; + } + + BulkRequest bulkRequest = new BulkRequest(); + for (T entity : list) { + Map map = ElasticsearchUtil.toMap(entity); + IndexRequest request = new IndexRequest(index, type).source(map); + if (entity.getDocId() != null) { + request.id(entity.getDocId()); + } + bulkRequest.add(request); + } + + BulkResponse response = client.bulk(bulkRequest, RequestOptions.DEFAULT); + return response != null && !response.hasFailures(); + } + + public static void asyncBatchInsert(RestHighLevelClient client, String index, String type, + Collection list, ActionListener listener) { + + if (CollectionUtil.isEmpty(list)) { + return; + } + + BulkRequest bulkRequest = new BulkRequest(); + for (T entity : list) { + Map map = ElasticsearchUtil.toMap(entity); + IndexRequest request = new IndexRequest(index, type).source(map); + if (entity.getDocId() != null) { + request.id(entity.getDocId()); + } + bulkRequest.add(request); + } + + client.bulkAsync(bulkRequest, RequestOptions.DEFAULT, listener); + } + + public static boolean updateById(RestHighLevelClient client, String index, String type, + T entity) throws IOException { + + if (entity == null || entity.getDocId() == null) { + return false; + } + + Map map = toMap(entity); + UpdateRequest request = new UpdateRequest(index, type, entity.getDocId()).doc(map); + UpdateResponse response = client.update(request, RequestOptions.DEFAULT); + return response != null && response.getResult() == DocWriteResponse.Result.UPDATED; + } + + public static boolean batchUpdateById(RestHighLevelClient client, String index, String type, + Collection list) throws IOException { + + if (CollectionUtil.isEmpty(list)) { + return true; + } + + BulkRequest bulkRequest = new BulkRequest(); + for (T entity : list) { + if (entity == null || entity.getDocId() == null) { + continue; + } + Map map = ElasticsearchUtil.toMap(entity); + UpdateRequest request = new UpdateRequest(index, type, entity.getDocId()).doc(map); + bulkRequest.add(request); + } + + BulkResponse response = client.bulk(bulkRequest, RequestOptions.DEFAULT); + return response != null && !response.hasFailures(); + } + + public static void asyncBatchUpdateById(RestHighLevelClient client, String index, + String type, Collection list, ActionListener listener) { + + if (CollectionUtil.isEmpty(list)) { + return; + } + + BulkRequest bulkRequest = new BulkRequest(); + for (T entity : list) { + if (entity == null || entity.getDocId() == null) { + continue; + } + Map map = ElasticsearchUtil.toMap(entity); + UpdateRequest request = new UpdateRequest(index, type, entity.getDocId()).doc(map); + bulkRequest.add(request); + } + + client.bulkAsync(bulkRequest, RequestOptions.DEFAULT, listener); + } + + public static boolean deleteById(RestHighLevelClient client, String index, String type, String id) + throws IOException { + return batchDeleteById(client, index, type, Collections.singleton(id)); + } + + public static boolean batchDeleteById(RestHighLevelClient client, String index, String type, Collection ids) + throws IOException { + + if (CollectionUtil.isEmpty(ids)) { + return true; + } + + BulkRequest bulkRequest = new BulkRequest(); + ids.forEach(id -> { + DeleteRequest deleteRequest = new DeleteRequest(index, type, id); + bulkRequest.add(deleteRequest); + }); + + BulkResponse response = client.bulk(bulkRequest, RequestOptions.DEFAULT); + return response != null && !response.hasFailures(); + } + + public static void asyncBatchDeleteById(RestHighLevelClient client, String index, String type, + Collection ids, ActionListener listener) { + + if (CollectionUtil.isEmpty(ids)) { + return; + } + + BulkRequest bulkRequest = new BulkRequest(); + ids.forEach(id -> { + DeleteRequest deleteRequest = new DeleteRequest(index, type, id); + bulkRequest.add(deleteRequest); + }); + + client.bulkAsync(bulkRequest, RequestOptions.DEFAULT, listener); + } + + public static SearchResponse getById(RestHighLevelClient client, String index, String type, String id) + throws IOException { + SearchRequest searchRequest = Requests.searchRequest(index).types(type); + QueryBuilder queryBuilder = QueryBuilders.idsQuery().addIds(id); + SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); + sourceBuilder.query(queryBuilder); + searchRequest.source(sourceBuilder); + return client.search(searchRequest, RequestOptions.DEFAULT); + } + + public static T pojoById(RestHighLevelClient client, String index, String type, String id, Class clazz) + throws IOException { + SearchResponse response = getById(client, index, type, id); + if (response == null) { + return null; + } + List list = ElasticsearchUtil.toPojoList(response, clazz); + if (CollectionUtil.isEmpty(list)) { + return null; + } + return list.get(0); + } + + public static List pojoListByIds(RestHighLevelClient client, String index, String type, + Collection ids, Class clazz) throws IOException { + + if (CollectionUtil.isEmpty(ids)) { + return null; + } + + MultiGetRequest request = new MultiGetRequest(); + for (String id : ids) { + request.add(new MultiGetRequest.Item(index, type, id)); + } + + MultiGetResponse multiGetResponse = client.mget(request, RequestOptions.DEFAULT); + if (null == multiGetResponse + || multiGetResponse.getResponses() == null + || multiGetResponse.getResponses().length <= 0) { + return new ArrayList<>(); + } + + List list = new ArrayList<>(); + for (MultiGetItemResponse itemResponse : multiGetResponse.getResponses()) { + if (itemResponse.isFailed()) { + log.error("通过id获取文档失败", itemResponse.getFailure().getFailure()); + } else { + T entity = ElasticsearchUtil.toPojo(itemResponse.getResponse(), clazz); + if (entity != null) { + list.add(entity); + } + } + } + return list; + } + + public static Page pojoPage(RestHighLevelClient client, String index, String type, + SearchSourceBuilder builder, Class clazz) throws IOException { + SearchResponse response = query(client, index, type, builder); + if (response == null || response.status() != RestStatus.OK) { + return null; + } + + List content = toPojoList(response, clazz); + SearchHits searchHits = response.getHits(); + return new Page<>(searchHits.getTotalHits(), builder.from(), builder.size(), content); + } + + public static SearchResponse query(RestHighLevelClient client, String index, String type, + SearchSourceBuilder builder) throws IOException { + SearchRequest request = new SearchRequest(index).types(type); + request.source(builder); + return client.search(request, RequestOptions.DEFAULT); + } + public static T toPojo(GetResponse response, Class clazz) { if (null == response) { return null; @@ -99,31 +413,22 @@ public class ElasticsearchUtil { } public static List toPojoList(SearchResponse response, Class clazz) { - List list = null; - try { - if (response != null) { - SearchHit[] searchHits = response.getHits().getHits(); - T entity; - list = new ArrayList(searchHits.length); - for (SearchHit hit : searchHits) { - if (null == hit) { - continue; - } - entity = JSONUtil.toBean(hit.getSourceAsString(), clazz); - list.add(entity); - } - } - } catch (Exception e) { - log.error("解析ES返回结果异常, response:{}", response); - throw e; + + if (response == null || response.status() != RestStatus.OK) { + return new ArrayList<>(); } - return list; + + if (ArrayUtil.isEmpty(response.getHits().getHits())) { + return new ArrayList<>(); + } + + return Stream.of(response.getHits().getHits()) + .map(hit -> JSONUtil.toBean(hit.getSourceAsString(), clazz)) + .collect(Collectors.toList()); } public static Map toMap(T entity) { - Map map = new HashMap<>(); - BeanUtil.beanToMap(entity, map, CopyOptions.create().ignoreError()); - return map; + return JsonUtil.toMap(JsonUtil.toJson(entity)); } } diff --git a/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/util/JsonUtil.java b/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/util/JsonUtil.java new file mode 100644 index 0000000..8106f4e --- /dev/null +++ b/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/util/JsonUtil.java @@ -0,0 +1,99 @@ +package io.github.dunwu.javadb.elasticsearch.util; + +import cn.hutool.core.util.StrUtil; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.JavaType; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.json.JsonMapper; +import lombok.extern.slf4j.Slf4j; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * JSON 工具类 + * + * @author Zhang Peng + * @date 2023-06-29 + */ +@Slf4j +public class JsonUtil { + + private static final ObjectMapper MAPPER = + JsonMapper.builder() + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) + .configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true) + .configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true) + .serializationInclusion(JsonInclude.Include.ALWAYS) + .build(); + + public static List toList(String json, Class clazz) { + if (StrUtil.isBlank(json)) { + return null; + } + JavaType javaType = MAPPER.getTypeFactory().constructParametricType(List.class, clazz); + try { + return MAPPER.readValue(json, javaType); + } catch (Exception e) { + log.error("反序列化失败!json: {}, msg: {}", json, e.getMessage()); + } + return null; + } + + public static Map toMap(String json) { + if (StrUtil.isBlank(json)) { + return new HashMap<>(0); + } + try { + return MAPPER.readValue(json, new TypeReference>() { }); + } catch (Exception e) { + log.error("反序列化失败!json: {}, msg: {}", json, e.getMessage()); + } + return Collections.emptyMap(); + } + + public static T toBean(String json, Class clazz) { + if (StrUtil.isBlank(json)) { + return null; + } + try { + return MAPPER.readValue(json, clazz); + } catch (Exception e) { + log.error("反序列化失败!json: {}, msg: {}", json, e.getMessage()); + } + return null; + } + + public static T toBean(String json, TypeReference typeReference) { + if (StrUtil.isBlank(json)) { + return null; + } + try { + return (T) MAPPER.readValue(json, typeReference); + } catch (Exception e) { + log.error("反序列化失败!json: {}, msg: {}", json, e.getMessage()); + } + return null; + } + + public static String toJson(T obj) { + if (obj == null) { + return null; + } + if (obj instanceof String) { + return (String) obj; + } + try { + return MAPPER.writeValueAsString(obj); + } catch (Exception e) { + log.error("序列化失败!obj: {}, msg: {}", obj, e.getMessage()); + } + return null; + } + +}