diff --git a/codes/javadb/elasticsearch/elasticsearch6/pom.xml b/codes/javadb/elasticsearch/elasticsearch6/pom.xml index b9d8859..72683b1 100644 --- a/codes/javadb/elasticsearch/elasticsearch6/pom.xml +++ b/codes/javadb/elasticsearch/elasticsearch6/pom.xml @@ -3,7 +3,7 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 4.0.0 - o + org.springframework.boot spring-boot-starter-parent 2.7.7 @@ -42,7 +42,7 @@ cn.hutool hutool-all - 5.8.8 + 5.8.25 diff --git a/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/Demo.java b/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/Demo.java deleted file mode 100644 index 3f97a58..0000000 --- a/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/Demo.java +++ /dev/null @@ -1,35 +0,0 @@ -package io.github.dunwu.javadb.elasticsearch; - -import io.github.dunwu.javadb.elasticsearch.entity.User; -import io.github.dunwu.javadb.elasticsearch.mapper.UserEsMapper; - -import java.io.IOException; -import java.util.Arrays; -import java.util.List; - -public class Demo { - - private static final String env = "test"; - private static final ElasticsearchTemplate elasticsearchTemplate - = ElasticsearchFactory.newElasticsearchTemplate(env); - - public static void main(String[] args) throws IOException, InterruptedException { - - UserEsMapper mapper = new UserEsMapper(elasticsearchTemplate); - - System.out.println("索引是否存在:" + mapper.isIndexExists()); - - User jack = User.builder().id(1L).username("jack").age(18).build(); - User tom = User.builder().id(2L).username("tom").age(20).build(); - List users = Arrays.asList(jack, tom); - - System.out.println("批量插入:" + mapper.batchSave(users)); - System.out.println("根据ID查询:" + mapper.getById("1").toString()); - System.out.println("根据ID查询:" + mapper.pojoById("2").toString()); - System.out.println("根据ID批量查询:" + mapper.pojoListByIds(Arrays.asList("1", "2")).toString()); - - Thread.sleep(1000); - System.out.println("根据ID批量删除:" + mapper.batchDeleteById(Arrays.asList("1", "2"))); - } - -} diff --git a/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/ElasticsearchFactory.java b/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/ElasticsearchFactory.java index 8582fdc..33109cb 100644 --- a/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/ElasticsearchFactory.java +++ b/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/ElasticsearchFactory.java @@ -9,6 +9,7 @@ import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestClientBuilder; import org.elasticsearch.client.RestHighLevelClient; +import java.util.Collection; import java.util.List; import java.util.stream.Collectors; @@ -38,11 +39,13 @@ public class ElasticsearchFactory { } public static RestClient newRestClient(String env) { - String hosts = getDefaultEsAddress(env); - return newRestClient(toHttpHostList(hosts)); + String hostsConfig = getDefaultEsAddress(env); + List hosts = StrUtil.split(hostsConfig, ","); + return newRestClient(hosts); } - public static RestClient newRestClient(HttpHost[] httpHosts) { + public static RestClient newRestClient(Collection hosts) { + HttpHost[] httpHosts = toHttpHostList(hosts); RestClientBuilder builder = getRestClientBuilder(httpHosts); if (builder == null) { return null; @@ -62,11 +65,13 @@ public class ElasticsearchFactory { } public static RestHighLevelClient newRestHighLevelClient(String env) { - String hosts = getDefaultEsAddress(env); - return newRestHighLevelClient(toHttpHostList(hosts)); + String hostsConfig = getDefaultEsAddress(env); + List hosts = StrUtil.split(hostsConfig, ","); + return newRestHighLevelClient(hosts); } - public static RestHighLevelClient newRestHighLevelClient(HttpHost[] httpHosts) { + public static RestHighLevelClient newRestHighLevelClient(Collection hosts) { + HttpHost[] httpHosts = toHttpHostList(hosts); RestClientBuilder builder = getRestClientBuilder(httpHosts); if (builder == null) { return null; @@ -86,12 +91,13 @@ public class ElasticsearchFactory { } public static ElasticsearchTemplate newElasticsearchTemplate(String env) { - String hosts = getDefaultEsAddress(env); - return newElasticsearchTemplate(toHttpHostList(hosts)); + String hostsConfig = getDefaultEsAddress(env); + List hosts = StrUtil.split(hostsConfig, ","); + return newElasticsearchTemplate(hosts); } - public static ElasticsearchTemplate newElasticsearchTemplate(HttpHost[] httpHosts) { - RestHighLevelClient client = newRestHighLevelClient(httpHosts); + public static ElasticsearchTemplate newElasticsearchTemplate(Collection hosts) { + RestHighLevelClient client = newRestHighLevelClient(hosts); if (client == null) { return null; } @@ -125,21 +131,22 @@ public class ElasticsearchFactory { return restClientBuilder; } - private static HttpHost[] toHttpHostList(String hosts) { - if (StrUtil.isBlank(hosts)) { - return null; + private static HttpHost[] toHttpHostList(Collection hosts) { + if (CollectionUtil.isEmpty(hosts)) { + return new HttpHost[0]; } - List strList = StrUtil.split(hosts, ","); - List list = strList.stream().map(str -> { - List params = StrUtil.split(str, ":"); - return new HttpHost(params.get(0), Integer.parseInt(params.get(1)), "http"); - }).collect(Collectors.toList()); + List list = hosts.stream().map(ElasticsearchFactory::toHttpHost).collect(Collectors.toList()); if (CollectionUtil.isEmpty(list)) { return new HttpHost[0]; } return list.toArray(new HttpHost[0]); } + public static HttpHost toHttpHost(String host) { + List params = StrUtil.split(host, ":"); + return new HttpHost(params.get(0), Integer.parseInt(params.get(1)), "http"); + } + public static String getDefaultEsAddress() { // 从配置中心读取环境变量 String env = "test"; diff --git a/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/ElasticsearchTemplate.java b/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/ElasticsearchTemplate.java index cbe281a..730f9f1 100644 --- a/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/ElasticsearchTemplate.java +++ b/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/ElasticsearchTemplate.java @@ -6,11 +6,19 @@ import cn.hutool.core.map.MapUtil; import cn.hutool.core.util.ArrayUtil; import cn.hutool.core.util.StrUtil; import io.github.dunwu.javadb.elasticsearch.entity.BaseEsEntity; -import io.github.dunwu.javadb.elasticsearch.entity.Page; +import io.github.dunwu.javadb.elasticsearch.entity.common.PageData; +import io.github.dunwu.javadb.elasticsearch.entity.common.ScrollData; import io.github.dunwu.javadb.elasticsearch.util.JsonUtil; import lombok.extern.slf4j.Slf4j; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.DocWriteResponse; +import org.elasticsearch.action.admin.indices.alias.Alias; +import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; +import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; +import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; +import org.elasticsearch.action.admin.indices.get.GetIndexRequest; +import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; import org.elasticsearch.action.bulk.BackoffPolicy; import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.action.bulk.BulkRequest; @@ -23,19 +31,31 @@ import org.elasticsearch.action.get.MultiGetRequest; import org.elasticsearch.action.get.MultiGetResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.action.search.ClearScrollRequest; +import org.elasticsearch.action.search.ClearScrollResponse; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.SearchScrollRequest; import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.index.query.BoolQueryBuilder; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.search.Scroll; import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.search.sort.SortOrder; import java.io.Closeable; import java.io.IOException; @@ -122,6 +142,99 @@ public class ElasticsearchTemplate implements Closeable { return bulkProcessor; } + // ==================================================================== + // 索引管理操作 + // ==================================================================== + + public void createIndex(String index, String type, String alias, int shard, int replica) throws IOException { + + if (StrUtil.isBlank(index) || StrUtil.isBlank(type)) { + throw new ElasticsearchException("【ES】index、type 不能为空!"); + } + + CreateIndexRequest request = new CreateIndexRequest(index); + if (StrUtil.isNotBlank(alias)) { + request.alias(new Alias(alias)); + } + + Settings.Builder settings = + Settings.builder().put("index.number_of_shards", shard).put("index.number_of_replicas", replica); + request.settings(settings); + AcknowledgedResponse response = client.indices().create(request, RequestOptions.DEFAULT); + if (!response.isAcknowledged()) { + String msg = StrUtil.format("【ES】创建索引失败!index: {}, type: {}", index, type); + throw new ElasticsearchException(msg); + } + } + + public void deleteIndex(String index) throws IOException { + DeleteIndexRequest request = new DeleteIndexRequest(index); + AcknowledgedResponse response = client.indices().delete(request, RequestOptions.DEFAULT); + if (!response.isAcknowledged()) { + String msg = StrUtil.format("【ES】删除索引失败!index: {}", index); + throw new ElasticsearchException(msg); + } + } + + public void updateAlias(String index, String alias) throws IOException { + IndicesAliasesRequest request = new IndicesAliasesRequest(); + IndicesAliasesRequest.AliasActions aliasAction = + new IndicesAliasesRequest.AliasActions(IndicesAliasesRequest.AliasActions.Type.ADD).index(index) + .alias(alias); + request.addAliasAction(aliasAction); + AcknowledgedResponse response = client.indices().updateAliases(request, RequestOptions.DEFAULT); + if (!response.isAcknowledged()) { + String msg = StrUtil.format("【ES】更新索引别名失败!index: {}, alias: {}", index, alias); + throw new ElasticsearchException(msg); + } + } + + public boolean isIndexExists(String index) throws IOException { + GetIndexRequest request = new GetIndexRequest(); + return client.indices().exists(request.indices(index), RequestOptions.DEFAULT); + } + + public void setMapping(String index, String type, Map propertiesMap) throws IOException { + + if (MapUtil.isEmpty(propertiesMap)) { + throw new ElasticsearchException("【ES】设置 mapping 的 properties 不能为空!"); + } + + PutMappingRequest request = new PutMappingRequest(index).type(type); + XContentBuilder builder = XContentFactory.jsonBuilder(); + builder.startObject(); + builder.startObject(type); + builder.startObject("properties"); + + for (Map.Entry entry : propertiesMap.entrySet()) { + + String field = entry.getKey(); + String fieldType = entry.getValue(); + if (StrUtil.isBlank(field) || StrUtil.isBlank(fieldType)) { + continue; + } + + builder.startObject(field); + { + builder.field("type", fieldType); + } + builder.endObject(); + } + + builder.endObject(); + builder.endObject(); + builder.endObject(); + request.source(builder); + AcknowledgedResponse response = client.indices().putMapping(request, RequestOptions.DEFAULT); + if (!response.isAcknowledged()) { + throw new ElasticsearchException("【ES】设置 mapping 失败!"); + } + } + + // ==================================================================== + // CRUD 操作 + // ==================================================================== + public T save(String index, String type, T entity) throws IOException { if (entity == null) { @@ -154,7 +267,7 @@ public class ElasticsearchTemplate implements Closeable { } } - public boolean batchSave(String index, String type, Collection list) + public boolean saveBatch(String index, String type, Collection list) throws IOException { if (CollectionUtil.isEmpty(list)) { @@ -179,7 +292,7 @@ public class ElasticsearchTemplate implements Closeable { return response != null && !response.hasFailures(); } - public void asyncBatchSave(String index, String type, Collection list, + public void asyncSaveBatch(String index, String type, Collection list, ActionListener listener) { if (CollectionUtil.isEmpty(list)) { @@ -236,7 +349,7 @@ public class ElasticsearchTemplate implements Closeable { } } - public boolean batchUpdateById(String index, String type, Collection list) + public boolean updateBatchIds(String index, String type, Collection list) throws IOException { if (CollectionUtil.isEmpty(list)) { @@ -248,7 +361,7 @@ public class ElasticsearchTemplate implements Closeable { return response != null && !response.hasFailures(); } - public void asyncBatchUpdateById(String index, String type, Collection list, + public void asyncUpdateBatchIds(String index, String type, Collection list, ActionListener listener) { if (CollectionUtil.isEmpty(list)) { @@ -277,10 +390,10 @@ public class ElasticsearchTemplate implements Closeable { } public boolean deleteById(String index, String type, String id) throws IOException { - return batchDeleteById(index, type, Collections.singleton(id)); + return deleteBatchIds(index, type, Collections.singleton(id)); } - public boolean batchDeleteById(String index, String type, Collection ids) throws IOException { + public boolean deleteBatchIds(String index, String type, Collection ids) throws IOException { if (CollectionUtil.isEmpty(ids)) { return true; @@ -302,7 +415,7 @@ public class ElasticsearchTemplate implements Closeable { return !response.hasFailures(); } - public void asyncBatchDeleteById(String index, String type, Collection ids, + public void asyncDeleteBatchIds(String index, String type, Collection ids, ActionListener listener) { if (CollectionUtil.isEmpty(ids)) { @@ -375,21 +488,6 @@ public class ElasticsearchTemplate implements Closeable { return list; } - public Page pojoPage(String index, String type, SearchSourceBuilder builder, Class clazz) - throws IOException { - SearchResponse response = query(index, type, builder); - if (response == null || response.status() != RestStatus.OK) { - return null; - } - - List content = toPojoList(response, clazz); - SearchHits searchHits = response.getHits(); - int offset = builder.from(); - int size = builder.size(); - int page = offset / size + (offset % size == 0 ? 0 : 1) + 1; - return new Page<>(page, size, searchHits.getTotalHits(), content); - } - public long count(String index, String type, SearchSourceBuilder builder) throws IOException { SearchResponse response = query(index, type, builder); if (response == null || response.status() != RestStatus.OK) { @@ -399,6 +497,12 @@ public class ElasticsearchTemplate implements Closeable { return searchHits.getTotalHits(); } + public long count(String index, String type, QueryBuilder queryBuilder) throws IOException { + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.query(queryBuilder); + return count(index, type, searchSourceBuilder); + } + public SearchResponse query(String index, String type, SearchSourceBuilder builder) throws IOException { SearchRequest request = new SearchRequest(index).types(type); request.source(builder); @@ -409,6 +513,131 @@ public class ElasticsearchTemplate implements Closeable { return client.search(request, RequestOptions.DEFAULT); } + /** + * from+size 分页 + *

+ * 注:在深分页的场景下,效率很低(一般超过 1万条数据就不适用了) + */ + public PageData pojoPage(String index, String type, SearchSourceBuilder builder, Class clazz) + throws IOException { + SearchResponse response = query(index, type, builder); + if (response == null || response.status() != RestStatus.OK) { + return null; + } + + List content = toPojoList(response, clazz); + SearchHits searchHits = response.getHits(); + int from = builder.from(); + int size = builder.size(); + int page = from / size + (from % size == 0 ? 0 : 1) + 1; + return new PageData<>(page, size, searchHits.getTotalHits(), content); + } + + /** + * from+size 分页 + *

+ * 注:在深分页的场景下,效率很低(一般超过 1万条数据就不适用了) + */ + public PageData pojoPage(String index, String type, int from, int size, QueryBuilder queryBuilder, + Class clazz) throws IOException { + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.query(queryBuilder); + searchSourceBuilder.from(from); + searchSourceBuilder.size(size); + return pojoPage(index, type, searchSourceBuilder, clazz); + } + + /** + * search after 分页 + */ + public ScrollData pojoPageByLastId(String index, String type, String lastId, int size, + QueryBuilder queryBuilder, Class clazz) throws IOException { + + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.size(size); + searchSourceBuilder.sort(BaseEsEntity.DOC_ID, SortOrder.ASC); + if (StrUtil.isNotBlank(lastId)) { + BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery(); + boolQueryBuilder.must(queryBuilder).must(QueryBuilders.rangeQuery(BaseEsEntity.DOC_ID).gt(lastId)); + searchSourceBuilder.query(boolQueryBuilder); + } else { + searchSourceBuilder.query(queryBuilder); + } + + SearchResponse response = query(index, type, searchSourceBuilder); + if (response == null || response.status() != RestStatus.OK) { + return null; + } + List content = toPojoList(response, clazz); + ScrollData scrollData = new ScrollData<>(); + scrollData.setSize(size); + scrollData.setTotal(response.getHits().getTotalHits()); + scrollData.setContent(content); + if (CollectionUtil.isNotEmpty(content)) { + T lastEntity = content.get(content.size() - 1); + scrollData.setScrollId(lastEntity.getDocId()); + } + return scrollData; + } + + /** + * 首次滚动查询批量查询,但是不适用与搜索,仅用于批查询 + **/ + public ScrollData pojoScrollBegin(String index, String type, SearchSourceBuilder searchBuilder, + Class clazz) throws IOException { + + int scrollTime = 10; + final Scroll scroll = new Scroll(TimeValue.timeValueSeconds(scrollTime)); + SearchRequest request = new SearchRequest(index); + request.types(type); + request.source(searchBuilder); + request.scroll(scroll); + SearchResponse response = client.search(request, RequestOptions.DEFAULT); + if (response == null || response.status() != RestStatus.OK) { + return null; + } + List content = toPojoList(response, clazz); + ScrollData scrollData = new ScrollData<>(); + scrollData.setSize(searchBuilder.size()); + scrollData.setTotal(response.getHits().getTotalHits()); + scrollData.setScrollId(response.getScrollId()); + scrollData.setContent(content); + return scrollData; + } + + /** + * 知道ScrollId之后,后续根据scrollId批量查询 + **/ + public ScrollData pojoScroll(String scrollId, SearchSourceBuilder searchBuilder, Class clazz) + throws IOException { + + int scrollTime = 10; + final Scroll scroll = new Scroll(TimeValue.timeValueSeconds(scrollTime)); + SearchScrollRequest request = new SearchScrollRequest(scrollId); + request.scroll(scroll); + SearchResponse response = client.scroll(request, RequestOptions.DEFAULT); + if (response == null || response.status() != RestStatus.OK) { + return null; + } + List content = toPojoList(response, clazz); + ScrollData scrollData = new ScrollData<>(); + scrollData.setSize(searchBuilder.size()); + scrollData.setTotal(response.getHits().getTotalHits()); + scrollData.setScrollId(response.getScrollId()); + scrollData.setContent(content); + return scrollData; + } + + public boolean pojoScrollEnd(String scrollId) throws IOException { + ClearScrollRequest request = new ClearScrollRequest(); + request.addScrollId(scrollId); + ClearScrollResponse response = client.clearScroll(request, RequestOptions.DEFAULT); + if (response != null) { + return response.isSucceeded(); + } + return false; + } + public T toPojo(GetResponse response, Class clazz) { if (null == response) { return null; diff --git a/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/config/ElasticsearchConfig.java b/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/config/ElasticsearchConfig.java index bdd2f38..791bbf1 100644 --- a/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/config/ElasticsearchConfig.java +++ b/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/config/ElasticsearchConfig.java @@ -1,12 +1,17 @@ package io.github.dunwu.javadb.elasticsearch.config; +import cn.hutool.core.util.StrUtil; import io.github.dunwu.javadb.elasticsearch.ElasticsearchFactory; import io.github.dunwu.javadb.elasticsearch.ElasticsearchTemplate; import org.elasticsearch.client.RestHighLevelClient; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; +import java.util.List; + /** * ES 配置 * @@ -17,14 +22,23 @@ import org.springframework.context.annotation.Configuration; @ComponentScan(value = "io.github.dunwu.javadb.elasticsearch.mapper") public class ElasticsearchConfig { + @Value("${es.hosts:#{null}}") + private String hostsConfig; + @Bean("restHighLevelClient") + @ConditionalOnMissingBean public RestHighLevelClient restHighLevelClient() { - return ElasticsearchFactory.newRestHighLevelClient(); + if (hostsConfig == null) { + return ElasticsearchFactory.newRestHighLevelClient(); + } else { + List hosts = StrUtil.split(hostsConfig, ","); + return ElasticsearchFactory.newRestHighLevelClient(hosts); + } } @Bean("elasticsearchTemplate") - public ElasticsearchTemplate elasticsearchTemplate() { - return ElasticsearchFactory.newElasticsearchTemplate(); + public ElasticsearchTemplate elasticsearchTemplate(RestHighLevelClient restHighLevelClient) { + return ElasticsearchFactory.newElasticsearchTemplate(restHighLevelClient); } } diff --git a/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/constant/CodeMsg.java b/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/constant/CodeMsg.java new file mode 100644 index 0000000..96e46f6 --- /dev/null +++ b/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/constant/CodeMsg.java @@ -0,0 +1,15 @@ +package io.github.dunwu.javadb.elasticsearch.constant; + +/** + * 请求 / 应答状态接口 + * + * @author Zhang Peng + * @since 2019-06-06 + */ +public interface CodeMsg { + + int getCode(); + + String getMsg(); + +} diff --git a/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/constant/ResultCode.java b/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/constant/ResultCode.java new file mode 100644 index 0000000..d4822fb --- /dev/null +++ b/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/constant/ResultCode.java @@ -0,0 +1,97 @@ +package io.github.dunwu.javadb.elasticsearch.constant; + +import cn.hutool.core.util.StrUtil; + +import java.util.stream.Stream; + +/** + * 系统级错误码 + * + * @author Zhang Peng + * @see HTTP 状态码 + * @see 腾讯开放平台错误码 + * @see 新浪开放平台错误码 + * @see 支付宝开放平台API + * @see 微信开放平台错误码 + * @since 2019-04-11 + */ +public enum ResultCode implements CodeMsg { + + OK(0, "成功"), + + PART_OK(1, "部分成功"), + + FAIL(-1, "失败"), + + // ----------------------------------------------------- + // 系统级错误码 + // ----------------------------------------------------- + + ERROR(1000, "服务器错误"), + + PARAM_ERROR(1001, "参数错误"), + + TASK_ERROR(1001, "调度任务错误"), + + CONFIG_ERROR(1003, "配置错误"), + + REQUEST_ERROR(1004, "请求错误"), + + IO_ERROR(1005, "IO 错误"), + + // ----------------------------------------------------- + // 2000 ~ 2999 数据库错误 + // ----------------------------------------------------- + + DATA_ERROR(2000, "数据库错误"), + + // ----------------------------------------------------- + // 3000 ~ 3999 三方错误 + // ----------------------------------------------------- + + THIRD_PART_ERROR(3000, "三方错误"), + + // ----------------------------------------------------- + // 3000 ~ 3999 认证错误 + // ----------------------------------------------------- + + AUTH_ERROR(4000, "认证错误"); + + private final int code; + + private final String msg; + + ResultCode(int code, String msg) { + this.code = code; + this.msg = msg; + } + + @Override + public int getCode() { + return code; + } + + @Override + public String getMsg() { + return msg; + } + + public static String getNameByCode(int code) { + return Stream.of(ResultCode.values()).filter(item -> item.getCode() == code).findFirst() + .map(ResultCode::getMsg).orElse(null); + } + + public static ResultCode getEnumByCode(int code) { + return Stream.of(ResultCode.values()).filter(item -> item.getCode() == code).findFirst().orElse(null); + } + + public static String getTypeInfo() { + StringBuilder sb = new StringBuilder(); + ResultCode[] types = ResultCode.values(); + for (ResultCode type : types) { + sb.append(StrUtil.format("{}:{}, ", type.getCode(), type.getMsg())); + } + return sb.toString(); + } +} diff --git a/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/entity/BaseEsEntity.java b/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/entity/BaseEsEntity.java index a775976..3220606 100644 --- a/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/entity/BaseEsEntity.java +++ b/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/entity/BaseEsEntity.java @@ -4,6 +4,8 @@ import lombok.Data; import lombok.ToString; import java.io.Serializable; +import java.util.LinkedHashMap; +import java.util.Map; /** * ES 实体接口 @@ -15,6 +17,8 @@ import java.io.Serializable; @ToString public abstract class BaseEsEntity implements Serializable { + public static final String DOC_ID = "docId"; + /** * 获取版本 */ @@ -24,4 +28,10 @@ public abstract class BaseEsEntity implements Serializable { public abstract String getDocId(); + public static Map getPropertiesMap() { + Map map = new LinkedHashMap<>(1); + map.put(BaseEsEntity.DOC_ID, "keyword"); + return map; + } + } diff --git a/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/entity/Page.java b/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/entity/Page.java deleted file mode 100644 index 8baef8b..0000000 --- a/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/entity/Page.java +++ /dev/null @@ -1,36 +0,0 @@ -package io.github.dunwu.javadb.elasticsearch.entity; - -import lombok.Data; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; - -/** - * 分页实体 - * - * @author Zhang Peng - * @date 2023-06-28 - */ -@Data -public class Page { - - private long total; - private int page; - private int size; - private List content = new ArrayList<>(); - - public Page(int page, int size, long total) { - this.total = total; - this.page = page; - this.size = size; - } - - public Page(int page, int size, long total, Collection list) { - this.total = total; - this.page = page; - this.size = size; - this.content.addAll(list); - } - -} diff --git a/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/entity/User.java b/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/entity/User.java index 1594772..b21b229 100644 --- a/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/entity/User.java +++ b/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/entity/User.java @@ -2,32 +2,43 @@ package io.github.dunwu.javadb.elasticsearch.entity; import lombok.AllArgsConstructor; import lombok.Builder; -import lombok.Data; -import lombok.EqualsAndHashCode; +import lombok.Getter; import lombok.NoArgsConstructor; +import lombok.Setter; + +import java.io.Serializable; +import java.util.LinkedHashMap; +import java.util.Map; /** - * 用户实体 + * 短剧、长视频消费数据 ES 实体 * * @author Zhang Peng - * @since 2023-06-28 + * @date 2024-04-02 */ -@Data @Builder -@EqualsAndHashCode(callSuper = true) -@AllArgsConstructor +@Getter +@Setter @NoArgsConstructor -public class User extends BaseEsEntity { +@AllArgsConstructor +public class User extends BaseEsEntity implements Serializable { - private Long id; - private String username; - private String password; + private String id; + private String name; private Integer age; - private String email; @Override public String getDocId() { - return String.valueOf(id); + return id; + } + + public static Map getPropertiesMap() { + Map map = new LinkedHashMap<>(); + map.put(BaseEsEntity.DOC_ID, "keyword"); + map.put("id", "long"); + map.put("name", "keyword"); + map.put("age", "integer"); + return map; } } diff --git a/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/entity/common/PageData.java b/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/entity/common/PageData.java new file mode 100644 index 0000000..e436f8b --- /dev/null +++ b/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/entity/common/PageData.java @@ -0,0 +1,37 @@ +package io.github.dunwu.javadb.elasticsearch.entity.common; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +/** + * 分页实体 + * + * @author Zhang Peng + * @date 2023-06-28 + */ +@Getter +@Setter +@NoArgsConstructor +@AllArgsConstructor +public class PageData implements Serializable { + + private int page; + private int size; + private long total; + private List content = new ArrayList<>(); + + public PageData(int page, int size, long total) { + this.total = total; + this.page = page; + this.size = size; + } + + private static final long serialVersionUID = 1L; + +} diff --git a/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/entity/common/ScrollData.java b/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/entity/common/ScrollData.java new file mode 100644 index 0000000..4f90cb8 --- /dev/null +++ b/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/entity/common/ScrollData.java @@ -0,0 +1,30 @@ +package io.github.dunwu.javadb.elasticsearch.entity.common; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; + +import java.io.Serializable; +import java.util.Collection; + +/** + * Hbase 滚动数据实体 + * + * @author Zhang Peng + * @date 2023-11-16 + */ +@Getter +@Setter +@NoArgsConstructor +@AllArgsConstructor +public class ScrollData implements Serializable { + + private String scrollId; + private int size = 10; + private long total = 0L; + private Collection content; + + private static final long serialVersionUID = 1L; + +} diff --git a/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/exception/CodeMsgException.java b/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/exception/CodeMsgException.java new file mode 100644 index 0000000..98ab199 --- /dev/null +++ b/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/exception/CodeMsgException.java @@ -0,0 +1,128 @@ +package io.github.dunwu.javadb.elasticsearch.exception; + +import cn.hutool.core.util.StrUtil; +import io.github.dunwu.javadb.elasticsearch.constant.CodeMsg; +import io.github.dunwu.javadb.elasticsearch.constant.ResultCode; + +/** + * 基础异常 + * + * @author Zhang Peng + * @since 2021-09-25 + */ +public class CodeMsgException extends RuntimeException implements CodeMsg { + + private static final long serialVersionUID = 6146660782281445735L; + + /** + * 状态码 + */ + protected int code; + + /** + * 响应信息 + */ + protected String msg; + + /** + * 提示信息 + */ + protected String toast; + + public CodeMsgException() { + this(ResultCode.FAIL); + } + + public CodeMsgException(CodeMsg codeMsg) { + this(codeMsg.getCode(), codeMsg.getMsg()); + } + + public CodeMsgException(CodeMsg codeMsg, String msg) { + this(codeMsg.getCode(), msg, null); + } + + public CodeMsgException(CodeMsg codeMsg, String msg, String toast) { + this(codeMsg.getCode(), msg, toast); + } + + public CodeMsgException(String msg) { + this(ResultCode.FAIL, msg); + } + + public CodeMsgException(int code, String msg) { + this(code, msg, msg); + } + + public CodeMsgException(int code, String msg, String toast) { + super(msg); + setCode(code); + setMsg(msg); + setToast(toast); + } + + public CodeMsgException(Throwable cause) { + this(cause, ResultCode.FAIL); + } + + public CodeMsgException(Throwable cause, String msg) { + this(cause, ResultCode.FAIL, msg); + } + + public CodeMsgException(Throwable cause, CodeMsg codeMsg) { + this(cause, codeMsg.getCode(), codeMsg.getMsg()); + } + + public CodeMsgException(Throwable cause, CodeMsg codeMsg, String msg) { + this(cause, codeMsg.getCode(), msg, null); + } + + public CodeMsgException(Throwable cause, CodeMsg codeMsg, String msg, String toast) { + this(cause, codeMsg.getCode(), msg, toast); + } + + public CodeMsgException(Throwable cause, int code, String msg) { + this(cause, code, msg, null); + } + + public CodeMsgException(Throwable cause, int code, String msg, String toast) { + super(msg, cause); + setCode(code); + setMsg(msg); + setToast(toast); + } + + @Override + public String getMessage() { + if (StrUtil.isNotBlank(msg)) { + return StrUtil.format("[{}]{}", code, msg); + } + return super.getMessage(); + } + + @Override + public int getCode() { + return code; + } + + public void setCode(int code) { + this.code = code; + } + + @Override + public String getMsg() { + return msg; + } + + public void setMsg(String msg) { + this.msg = msg; + } + + public String getToast() { + return toast; + } + + public void setToast(String toast) { + this.toast = toast; + } + +} diff --git a/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/exception/DefaultException.java b/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/exception/DefaultException.java new file mode 100644 index 0000000..14908e3 --- /dev/null +++ b/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/exception/DefaultException.java @@ -0,0 +1,72 @@ +package io.github.dunwu.javadb.elasticsearch.exception; + +import io.github.dunwu.javadb.elasticsearch.constant.CodeMsg; +import io.github.dunwu.javadb.elasticsearch.constant.ResultCode; + +/** + * 默认异常 + * + * @author Zhang Peng + * @since 2021-12-30 + */ +public class DefaultException extends CodeMsgException { + + private static final long serialVersionUID = -7027578114976830416L; + + public DefaultException() { + this(ResultCode.FAIL); + } + + public DefaultException(CodeMsg codeMsg) { + this(codeMsg.getCode(), codeMsg.getMsg()); + } + + public DefaultException(CodeMsg codeMsg, String msg) { + this(codeMsg.getCode(), msg, null); + } + + public DefaultException(CodeMsg codeMsg, String msg, String toast) { + this(codeMsg.getCode(), msg, toast); + } + + public DefaultException(String msg) { + this(ResultCode.FAIL, msg); + } + + public DefaultException(int code, String msg) { + this(code, msg, msg); + } + + public DefaultException(int code, String msg, String toast) { + super(code, msg, toast); + } + + public DefaultException(Throwable cause) { + this(cause, ResultCode.FAIL); + } + + public DefaultException(Throwable cause, String msg) { + this(cause, ResultCode.FAIL, msg); + } + + public DefaultException(Throwable cause, CodeMsg codeMsg) { + this(cause, codeMsg.getCode(), codeMsg.getMsg()); + } + + public DefaultException(Throwable cause, CodeMsg codeMsg, String msg) { + this(cause, codeMsg.getCode(), msg, null); + } + + public DefaultException(Throwable cause, CodeMsg codeMsg, String msg, String toast) { + this(cause, codeMsg.getCode(), msg, toast); + } + + public DefaultException(Throwable cause, int code, String msg) { + this(cause, code, msg, null); + } + + public DefaultException(Throwable cause, int code, String msg, String toast) { + super(cause, code, msg, toast); + } + +} diff --git a/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/mapper/BaseDynamicEsMapper.java b/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/mapper/BaseDynamicEsMapper.java new file mode 100644 index 0000000..710594d --- /dev/null +++ b/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/mapper/BaseDynamicEsMapper.java @@ -0,0 +1,210 @@ +package io.github.dunwu.javadb.elasticsearch.mapper; + +import cn.hutool.core.date.DatePattern; +import cn.hutool.core.date.DateTime; +import cn.hutool.core.date.DateUtil; +import cn.hutool.core.map.MapUtil; +import cn.hutool.core.util.StrUtil; +import io.github.dunwu.javadb.elasticsearch.ElasticsearchTemplate; +import io.github.dunwu.javadb.elasticsearch.constant.ResultCode; +import io.github.dunwu.javadb.elasticsearch.entity.BaseEsEntity; +import io.github.dunwu.javadb.elasticsearch.entity.common.PageData; +import io.github.dunwu.javadb.elasticsearch.entity.common.ScrollData; +import io.github.dunwu.javadb.elasticsearch.exception.DefaultException; +import lombok.extern.slf4j.Slf4j; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.search.builder.SearchSourceBuilder; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.Map; + +/** + * 动态 ES Mapper 基础类(以时间为维度动态创建、删除 index),用于数据量特别大,需要按照日期分片的索引。 + *

+ * 注:使用此 Mapper 的索引、别名必须遵循命名格式:索引名 = 别名_yyyyMMdd + * + * @author Zhang Peng + * @date 2024-04-07 + */ +@Slf4j +public abstract class BaseDynamicEsMapper extends BaseEsMapper { + + public BaseDynamicEsMapper(ElasticsearchTemplate elasticsearchTemplate) { + super(elasticsearchTemplate); + } + + // ==================================================================== + // 索引管理操作 + // ==================================================================== + + public String getIndex(String day) { + + String alias = getAlias(); + if (StrUtil.isBlank(day)) { + String msg = StrUtil.format("【ES】获取 {} 索引失败!day 不能为空!", alias); + throw new DefaultException(ResultCode.PARAM_ERROR, msg); + } + + DateTime date; + try { + date = DateUtil.parse(day, DatePattern.NORM_DATE_PATTERN); + } catch (Exception e) { + String msg = StrUtil.format("【ES】获取 {} 索引失败!day: {} 不符合日期格式 {}!", + alias, day, DatePattern.NORM_DATE_PATTERN); + throw new DefaultException(e, ResultCode.PARAM_ERROR, msg); + } + + String formatDate = DateUtil.format(date, DatePattern.PURE_DATE_FORMAT); + return alias + "_" + formatDate; + } + + public boolean isIndexExistsInDay(String day) throws IOException { + return elasticsearchTemplate.isIndexExists(getIndex(day)); + } + + public String createIndexInDay(String day) throws IOException, DefaultException { + String index = getIndex(day); + boolean indexExists = isIndexExistsInDay(day); + if (indexExists) { + return index; + } + elasticsearchTemplate.createIndex(index, getType(), getAlias(), getShard(), getReplica()); + Map map = getPropertiesMap(); + if (MapUtil.isNotEmpty(map)) { + elasticsearchTemplate.setMapping(index, getType(), map); + } + return index; + } + + public void deleteIndexInDay(String day) throws IOException { + elasticsearchTemplate.deleteIndex(getIndex(day)); + } + + public void updateAliasInDay(String day) throws IOException { + elasticsearchTemplate.updateAlias(getIndex(day), getAlias()); + } + + // ==================================================================== + // CRUD 操作 + // ==================================================================== + + public GetResponse getByIdInDay(String day, String id) throws IOException { + return elasticsearchTemplate.getById(getIndex(day), getType(), id, null); + } + + public T pojoByIdInDay(String day, String id) throws IOException { + return elasticsearchTemplate.pojoById(getIndex(day), getType(), id, null, getEntityClass()); + } + + public List pojoListByIdsInDay(String day, Collection ids) throws IOException { + return elasticsearchTemplate.pojoListByIds(getIndex(day), getType(), ids, getEntityClass()); + } + + public long countInDay(String day, SearchSourceBuilder builder) throws IOException { + return elasticsearchTemplate.count(getIndex(day), getType(), builder); + } + + public SearchResponse queryInDay(String day, SearchSourceBuilder builder) throws IOException { + return elasticsearchTemplate.query(getIndex(day), getType(), builder); + } + + public PageData pojoPageInDay(String day, SearchSourceBuilder builder) throws IOException { + return elasticsearchTemplate.pojoPage(getIndex(day), getType(), builder, getEntityClass()); + } + + public ScrollData pojoPageByLastIdInDay(String day, String lastId, int size, QueryBuilder queryBuilder) + throws IOException { + return elasticsearchTemplate.pojoPageByLastId(getIndex(day), getType(), lastId, size, + queryBuilder, getEntityClass()); + } + + public ScrollData pojoScrollBeginInDay(String day, SearchSourceBuilder builder) throws IOException { + return elasticsearchTemplate.pojoScrollBegin(getIndex(day), getType(), builder, getEntityClass()); + } + + /** + * 根据日期动态选择索引并更新 + * + * @param day 日期,格式为:yyyy-MM-dd + * @param entity 待更新的数据 + * @return / + */ + public boolean saveInDay(String day, T entity) throws IOException, DefaultException { + String index = checkIndex(day); + checkData(entity); + elasticsearchTemplate.save(index, getType(), entity); + return true; + } + + /** + * 根据日期动态选择索引并批量更新 + * + * @param day 日期,格式为:yyyy-MM-dd + * @param list 待更新的数据 + * @return / + */ + public boolean saveBatchInDay(String day, Collection list) throws IOException, DefaultException { + String index = checkIndex(day); + checkData(list); + elasticsearchTemplate.saveBatch(index, getType(), list); + return true; + } + + public void asyncSaveBatchInDay(String day, Collection list) throws IOException { + String index = checkIndex(day); + checkData(list); + ActionListener listener = new ActionListener() { + @Override + public void onResponse(BulkResponse response) { + if (response != null && !response.hasFailures()) { + String msg = StrUtil.format("【ES】按日期异步批量保存 {} 成功!", index); + log.info(msg); + } else { + String msg = StrUtil.format("【ES】按日期异步批量保存 {} 失败!", index); + log.warn(msg); + } + } + + @Override + public void onFailure(Exception e) { + String msg = StrUtil.format("【ES】按日期异步批量保存 {} 异常!", index); + log.error(msg, e); + } + }; + asyncSaveBatchInDay(day, list, listener); + } + + public void asyncSaveBatchInDay(String day, Collection list, ActionListener listener) + throws IOException { + String index = checkIndex(day); + checkData(list); + elasticsearchTemplate.asyncSaveBatch(getIndex(day), getType(), list, listener); + } + + public boolean deleteByIdInDay(String day, String id) throws IOException { + return elasticsearchTemplate.deleteById(getIndex(day), getType(), id); + } + + public boolean deleteBatchIdsInDay(String day, Collection ids) throws IOException { + return elasticsearchTemplate.deleteBatchIds(getIndex(day), getType(), ids); + } + + protected String checkIndex(String day) throws IOException { + if (!enableAutoCreateIndex()) { + return getIndex(day); + } + String index = createIndexInDay(day); + if (StrUtil.isBlank(index)) { + String msg = StrUtil.format("【ES】按日期批量保存 {} 失败!索引找不到且创建失败!", index); + throw new DefaultException(ResultCode.ERROR, msg); + } + return index; + } + +} diff --git a/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/mapper/BaseEsMapper.java b/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/mapper/BaseEsMapper.java index cd7e53a..3232f9e 100644 --- a/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/mapper/BaseEsMapper.java +++ b/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/mapper/BaseEsMapper.java @@ -1,23 +1,31 @@ package io.github.dunwu.javadb.elasticsearch.mapper; +import cn.hutool.core.collection.CollectionUtil; +import cn.hutool.core.map.MapUtil; +import cn.hutool.core.util.ReflectUtil; +import cn.hutool.core.util.StrUtil; import io.github.dunwu.javadb.elasticsearch.ElasticsearchTemplate; +import io.github.dunwu.javadb.elasticsearch.constant.ResultCode; import io.github.dunwu.javadb.elasticsearch.entity.BaseEsEntity; -import io.github.dunwu.javadb.elasticsearch.entity.Page; +import io.github.dunwu.javadb.elasticsearch.entity.common.PageData; +import io.github.dunwu.javadb.elasticsearch.entity.common.ScrollData; +import io.github.dunwu.javadb.elasticsearch.exception.DefaultException; import lombok.extern.slf4j.Slf4j; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.admin.indices.get.GetIndexRequest; import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.client.IndicesClient; -import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder; import java.io.IOException; +import java.lang.reflect.Method; import java.util.Collection; +import java.util.HashMap; import java.util.List; +import java.util.Map; /** * ES Mapper 基础类 @@ -28,7 +36,7 @@ import java.util.List; @Slf4j public abstract class BaseEsMapper implements EsMapper { - private BulkProcessor bulkProcessor; + protected BulkProcessor bulkProcessor; protected final ElasticsearchTemplate elasticsearchTemplate; @@ -36,6 +44,21 @@ public abstract class BaseEsMapper implements EsMapper implements EsMapper getPropertiesMap() { + + Class clazz = getEntityClass(); + Method method; + try { + method = clazz.getMethod("getPropertiesMap"); + } catch (NoSuchMethodException e) { + String msg = StrUtil.format("【ES】检查并创建 {} 索引失败!day 不能为空!", getAlias()); + throw new DefaultException(e, ResultCode.ERROR, msg); + } + + Object result = ReflectUtil.invokeStatic(method); + if (result == null) { + return new HashMap<>(0); + } + return (Map) result; + } + + // ==================================================================== + // 索引管理操作 + // ==================================================================== + @Override public boolean isIndexExists() throws IOException { - IndicesClient indicesClient = getClient().indices(); - GetIndexRequest request = new GetIndexRequest(); - request.indices(getIndex()); - return indicesClient.exists(request, RequestOptions.DEFAULT); + return elasticsearchTemplate.isIndexExists(getIndex()); } + @Override + public String createIndexIfNotExists() throws IOException { + String index = getIndex(); + boolean exists = elasticsearchTemplate.isIndexExists(index); + if (exists) { + return index; + } + elasticsearchTemplate.createIndex(index, getType(), getAlias(), getShard(), getReplica()); + Map propertiesMap = getPropertiesMap(); + if (MapUtil.isNotEmpty(propertiesMap)) { + elasticsearchTemplate.setMapping(index, getType(), propertiesMap); + } + return index; + } + + @Override + public void deleteIndex() throws IOException { + elasticsearchTemplate.deleteIndex(getIndex()); + } + + @Override + public void updateAlias() throws IOException { + elasticsearchTemplate.updateAlias(getIndex(), getAlias()); + } + + // ==================================================================== + // CRUD 操作 + // ==================================================================== + @Override public GetResponse getById(String id) throws IOException { return getById(id, null); @@ -85,11 +157,6 @@ public abstract class BaseEsMapper implements EsMapper pojoPage(SearchSourceBuilder builder) throws IOException { - return elasticsearchTemplate.pojoPage(getIndex(), getType(), builder, getEntityClass()); - } - @Override public long count(SearchSourceBuilder builder) throws IOException { return elasticsearchTemplate.count(getIndex(), getType(), builder); @@ -99,55 +166,95 @@ public abstract class BaseEsMapper implements EsMapper pojoPage(SearchSourceBuilder builder) throws IOException { + return elasticsearchTemplate.pojoPage(getIndex(), getType(), builder, getEntityClass()); + } + + @Override + public ScrollData pojoPageByLastId(String lastId, int size, QueryBuilder queryBuilder) throws IOException { + return elasticsearchTemplate.pojoPageByLastId(getIndex(), getType(), lastId, size, + queryBuilder, getEntityClass()); + } + + @Override + public ScrollData pojoScrollBegin(SearchSourceBuilder builder) throws IOException { + return elasticsearchTemplate.pojoScrollBegin(getIndex(), getType(), builder, getEntityClass()); + } + + @Override + public ScrollData pojoScroll(String scrollId, SearchSourceBuilder builder) throws IOException { + return elasticsearchTemplate.pojoScroll(scrollId, builder, getEntityClass()); + } + + @Override + public boolean pojoScrollEnd(String scrollId) throws IOException { + return elasticsearchTemplate.pojoScrollEnd(scrollId); + } + @Override public T save(T entity) throws IOException { - return elasticsearchTemplate.save(getIndex(), getType(), entity); + String index = checkIndex(); + checkData(entity); + return elasticsearchTemplate.save(index, getType(), entity); } @Override - public boolean batchSave(Collection list) throws IOException { - return elasticsearchTemplate.batchSave(getIndex(), getType(), list); + public boolean saveBatch(Collection list) throws IOException { + String index = checkIndex(); + checkData(list); + return elasticsearchTemplate.saveBatch(index, getType(), list); } @Override - public void asyncBatchSave(Collection list) throws IOException { + public void asyncSaveBatch(Collection list) throws IOException { + String index = checkIndex(); + checkData(list); ActionListener listener = new ActionListener() { @Override public void onResponse(BulkResponse response) { if (response != null && !response.hasFailures()) { - log.info("【ES】异步批量插入成功!"); + String msg = StrUtil.format("【ES】异步批量保存 {} 成功!", index); + log.info(msg); } else { - log.warn("【ES】异步批量插入失败!"); + String msg = StrUtil.format("【ES】异步批量保存 {} 失败!", index); + log.warn(msg); } } @Override public void onFailure(Exception e) { - log.error("【ES】异步批量插入异常!", e); + String msg = StrUtil.format("【ES】异步批量保存 {} 异常!", index); + log.error(msg, e); } }; - asyncBatchSave(list, listener); + asyncSaveBatch(list, listener); } @Override - public void asyncBatchSave(Collection list, ActionListener listener) { - elasticsearchTemplate.asyncBatchSave(getIndex(), getType(), list, listener); + public void asyncSaveBatch(Collection list, ActionListener listener) throws IOException { + String index = checkIndex(); + checkData(list); + elasticsearchTemplate.asyncSaveBatch(index, getType(), list, listener); } @Override public T updateById(T entity) throws IOException { + checkData(entity); return elasticsearchTemplate.updateById(getIndex(), getType(), entity); } @Override - public boolean batchUpdateById(Collection list) throws IOException { - return elasticsearchTemplate.batchUpdateById(getIndex(), getType(), list); + public boolean updateBatchIds(Collection list) throws IOException { + checkData(list); + return elasticsearchTemplate.updateBatchIds(getIndex(), getType(), list); } @Override - public void asyncBatchUpdateById(Collection list, ActionListener listener) { - elasticsearchTemplate.asyncBatchUpdateById(getIndex(), getType(), list, listener); + public void asyncUpdateBatchIds(Collection list, ActionListener listener) { + checkData(list); + elasticsearchTemplate.asyncUpdateBatchIds(getIndex(), getType(), list, listener); } @Override @@ -156,12 +263,12 @@ public abstract class BaseEsMapper implements EsMapper ids) throws IOException { - return elasticsearchTemplate.batchDeleteById(getIndex(), getType(), ids); + public boolean deleteBatchIds(Collection ids) throws IOException { + return elasticsearchTemplate.deleteBatchIds(getIndex(), getType(), ids); } @Override - public void asyncBatchDeleteById(Collection ids) throws IOException { + public void asyncDeleteBatchIds(Collection ids) throws IOException { ActionListener listener = new ActionListener() { @Override public void onResponse(BulkResponse response) { @@ -177,12 +284,38 @@ public abstract class BaseEsMapper implements EsMapper ids, ActionListener listener) throws IOException { - elasticsearchTemplate.asyncBatchDeleteById(getIndex(), getType(), ids, listener); + public void asyncDeleteBatchIds(Collection ids, ActionListener listener) throws IOException { + elasticsearchTemplate.asyncDeleteBatchIds(getIndex(), getType(), ids, listener); + } + + protected String checkIndex() throws IOException { + if (!enableAutoCreateIndex()) { + return getIndex(); + } + String index = createIndexIfNotExists(); + if (StrUtil.isBlank(index)) { + String msg = StrUtil.format("【ES】索引找不到且创建失败!", index); + throw new DefaultException(ResultCode.ERROR, msg); + } + return index; + } + + protected void checkData(Collection list) { + if (CollectionUtil.isEmpty(list)) { + String msg = StrUtil.format("【ES】写入 {} 失败!list 不能为空!", getIndex()); + throw new DefaultException(ResultCode.PARAM_ERROR, msg); + } + } + + protected void checkData(T entity) { + if (entity == null) { + String msg = StrUtil.format("【ES】写入 {} 失败!entity 不能为空!", getIndex()); + throw new DefaultException(ResultCode.PARAM_ERROR, msg); + } } } diff --git a/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/mapper/EsMapper.java b/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/mapper/EsMapper.java index 53eb9b2..bacabff 100644 --- a/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/mapper/EsMapper.java +++ b/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/mapper/EsMapper.java @@ -2,13 +2,15 @@ package io.github.dunwu.javadb.elasticsearch.mapper; import cn.hutool.core.collection.CollectionUtil; import io.github.dunwu.javadb.elasticsearch.entity.BaseEsEntity; -import io.github.dunwu.javadb.elasticsearch.entity.Page; +import io.github.dunwu.javadb.elasticsearch.entity.common.PageData; +import io.github.dunwu.javadb.elasticsearch.entity.common.ScrollData; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder; import java.io.IOException; @@ -25,6 +27,11 @@ import java.util.Map; */ public interface EsMapper { + /** + * 获取别名 + */ + String getAlias(); + /** * 获取索引名 */ @@ -35,6 +42,16 @@ public interface EsMapper { */ String getType(); + /** + * 获取分片数 + */ + int getShard(); + + /** + * 获取副本数 + */ + int getReplica(); + /** * 获取实体类型 */ @@ -46,6 +63,12 @@ public interface EsMapper { boolean isIndexExists() throws IOException; + String createIndexIfNotExists() throws IOException; + + void deleteIndex() throws IOException; + + void updateAlias() throws IOException; + GetResponse getById(String id) throws IOException; GetResponse getById(String id, Long version) throws IOException; @@ -69,32 +92,40 @@ public interface EsMapper { return map; } - Page pojoPage(SearchSourceBuilder builder) throws IOException; - long count(SearchSourceBuilder builder) throws IOException; SearchResponse query(SearchSourceBuilder builder) throws IOException; + PageData pojoPage(SearchSourceBuilder builder) throws IOException; + + ScrollData pojoPageByLastId(String lastId, int size, QueryBuilder queryBuilder) throws IOException; + + ScrollData pojoScrollBegin(SearchSourceBuilder builder) throws IOException; + + ScrollData pojoScroll(String scrollId, SearchSourceBuilder builder) throws IOException; + + boolean pojoScrollEnd(String scrollId) throws IOException; + T save(T entity) throws IOException; - boolean batchSave(Collection list) throws IOException; + boolean saveBatch(Collection list) throws IOException; - void asyncBatchSave(Collection list) throws IOException; + void asyncSaveBatch(Collection list) throws IOException; - void asyncBatchSave(Collection list, ActionListener listener) throws IOException; + void asyncSaveBatch(Collection list, ActionListener listener) throws IOException; T updateById(T entity) throws IOException; - boolean batchUpdateById(Collection list) throws IOException; + boolean updateBatchIds(Collection list) throws IOException; - void asyncBatchUpdateById(Collection list, ActionListener listener); + void asyncUpdateBatchIds(Collection list, ActionListener listener); boolean deleteById(String id) throws IOException; - boolean batchDeleteById(Collection ids) throws IOException; + boolean deleteBatchIds(Collection ids) throws IOException; - void asyncBatchDeleteById(Collection ids) throws IOException; + void asyncDeleteBatchIds(Collection ids) throws IOException; - void asyncBatchDeleteById(Collection ids, ActionListener listener) throws IOException; + void asyncDeleteBatchIds(Collection ids, ActionListener listener) throws IOException; } diff --git a/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/mapper/UserEsMapper.java b/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/mapper/UserEsMapper.java index 970b1b4..de6d1b7 100644 --- a/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/mapper/UserEsMapper.java +++ b/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/mapper/UserEsMapper.java @@ -1,27 +1,39 @@ package io.github.dunwu.javadb.elasticsearch.mapper; +import cn.hutool.core.date.DatePattern; +import cn.hutool.core.date.DateUtil; import io.github.dunwu.javadb.elasticsearch.ElasticsearchTemplate; import io.github.dunwu.javadb.elasticsearch.entity.User; +import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; +import java.util.Date; + /** - * User ES Mapper + * open_applet_consume_yyyyMMdd ES Mapper * * @author Zhang Peng * @date 2023-06-27 */ +@Slf4j @Component -public class UserEsMapper extends BaseEsMapper { +public class UserEsMapper extends BaseDynamicEsMapper { public UserEsMapper(ElasticsearchTemplate elasticsearchTemplate) { super(elasticsearchTemplate); } @Override - public String getIndex() { + public String getAlias() { return "user"; } + @Override + public String getIndex() { + String date = DateUtil.format(new Date(), DatePattern.PURE_DATE_FORMAT); + return getAlias() + "_" + date; + } + @Override public String getType() { return "_doc"; diff --git a/codes/javadb/elasticsearch/elasticsearch6/src/test/java/io/github/dunwu/javadb/elasticsearch/BaseApplicationTests.java b/codes/javadb/elasticsearch/elasticsearch6/src/test/java/io/github/dunwu/javadb/elasticsearch/BaseApplicationTests.java index 8d43d1b..1fadeff 100644 --- a/codes/javadb/elasticsearch/elasticsearch6/src/test/java/io/github/dunwu/javadb/elasticsearch/BaseApplicationTests.java +++ b/codes/javadb/elasticsearch/elasticsearch6/src/test/java/io/github/dunwu/javadb/elasticsearch/BaseApplicationTests.java @@ -16,9 +16,6 @@ import org.springframework.web.context.WebApplicationContext; @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) public abstract class BaseApplicationTests { - // ---------------------------------------------------------------------------- 测试常量数据 - - // ---------------------------------------------------------------------------- protected MockMvc mockMvc; @Autowired @@ -26,7 +23,7 @@ public abstract class BaseApplicationTests { @BeforeEach public void setUp() { - mockMvc = MockMvcBuilders.webAppContextSetup(context).build(); //构造MockMvc + mockMvc = MockMvcBuilders.webAppContextSetup(context).build(); } @BeforeAll diff --git a/codes/javadb/elasticsearch/elasticsearch6/src/test/java/io/github/dunwu/javadb/elasticsearch/BaseElasticsearchTemplateTest.java b/codes/javadb/elasticsearch/elasticsearch6/src/test/java/io/github/dunwu/javadb/elasticsearch/BaseElasticsearchTemplateTest.java new file mode 100644 index 0000000..05b95f7 --- /dev/null +++ b/codes/javadb/elasticsearch/elasticsearch6/src/test/java/io/github/dunwu/javadb/elasticsearch/BaseElasticsearchTemplateTest.java @@ -0,0 +1,241 @@ +package io.github.dunwu.javadb.elasticsearch; + +import cn.hutool.core.collection.CollectionUtil; +import cn.hutool.core.util.StrUtil; +import io.github.dunwu.javadb.elasticsearch.entity.BaseEsEntity; +import io.github.dunwu.javadb.elasticsearch.entity.common.PageData; +import io.github.dunwu.javadb.elasticsearch.entity.common.ScrollData; +import io.github.dunwu.javadb.elasticsearch.util.JsonUtil; +import lombok.extern.slf4j.Slf4j; +import org.assertj.core.api.Assertions; +import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.index.query.BoolQueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.builder.SearchSourceBuilder; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +/** + * ElasticsearchTemplate 测试 + * + * @author Zhang Peng + * @date 2023-11-13 + */ +@Slf4j +public abstract class BaseElasticsearchTemplateTest { + + static final int FROM = 0; + static final int SIZE = 10; + static final String TEST_ID_01 = "1"; + static final String TEST_ID_02 = "2"; + + protected ElasticsearchTemplate TEMPLATE = ElasticsearchFactory.newElasticsearchTemplate(); + + protected abstract String getAlias(); + + protected abstract String getIndex(); + + protected abstract String getType(); + + protected abstract int getShard(); + + protected abstract int getReplica(); + + protected abstract Class getEntityClass(); + + protected abstract Map getPropertiesMap(); + + protected abstract T getOneMockData(String id); + + protected abstract List getMockList(int num); + + protected void deleteIndex() throws IOException { + boolean exists = TEMPLATE.isIndexExists(getIndex()); + if (!exists) { + return; + } + TEMPLATE.deleteIndex(getIndex()); + exists = TEMPLATE.isIndexExists(getIndex()); + Assertions.assertThat(exists).isFalse(); + } + + protected void createIndex() throws IOException { + boolean exists = TEMPLATE.isIndexExists(getIndex()); + if (exists) { + return; + } + TEMPLATE.createIndex(getIndex(), getType(), getAlias(), getShard(), getReplica()); + TEMPLATE.setMapping(getIndex(), getType(), getPropertiesMap()); + exists = TEMPLATE.isIndexExists(getIndex()); + Assertions.assertThat(exists).isTrue(); + } + + protected void save() throws IOException { + String id = "1"; + T entity = getOneMockData(id); + TEMPLATE.save(getIndex(), getType(), entity); + T newEntity = TEMPLATE.pojoById(getIndex(), getType(), id, getEntityClass()); + log.info("记录:{}", JsonUtil.toString(newEntity)); + Assertions.assertThat(newEntity).isNotNull(); + } + + protected void saveBatch() throws IOException { + int total = 10000; + List> listGroup = CollectionUtil.split(getMockList(total), 1000); + for (List list : listGroup) { + TEMPLATE.saveBatch(getIndex(), getType(), list); + } + long count = TEMPLATE.count(getIndex(), getType(), new SearchSourceBuilder()); + log.info("批量更新记录数: {}", count); + Assertions.assertThat(count).isEqualTo(total); + } + + protected void getById() throws IOException { + GetResponse response = TEMPLATE.getById(getIndex(), getType(), TEST_ID_01); + Assertions.assertThat(response).isNotNull(); + log.info("记录:{}", JsonUtil.toString(response.getSourceAsMap())); + } + + protected void pojoById() throws IOException { + T entity = TEMPLATE.pojoById(getIndex(), getType(), TEST_ID_01, getEntityClass()); + Assertions.assertThat(entity).isNotNull(); + log.info("记录:{}", JsonUtil.toString(entity)); + } + + protected void pojoListByIds() throws IOException { + List ids = Arrays.asList(TEST_ID_01, TEST_ID_02); + List list = TEMPLATE.pojoListByIds(getIndex(), getType(), ids, getEntityClass()); + Assertions.assertThat(list).isNotEmpty(); + Assertions.assertThat(list.size()).isEqualTo(2); + for (T entity : list) { + log.info("记录:{}", JsonUtil.toString(entity)); + } + } + + protected void count() throws IOException { + BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery(); + queryBuilder.must(QueryBuilders.rangeQuery("docId").lt("100")); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.query(queryBuilder); + long total = TEMPLATE.count(getIndex(), getType(), searchSourceBuilder); + Assertions.assertThat(total).isNotZero(); + log.info("符合条件的记录数:{}", total); + } + + protected void query() throws IOException { + BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery(); + queryBuilder.must(QueryBuilders.rangeQuery("docId").lt("100")); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.query(queryBuilder); + searchSourceBuilder.from(FROM); + searchSourceBuilder.size(SIZE); + SearchResponse response = TEMPLATE.query(getIndex(), getType(), searchSourceBuilder); + Assertions.assertThat(response).isNotNull(); + Assertions.assertThat(response.getHits()).isNotNull(); + for (SearchHit hit : response.getHits().getHits()) { + log.info("记录:{}", hit.getSourceAsString()); + Map map = hit.getSourceAsMap(); + Assertions.assertThat(map).isNotNull(); + Assertions.assertThat(Integer.valueOf((String) map.get("docId"))).isLessThan(100); + } + } + + protected void pojoPage() throws IOException { + BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery(); + queryBuilder.must(QueryBuilders.rangeQuery("docId").lt("100")); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.query(queryBuilder); + searchSourceBuilder.from(FROM); + searchSourceBuilder.size(SIZE); + PageData page = TEMPLATE.pojoPage(getIndex(), getType(), searchSourceBuilder, getEntityClass()); + Assertions.assertThat(page).isNotNull(); + Assertions.assertThat(page.getContent()).isNotEmpty(); + for (T entity : page.getContent()) { + log.info("记录:{}", JsonUtil.toString(entity)); + } + } + + protected void pojoPageByLastId() throws IOException { + + BoolQueryBuilder queryBuilder = new BoolQueryBuilder(); + queryBuilder.must(QueryBuilders.rangeQuery("docId").lt("100")); + + long total = TEMPLATE.count(getIndex(), getType(), queryBuilder); + ScrollData scrollData = + TEMPLATE.pojoPageByLastId(getIndex(), getType(), null, SIZE, queryBuilder, getEntityClass()); + if (scrollData == null || scrollData.getScrollId() == null) { + return; + } + Assertions.assertThat(scrollData.getTotal()).isEqualTo(total); + + long count = 0L; + scrollData.getContent().forEach(data -> { + log.info("docId: {}", data.getDocId()); + }); + count += scrollData.getContent().size(); + + String scrollId = scrollData.getScrollId(); + while (CollectionUtil.isNotEmpty(scrollData.getContent())) { + scrollData = + TEMPLATE.pojoPageByLastId(getIndex(), getType(), scrollId, SIZE, queryBuilder, getEntityClass()); + if (scrollData == null || CollectionUtil.isEmpty(scrollData.getContent())) { + break; + } + if (StrUtil.isNotBlank(scrollData.getScrollId())) { + scrollId = scrollData.getScrollId(); + } + scrollData.getContent().forEach(data -> { + log.info("docId: {}", data.getDocId()); + }); + count += scrollData.getContent().size(); + } + log.info("total: {}", total); + Assertions.assertThat(count).isEqualTo(total); + } + + protected void pojoScroll() throws IOException { + + BoolQueryBuilder queryBuilder = new BoolQueryBuilder(); + queryBuilder.must(QueryBuilders.rangeQuery("docId").lt("100")); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.size(SIZE).query(queryBuilder).trackScores(false); + + long total = TEMPLATE.count(getIndex(), getType(), queryBuilder); + ScrollData scrollData = + TEMPLATE.pojoScrollBegin(getIndex(), getType(), searchSourceBuilder, getEntityClass()); + if (scrollData == null || scrollData.getScrollId() == null) { + return; + } + Assertions.assertThat(scrollData.getTotal()).isEqualTo(total); + + long count = 0L; + scrollData.getContent().forEach(data -> { + log.info("docId: {}", data.getDocId()); + }); + count += scrollData.getContent().size(); + + String scrollId = scrollData.getScrollId(); + while (CollectionUtil.isNotEmpty(scrollData.getContent())) { + scrollData = TEMPLATE.pojoScroll(scrollId, searchSourceBuilder, getEntityClass()); + if (scrollData == null || CollectionUtil.isEmpty(scrollData.getContent())) { + break; + } + if (StrUtil.isNotBlank(scrollData.getScrollId())) { + scrollId = scrollData.getScrollId(); + } + scrollData.getContent().forEach(data -> { + log.info("docId: {}", data.getDocId()); + }); + count += scrollData.getContent().size(); + } + TEMPLATE.pojoScrollEnd(scrollId); + log.info("total: {}", total); + Assertions.assertThat(count).isEqualTo(total); + } + +} diff --git a/codes/javadb/elasticsearch/elasticsearch6/src/test/java/io/github/dunwu/javadb/elasticsearch/ElasticsearchTemplateTest.java b/codes/javadb/elasticsearch/elasticsearch6/src/test/java/io/github/dunwu/javadb/elasticsearch/ElasticsearchTemplateTest.java deleted file mode 100644 index bbddf70..0000000 --- a/codes/javadb/elasticsearch/elasticsearch6/src/test/java/io/github/dunwu/javadb/elasticsearch/ElasticsearchTemplateTest.java +++ /dev/null @@ -1,193 +0,0 @@ -package io.github.dunwu.javadb.elasticsearch; - -import io.github.dunwu.javadb.elasticsearch.entity.Page; -import io.github.dunwu.javadb.elasticsearch.entity.User; -import io.github.dunwu.javadb.elasticsearch.util.JsonUtil; -import org.assertj.core.api.Assertions; -import org.elasticsearch.action.get.GetResponse; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.index.query.BoolQueryBuilder; -import org.elasticsearch.index.query.QueryBuilders; -import org.elasticsearch.search.SearchHit; -import org.elasticsearch.search.builder.SearchSourceBuilder; -import org.junit.jupiter.api.DisplayName; -import org.junit.jupiter.api.Nested; -import org.junit.jupiter.api.Test; - -import java.io.IOException; -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; - -/** - * ElasticsearchTemplate 测试 - * - * @author Zhang Peng - * @date 2023-11-13 - */ -public class ElasticsearchTemplateTest { - - public static final String INDEX = "user"; - public static final String TYPE = "_doc"; - public static final String TEST_ID_01 = "1"; - public static final String TEST_ID_02 = "2"; - - private static final ElasticsearchTemplate TEMPLATE; - - static { - TEMPLATE = ElasticsearchFactory.newElasticsearchTemplate(); - } - - @Test - @DisplayName("根据ID精确查询") - public void getById() throws IOException { - GetResponse response = TEMPLATE.getById(INDEX, TYPE, TEST_ID_01); - System.out.println("记录:" + JsonUtil.toString(response.getSourceAsMap())); - } - - @Test - @DisplayName("根据ID精确查询POJO") - public void pojoById() throws IOException { - User entity = TEMPLATE.pojoById(INDEX, TYPE, TEST_ID_01, User.class); - System.out.println("记录:" + JsonUtil.toString(entity)); - } - - @Test - @DisplayName("根据ID精确批量查询POJO") - public void pojoListByIds() throws IOException { - List ids = Arrays.asList(TEST_ID_01, TEST_ID_02); - List list = TEMPLATE.pojoListByIds(INDEX, TYPE, ids, User.class); - Assertions.assertThat(list).isNotEmpty(); - Assertions.assertThat(list.size()).isEqualTo(2); - for (User entity : list) { - System.out.println("记录:" + JsonUtil.toString(entity)); - } - } - - @Test - @DisplayName("分页查询") - public void pojoPage() throws IOException { - BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery(); - boolQueryBuilder.must(QueryBuilders.termQuery("theme", 3)); - SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); - searchSourceBuilder.query(boolQueryBuilder); - searchSourceBuilder.from(0); - searchSourceBuilder.size(10); - - Page page = TEMPLATE.pojoPage(INDEX, TYPE, searchSourceBuilder, User.class); - Assertions.assertThat(page).isNotNull(); - Assertions.assertThat(page.getContent()).isNotEmpty(); - for (User entity : page.getContent()) { - System.out.println("记录:" + JsonUtil.toString(entity)); - } - } - - @Test - @DisplayName("条件数量查询") - public void count() throws IOException { - BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery(); - boolQueryBuilder.must(QueryBuilders.termQuery("theme", 3)); - SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); - searchSourceBuilder.query(boolQueryBuilder); - searchSourceBuilder.from(0); - searchSourceBuilder.size(10); - long total = TEMPLATE.count(INDEX, TYPE, searchSourceBuilder); - Assertions.assertThat(total).isNotZero(); - System.out.println("符合条件的总记录数:" + total); - } - - @Test - @DisplayName("条件查询") - public void query() throws IOException { - BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery(); - boolQueryBuilder.must(QueryBuilders.termQuery("theme", 3)); - SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); - searchSourceBuilder.query(boolQueryBuilder); - searchSourceBuilder.from(0); - searchSourceBuilder.size(10); - SearchResponse response = TEMPLATE.query(INDEX, TYPE, searchSourceBuilder); - Assertions.assertThat(response).isNotNull(); - Assertions.assertThat(response.getHits()).isNotNull(); - for (SearchHit hit : response.getHits().getHits()) { - System.out.println("记录:" + hit.getSourceAsString()); - Map map = hit.getSourceAsMap(); - Assertions.assertThat(map).isNotNull(); - Assertions.assertThat(map.get("theme")).isEqualTo(3); - } - } - - @Nested - @DisplayName("写操作测试") - public class WriteTest { - - String json1 = - "{\"id\":1,\"username\":\"user1\",\"password\":\"xxxxxx\",\"age\":18,\"email\":\"user1@xxx.com\"}"; - String json2 = - "{\"id\":2,\"username\":\"user2\",\"password\":\"xxxxxx\",\"age\":18,\"email\":\"user2@xxx.com\"}"; - - @Test - @DisplayName("插入、更新") - public void saveAndUpdate() throws IOException, InterruptedException { - - User origin = JsonUtil.toBean(json1, User.class); - if (origin == null) { - System.err.println("反序列化失败!"); - return; - } - - TEMPLATE.save(INDEX, TYPE, origin); - TimeUnit.SECONDS.sleep(1); - User expectEntity = TEMPLATE.pojoById(INDEX, TYPE, origin.getDocId(), User.class); - Assertions.assertThat(expectEntity).isNotNull(); - - expectEntity.setAge(20); - TEMPLATE.updateById(INDEX, TYPE, expectEntity); - TimeUnit.SECONDS.sleep(18); - User expectEntity2 = - TEMPLATE.pojoById(INDEX, TYPE, origin.getDocId(), User.class); - Assertions.assertThat(expectEntity2).isNotNull(); - Assertions.assertThat(expectEntity2.getAge()).isEqualTo(20); - } - - @Test - @DisplayName("批量插入、更新") - public void batchSaveAndUpdate() throws IOException, InterruptedException { - - User origin1 = JsonUtil.toBean(json1, User.class); - if (origin1 == null) { - System.err.println("反序列化失败!"); - return; - } - - User origin2 = JsonUtil.toBean(json2, User.class); - if (origin2 == null) { - System.err.println("反序列化失败!"); - return; - } - - List list = Arrays.asList(origin1, origin2); - List ids = list.stream().map(User::getDocId).collect(Collectors.toList()); - - TEMPLATE.batchSave(INDEX, TYPE, list); - List newList = TEMPLATE.pojoListByIds(INDEX, TYPE, ids, User.class); - Assertions.assertThat(newList).isNotEmpty(); - - newList.forEach(entity -> { - entity.setAge(20); - }); - TEMPLATE.batchUpdateById(INDEX, TYPE, newList); - TimeUnit.SECONDS.sleep(1); - - List expectList = - TEMPLATE.pojoListByIds(INDEX, TYPE, ids, User.class); - Assertions.assertThat(expectList).isNotEmpty(); - for (User item : expectList) { - Assertions.assertThat(item.getAge()).isEqualTo(20); - } - } - - } - -} diff --git a/codes/javadb/elasticsearch/elasticsearch6/src/test/java/io/github/dunwu/javadb/elasticsearch/UserElasticsearchTemplateTest.java b/codes/javadb/elasticsearch/elasticsearch6/src/test/java/io/github/dunwu/javadb/elasticsearch/UserElasticsearchTemplateTest.java new file mode 100644 index 0000000..3724246 --- /dev/null +++ b/codes/javadb/elasticsearch/elasticsearch6/src/test/java/io/github/dunwu/javadb/elasticsearch/UserElasticsearchTemplateTest.java @@ -0,0 +1,108 @@ +package io.github.dunwu.javadb.elasticsearch; + +import cn.hutool.core.date.DatePattern; +import cn.hutool.core.date.DateUtil; +import cn.hutool.core.util.RandomUtil; +import io.github.dunwu.javadb.elasticsearch.entity.User; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.Date; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +/** + * 使用 ElasticsearchTemplate 对 user 索引进行测试 + * + * @author Zhang Peng + * @date 2024-04-09 + */ +@Slf4j +public class UserElasticsearchTemplateTest extends BaseElasticsearchTemplateTest { + + @Override + protected String getAlias() { + return "user"; + } + + @Override + protected String getIndex() { + String date = DateUtil.format(new Date(), DatePattern.PURE_DATE_FORMAT); + return getAlias() + "_" + date; + } + + @Override + protected String getType() { + return "_doc"; + } + + @Override + protected int getShard() { + return 5; + } + + @Override + protected int getReplica() { + return 1; + } + + @Override + protected Class getEntityClass() { + return User.class; + } + + @Override + protected Map getPropertiesMap() { + return User.getPropertiesMap(); + } + + @Override + protected User getOneMockData(String id) { + return User.builder() + .id(id) + .name("测试数据" + id) + .age(RandomUtil.randomInt(1, 100)) + .build(); + } + + @Override + protected List getMockList(int num) { + List list = new LinkedList<>(); + for (int i = 1; i <= num; i++) { + User entity = getOneMockData(String.valueOf(i)); + list.add(entity); + } + return list; + } + + @Test + @DisplayName("索引管理测试") + public void indexTest() throws IOException { + super.deleteIndex(); + super.createIndex(); + } + + @Test + @DisplayName("写数据测试") + protected void writeTest() throws IOException { + super.save(); + super.saveBatch(); + } + + @Test + @DisplayName("读数据测试") + public void readTest() throws IOException { + super.getById(); + super.pojoById(); + super.pojoListByIds(); + super.count(); + super.query(); + super.pojoPage(); + super.pojoPageByLastId(); + super.pojoScroll(); + } + +} diff --git a/codes/javadb/elasticsearch/elasticsearch6/src/test/java/io/github/dunwu/javadb/elasticsearch/mapper/UserEsMapperTest.java b/codes/javadb/elasticsearch/elasticsearch6/src/test/java/io/github/dunwu/javadb/elasticsearch/mapper/UserEsMapperTest.java index 65c7927..ccc7b39 100644 --- a/codes/javadb/elasticsearch/elasticsearch6/src/test/java/io/github/dunwu/javadb/elasticsearch/mapper/UserEsMapperTest.java +++ b/codes/javadb/elasticsearch/elasticsearch6/src/test/java/io/github/dunwu/javadb/elasticsearch/mapper/UserEsMapperTest.java @@ -1,27 +1,31 @@ package io.github.dunwu.javadb.elasticsearch.mapper; +import cn.hutool.core.collection.CollectionUtil; +import cn.hutool.core.util.RandomUtil; +import cn.hutool.core.util.StrUtil; import io.github.dunwu.javadb.elasticsearch.BaseApplicationTests; -import io.github.dunwu.javadb.elasticsearch.entity.Page; import io.github.dunwu.javadb.elasticsearch.entity.User; +import io.github.dunwu.javadb.elasticsearch.entity.common.PageData; +import io.github.dunwu.javadb.elasticsearch.entity.common.ScrollData; import io.github.dunwu.javadb.elasticsearch.util.JsonUtil; +import lombok.extern.slf4j.Slf4j; import org.assertj.core.api.Assertions; -import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.search.sort.SortOrder; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import java.io.IOException; -import java.util.Arrays; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; /** * ElasticsearchTemplate 测试 @@ -29,163 +33,441 @@ import java.util.stream.Collectors; * @author Zhang Peng * @date 2023-11-13 */ +@Slf4j public class UserEsMapperTest extends BaseApplicationTests { + static final int FROM = 0; + static final int SIZE = 10; + private static final String day = "2024-04-07"; + @Autowired private UserEsMapper mapper; - public static final String TEST_ID_01 = "1"; - public static final String TEST_ID_02 = "2"; - @Test - @DisplayName("根据ID精确查询") - public void getById() throws IOException { - GetResponse response = mapper.getById(TEST_ID_01); - System.out.println("记录:" + JsonUtil.toString(response.getSourceAsMap())); - } + @Nested + @DisplayName("删除索引测试") + class DeleteIndexTest { - @Test - @DisplayName("根据ID精确查询POJO") - public void pojoById() throws IOException { - User entity = mapper.pojoById(TEST_ID_01); - System.out.println("记录:" + JsonUtil.toString(entity)); - } - - @Test - @DisplayName("根据ID精确批量查询POJO") - public void pojoListByIds() throws IOException { - List ids = Arrays.asList(TEST_ID_01, TEST_ID_02); - List list = mapper.pojoListByIds(ids); - Assertions.assertThat(list).isNotEmpty(); - Assertions.assertThat(list.size()).isEqualTo(2); - for (User entity : list) { - System.out.println("记录:" + JsonUtil.toString(entity)); + @Test + @DisplayName("删除当天索引") + public void deleteIndex() throws IOException { + String index = mapper.getIndex(); + boolean indexExists = mapper.isIndexExists(); + if (!indexExists) { + log.info("【ES】{} 不存在!", index); + return; + } + mapper.deleteIndex(); + indexExists = mapper.isIndexExists(); + Assertions.assertThat(indexExists).isFalse(); } - } - @Test - @DisplayName("分页查询") - public void pojoPage() throws IOException { - BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery(); - boolQueryBuilder.must(QueryBuilders.termQuery("id", 1)); - SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); - searchSourceBuilder.query(boolQueryBuilder); - searchSourceBuilder.from(0); - searchSourceBuilder.size(10); - - Page page = mapper.pojoPage(searchSourceBuilder); - Assertions.assertThat(page).isNotNull(); - Assertions.assertThat(page.getContent()).isNotEmpty(); - for (User entity : page.getContent()) { - System.out.println("记录:" + JsonUtil.toString(entity)); + @Test + @DisplayName("根据日期删除索引") + public void deleteIndexInDay() throws IOException { + String index = mapper.getIndex(day); + boolean indexExists = mapper.isIndexExistsInDay(day); + if (!indexExists) { + log.info("【ES】{} 不存在!", index); + return; + } + mapper.deleteIndexInDay(day); + indexExists = mapper.isIndexExistsInDay(day); + Assertions.assertThat(indexExists).isFalse(); } + } - @Test - @DisplayName("条件数量查询") - public void count() throws IOException { - BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery(); - boolQueryBuilder.must(QueryBuilders.termQuery("id", 1)); - // boolQueryBuilder.must(QueryBuilders.rangeQuery("age") - // .from(18) - // .to(25) - // .includeLower(true) - // .includeUpper(true)); - SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); - searchSourceBuilder.query(boolQueryBuilder); - searchSourceBuilder.from(0); - searchSourceBuilder.size(10); - long total = mapper.count(searchSourceBuilder); - Assertions.assertThat(total).isNotZero(); - System.out.println("符合条件的总记录数:" + total); - } + @Nested + @DisplayName("创建索引测试") + class CreateIndexTest { - @Test - @DisplayName("条件查询") - public void query() throws IOException { - BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery(); - boolQueryBuilder.must(QueryBuilders.termQuery("id", 1)); - SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); - searchSourceBuilder.query(boolQueryBuilder); - searchSourceBuilder.from(0); - searchSourceBuilder.size(10); - SearchResponse response = mapper.query(searchSourceBuilder); - Assertions.assertThat(response).isNotNull(); - Assertions.assertThat(response.getHits()).isNotNull(); - for (SearchHit hit : response.getHits().getHits()) { - System.out.println("记录:" + hit.getSourceAsString()); - Map map = hit.getSourceAsMap(); - Assertions.assertThat(map).isNotNull(); + @Test + @DisplayName("创建当天索引") + public void createIndex() throws IOException { + + String index = mapper.getIndex(); + boolean indexExists = mapper.isIndexExists(); + if (indexExists) { + log.info("【ES】{} 已存在!", index); + return; + } + + mapper.createIndexIfNotExists(); + indexExists = mapper.isIndexExists(); + Assertions.assertThat(indexExists).isTrue(); } + + @Test + @DisplayName("根据日期创建索引") + public void createIndexInDay() throws IOException { + + String index = mapper.getIndex(day); + boolean indexExists = mapper.isIndexExistsInDay(day); + if (indexExists) { + log.info("【ES】{} 已存在!", index); + return; + } + + mapper.createIndexInDay(day); + indexExists = mapper.isIndexExistsInDay(day); + Assertions.assertThat(indexExists).isTrue(); + } + } @Nested @DisplayName("写操作测试") - public class WriteTest { - - String json1 = - "{\"id\":1,\"username\":\"user1\",\"password\":\"xxxxxx\",\"age\":18,\"email\":\"user1@xxx.com\"}"; - String json2 = - "{\"id\":2,\"username\":\"user2\",\"password\":\"xxxxxx\",\"age\":18,\"email\":\"user2@xxx.com\"}"; + class WriteTest { @Test - @DisplayName("插入、更新") - public void saveAndUpdate() throws IOException, InterruptedException { - - User origin = JsonUtil.toBean(json1, User.class); - if (origin == null) { - System.err.println("反序列化失败!"); - return; - } - - mapper.save(origin); - TimeUnit.SECONDS.sleep(1); - User expectEntity = mapper.pojoById(origin.getDocId()); - Assertions.assertThat(expectEntity).isNotNull(); - - expectEntity.setAge(20); - mapper.updateById(expectEntity); - TimeUnit.SECONDS.sleep(1); - User expectEntity2 = mapper.pojoById(origin.getDocId()); - Assertions.assertThat(expectEntity2).isNotNull(); - Assertions.assertThat(expectEntity2.getAge()).isEqualTo(20); + @DisplayName("保存当天数据") + public void save() throws IOException { + String id = "1"; + User entity = getOneMockData(id); + mapper.save(entity); + User newEntity = mapper.pojoById(id); + log.info("entity: {}", JsonUtil.toString(newEntity)); + Assertions.assertThat(newEntity).isNotNull(); } @Test - @DisplayName("批量插入、更新") - public void batchSaveAndUpdate() throws IOException, InterruptedException { + @DisplayName("保存指定日期数据") + public void saveInDay() throws IOException { + String id = "1"; + User entity = getOneMockData(id); + mapper.saveInDay(day, entity); + User newEntity = mapper.pojoByIdInDay(day, id); + log.info("entity: {}", JsonUtil.toString(newEntity)); + Assertions.assertThat(newEntity).isNotNull(); + } - User origin1 = JsonUtil.toBean(json1, User.class); - if (origin1 == null) { - System.err.println("反序列化失败!"); - return; + @Test + @DisplayName("批量保存当天数据") + public void batchSave() throws IOException, InterruptedException { + int total = 10000; + List> listGroup = CollectionUtil.split(getMockList(total), 1000); + for (List list : listGroup) { + mapper.asyncSaveBatch(list); } + TimeUnit.SECONDS.sleep(20); + long count = mapper.count(new SearchSourceBuilder()); + log.info("count: {}", count); + Assertions.assertThat(count).isEqualTo(10 * 1000); + } - User origin2 = JsonUtil.toBean(json2, User.class); - if (origin2 == null) { - System.err.println("反序列化失败!"); - return; - } - - List list = Arrays.asList(origin1, origin2); - List ids = list.stream().map(User::getDocId).collect(Collectors.toList()); - - mapper.batchSave(list); - List newList = mapper.pojoListByIds(ids); - Assertions.assertThat(newList).isNotEmpty(); - - newList.forEach(entity -> { - entity.setAge(20); - }); - mapper.batchUpdateById(newList); - TimeUnit.SECONDS.sleep(1); - - List expectList = mapper.pojoListByIds(ids); - Assertions.assertThat(expectList).isNotEmpty(); - for (User item : expectList) { - Assertions.assertThat(item.getAge()).isEqualTo(20); + @Test + @DisplayName("批量保存指定日期数据") + public void batchSaveInDay() throws IOException, InterruptedException { + int total = 10000; + List> listGroup = CollectionUtil.split(getMockList(total), 1000); + for (List list : listGroup) { + mapper.asyncSaveBatchInDay(day, list); } + TimeUnit.SECONDS.sleep(20); + long count = mapper.countInDay(day, new SearchSourceBuilder()); + log.info("count: {}", count); + Assertions.assertThat(count).isEqualTo(10 * 1000); } } + @Nested + @DisplayName("读操作测试") + class ReadTest { + + @Test + @DisplayName("根据ID查找当日数据") + public void pojoById() throws IOException { + String id = "1"; + User newEntity = mapper.pojoById(id); + log.info("entity: {}", JsonUtil.toString(newEntity)); + Assertions.assertThat(newEntity).isNotNull(); + } + + @Test + @DisplayName("根据ID查找指定日期数据") + public void pojoByIdInDay() throws IOException { + String id = "1"; + User newEntity = mapper.pojoByIdInDay(day, id); + log.info("entity: {}", JsonUtil.toString(newEntity)); + Assertions.assertThat(newEntity).isNotNull(); + } + + @Test + @DisplayName("获取匹配条件的记录数") + public void count() throws IOException { + BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery(); + queryBuilder.must(QueryBuilders.rangeQuery("docId").lt("100")); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.query(queryBuilder); + long total = mapper.count(searchSourceBuilder); + Assertions.assertThat(total).isNotZero(); + log.info("符合条件的记录数:{}", total); + } + + @Test + @DisplayName("获取匹配条件的指定日期记录数") + public void countInDay() throws IOException { + BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery(); + queryBuilder.must(QueryBuilders.rangeQuery("docId").lt("100")); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.query(queryBuilder); + long total = mapper.countInDay(day, searchSourceBuilder); + Assertions.assertThat(total).isNotZero(); + log.info("符合条件的记录数:{}", total); + } + + @Test + @DisplayName("获取匹配条件的记录") + public void query() throws IOException { + BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery(); + queryBuilder.must(QueryBuilders.rangeQuery("docId").lt("100")); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.query(queryBuilder); + searchSourceBuilder.from(FROM); + searchSourceBuilder.size(SIZE); + SearchResponse response = mapper.query(searchSourceBuilder); + Assertions.assertThat(response).isNotNull(); + Assertions.assertThat(response.getHits()).isNotNull(); + for (SearchHit hit : response.getHits().getHits()) { + log.info("记录:{}", hit.getSourceAsString()); + Map map = hit.getSourceAsMap(); + Assertions.assertThat(map).isNotNull(); + Assertions.assertThat(Integer.valueOf((String) map.get("docId"))).isLessThan(100); + } + } + + @Test + @DisplayName("获取匹配条件的指定日期记录") + public void queryInDay() throws IOException { + BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery(); + queryBuilder.must(QueryBuilders.rangeQuery("docId").lt("100")); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.query(queryBuilder); + searchSourceBuilder.from(FROM); + searchSourceBuilder.size(SIZE); + SearchResponse response = mapper.queryInDay(day, searchSourceBuilder); + Assertions.assertThat(response).isNotNull(); + Assertions.assertThat(response.getHits()).isNotNull(); + for (SearchHit hit : response.getHits().getHits()) { + log.info("记录:{}", hit.getSourceAsString()); + Map map = hit.getSourceAsMap(); + Assertions.assertThat(map).isNotNull(); + Assertions.assertThat(Integer.valueOf((String) map.get("docId"))).isLessThan(100); + } + } + + @Test + @DisplayName("from + size 分页查询当日数据") + public void pojoPage() throws IOException { + BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery(); + queryBuilder.must(QueryBuilders.rangeQuery("docId").lt("100")); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.query(queryBuilder); + searchSourceBuilder.from(FROM); + searchSourceBuilder.size(SIZE); + PageData page = mapper.pojoPage(searchSourceBuilder); + Assertions.assertThat(page).isNotNull(); + Assertions.assertThat(page.getContent()).isNotEmpty(); + for (User entity : page.getContent()) { + log.info("记录:{}", JsonUtil.toString(entity)); + } + } + + @Test + @DisplayName("from + size 分页查询指定日期数据") + public void pojoPageInDay() throws IOException { + BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery(); + queryBuilder.must(QueryBuilders.rangeQuery("docId").lt("100")); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.query(queryBuilder); + searchSourceBuilder.from(FROM); + searchSourceBuilder.size(SIZE); + PageData page = mapper.pojoPageInDay(day, searchSourceBuilder); + Assertions.assertThat(page).isNotNull(); + Assertions.assertThat(page.getContent()).isNotEmpty(); + for (User entity : page.getContent()) { + log.info("记录:{}", JsonUtil.toString(entity)); + } + } + + @Test + @DisplayName("search after 分页查询当日数据") + protected void pojoPageByLastId() throws IOException { + + BoolQueryBuilder queryBuilder = new BoolQueryBuilder(); + queryBuilder.must(QueryBuilders.rangeQuery("docId").lt("100")); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.query(queryBuilder); + long total = mapper.count(searchSourceBuilder); + ScrollData scrollData = mapper.pojoPageByLastId(null, SIZE, queryBuilder); + if (scrollData == null || scrollData.getScrollId() == null) { + return; + } + Assertions.assertThat(scrollData.getTotal()).isEqualTo(total); + + long count = 0L; + scrollData.getContent().forEach(data -> { + log.info("docId: {}", data.getDocId()); + }); + count += scrollData.getContent().size(); + + String scrollId = scrollData.getScrollId(); + while (CollectionUtil.isNotEmpty(scrollData.getContent())) { + scrollData = mapper.pojoPageByLastId(scrollId, SIZE, queryBuilder); + if (scrollData == null || CollectionUtil.isEmpty(scrollData.getContent())) { + break; + } + if (StrUtil.isNotBlank(scrollData.getScrollId())) { + scrollId = scrollData.getScrollId(); + } + scrollData.getContent().forEach(data -> { + log.info("docId: {}", data.getDocId()); + }); + count += scrollData.getContent().size(); + } + log.info("total: {}", total); + Assertions.assertThat(count).isEqualTo(total); + } + + @Test + @DisplayName("search after 分页查询指定日期数据") + protected void pojoPageByLastIdInDay() throws IOException { + + BoolQueryBuilder queryBuilder = new BoolQueryBuilder(); + queryBuilder.must(QueryBuilders.rangeQuery("docId").lt("100")); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.query(queryBuilder); + long total = mapper.count(searchSourceBuilder); + ScrollData scrollData = mapper.pojoPageByLastIdInDay(day, null, SIZE, queryBuilder); + if (scrollData == null || scrollData.getScrollId() == null) { + return; + } + Assertions.assertThat(scrollData.getTotal()).isEqualTo(total); + + long count = 0L; + scrollData.getContent().forEach(data -> { + log.info("docId: {}", data.getDocId()); + }); + count += scrollData.getContent().size(); + + String scrollId = scrollData.getScrollId(); + while (CollectionUtil.isNotEmpty(scrollData.getContent())) { + scrollData = mapper.pojoPageByLastIdInDay(day, scrollId, SIZE, queryBuilder); + if (scrollData == null || CollectionUtil.isEmpty(scrollData.getContent())) { + break; + } + if (StrUtil.isNotBlank(scrollData.getScrollId())) { + scrollId = scrollData.getScrollId(); + } + scrollData.getContent().forEach(data -> { + log.info("docId: {}", data.getDocId()); + }); + count += scrollData.getContent().size(); + } + log.info("total: {}", total); + Assertions.assertThat(count).isEqualTo(total); + } + + @Test + @DisplayName("滚动翻页当日数据") + public void pojoScroll() throws IOException { + + final int size = 100; + + BoolQueryBuilder queryBuilder = new BoolQueryBuilder(); + queryBuilder.must(QueryBuilders.rangeQuery("docId").lt("100")); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.size(size).sort("docId", SortOrder.ASC).query(queryBuilder).trackScores(false); + + long total = mapper.count(searchSourceBuilder); + log.info("total: {}", total); + + ScrollData scrollData = mapper.pojoScrollBegin(searchSourceBuilder); + if (scrollData == null || scrollData.getScrollId() == null) { + return; + } + + long count = 0L; + scrollData.getContent().forEach(data -> { + log.info("docId: {}", data.getDocId()); + }); + Assertions.assertThat(scrollData.getTotal()).isEqualTo(total); + count += scrollData.getContent().size(); + + String scrollId = scrollData.getScrollId(); + while (CollectionUtil.isNotEmpty(scrollData.getContent())) { + scrollData = mapper.pojoScroll(scrollId, searchSourceBuilder); + if (scrollData != null && StrUtil.isNotBlank(scrollData.getScrollId())) { + scrollId = scrollData.getScrollId(); + } + scrollData.getContent().forEach(data -> { + log.info("docId: {}", data.getDocId()); + }); + count += scrollData.getContent().size(); + } + mapper.pojoScrollEnd(scrollId); + Assertions.assertThat(count).isEqualTo(total); + } + + @Test + @DisplayName("滚动翻页指定日期数据") + public void pojoScrollInDay() throws IOException { + + final int size = 100; + + BoolQueryBuilder queryBuilder = new BoolQueryBuilder(); + queryBuilder.must(QueryBuilders.rangeQuery("docId").lt("100")); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.size(size).sort("docId", SortOrder.ASC).query(queryBuilder).trackScores(false); + + long total = mapper.countInDay(day, searchSourceBuilder); + log.info("total: {}", total); + + ScrollData scrollData = mapper.pojoScrollBeginInDay(day, searchSourceBuilder); + if (scrollData == null || scrollData.getScrollId() == null) { + return; + } + + long count = 0L; + scrollData.getContent().forEach(data -> { + log.info("docId: {}", data.getDocId()); + }); + Assertions.assertThat(scrollData.getTotal()).isEqualTo(total); + count += scrollData.getContent().size(); + + String scrollId = scrollData.getScrollId(); + while (CollectionUtil.isNotEmpty(scrollData.getContent())) { + scrollData = mapper.pojoScroll(scrollId, searchSourceBuilder); + if (scrollData != null && StrUtil.isNotBlank(scrollData.getScrollId())) { + scrollId = scrollData.getScrollId(); + } + scrollData.getContent().forEach(data -> { + log.info("docId: {}", data.getDocId()); + }); + count += scrollData.getContent().size(); + } + mapper.pojoScrollEnd(scrollId); + Assertions.assertThat(count).isEqualTo(total); + } + + } + + public User getOneMockData(String id) { + return User.builder() + .id(id) + .name("测试数据" + id) + .age(RandomUtil.randomInt(1, 100)) + .build(); + } + + public List getMockList(int num) { + List list = new LinkedList<>(); + for (int i = 1; i <= num; i++) { + User entity = getOneMockData(String.valueOf(i)); + list.add(entity); + } + return list; + } + }