From 14b5853e4c6fec2e4e86388e93ecc1c48c8bffc8 Mon Sep 17 00:00:00 2001 From: dunwu Date: Wed, 21 Feb 2024 22:32:55 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=9B=B4=E6=96=B0=20elasticsearch=20?= =?UTF-8?q?=E7=A4=BA=E4=BE=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../elasticsearch/elasticsearch6/pom.xml | 50 ++- .../dunwu/javadb/elasticsearch/Demo.java | 11 +- .../elasticsearch/ElasticsearchFactory.java | 166 +++++++++ ...chUtil.java => ElasticsearchTemplate.java} | 326 ++++++++++-------- .../config/ElasticsearchConfig.java | 30 ++ .../config/EnableElasticsearch.java | 26 ++ .../elasticsearch/entity/BaseEsEntity.java | 27 ++ .../javadb/elasticsearch/entity/EsEntity.java | 16 - .../javadb/elasticsearch/entity/Page.java | 4 +- .../javadb/elasticsearch/entity/User.java | 10 +- .../elasticsearch/mapper/BaseEsMapper.java | 108 +++--- .../javadb/elasticsearch/mapper/EsMapper.java | 45 ++- .../elasticsearch/mapper/UserEsMapper.java | 8 +- .../javadb/elasticsearch/util/JsonUtil.java | 2 +- .../elasticsearch/BaseApplicationTests.java | 36 ++ .../ElasticsearchTemplateTest.java | 193 +++++++++++ .../javadb/elasticsearch/TestApplication.java | 20 ++ .../mapper/UserEsMapperTest.java | 191 ++++++++++ 18 files changed, 1010 insertions(+), 259 deletions(-) create mode 100644 codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/ElasticsearchFactory.java rename codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/{util/ElasticsearchUtil.java => ElasticsearchTemplate.java} (53%) create mode 100644 codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/config/ElasticsearchConfig.java create mode 100644 codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/config/EnableElasticsearch.java create mode 100644 codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/entity/BaseEsEntity.java delete mode 100644 codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/entity/EsEntity.java create mode 100644 codes/javadb/elasticsearch/elasticsearch6/src/test/java/io/github/dunwu/javadb/elasticsearch/BaseApplicationTests.java create mode 100644 codes/javadb/elasticsearch/elasticsearch6/src/test/java/io/github/dunwu/javadb/elasticsearch/ElasticsearchTemplateTest.java create mode 100644 codes/javadb/elasticsearch/elasticsearch6/src/test/java/io/github/dunwu/javadb/elasticsearch/TestApplication.java create mode 100644 codes/javadb/elasticsearch/elasticsearch6/src/test/java/io/github/dunwu/javadb/elasticsearch/mapper/UserEsMapperTest.java diff --git a/codes/javadb/elasticsearch/elasticsearch6/pom.xml b/codes/javadb/elasticsearch/elasticsearch6/pom.xml index ffd5de2..b9d8859 100644 --- a/codes/javadb/elasticsearch/elasticsearch6/pom.xml +++ b/codes/javadb/elasticsearch/elasticsearch6/pom.xml @@ -1,8 +1,14 @@ + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 4.0.0 + o + org.springframework.boot + spring-boot-starter-parent + 2.7.7 + + io.github.dunwu javadb-elasticsearch6 1.0.0 @@ -17,37 +23,47 @@ + + org.springframework.boot + spring-boot-starter-aop + + + org.springframework.boot + spring-boot-starter-web + org.elasticsearch.client elasticsearch-rest-high-level-client - 6.4.3 org.projectlombok lombok - 1.18.22 cn.hutool hutool-all - 5.7.20 - - - com.fasterxml.jackson.core - jackson-databind - 2.15.2 + 5.8.8 - ch.qos.logback - logback-classic - 1.2.10 - - - org.apache.logging.log4j - log4j-to-slf4j - 2.17.1 + org.springframework.boot + spring-boot-starter-test + test + + + + org.elasticsearch.client + elasticsearch-rest-high-level-client + 6.4.3 + + + org.elasticsearch + elasticsearch + 6.4.3 + + + diff --git a/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/Demo.java b/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/Demo.java index 6e7523f..3f97a58 100644 --- a/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/Demo.java +++ b/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/Demo.java @@ -2,8 +2,6 @@ package io.github.dunwu.javadb.elasticsearch; import io.github.dunwu.javadb.elasticsearch.entity.User; import io.github.dunwu.javadb.elasticsearch.mapper.UserEsMapper; -import io.github.dunwu.javadb.elasticsearch.util.ElasticsearchUtil; -import org.elasticsearch.client.RestHighLevelClient; import java.io.IOException; import java.util.Arrays; @@ -11,12 +9,13 @@ import java.util.List; public class Demo { - private static final String HOSTS = "127.0.0.1:9200"; - private static final RestHighLevelClient restHighLevelClient = ElasticsearchUtil.newRestHighLevelClient(HOSTS); + private static final String env = "test"; + private static final ElasticsearchTemplate elasticsearchTemplate + = ElasticsearchFactory.newElasticsearchTemplate(env); public static void main(String[] args) throws IOException, InterruptedException { - UserEsMapper mapper = new UserEsMapper(restHighLevelClient); + UserEsMapper mapper = new UserEsMapper(elasticsearchTemplate); System.out.println("索引是否存在:" + mapper.isIndexExists()); @@ -24,7 +23,7 @@ public class Demo { User tom = User.builder().id(2L).username("tom").age(20).build(); List users = Arrays.asList(jack, tom); - System.out.println("批量插入:" + mapper.batchInsert(users)); + System.out.println("批量插入:" + mapper.batchSave(users)); System.out.println("根据ID查询:" + mapper.getById("1").toString()); System.out.println("根据ID查询:" + mapper.pojoById("2").toString()); System.out.println("根据ID批量查询:" + mapper.pojoListByIds(Arrays.asList("1", "2")).toString()); diff --git a/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/ElasticsearchFactory.java b/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/ElasticsearchFactory.java new file mode 100644 index 0000000..8582fdc --- /dev/null +++ b/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/ElasticsearchFactory.java @@ -0,0 +1,166 @@ +package io.github.dunwu.javadb.elasticsearch; + +import cn.hutool.core.collection.CollectionUtil; +import cn.hutool.core.util.ArrayUtil; +import cn.hutool.core.util.StrUtil; +import lombok.extern.slf4j.Slf4j; +import org.apache.http.HttpHost; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestClientBuilder; +import org.elasticsearch.client.RestHighLevelClient; + +import java.util.List; +import java.util.stream.Collectors; + +/** + * Elasticsearch 客户端实例工厂 + * + * @author Zhang Peng + * @date 2024-02-07 + */ +@Slf4j +public class ElasticsearchFactory { + + public static int CONNECT_TIMEOUT_MILLIS = 1000; + + public static int SOCKET_TIMEOUT_MILLIS = 30000; + + public static int CONNECTION_REQUEST_TIMEOUT_MILLIS = 500; + + public static int MAX_CONN_TOTAL = 30; + + public static int MAX_CONN_PER_ROUTE = 10; + + public static RestClient newRestClient() { + // 从配置中心读取环境变量 + String env = "test"; + return newRestClient(env); + } + + public static RestClient newRestClient(String env) { + String hosts = getDefaultEsAddress(env); + return newRestClient(toHttpHostList(hosts)); + } + + public static RestClient newRestClient(HttpHost[] httpHosts) { + RestClientBuilder builder = getRestClientBuilder(httpHosts); + if (builder == null) { + return null; + } + try { + return builder.build(); + } catch (Exception e) { + log.error("【ES】connect failed.", e); + return null; + } + } + + public static RestHighLevelClient newRestHighLevelClient() { + // 从配置中心读取环境变量 + String env = "test"; + return newRestHighLevelClient(env); + } + + public static RestHighLevelClient newRestHighLevelClient(String env) { + String hosts = getDefaultEsAddress(env); + return newRestHighLevelClient(toHttpHostList(hosts)); + } + + public static RestHighLevelClient newRestHighLevelClient(HttpHost[] httpHosts) { + RestClientBuilder builder = getRestClientBuilder(httpHosts); + if (builder == null) { + return null; + } + try { + return new RestHighLevelClient(builder); + } catch (Exception e) { + log.error("【ES】connect failed.", e); + return null; + } + } + + public static ElasticsearchTemplate newElasticsearchTemplate() { + // 从配置中心读取环境变量 + String env = "test"; + return newElasticsearchTemplate(env); + } + + public static ElasticsearchTemplate newElasticsearchTemplate(String env) { + String hosts = getDefaultEsAddress(env); + return newElasticsearchTemplate(toHttpHostList(hosts)); + } + + public static ElasticsearchTemplate newElasticsearchTemplate(HttpHost[] httpHosts) { + RestHighLevelClient client = newRestHighLevelClient(httpHosts); + if (client == null) { + return null; + } + return new ElasticsearchTemplate(client); + } + + public static ElasticsearchTemplate newElasticsearchTemplate(RestHighLevelClient client) { + if (client == null) { + return null; + } + return new ElasticsearchTemplate(client); + } + + public static RestClientBuilder getRestClientBuilder(HttpHost[] httpHosts) { + if (ArrayUtil.isEmpty(httpHosts)) { + log.error("【ES】connect failed. hosts are empty."); + return null; + } + RestClientBuilder restClientBuilder = RestClient.builder(httpHosts); + restClientBuilder.setRequestConfigCallback(builder -> { + builder.setConnectTimeout(CONNECT_TIMEOUT_MILLIS); + builder.setSocketTimeout(SOCKET_TIMEOUT_MILLIS); + builder.setConnectionRequestTimeout(CONNECTION_REQUEST_TIMEOUT_MILLIS); + return builder; + }); + restClientBuilder.setHttpClientConfigCallback(builder -> { + builder.setMaxConnTotal(MAX_CONN_TOTAL); + builder.setMaxConnPerRoute(MAX_CONN_PER_ROUTE); + return builder; + }); + return restClientBuilder; + } + + private static HttpHost[] toHttpHostList(String hosts) { + if (StrUtil.isBlank(hosts)) { + return null; + } + List strList = StrUtil.split(hosts, ","); + List list = strList.stream().map(str -> { + List params = StrUtil.split(str, ":"); + return new HttpHost(params.get(0), Integer.parseInt(params.get(1)), "http"); + }).collect(Collectors.toList()); + if (CollectionUtil.isEmpty(list)) { + return new HttpHost[0]; + } + return list.toArray(new HttpHost[0]); + } + + public static String getDefaultEsAddress() { + // 从配置中心读取环境变量 + String env = "test"; + return getDefaultEsAddress(env); + } + + private static String getDefaultEsAddress(String env) { + String defaultAddress; + switch (env) { + case "prd": + defaultAddress = "127.0.0.1:9200,127.0.0.2:9200,127.0.0.3:9200"; + break; + case "pre": + defaultAddress = "127.0.0.1:9200"; + break; + case "test": + default: + defaultAddress = "127.0.0.1:9200"; + break; + } + return defaultAddress; + } + +} diff --git a/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/util/ElasticsearchUtil.java b/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/ElasticsearchTemplate.java similarity index 53% rename from codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/util/ElasticsearchUtil.java rename to codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/ElasticsearchTemplate.java index e335208..cbe281a 100644 --- a/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/util/ElasticsearchUtil.java +++ b/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/ElasticsearchTemplate.java @@ -1,13 +1,14 @@ -package io.github.dunwu.javadb.elasticsearch.util; +package io.github.dunwu.javadb.elasticsearch; import cn.hutool.core.collection.CollectionUtil; +import cn.hutool.core.io.IoUtil; +import cn.hutool.core.map.MapUtil; import cn.hutool.core.util.ArrayUtil; import cn.hutool.core.util.StrUtil; -import cn.hutool.json.JSONUtil; -import io.github.dunwu.javadb.elasticsearch.entity.EsEntity; +import io.github.dunwu.javadb.elasticsearch.entity.BaseEsEntity; import io.github.dunwu.javadb.elasticsearch.entity.Page; +import io.github.dunwu.javadb.elasticsearch.util.JsonUtil; import lombok.extern.slf4j.Slf4j; -import org.apache.http.HttpHost; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.bulk.BackoffPolicy; @@ -15,6 +16,7 @@ import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.delete.DeleteRequest; +import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.get.MultiGetItemResponse; import org.elasticsearch.action.get.MultiGetRequest; @@ -23,28 +25,26 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.RequestOptions; -import org.elasticsearch.client.Requests; -import org.elasticsearch.client.RestClient; -import org.elasticsearch.client.RestClientBuilder; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.index.query.QueryBuilder; -import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.builder.SearchSourceBuilder; +import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; import java.util.stream.Collectors; @@ -57,37 +57,19 @@ import java.util.stream.Stream; * @date 2023-06-27 */ @Slf4j -public class ElasticsearchUtil { +public class ElasticsearchTemplate implements Closeable { - public static int CONNECT_TIMEOUT_MILLIS = 1000; - public static int SOCKET_TIMEOUT_MILLIS = 30000; - public static int CONNECTION_REQUEST_TIMEOUT_MILLIS = 500; - public static int MAX_CONN_TOTAL = 30; - public static int MAX_CONN_PER_ROUTE = 10; + private final RestHighLevelClient client; - public static RestClient newRestClient(String hosts) { - HttpHost[] httpHosts = toHttpHostList(hosts); - RestClientBuilder builder = builder(httpHosts); - try { - return builder.build(); - } catch (Exception e) { - log.error("【ES】connect failed.", e); - return null; - } + public ElasticsearchTemplate(RestHighLevelClient client) { + this.client = client; } - public static RestHighLevelClient newRestHighLevelClient(String hosts) { - HttpHost[] httpHosts = toHttpHostList(hosts); - RestClientBuilder builder = builder(httpHosts); - try { - return new RestHighLevelClient(builder); - } catch (Exception e) { - log.error("【ES】connect failed.", e); - return null; - } + public RestHighLevelClient getClient() { + return client; } - public static BulkProcessor newAsyncBulkProcessor(RestHighLevelClient client) { + public BulkProcessor newAsyncBulkProcessor() { BulkProcessor.Listener listener = new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) { @@ -96,7 +78,7 @@ public class ElasticsearchUtil { @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { if (response.hasFailures()) { - log.error("Bulk [{}] executed with failures,response = {}", executionId, + log.error("【ES】Bulk [{}] executed with failures,response = {}", executionId, response.buildFailureMessage()); } } @@ -105,90 +87,87 @@ public class ElasticsearchUtil { public void afterBulk(long executionId, BulkRequest request, Throwable failure) { } }; + + int bulkTimeout = 30; + int bulkActions = 1000; + int bulkSize = 5; + int concurrentRequests = 2; + int flushInterval = 1000; + int retryInterval = 100; + int retryLimit = 3; BiConsumer> bulkConsumer = (request, bulkListener) -> client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener); + BackoffPolicy backoffPolicy = + BackoffPolicy.constantBackoff(TimeValue.timeValueMillis(retryInterval), retryLimit); BulkProcessor bulkProcessor = BulkProcessor.builder(bulkConsumer, listener) // 1000条数据请求执行一次bulk - .setBulkActions(1000) + .setBulkActions(bulkActions) // 5mb的数据刷新一次bulk - .setBulkSize(new ByteSizeValue(5L, ByteSizeUnit.MB)) + .setBulkSize(new ByteSizeValue(bulkSize, ByteSizeUnit.MB)) // 并发请求数量, 0不并发, 1并发允许执行 - .setConcurrentRequests(2) - // 固定1s必须刷新一次 - .setFlushInterval(TimeValue.timeValueMillis(1000L)) - // 重试3次,间隔100ms - .setBackoffPolicy( - BackoffPolicy.constantBackoff(TimeValue.timeValueMillis(200L), - 3)).build(); + .setConcurrentRequests(concurrentRequests) + // 刷新间隔时间 + .setFlushInterval(TimeValue.timeValueMillis(flushInterval)) + // 重试次数、间隔时间 + .setBackoffPolicy(backoffPolicy).build(); Runtime.getRuntime().addShutdownHook(new Thread(() -> { try { bulkProcessor.flush(); - bulkProcessor.awaitClose(30, TimeUnit.SECONDS); + bulkProcessor.awaitClose(bulkTimeout, TimeUnit.SECONDS); } catch (Exception e) { - log.error("Failed to close bulkProcessor", e); + log.error("【ES】Failed to close bulkProcessor", e); } - log.info("bulkProcessor closed!"); + log.info("【ES】bulkProcessor closed!"); })); return bulkProcessor; } - public static HttpHost[] toHttpHostList(String hosts) { - if (StrUtil.isBlank(hosts)) { + public T save(String index, String type, T entity) throws IOException { + + if (entity == null) { + log.warn("【ES】save 实体为空!"); return null; } - List strList = StrUtil.split(hosts, ","); - List list = strList.stream().map(str -> { - List params = StrUtil.split(str, ":"); - return new HttpHost(params.get(0), Integer.parseInt(params.get(1)), "http"); - }).collect(Collectors.toList()); - if (CollectionUtil.isEmpty(list)) { - return new HttpHost[0]; - } - return list.toArray(new HttpHost[0]); - } - public static RestClientBuilder builder(HttpHost[] httpHosts) { - RestClientBuilder restClientBuilder = RestClient.builder(httpHosts); - restClientBuilder.setRequestConfigCallback(builder -> { - builder.setConnectTimeout(CONNECT_TIMEOUT_MILLIS); - builder.setSocketTimeout(SOCKET_TIMEOUT_MILLIS); - builder.setConnectionRequestTimeout(CONNECTION_REQUEST_TIMEOUT_MILLIS); - return builder; - }); - restClientBuilder.setHttpClientConfigCallback(builder -> { - builder.setMaxConnTotal(MAX_CONN_TOTAL); - builder.setMaxConnPerRoute(MAX_CONN_PER_ROUTE); - return builder; - }); - return restClientBuilder; - } - - public static String insert(RestHighLevelClient client, String index, String type, T entity) - throws IOException { Map map = toMap(entity); + if (MapUtil.isEmpty(map)) { + log.warn("【ES】save 实体数据为空!"); + return null; + } + IndexRequest request = new IndexRequest(index, type).source(map); if (entity.getDocId() != null) { request.id(entity.getDocId()); } - IndexResponse response = client.index(request, RequestOptions.DEFAULT); - if (response != null && response.getResult() == DocWriteResponse.Result.CREATED) { - return response.getId(); + if (response == null) { + log.warn("【ES】save 响应结果为空!"); + return null; + } + + if (response.getResult() == DocWriteResponse.Result.CREATED + || response.getResult() == DocWriteResponse.Result.UPDATED) { + return entity; } else { + log.warn("【ES】save 响应结果无效!result: {}", response.getResult()); return null; } } - public static boolean batchInsert(RestHighLevelClient client, String index, String type, - Collection list) throws IOException { + public boolean batchSave(String index, String type, Collection list) + throws IOException { if (CollectionUtil.isEmpty(list)) { return true; } BulkRequest bulkRequest = new BulkRequest(); + bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); for (T entity : list) { - Map map = ElasticsearchUtil.toMap(entity); + Map map = toMap(entity); + if (MapUtil.isEmpty(map)) { + continue; + } IndexRequest request = new IndexRequest(index, type).source(map); if (entity.getDocId() != null) { request.id(entity.getDocId()); @@ -200,16 +179,20 @@ public class ElasticsearchUtil { return response != null && !response.hasFailures(); } - public static void asyncBatchInsert(RestHighLevelClient client, String index, String type, - Collection list, ActionListener listener) { + public void asyncBatchSave(String index, String type, Collection list, + ActionListener listener) { if (CollectionUtil.isEmpty(list)) { return; } BulkRequest bulkRequest = new BulkRequest(); + bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); for (T entity : list) { - Map map = ElasticsearchUtil.toMap(entity); + Map map = toMap(entity); + if (MapUtil.isEmpty(map)) { + continue; + } IndexRequest request = new IndexRequest(index, type).source(map); if (entity.getDocId() != null) { request.id(entity.getDocId()); @@ -220,84 +203,107 @@ public class ElasticsearchUtil { client.bulkAsync(bulkRequest, RequestOptions.DEFAULT, listener); } - public static boolean updateById(RestHighLevelClient client, String index, String type, - T entity) throws IOException { + public T updateById(String index, String type, T entity) throws IOException { - if (entity == null || entity.getDocId() == null) { - return false; + if (entity == null) { + log.warn("【ES】updateById 实体为空!"); + return null; + } + + if (entity.getDocId() == null) { + log.warn("【ES】updateById docId 为空!"); + return null; } Map map = toMap(entity); + if (MapUtil.isEmpty(map)) { + log.warn("【ES】updateById 实体数据为空!"); + return null; + } + UpdateRequest request = new UpdateRequest(index, type, entity.getDocId()).doc(map); UpdateResponse response = client.update(request, RequestOptions.DEFAULT); - return response != null && response.getResult() == DocWriteResponse.Result.UPDATED; + if (response == null) { + log.warn("【ES】updateById 响应结果为空!"); + return null; + } + + if (response.getResult() == DocWriteResponse.Result.UPDATED) { + return entity; + } else { + log.warn("【ES】updateById 响应结果无效!result: {}", response.getResult()); + return null; + } } - public static boolean batchUpdateById(RestHighLevelClient client, String index, String type, - Collection list) throws IOException { + public boolean batchUpdateById(String index, String type, Collection list) + throws IOException { if (CollectionUtil.isEmpty(list)) { return true; } - BulkRequest bulkRequest = new BulkRequest(); - for (T entity : list) { - if (entity == null || entity.getDocId() == null) { - continue; - } - Map map = ElasticsearchUtil.toMap(entity); - UpdateRequest request = new UpdateRequest(index, type, entity.getDocId()).doc(map); - bulkRequest.add(request); - } - + BulkRequest bulkRequest = toUpdateBulkRequest(index, type, list); BulkResponse response = client.bulk(bulkRequest, RequestOptions.DEFAULT); return response != null && !response.hasFailures(); } - public static void asyncBatchUpdateById(RestHighLevelClient client, String index, - String type, Collection list, ActionListener listener) { + public void asyncBatchUpdateById(String index, String type, Collection list, + ActionListener listener) { if (CollectionUtil.isEmpty(list)) { return; } + BulkRequest bulkRequest = toUpdateBulkRequest(index, type, list); + client.bulkAsync(bulkRequest, RequestOptions.DEFAULT, listener); + } + + private BulkRequest toUpdateBulkRequest(String index, String type, Collection list) { BulkRequest bulkRequest = new BulkRequest(); + bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); for (T entity : list) { if (entity == null || entity.getDocId() == null) { continue; } - Map map = ElasticsearchUtil.toMap(entity); + Map map = toMap(entity); + if (MapUtil.isEmpty(map)) { + continue; + } UpdateRequest request = new UpdateRequest(index, type, entity.getDocId()).doc(map); bulkRequest.add(request); } - - client.bulkAsync(bulkRequest, RequestOptions.DEFAULT, listener); + return bulkRequest; } - public static boolean deleteById(RestHighLevelClient client, String index, String type, String id) - throws IOException { - return batchDeleteById(client, index, type, Collections.singleton(id)); + public boolean deleteById(String index, String type, String id) throws IOException { + return batchDeleteById(index, type, Collections.singleton(id)); } - public static boolean batchDeleteById(RestHighLevelClient client, String index, String type, Collection ids) - throws IOException { + public boolean batchDeleteById(String index, String type, Collection ids) throws IOException { if (CollectionUtil.isEmpty(ids)) { return true; } BulkRequest bulkRequest = new BulkRequest(); - ids.forEach(id -> { + bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + ids.stream().filter(Objects::nonNull).forEach(id -> { DeleteRequest deleteRequest = new DeleteRequest(index, type, id); bulkRequest.add(deleteRequest); }); BulkResponse response = client.bulk(bulkRequest, RequestOptions.DEFAULT); - return response != null && !response.hasFailures(); + if (response == null) { + log.warn("【ES】batchDeleteById 响应结果为空!"); + return false; + } + + return !response.hasFailures(); } - public static void asyncBatchDeleteById(RestHighLevelClient client, String index, String type, - Collection ids, ActionListener listener) { + public void asyncBatchDeleteById(String index, String type, Collection ids, + ActionListener listener) { if (CollectionUtil.isEmpty(ids)) { return; @@ -312,31 +318,32 @@ public class ElasticsearchUtil { client.bulkAsync(bulkRequest, RequestOptions.DEFAULT, listener); } - public static SearchResponse getById(RestHighLevelClient client, String index, String type, String id) - throws IOException { - SearchRequest searchRequest = Requests.searchRequest(index).types(type); - QueryBuilder queryBuilder = QueryBuilders.idsQuery().addIds(id); - SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); - sourceBuilder.query(queryBuilder); - searchRequest.source(sourceBuilder); - return client.search(searchRequest, RequestOptions.DEFAULT); + public GetResponse getById(String index, String type, String id) throws IOException { + return getById(index, type, id, null); } - public static T pojoById(RestHighLevelClient client, String index, String type, String id, Class clazz) - throws IOException { - SearchResponse response = getById(client, index, type, id); + public GetResponse getById(String index, String type, String id, Long version) throws IOException { + GetRequest getRequest = new GetRequest(index, type, id); + if (version != null) { + getRequest.version(version); + } + return client.get(getRequest, RequestOptions.DEFAULT); + } + + public T pojoById(String index, String type, String id, Class clazz) throws IOException { + return pojoById(index, type, id, null, clazz); + } + + public T pojoById(String index, String type, String id, Long version, Class clazz) throws IOException { + GetResponse response = getById(index, type, id, version); if (response == null) { return null; } - List list = ElasticsearchUtil.toPojoList(response, clazz); - if (CollectionUtil.isEmpty(list)) { - return null; - } - return list.get(0); + return toPojo(response, clazz); } - public static List pojoListByIds(RestHighLevelClient client, String index, String type, - Collection ids, Class clazz) throws IOException { + public List pojoListByIds(String index, String type, Collection ids, Class clazz) + throws IOException { if (CollectionUtil.isEmpty(ids)) { return null; @@ -359,7 +366,7 @@ public class ElasticsearchUtil { if (itemResponse.isFailed()) { log.error("通过id获取文档失败", itemResponse.getFailure().getFailure()); } else { - T entity = ElasticsearchUtil.toPojo(itemResponse.getResponse(), clazz); + T entity = toPojo(itemResponse.getResponse(), clazz); if (entity != null) { list.add(entity); } @@ -368,36 +375,51 @@ public class ElasticsearchUtil { return list; } - public static Page pojoPage(RestHighLevelClient client, String index, String type, - SearchSourceBuilder builder, Class clazz) throws IOException { - SearchResponse response = query(client, index, type, builder); + public Page pojoPage(String index, String type, SearchSourceBuilder builder, Class clazz) + throws IOException { + SearchResponse response = query(index, type, builder); if (response == null || response.status() != RestStatus.OK) { return null; } List content = toPojoList(response, clazz); SearchHits searchHits = response.getHits(); - return new Page<>(searchHits.getTotalHits(), builder.from(), builder.size(), content); + int offset = builder.from(); + int size = builder.size(); + int page = offset / size + (offset % size == 0 ? 0 : 1) + 1; + return new Page<>(page, size, searchHits.getTotalHits(), content); } - public static SearchResponse query(RestHighLevelClient client, String index, String type, - SearchSourceBuilder builder) throws IOException { + public long count(String index, String type, SearchSourceBuilder builder) throws IOException { + SearchResponse response = query(index, type, builder); + if (response == null || response.status() != RestStatus.OK) { + return -1L; + } + SearchHits searchHits = response.getHits(); + return searchHits.getTotalHits(); + } + + public SearchResponse query(String index, String type, SearchSourceBuilder builder) throws IOException { SearchRequest request = new SearchRequest(index).types(type); request.source(builder); return client.search(request, RequestOptions.DEFAULT); } - public static T toPojo(GetResponse response, Class clazz) { + public SearchResponse query(SearchRequest request) throws IOException { + return client.search(request, RequestOptions.DEFAULT); + } + + public T toPojo(GetResponse response, Class clazz) { if (null == response) { return null; } else if (StrUtil.isBlank(response.getSourceAsString())) { return null; } else { - return JSONUtil.toBean(response.getSourceAsString(), clazz); + return JsonUtil.toBean(response.getSourceAsString(), clazz); } } - public static List toPojoList(SearchResponse response, Class clazz) { + public List toPojoList(SearchResponse response, Class clazz) { if (response == null || response.status() != RestStatus.OK) { return new ArrayList<>(); @@ -408,12 +430,20 @@ public class ElasticsearchUtil { } return Stream.of(response.getHits().getHits()) - .map(hit -> JSONUtil.toBean(hit.getSourceAsString(), clazz)) + .map(hit -> JsonUtil.toBean(hit.getSourceAsString(), clazz)) .collect(Collectors.toList()); } - public static Map toMap(T entity) { - return JsonUtil.toMap(JsonUtil.toJson(entity)); + public Map toMap(T entity) { + return JsonUtil.toMap(JsonUtil.toString(entity)); + } + + @Override + public synchronized void close() { + if (null == client) { + return; + } + IoUtil.close(client); } } diff --git a/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/config/ElasticsearchConfig.java b/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/config/ElasticsearchConfig.java new file mode 100644 index 0000000..bdd2f38 --- /dev/null +++ b/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/config/ElasticsearchConfig.java @@ -0,0 +1,30 @@ +package io.github.dunwu.javadb.elasticsearch.config; + +import io.github.dunwu.javadb.elasticsearch.ElasticsearchFactory; +import io.github.dunwu.javadb.elasticsearch.ElasticsearchTemplate; +import org.elasticsearch.client.RestHighLevelClient; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.ComponentScan; +import org.springframework.context.annotation.Configuration; + +/** + * ES 配置 + * + * @author Zhang Peng + * @date 2024-02-07 + */ +@Configuration +@ComponentScan(value = "io.github.dunwu.javadb.elasticsearch.mapper") +public class ElasticsearchConfig { + + @Bean("restHighLevelClient") + public RestHighLevelClient restHighLevelClient() { + return ElasticsearchFactory.newRestHighLevelClient(); + } + + @Bean("elasticsearchTemplate") + public ElasticsearchTemplate elasticsearchTemplate() { + return ElasticsearchFactory.newElasticsearchTemplate(); + } + +} diff --git a/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/config/EnableElasticsearch.java b/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/config/EnableElasticsearch.java new file mode 100644 index 0000000..c2c2447 --- /dev/null +++ b/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/config/EnableElasticsearch.java @@ -0,0 +1,26 @@ +package io.github.dunwu.javadb.elasticsearch.config; + +import org.springframework.context.annotation.EnableAspectJAutoProxy; +import org.springframework.context.annotation.Import; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * 启动 Elasticsearch 配置注解 + * + * @author Zhang Peng + * @date 2023-06-30 + */ +@Target({ ElementType.TYPE }) +@Retention(RetentionPolicy.RUNTIME) +@EnableAspectJAutoProxy( + proxyTargetClass = false +) +@Import({ ElasticsearchConfig.class }) +@Documented +public @interface EnableElasticsearch { +} diff --git a/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/entity/BaseEsEntity.java b/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/entity/BaseEsEntity.java new file mode 100644 index 0000000..a775976 --- /dev/null +++ b/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/entity/BaseEsEntity.java @@ -0,0 +1,27 @@ +package io.github.dunwu.javadb.elasticsearch.entity; + +import lombok.Data; +import lombok.ToString; + +import java.io.Serializable; + +/** + * ES 实体接口 + * + * @author Zhang Peng + * @since 2023-06-28 + */ +@Data +@ToString +public abstract class BaseEsEntity implements Serializable { + + /** + * 获取版本 + */ + protected Long version; + + protected Float hitScore; + + public abstract String getDocId(); + +} diff --git a/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/entity/EsEntity.java b/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/entity/EsEntity.java deleted file mode 100644 index df6d973..0000000 --- a/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/entity/EsEntity.java +++ /dev/null @@ -1,16 +0,0 @@ -package io.github.dunwu.javadb.elasticsearch.entity; - -/** - * ES 实体接口 - * - * @author Zhang Peng - * @since 2023-06-28 - */ -public interface EsEntity { - - /** - * 获取 ES 主键 - */ - String getDocId(); - -} diff --git a/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/entity/Page.java b/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/entity/Page.java index 76f178f..8baef8b 100644 --- a/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/entity/Page.java +++ b/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/entity/Page.java @@ -20,13 +20,13 @@ public class Page { private int size; private List content = new ArrayList<>(); - public Page(long total, int page, int size) { + public Page(int page, int size, long total) { this.total = total; this.page = page; this.size = size; } - public Page(long total, int page, int size, Collection list) { + public Page(int page, int size, long total, Collection list) { this.total = total; this.page = page; this.size = size; diff --git a/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/entity/User.java b/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/entity/User.java index 58428a1..1594772 100644 --- a/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/entity/User.java +++ b/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/entity/User.java @@ -1,7 +1,10 @@ package io.github.dunwu.javadb.elasticsearch.entity; +import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.NoArgsConstructor; /** * 用户实体 @@ -11,7 +14,10 @@ import lombok.Data; */ @Data @Builder -public class User implements EsEntity { +@EqualsAndHashCode(callSuper = true) +@AllArgsConstructor +@NoArgsConstructor +public class User extends BaseEsEntity { private Long id; private String username; @@ -21,7 +27,7 @@ public class User implements EsEntity { @Override public String getDocId() { - return null; + return String.valueOf(id); } } diff --git a/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/mapper/BaseEsMapper.java b/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/mapper/BaseEsMapper.java index 4033b4e..cd7e53a 100644 --- a/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/mapper/BaseEsMapper.java +++ b/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/mapper/BaseEsMapper.java @@ -1,14 +1,14 @@ package io.github.dunwu.javadb.elasticsearch.mapper; -import cn.hutool.core.lang.Assert; -import io.github.dunwu.javadb.elasticsearch.entity.EsEntity; +import io.github.dunwu.javadb.elasticsearch.ElasticsearchTemplate; +import io.github.dunwu.javadb.elasticsearch.entity.BaseEsEntity; import io.github.dunwu.javadb.elasticsearch.entity.Page; -import io.github.dunwu.javadb.elasticsearch.util.ElasticsearchUtil; import lombok.extern.slf4j.Slf4j; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.get.GetIndexRequest; import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.IndicesClient; import org.elasticsearch.client.RequestOptions; @@ -26,26 +26,28 @@ import java.util.List; * @date 2023-06-27 */ @Slf4j -public abstract class BaseEsMapper implements EsMapper { +public abstract class BaseEsMapper implements EsMapper { private BulkProcessor bulkProcessor; - protected final RestHighLevelClient restHighLevelClient; + protected final ElasticsearchTemplate elasticsearchTemplate; - public BaseEsMapper(RestHighLevelClient restHighLevelClient) { - this.restHighLevelClient = restHighLevelClient; + public BaseEsMapper(ElasticsearchTemplate elasticsearchTemplate) { + this.elasticsearchTemplate = elasticsearchTemplate; } @Override - public RestHighLevelClient getClient() throws IOException { - Assert.notNull(restHighLevelClient, () -> new IOException("【ES】not connected.")); - return restHighLevelClient; + public RestHighLevelClient getClient() { + if (elasticsearchTemplate == null) { + return null; + } + return elasticsearchTemplate.getClient(); } @Override - public synchronized BulkProcessor getBulkProcessor() throws IOException { + public synchronized BulkProcessor getBulkProcessor() { if (bulkProcessor == null) { - bulkProcessor = ElasticsearchUtil.newAsyncBulkProcessor(getClient()); + bulkProcessor = elasticsearchTemplate.newAsyncBulkProcessor(); } return bulkProcessor; } @@ -59,37 +61,57 @@ public abstract class BaseEsMapper implements EsMapper { } @Override - public SearchResponse getById(String id) throws IOException { - return ElasticsearchUtil.getById(getClient(), getIndex(), getType(), id); + public GetResponse getById(String id) throws IOException { + return getById(id, null); + } + + @Override + public GetResponse getById(String id, Long version) throws IOException { + return elasticsearchTemplate.getById(getIndex(), getType(), id, version); } @Override public T pojoById(String id) throws IOException { - return ElasticsearchUtil.pojoById(getClient(), getIndex(), getType(), id, getEntityClass()); + return pojoById(id, null); + } + + @Override + public T pojoById(String id, Long version) throws IOException { + return elasticsearchTemplate.pojoById(getIndex(), getType(), id, version, getEntityClass()); } @Override public List pojoListByIds(Collection ids) throws IOException { - return ElasticsearchUtil.pojoListByIds(getClient(), getIndex(), getType(), ids, getEntityClass()); + return elasticsearchTemplate.pojoListByIds(getIndex(), getType(), ids, getEntityClass()); } @Override public Page pojoPage(SearchSourceBuilder builder) throws IOException { - return ElasticsearchUtil.pojoPage(getClient(), getIndex(), getType(), builder, getEntityClass()); + return elasticsearchTemplate.pojoPage(getIndex(), getType(), builder, getEntityClass()); } @Override - public String insert(T entity) throws IOException { - return ElasticsearchUtil.insert(getClient(), getIndex(), getType(), entity); + public long count(SearchSourceBuilder builder) throws IOException { + return elasticsearchTemplate.count(getIndex(), getType(), builder); } @Override - public boolean batchInsert(Collection list) throws IOException { - return ElasticsearchUtil.batchInsert(getClient(), getIndex(), getType(), list); + public SearchResponse query(SearchSourceBuilder builder) throws IOException { + return elasticsearchTemplate.query(getIndex(), getType(), builder); } @Override - public void asyncBatchInsert(Collection list) throws IOException { + public T save(T entity) throws IOException { + return elasticsearchTemplate.save(getIndex(), getType(), entity); + } + + @Override + public boolean batchSave(Collection list) throws IOException { + return elasticsearchTemplate.batchSave(getIndex(), getType(), list); + } + + @Override + public void asyncBatchSave(Collection list) throws IOException { ActionListener listener = new ActionListener() { @Override public void onResponse(BulkResponse response) { @@ -105,57 +127,37 @@ public abstract class BaseEsMapper implements EsMapper { log.error("【ES】异步批量插入异常!", e); } }; - asyncBatchInsert(list, listener); + asyncBatchSave(list, listener); } @Override - public void asyncBatchInsert(Collection list, ActionListener listener) throws IOException { - ElasticsearchUtil.asyncBatchInsert(getClient(), getIndex(), getType(), list, listener); + public void asyncBatchSave(Collection list, ActionListener listener) { + elasticsearchTemplate.asyncBatchSave(getIndex(), getType(), list, listener); } @Override - public boolean updateById(T entity) throws IOException { - return ElasticsearchUtil.updateById(getClient(), getIndex(), getType(), entity); + public T updateById(T entity) throws IOException { + return elasticsearchTemplate.updateById(getIndex(), getType(), entity); } @Override public boolean batchUpdateById(Collection list) throws IOException { - return ElasticsearchUtil.batchUpdateById(getClient(), getIndex(), getType(), list); + return elasticsearchTemplate.batchUpdateById(getIndex(), getType(), list); } @Override - public void asyncBatchUpdateById(Collection list) throws IOException { - ActionListener listener = new ActionListener() { - @Override - public void onResponse(BulkResponse response) { - if (response != null && !response.hasFailures()) { - log.info("【ES】异步批量更新成功!"); - } else { - log.warn("【ES】异步批量更新失败!"); - } - } - - @Override - public void onFailure(Exception e) { - log.error("【ES】异步批量更新异常!", e); - } - }; - asyncBatchUpdateById(list, listener); - } - - @Override - public void asyncBatchUpdateById(Collection list, ActionListener listener) throws IOException { - ElasticsearchUtil.asyncBatchUpdateById(getClient(), getIndex(), getType(), list, listener); + public void asyncBatchUpdateById(Collection list, ActionListener listener) { + elasticsearchTemplate.asyncBatchUpdateById(getIndex(), getType(), list, listener); } @Override public boolean deleteById(String id) throws IOException { - return ElasticsearchUtil.deleteById(getClient(), getIndex(), getType(), id); + return elasticsearchTemplate.deleteById(getIndex(), getType(), id); } @Override public boolean batchDeleteById(Collection ids) throws IOException { - return ElasticsearchUtil.batchDeleteById(getClient(), getIndex(), getType(), ids); + return elasticsearchTemplate.batchDeleteById(getIndex(), getType(), ids); } @Override @@ -180,7 +182,7 @@ public abstract class BaseEsMapper implements EsMapper { @Override public void asyncBatchDeleteById(Collection ids, ActionListener listener) throws IOException { - ElasticsearchUtil.asyncBatchDeleteById(getClient(), getIndex(), getType(), ids, listener); + elasticsearchTemplate.asyncBatchDeleteById(getIndex(), getType(), ids, listener); } } diff --git a/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/mapper/EsMapper.java b/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/mapper/EsMapper.java index fa01d7c..53eb9b2 100644 --- a/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/mapper/EsMapper.java +++ b/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/mapper/EsMapper.java @@ -1,17 +1,21 @@ package io.github.dunwu.javadb.elasticsearch.mapper; -import io.github.dunwu.javadb.elasticsearch.entity.EsEntity; +import cn.hutool.core.collection.CollectionUtil; +import io.github.dunwu.javadb.elasticsearch.entity.BaseEsEntity; import io.github.dunwu.javadb.elasticsearch.entity.Page; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.search.builder.SearchSourceBuilder; import java.io.IOException; import java.util.Collection; +import java.util.HashMap; import java.util.List; +import java.util.Map; /** * ES Mapper @@ -19,7 +23,7 @@ import java.util.List; * @author Zhang Peng * @date 2023-06-27 */ -public interface EsMapper { +public interface EsMapper { /** * 获取索引名 @@ -42,29 +46,48 @@ public interface EsMapper { boolean isIndexExists() throws IOException; - SearchResponse getById(String id) throws IOException; + GetResponse getById(String id) throws IOException; + + GetResponse getById(String id, Long version) throws IOException; T pojoById(String id) throws IOException; + T pojoById(String id, Long version) throws IOException; + List pojoListByIds(Collection ids) throws IOException; + default Map pojoMapByIds(Collection ids) throws IOException { + List list = pojoListByIds(ids); + if (CollectionUtil.isEmpty(list)) { + return new HashMap<>(0); + } + + Map map = new HashMap<>(list.size()); + for (T entity : list) { + map.put(entity.getDocId(), entity); + } + return map; + } + Page pojoPage(SearchSourceBuilder builder) throws IOException; - String insert(T entity) throws IOException; + long count(SearchSourceBuilder builder) throws IOException; - boolean batchInsert(Collection list) throws IOException; + SearchResponse query(SearchSourceBuilder builder) throws IOException; - void asyncBatchInsert(Collection list) throws IOException; + T save(T entity) throws IOException; - void asyncBatchInsert(Collection list, ActionListener listener) throws IOException; + boolean batchSave(Collection list) throws IOException; - boolean updateById(T entity) throws IOException; + void asyncBatchSave(Collection list) throws IOException; + + void asyncBatchSave(Collection list, ActionListener listener) throws IOException; + + T updateById(T entity) throws IOException; boolean batchUpdateById(Collection list) throws IOException; - void asyncBatchUpdateById(Collection list) throws IOException; - - void asyncBatchUpdateById(Collection list, ActionListener listener) throws IOException; + void asyncBatchUpdateById(Collection list, ActionListener listener); boolean deleteById(String id) throws IOException; diff --git a/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/mapper/UserEsMapper.java b/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/mapper/UserEsMapper.java index c60af8a..970b1b4 100644 --- a/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/mapper/UserEsMapper.java +++ b/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/mapper/UserEsMapper.java @@ -1,7 +1,8 @@ package io.github.dunwu.javadb.elasticsearch.mapper; +import io.github.dunwu.javadb.elasticsearch.ElasticsearchTemplate; import io.github.dunwu.javadb.elasticsearch.entity.User; -import org.elasticsearch.client.RestHighLevelClient; +import org.springframework.stereotype.Component; /** * User ES Mapper @@ -9,10 +10,11 @@ import org.elasticsearch.client.RestHighLevelClient; * @author Zhang Peng * @date 2023-06-27 */ +@Component public class UserEsMapper extends BaseEsMapper { - public UserEsMapper(RestHighLevelClient restHighLevelClient) { - super(restHighLevelClient); + public UserEsMapper(ElasticsearchTemplate elasticsearchTemplate) { + super(elasticsearchTemplate); } @Override diff --git a/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/util/JsonUtil.java b/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/util/JsonUtil.java index 0ed7ddb..dabe0df 100644 --- a/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/util/JsonUtil.java +++ b/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/util/JsonUtil.java @@ -81,7 +81,7 @@ public class JsonUtil { return null; } - public static String toJson(T obj) { + public static String toString(T obj) { if (obj == null) { return null; } diff --git a/codes/javadb/elasticsearch/elasticsearch6/src/test/java/io/github/dunwu/javadb/elasticsearch/BaseApplicationTests.java b/codes/javadb/elasticsearch/elasticsearch6/src/test/java/io/github/dunwu/javadb/elasticsearch/BaseApplicationTests.java new file mode 100644 index 0000000..8d43d1b --- /dev/null +++ b/codes/javadb/elasticsearch/elasticsearch6/src/test/java/io/github/dunwu/javadb/elasticsearch/BaseApplicationTests.java @@ -0,0 +1,36 @@ +package io.github.dunwu.javadb.elasticsearch; + +import io.github.dunwu.javadb.elasticsearch.config.EnableElasticsearch; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.extension.ExtendWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.junit.jupiter.SpringExtension; +import org.springframework.test.web.servlet.MockMvc; +import org.springframework.test.web.servlet.setup.MockMvcBuilders; +import org.springframework.web.context.WebApplicationContext; + +@EnableElasticsearch +@ExtendWith(SpringExtension.class) +@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) +public abstract class BaseApplicationTests { + + // ---------------------------------------------------------------------------- 测试常量数据 + + // ---------------------------------------------------------------------------- + protected MockMvc mockMvc; + + @Autowired + private WebApplicationContext context; + + @BeforeEach + public void setUp() { + mockMvc = MockMvcBuilders.webAppContextSetup(context).build(); //构造MockMvc + } + + @BeforeAll + public static void setEnvironmentInDev() { + } + +} \ No newline at end of file diff --git a/codes/javadb/elasticsearch/elasticsearch6/src/test/java/io/github/dunwu/javadb/elasticsearch/ElasticsearchTemplateTest.java b/codes/javadb/elasticsearch/elasticsearch6/src/test/java/io/github/dunwu/javadb/elasticsearch/ElasticsearchTemplateTest.java new file mode 100644 index 0000000..bbddf70 --- /dev/null +++ b/codes/javadb/elasticsearch/elasticsearch6/src/test/java/io/github/dunwu/javadb/elasticsearch/ElasticsearchTemplateTest.java @@ -0,0 +1,193 @@ +package io.github.dunwu.javadb.elasticsearch; + +import io.github.dunwu.javadb.elasticsearch.entity.Page; +import io.github.dunwu.javadb.elasticsearch.entity.User; +import io.github.dunwu.javadb.elasticsearch.util.JsonUtil; +import org.assertj.core.api.Assertions; +import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.index.query.BoolQueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +/** + * ElasticsearchTemplate 测试 + * + * @author Zhang Peng + * @date 2023-11-13 + */ +public class ElasticsearchTemplateTest { + + public static final String INDEX = "user"; + public static final String TYPE = "_doc"; + public static final String TEST_ID_01 = "1"; + public static final String TEST_ID_02 = "2"; + + private static final ElasticsearchTemplate TEMPLATE; + + static { + TEMPLATE = ElasticsearchFactory.newElasticsearchTemplate(); + } + + @Test + @DisplayName("根据ID精确查询") + public void getById() throws IOException { + GetResponse response = TEMPLATE.getById(INDEX, TYPE, TEST_ID_01); + System.out.println("记录:" + JsonUtil.toString(response.getSourceAsMap())); + } + + @Test + @DisplayName("根据ID精确查询POJO") + public void pojoById() throws IOException { + User entity = TEMPLATE.pojoById(INDEX, TYPE, TEST_ID_01, User.class); + System.out.println("记录:" + JsonUtil.toString(entity)); + } + + @Test + @DisplayName("根据ID精确批量查询POJO") + public void pojoListByIds() throws IOException { + List ids = Arrays.asList(TEST_ID_01, TEST_ID_02); + List list = TEMPLATE.pojoListByIds(INDEX, TYPE, ids, User.class); + Assertions.assertThat(list).isNotEmpty(); + Assertions.assertThat(list.size()).isEqualTo(2); + for (User entity : list) { + System.out.println("记录:" + JsonUtil.toString(entity)); + } + } + + @Test + @DisplayName("分页查询") + public void pojoPage() throws IOException { + BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery(); + boolQueryBuilder.must(QueryBuilders.termQuery("theme", 3)); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.query(boolQueryBuilder); + searchSourceBuilder.from(0); + searchSourceBuilder.size(10); + + Page page = TEMPLATE.pojoPage(INDEX, TYPE, searchSourceBuilder, User.class); + Assertions.assertThat(page).isNotNull(); + Assertions.assertThat(page.getContent()).isNotEmpty(); + for (User entity : page.getContent()) { + System.out.println("记录:" + JsonUtil.toString(entity)); + } + } + + @Test + @DisplayName("条件数量查询") + public void count() throws IOException { + BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery(); + boolQueryBuilder.must(QueryBuilders.termQuery("theme", 3)); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.query(boolQueryBuilder); + searchSourceBuilder.from(0); + searchSourceBuilder.size(10); + long total = TEMPLATE.count(INDEX, TYPE, searchSourceBuilder); + Assertions.assertThat(total).isNotZero(); + System.out.println("符合条件的总记录数:" + total); + } + + @Test + @DisplayName("条件查询") + public void query() throws IOException { + BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery(); + boolQueryBuilder.must(QueryBuilders.termQuery("theme", 3)); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.query(boolQueryBuilder); + searchSourceBuilder.from(0); + searchSourceBuilder.size(10); + SearchResponse response = TEMPLATE.query(INDEX, TYPE, searchSourceBuilder); + Assertions.assertThat(response).isNotNull(); + Assertions.assertThat(response.getHits()).isNotNull(); + for (SearchHit hit : response.getHits().getHits()) { + System.out.println("记录:" + hit.getSourceAsString()); + Map map = hit.getSourceAsMap(); + Assertions.assertThat(map).isNotNull(); + Assertions.assertThat(map.get("theme")).isEqualTo(3); + } + } + + @Nested + @DisplayName("写操作测试") + public class WriteTest { + + String json1 = + "{\"id\":1,\"username\":\"user1\",\"password\":\"xxxxxx\",\"age\":18,\"email\":\"user1@xxx.com\"}"; + String json2 = + "{\"id\":2,\"username\":\"user2\",\"password\":\"xxxxxx\",\"age\":18,\"email\":\"user2@xxx.com\"}"; + + @Test + @DisplayName("插入、更新") + public void saveAndUpdate() throws IOException, InterruptedException { + + User origin = JsonUtil.toBean(json1, User.class); + if (origin == null) { + System.err.println("反序列化失败!"); + return; + } + + TEMPLATE.save(INDEX, TYPE, origin); + TimeUnit.SECONDS.sleep(1); + User expectEntity = TEMPLATE.pojoById(INDEX, TYPE, origin.getDocId(), User.class); + Assertions.assertThat(expectEntity).isNotNull(); + + expectEntity.setAge(20); + TEMPLATE.updateById(INDEX, TYPE, expectEntity); + TimeUnit.SECONDS.sleep(18); + User expectEntity2 = + TEMPLATE.pojoById(INDEX, TYPE, origin.getDocId(), User.class); + Assertions.assertThat(expectEntity2).isNotNull(); + Assertions.assertThat(expectEntity2.getAge()).isEqualTo(20); + } + + @Test + @DisplayName("批量插入、更新") + public void batchSaveAndUpdate() throws IOException, InterruptedException { + + User origin1 = JsonUtil.toBean(json1, User.class); + if (origin1 == null) { + System.err.println("反序列化失败!"); + return; + } + + User origin2 = JsonUtil.toBean(json2, User.class); + if (origin2 == null) { + System.err.println("反序列化失败!"); + return; + } + + List list = Arrays.asList(origin1, origin2); + List ids = list.stream().map(User::getDocId).collect(Collectors.toList()); + + TEMPLATE.batchSave(INDEX, TYPE, list); + List newList = TEMPLATE.pojoListByIds(INDEX, TYPE, ids, User.class); + Assertions.assertThat(newList).isNotEmpty(); + + newList.forEach(entity -> { + entity.setAge(20); + }); + TEMPLATE.batchUpdateById(INDEX, TYPE, newList); + TimeUnit.SECONDS.sleep(1); + + List expectList = + TEMPLATE.pojoListByIds(INDEX, TYPE, ids, User.class); + Assertions.assertThat(expectList).isNotEmpty(); + for (User item : expectList) { + Assertions.assertThat(item.getAge()).isEqualTo(20); + } + } + + } + +} diff --git a/codes/javadb/elasticsearch/elasticsearch6/src/test/java/io/github/dunwu/javadb/elasticsearch/TestApplication.java b/codes/javadb/elasticsearch/elasticsearch6/src/test/java/io/github/dunwu/javadb/elasticsearch/TestApplication.java new file mode 100644 index 0000000..e9ae33c --- /dev/null +++ b/codes/javadb/elasticsearch/elasticsearch6/src/test/java/io/github/dunwu/javadb/elasticsearch/TestApplication.java @@ -0,0 +1,20 @@ +package io.github.dunwu.javadb.elasticsearch; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.builder.SpringApplicationBuilder; +import org.springframework.boot.web.servlet.support.SpringBootServletInitializer; + +@SpringBootApplication +public class TestApplication extends SpringBootServletInitializer { + + public static void main(String[] args) { + SpringApplication.run(TestApplication.class, args); + } + + @Override + protected SpringApplicationBuilder configure(SpringApplicationBuilder builder) { + return builder.sources(TestApplication.class); + } + +} diff --git a/codes/javadb/elasticsearch/elasticsearch6/src/test/java/io/github/dunwu/javadb/elasticsearch/mapper/UserEsMapperTest.java b/codes/javadb/elasticsearch/elasticsearch6/src/test/java/io/github/dunwu/javadb/elasticsearch/mapper/UserEsMapperTest.java new file mode 100644 index 0000000..65c7927 --- /dev/null +++ b/codes/javadb/elasticsearch/elasticsearch6/src/test/java/io/github/dunwu/javadb/elasticsearch/mapper/UserEsMapperTest.java @@ -0,0 +1,191 @@ +package io.github.dunwu.javadb.elasticsearch.mapper; + +import io.github.dunwu.javadb.elasticsearch.BaseApplicationTests; +import io.github.dunwu.javadb.elasticsearch.entity.Page; +import io.github.dunwu.javadb.elasticsearch.entity.User; +import io.github.dunwu.javadb.elasticsearch.util.JsonUtil; +import org.assertj.core.api.Assertions; +import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.index.query.BoolQueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +/** + * ElasticsearchTemplate 测试 + * + * @author Zhang Peng + * @date 2023-11-13 + */ +public class UserEsMapperTest extends BaseApplicationTests { + + @Autowired + private UserEsMapper mapper; + public static final String TEST_ID_01 = "1"; + public static final String TEST_ID_02 = "2"; + + @Test + @DisplayName("根据ID精确查询") + public void getById() throws IOException { + GetResponse response = mapper.getById(TEST_ID_01); + System.out.println("记录:" + JsonUtil.toString(response.getSourceAsMap())); + } + + @Test + @DisplayName("根据ID精确查询POJO") + public void pojoById() throws IOException { + User entity = mapper.pojoById(TEST_ID_01); + System.out.println("记录:" + JsonUtil.toString(entity)); + } + + @Test + @DisplayName("根据ID精确批量查询POJO") + public void pojoListByIds() throws IOException { + List ids = Arrays.asList(TEST_ID_01, TEST_ID_02); + List list = mapper.pojoListByIds(ids); + Assertions.assertThat(list).isNotEmpty(); + Assertions.assertThat(list.size()).isEqualTo(2); + for (User entity : list) { + System.out.println("记录:" + JsonUtil.toString(entity)); + } + } + + @Test + @DisplayName("分页查询") + public void pojoPage() throws IOException { + BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery(); + boolQueryBuilder.must(QueryBuilders.termQuery("id", 1)); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.query(boolQueryBuilder); + searchSourceBuilder.from(0); + searchSourceBuilder.size(10); + + Page page = mapper.pojoPage(searchSourceBuilder); + Assertions.assertThat(page).isNotNull(); + Assertions.assertThat(page.getContent()).isNotEmpty(); + for (User entity : page.getContent()) { + System.out.println("记录:" + JsonUtil.toString(entity)); + } + } + + @Test + @DisplayName("条件数量查询") + public void count() throws IOException { + BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery(); + boolQueryBuilder.must(QueryBuilders.termQuery("id", 1)); + // boolQueryBuilder.must(QueryBuilders.rangeQuery("age") + // .from(18) + // .to(25) + // .includeLower(true) + // .includeUpper(true)); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.query(boolQueryBuilder); + searchSourceBuilder.from(0); + searchSourceBuilder.size(10); + long total = mapper.count(searchSourceBuilder); + Assertions.assertThat(total).isNotZero(); + System.out.println("符合条件的总记录数:" + total); + } + + @Test + @DisplayName("条件查询") + public void query() throws IOException { + BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery(); + boolQueryBuilder.must(QueryBuilders.termQuery("id", 1)); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.query(boolQueryBuilder); + searchSourceBuilder.from(0); + searchSourceBuilder.size(10); + SearchResponse response = mapper.query(searchSourceBuilder); + Assertions.assertThat(response).isNotNull(); + Assertions.assertThat(response.getHits()).isNotNull(); + for (SearchHit hit : response.getHits().getHits()) { + System.out.println("记录:" + hit.getSourceAsString()); + Map map = hit.getSourceAsMap(); + Assertions.assertThat(map).isNotNull(); + } + } + + @Nested + @DisplayName("写操作测试") + public class WriteTest { + + String json1 = + "{\"id\":1,\"username\":\"user1\",\"password\":\"xxxxxx\",\"age\":18,\"email\":\"user1@xxx.com\"}"; + String json2 = + "{\"id\":2,\"username\":\"user2\",\"password\":\"xxxxxx\",\"age\":18,\"email\":\"user2@xxx.com\"}"; + + @Test + @DisplayName("插入、更新") + public void saveAndUpdate() throws IOException, InterruptedException { + + User origin = JsonUtil.toBean(json1, User.class); + if (origin == null) { + System.err.println("反序列化失败!"); + return; + } + + mapper.save(origin); + TimeUnit.SECONDS.sleep(1); + User expectEntity = mapper.pojoById(origin.getDocId()); + Assertions.assertThat(expectEntity).isNotNull(); + + expectEntity.setAge(20); + mapper.updateById(expectEntity); + TimeUnit.SECONDS.sleep(1); + User expectEntity2 = mapper.pojoById(origin.getDocId()); + Assertions.assertThat(expectEntity2).isNotNull(); + Assertions.assertThat(expectEntity2.getAge()).isEqualTo(20); + } + + @Test + @DisplayName("批量插入、更新") + public void batchSaveAndUpdate() throws IOException, InterruptedException { + + User origin1 = JsonUtil.toBean(json1, User.class); + if (origin1 == null) { + System.err.println("反序列化失败!"); + return; + } + + User origin2 = JsonUtil.toBean(json2, User.class); + if (origin2 == null) { + System.err.println("反序列化失败!"); + return; + } + + List list = Arrays.asList(origin1, origin2); + List ids = list.stream().map(User::getDocId).collect(Collectors.toList()); + + mapper.batchSave(list); + List newList = mapper.pojoListByIds(ids); + Assertions.assertThat(newList).isNotEmpty(); + + newList.forEach(entity -> { + entity.setAge(20); + }); + mapper.batchUpdateById(newList); + TimeUnit.SECONDS.sleep(1); + + List expectList = mapper.pojoListByIds(ids); + Assertions.assertThat(expectList).isNotEmpty(); + for (User item : expectList) { + Assertions.assertThat(item.getAge()).isEqualTo(20); + } + } + + } + +}