mirror of https://github.com/dunwu/db-tutorial.git
feat: elasticsearch 6.x 示例
parent
14b5853e4c
commit
72e3c1142e
|
@ -3,7 +3,7 @@
|
|||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<parent>o
|
||||
<parent>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-parent</artifactId>
|
||||
<version>2.7.7</version>
|
||||
|
@ -42,7 +42,7 @@
|
|||
<dependency>
|
||||
<groupId>cn.hutool</groupId>
|
||||
<artifactId>hutool-all</artifactId>
|
||||
<version>5.8.8</version>
|
||||
<version>5.8.25</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
|
|
|
@ -1,35 +0,0 @@
|
|||
package io.github.dunwu.javadb.elasticsearch;
|
||||
|
||||
import io.github.dunwu.javadb.elasticsearch.entity.User;
|
||||
import io.github.dunwu.javadb.elasticsearch.mapper.UserEsMapper;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
public class Demo {
|
||||
|
||||
private static final String env = "test";
|
||||
private static final ElasticsearchTemplate elasticsearchTemplate
|
||||
= ElasticsearchFactory.newElasticsearchTemplate(env);
|
||||
|
||||
public static void main(String[] args) throws IOException, InterruptedException {
|
||||
|
||||
UserEsMapper mapper = new UserEsMapper(elasticsearchTemplate);
|
||||
|
||||
System.out.println("索引是否存在:" + mapper.isIndexExists());
|
||||
|
||||
User jack = User.builder().id(1L).username("jack").age(18).build();
|
||||
User tom = User.builder().id(2L).username("tom").age(20).build();
|
||||
List<User> users = Arrays.asList(jack, tom);
|
||||
|
||||
System.out.println("批量插入:" + mapper.batchSave(users));
|
||||
System.out.println("根据ID查询:" + mapper.getById("1").toString());
|
||||
System.out.println("根据ID查询:" + mapper.pojoById("2").toString());
|
||||
System.out.println("根据ID批量查询:" + mapper.pojoListByIds(Arrays.asList("1", "2")).toString());
|
||||
|
||||
Thread.sleep(1000);
|
||||
System.out.println("根据ID批量删除:" + mapper.batchDeleteById(Arrays.asList("1", "2")));
|
||||
}
|
||||
|
||||
}
|
|
@ -9,6 +9,7 @@ import org.elasticsearch.client.RestClient;
|
|||
import org.elasticsearch.client.RestClientBuilder;
|
||||
import org.elasticsearch.client.RestHighLevelClient;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
|
@ -38,11 +39,13 @@ public class ElasticsearchFactory {
|
|||
}
|
||||
|
||||
public static RestClient newRestClient(String env) {
|
||||
String hosts = getDefaultEsAddress(env);
|
||||
return newRestClient(toHttpHostList(hosts));
|
||||
String hostsConfig = getDefaultEsAddress(env);
|
||||
List<String> hosts = StrUtil.split(hostsConfig, ",");
|
||||
return newRestClient(hosts);
|
||||
}
|
||||
|
||||
public static RestClient newRestClient(HttpHost[] httpHosts) {
|
||||
public static RestClient newRestClient(Collection<String> hosts) {
|
||||
HttpHost[] httpHosts = toHttpHostList(hosts);
|
||||
RestClientBuilder builder = getRestClientBuilder(httpHosts);
|
||||
if (builder == null) {
|
||||
return null;
|
||||
|
@ -62,11 +65,13 @@ public class ElasticsearchFactory {
|
|||
}
|
||||
|
||||
public static RestHighLevelClient newRestHighLevelClient(String env) {
|
||||
String hosts = getDefaultEsAddress(env);
|
||||
return newRestHighLevelClient(toHttpHostList(hosts));
|
||||
String hostsConfig = getDefaultEsAddress(env);
|
||||
List<String> hosts = StrUtil.split(hostsConfig, ",");
|
||||
return newRestHighLevelClient(hosts);
|
||||
}
|
||||
|
||||
public static RestHighLevelClient newRestHighLevelClient(HttpHost[] httpHosts) {
|
||||
public static RestHighLevelClient newRestHighLevelClient(Collection<String> hosts) {
|
||||
HttpHost[] httpHosts = toHttpHostList(hosts);
|
||||
RestClientBuilder builder = getRestClientBuilder(httpHosts);
|
||||
if (builder == null) {
|
||||
return null;
|
||||
|
@ -86,12 +91,13 @@ public class ElasticsearchFactory {
|
|||
}
|
||||
|
||||
public static ElasticsearchTemplate newElasticsearchTemplate(String env) {
|
||||
String hosts = getDefaultEsAddress(env);
|
||||
return newElasticsearchTemplate(toHttpHostList(hosts));
|
||||
String hostsConfig = getDefaultEsAddress(env);
|
||||
List<String> hosts = StrUtil.split(hostsConfig, ",");
|
||||
return newElasticsearchTemplate(hosts);
|
||||
}
|
||||
|
||||
public static ElasticsearchTemplate newElasticsearchTemplate(HttpHost[] httpHosts) {
|
||||
RestHighLevelClient client = newRestHighLevelClient(httpHosts);
|
||||
public static ElasticsearchTemplate newElasticsearchTemplate(Collection<String> hosts) {
|
||||
RestHighLevelClient client = newRestHighLevelClient(hosts);
|
||||
if (client == null) {
|
||||
return null;
|
||||
}
|
||||
|
@ -125,21 +131,22 @@ public class ElasticsearchFactory {
|
|||
return restClientBuilder;
|
||||
}
|
||||
|
||||
private static HttpHost[] toHttpHostList(String hosts) {
|
||||
if (StrUtil.isBlank(hosts)) {
|
||||
return null;
|
||||
private static HttpHost[] toHttpHostList(Collection<String> hosts) {
|
||||
if (CollectionUtil.isEmpty(hosts)) {
|
||||
return new HttpHost[0];
|
||||
}
|
||||
List<String> strList = StrUtil.split(hosts, ",");
|
||||
List<HttpHost> list = strList.stream().map(str -> {
|
||||
List<String> params = StrUtil.split(str, ":");
|
||||
return new HttpHost(params.get(0), Integer.parseInt(params.get(1)), "http");
|
||||
}).collect(Collectors.toList());
|
||||
List<HttpHost> list = hosts.stream().map(ElasticsearchFactory::toHttpHost).collect(Collectors.toList());
|
||||
if (CollectionUtil.isEmpty(list)) {
|
||||
return new HttpHost[0];
|
||||
}
|
||||
return list.toArray(new HttpHost[0]);
|
||||
}
|
||||
|
||||
public static HttpHost toHttpHost(String host) {
|
||||
List<String> params = StrUtil.split(host, ":");
|
||||
return new HttpHost(params.get(0), Integer.parseInt(params.get(1)), "http");
|
||||
}
|
||||
|
||||
public static String getDefaultEsAddress() {
|
||||
// 从配置中心读取环境变量
|
||||
String env = "test";
|
||||
|
|
|
@ -6,11 +6,19 @@ import cn.hutool.core.map.MapUtil;
|
|||
import cn.hutool.core.util.ArrayUtil;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import io.github.dunwu.javadb.elasticsearch.entity.BaseEsEntity;
|
||||
import io.github.dunwu.javadb.elasticsearch.entity.Page;
|
||||
import io.github.dunwu.javadb.elasticsearch.entity.common.PageData;
|
||||
import io.github.dunwu.javadb.elasticsearch.entity.common.ScrollData;
|
||||
import io.github.dunwu.javadb.elasticsearch.util.JsonUtil;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.DocWriteResponse;
|
||||
import org.elasticsearch.action.admin.indices.alias.Alias;
|
||||
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
|
||||
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
|
||||
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
|
||||
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
|
||||
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
|
||||
import org.elasticsearch.action.bulk.BackoffPolicy;
|
||||
import org.elasticsearch.action.bulk.BulkProcessor;
|
||||
import org.elasticsearch.action.bulk.BulkRequest;
|
||||
|
@ -23,19 +31,31 @@ import org.elasticsearch.action.get.MultiGetRequest;
|
|||
import org.elasticsearch.action.get.MultiGetResponse;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.action.index.IndexResponse;
|
||||
import org.elasticsearch.action.search.ClearScrollRequest;
|
||||
import org.elasticsearch.action.search.ClearScrollResponse;
|
||||
import org.elasticsearch.action.search.SearchRequest;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.action.search.SearchScrollRequest;
|
||||
import org.elasticsearch.action.support.WriteRequest;
|
||||
import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
||||
import org.elasticsearch.action.update.UpdateRequest;
|
||||
import org.elasticsearch.action.update.UpdateResponse;
|
||||
import org.elasticsearch.client.RequestOptions;
|
||||
import org.elasticsearch.client.RestHighLevelClient;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||
import org.elasticsearch.index.query.BoolQueryBuilder;
|
||||
import org.elasticsearch.index.query.QueryBuilder;
|
||||
import org.elasticsearch.index.query.QueryBuilders;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.search.Scroll;
|
||||
import org.elasticsearch.search.SearchHits;
|
||||
import org.elasticsearch.search.builder.SearchSourceBuilder;
|
||||
import org.elasticsearch.search.sort.SortOrder;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
|
@ -122,6 +142,99 @@ public class ElasticsearchTemplate implements Closeable {
|
|||
return bulkProcessor;
|
||||
}
|
||||
|
||||
// ====================================================================
|
||||
// 索引管理操作
|
||||
// ====================================================================
|
||||
|
||||
public void createIndex(String index, String type, String alias, int shard, int replica) throws IOException {
|
||||
|
||||
if (StrUtil.isBlank(index) || StrUtil.isBlank(type)) {
|
||||
throw new ElasticsearchException("【ES】index、type 不能为空!");
|
||||
}
|
||||
|
||||
CreateIndexRequest request = new CreateIndexRequest(index);
|
||||
if (StrUtil.isNotBlank(alias)) {
|
||||
request.alias(new Alias(alias));
|
||||
}
|
||||
|
||||
Settings.Builder settings =
|
||||
Settings.builder().put("index.number_of_shards", shard).put("index.number_of_replicas", replica);
|
||||
request.settings(settings);
|
||||
AcknowledgedResponse response = client.indices().create(request, RequestOptions.DEFAULT);
|
||||
if (!response.isAcknowledged()) {
|
||||
String msg = StrUtil.format("【ES】创建索引失败!index: {}, type: {}", index, type);
|
||||
throw new ElasticsearchException(msg);
|
||||
}
|
||||
}
|
||||
|
||||
public void deleteIndex(String index) throws IOException {
|
||||
DeleteIndexRequest request = new DeleteIndexRequest(index);
|
||||
AcknowledgedResponse response = client.indices().delete(request, RequestOptions.DEFAULT);
|
||||
if (!response.isAcknowledged()) {
|
||||
String msg = StrUtil.format("【ES】删除索引失败!index: {}", index);
|
||||
throw new ElasticsearchException(msg);
|
||||
}
|
||||
}
|
||||
|
||||
public void updateAlias(String index, String alias) throws IOException {
|
||||
IndicesAliasesRequest request = new IndicesAliasesRequest();
|
||||
IndicesAliasesRequest.AliasActions aliasAction =
|
||||
new IndicesAliasesRequest.AliasActions(IndicesAliasesRequest.AliasActions.Type.ADD).index(index)
|
||||
.alias(alias);
|
||||
request.addAliasAction(aliasAction);
|
||||
AcknowledgedResponse response = client.indices().updateAliases(request, RequestOptions.DEFAULT);
|
||||
if (!response.isAcknowledged()) {
|
||||
String msg = StrUtil.format("【ES】更新索引别名失败!index: {}, alias: {}", index, alias);
|
||||
throw new ElasticsearchException(msg);
|
||||
}
|
||||
}
|
||||
|
||||
public boolean isIndexExists(String index) throws IOException {
|
||||
GetIndexRequest request = new GetIndexRequest();
|
||||
return client.indices().exists(request.indices(index), RequestOptions.DEFAULT);
|
||||
}
|
||||
|
||||
public void setMapping(String index, String type, Map<String, String> propertiesMap) throws IOException {
|
||||
|
||||
if (MapUtil.isEmpty(propertiesMap)) {
|
||||
throw new ElasticsearchException("【ES】设置 mapping 的 properties 不能为空!");
|
||||
}
|
||||
|
||||
PutMappingRequest request = new PutMappingRequest(index).type(type);
|
||||
XContentBuilder builder = XContentFactory.jsonBuilder();
|
||||
builder.startObject();
|
||||
builder.startObject(type);
|
||||
builder.startObject("properties");
|
||||
|
||||
for (Map.Entry<String, String> entry : propertiesMap.entrySet()) {
|
||||
|
||||
String field = entry.getKey();
|
||||
String fieldType = entry.getValue();
|
||||
if (StrUtil.isBlank(field) || StrUtil.isBlank(fieldType)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
builder.startObject(field);
|
||||
{
|
||||
builder.field("type", fieldType);
|
||||
}
|
||||
builder.endObject();
|
||||
}
|
||||
|
||||
builder.endObject();
|
||||
builder.endObject();
|
||||
builder.endObject();
|
||||
request.source(builder);
|
||||
AcknowledgedResponse response = client.indices().putMapping(request, RequestOptions.DEFAULT);
|
||||
if (!response.isAcknowledged()) {
|
||||
throw new ElasticsearchException("【ES】设置 mapping 失败!");
|
||||
}
|
||||
}
|
||||
|
||||
// ====================================================================
|
||||
// CRUD 操作
|
||||
// ====================================================================
|
||||
|
||||
public <T extends BaseEsEntity> T save(String index, String type, T entity) throws IOException {
|
||||
|
||||
if (entity == null) {
|
||||
|
@ -154,7 +267,7 @@ public class ElasticsearchTemplate implements Closeable {
|
|||
}
|
||||
}
|
||||
|
||||
public <T extends BaseEsEntity> boolean batchSave(String index, String type, Collection<T> list)
|
||||
public <T extends BaseEsEntity> boolean saveBatch(String index, String type, Collection<T> list)
|
||||
throws IOException {
|
||||
|
||||
if (CollectionUtil.isEmpty(list)) {
|
||||
|
@ -179,7 +292,7 @@ public class ElasticsearchTemplate implements Closeable {
|
|||
return response != null && !response.hasFailures();
|
||||
}
|
||||
|
||||
public <T extends BaseEsEntity> void asyncBatchSave(String index, String type, Collection<T> list,
|
||||
public <T extends BaseEsEntity> void asyncSaveBatch(String index, String type, Collection<T> list,
|
||||
ActionListener<BulkResponse> listener) {
|
||||
|
||||
if (CollectionUtil.isEmpty(list)) {
|
||||
|
@ -236,7 +349,7 @@ public class ElasticsearchTemplate implements Closeable {
|
|||
}
|
||||
}
|
||||
|
||||
public <T extends BaseEsEntity> boolean batchUpdateById(String index, String type, Collection<T> list)
|
||||
public <T extends BaseEsEntity> boolean updateBatchIds(String index, String type, Collection<T> list)
|
||||
throws IOException {
|
||||
|
||||
if (CollectionUtil.isEmpty(list)) {
|
||||
|
@ -248,7 +361,7 @@ public class ElasticsearchTemplate implements Closeable {
|
|||
return response != null && !response.hasFailures();
|
||||
}
|
||||
|
||||
public <T extends BaseEsEntity> void asyncBatchUpdateById(String index, String type, Collection<T> list,
|
||||
public <T extends BaseEsEntity> void asyncUpdateBatchIds(String index, String type, Collection<T> list,
|
||||
ActionListener<BulkResponse> listener) {
|
||||
|
||||
if (CollectionUtil.isEmpty(list)) {
|
||||
|
@ -277,10 +390,10 @@ public class ElasticsearchTemplate implements Closeable {
|
|||
}
|
||||
|
||||
public boolean deleteById(String index, String type, String id) throws IOException {
|
||||
return batchDeleteById(index, type, Collections.singleton(id));
|
||||
return deleteBatchIds(index, type, Collections.singleton(id));
|
||||
}
|
||||
|
||||
public boolean batchDeleteById(String index, String type, Collection<String> ids) throws IOException {
|
||||
public boolean deleteBatchIds(String index, String type, Collection<String> ids) throws IOException {
|
||||
|
||||
if (CollectionUtil.isEmpty(ids)) {
|
||||
return true;
|
||||
|
@ -302,7 +415,7 @@ public class ElasticsearchTemplate implements Closeable {
|
|||
return !response.hasFailures();
|
||||
}
|
||||
|
||||
public void asyncBatchDeleteById(String index, String type, Collection<String> ids,
|
||||
public void asyncDeleteBatchIds(String index, String type, Collection<String> ids,
|
||||
ActionListener<BulkResponse> listener) {
|
||||
|
||||
if (CollectionUtil.isEmpty(ids)) {
|
||||
|
@ -375,21 +488,6 @@ public class ElasticsearchTemplate implements Closeable {
|
|||
return list;
|
||||
}
|
||||
|
||||
public <T> Page<T> pojoPage(String index, String type, SearchSourceBuilder builder, Class<T> clazz)
|
||||
throws IOException {
|
||||
SearchResponse response = query(index, type, builder);
|
||||
if (response == null || response.status() != RestStatus.OK) {
|
||||
return null;
|
||||
}
|
||||
|
||||
List<T> content = toPojoList(response, clazz);
|
||||
SearchHits searchHits = response.getHits();
|
||||
int offset = builder.from();
|
||||
int size = builder.size();
|
||||
int page = offset / size + (offset % size == 0 ? 0 : 1) + 1;
|
||||
return new Page<>(page, size, searchHits.getTotalHits(), content);
|
||||
}
|
||||
|
||||
public long count(String index, String type, SearchSourceBuilder builder) throws IOException {
|
||||
SearchResponse response = query(index, type, builder);
|
||||
if (response == null || response.status() != RestStatus.OK) {
|
||||
|
@ -399,6 +497,12 @@ public class ElasticsearchTemplate implements Closeable {
|
|||
return searchHits.getTotalHits();
|
||||
}
|
||||
|
||||
public long count(String index, String type, QueryBuilder queryBuilder) throws IOException {
|
||||
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
|
||||
searchSourceBuilder.query(queryBuilder);
|
||||
return count(index, type, searchSourceBuilder);
|
||||
}
|
||||
|
||||
public SearchResponse query(String index, String type, SearchSourceBuilder builder) throws IOException {
|
||||
SearchRequest request = new SearchRequest(index).types(type);
|
||||
request.source(builder);
|
||||
|
@ -409,6 +513,131 @@ public class ElasticsearchTemplate implements Closeable {
|
|||
return client.search(request, RequestOptions.DEFAULT);
|
||||
}
|
||||
|
||||
/**
|
||||
* from+size 分页
|
||||
* <p>
|
||||
* 注:在深分页的场景下,效率很低(一般超过 1万条数据就不适用了)
|
||||
*/
|
||||
public <T> PageData<T> pojoPage(String index, String type, SearchSourceBuilder builder, Class<T> clazz)
|
||||
throws IOException {
|
||||
SearchResponse response = query(index, type, builder);
|
||||
if (response == null || response.status() != RestStatus.OK) {
|
||||
return null;
|
||||
}
|
||||
|
||||
List<T> content = toPojoList(response, clazz);
|
||||
SearchHits searchHits = response.getHits();
|
||||
int from = builder.from();
|
||||
int size = builder.size();
|
||||
int page = from / size + (from % size == 0 ? 0 : 1) + 1;
|
||||
return new PageData<>(page, size, searchHits.getTotalHits(), content);
|
||||
}
|
||||
|
||||
/**
|
||||
* from+size 分页
|
||||
* <p>
|
||||
* 注:在深分页的场景下,效率很低(一般超过 1万条数据就不适用了)
|
||||
*/
|
||||
public <T> PageData<T> pojoPage(String index, String type, int from, int size, QueryBuilder queryBuilder,
|
||||
Class<T> clazz) throws IOException {
|
||||
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
|
||||
searchSourceBuilder.query(queryBuilder);
|
||||
searchSourceBuilder.from(from);
|
||||
searchSourceBuilder.size(size);
|
||||
return pojoPage(index, type, searchSourceBuilder, clazz);
|
||||
}
|
||||
|
||||
/**
|
||||
* search after 分页
|
||||
*/
|
||||
public <T extends BaseEsEntity> ScrollData<T> pojoPageByLastId(String index, String type, String lastId, int size,
|
||||
QueryBuilder queryBuilder, Class<T> clazz) throws IOException {
|
||||
|
||||
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
|
||||
searchSourceBuilder.size(size);
|
||||
searchSourceBuilder.sort(BaseEsEntity.DOC_ID, SortOrder.ASC);
|
||||
if (StrUtil.isNotBlank(lastId)) {
|
||||
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
|
||||
boolQueryBuilder.must(queryBuilder).must(QueryBuilders.rangeQuery(BaseEsEntity.DOC_ID).gt(lastId));
|
||||
searchSourceBuilder.query(boolQueryBuilder);
|
||||
} else {
|
||||
searchSourceBuilder.query(queryBuilder);
|
||||
}
|
||||
|
||||
SearchResponse response = query(index, type, searchSourceBuilder);
|
||||
if (response == null || response.status() != RestStatus.OK) {
|
||||
return null;
|
||||
}
|
||||
List<T> content = toPojoList(response, clazz);
|
||||
ScrollData<T> scrollData = new ScrollData<>();
|
||||
scrollData.setSize(size);
|
||||
scrollData.setTotal(response.getHits().getTotalHits());
|
||||
scrollData.setContent(content);
|
||||
if (CollectionUtil.isNotEmpty(content)) {
|
||||
T lastEntity = content.get(content.size() - 1);
|
||||
scrollData.setScrollId(lastEntity.getDocId());
|
||||
}
|
||||
return scrollData;
|
||||
}
|
||||
|
||||
/**
|
||||
* 首次滚动查询批量查询,但是不适用与搜索,仅用于批查询
|
||||
**/
|
||||
public <T> ScrollData<T> pojoScrollBegin(String index, String type, SearchSourceBuilder searchBuilder,
|
||||
Class<T> clazz) throws IOException {
|
||||
|
||||
int scrollTime = 10;
|
||||
final Scroll scroll = new Scroll(TimeValue.timeValueSeconds(scrollTime));
|
||||
SearchRequest request = new SearchRequest(index);
|
||||
request.types(type);
|
||||
request.source(searchBuilder);
|
||||
request.scroll(scroll);
|
||||
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
|
||||
if (response == null || response.status() != RestStatus.OK) {
|
||||
return null;
|
||||
}
|
||||
List<T> content = toPojoList(response, clazz);
|
||||
ScrollData<T> scrollData = new ScrollData<>();
|
||||
scrollData.setSize(searchBuilder.size());
|
||||
scrollData.setTotal(response.getHits().getTotalHits());
|
||||
scrollData.setScrollId(response.getScrollId());
|
||||
scrollData.setContent(content);
|
||||
return scrollData;
|
||||
}
|
||||
|
||||
/**
|
||||
* 知道ScrollId之后,后续根据scrollId批量查询
|
||||
**/
|
||||
public <T> ScrollData<T> pojoScroll(String scrollId, SearchSourceBuilder searchBuilder, Class<T> clazz)
|
||||
throws IOException {
|
||||
|
||||
int scrollTime = 10;
|
||||
final Scroll scroll = new Scroll(TimeValue.timeValueSeconds(scrollTime));
|
||||
SearchScrollRequest request = new SearchScrollRequest(scrollId);
|
||||
request.scroll(scroll);
|
||||
SearchResponse response = client.scroll(request, RequestOptions.DEFAULT);
|
||||
if (response == null || response.status() != RestStatus.OK) {
|
||||
return null;
|
||||
}
|
||||
List<T> content = toPojoList(response, clazz);
|
||||
ScrollData<T> scrollData = new ScrollData<>();
|
||||
scrollData.setSize(searchBuilder.size());
|
||||
scrollData.setTotal(response.getHits().getTotalHits());
|
||||
scrollData.setScrollId(response.getScrollId());
|
||||
scrollData.setContent(content);
|
||||
return scrollData;
|
||||
}
|
||||
|
||||
public boolean pojoScrollEnd(String scrollId) throws IOException {
|
||||
ClearScrollRequest request = new ClearScrollRequest();
|
||||
request.addScrollId(scrollId);
|
||||
ClearScrollResponse response = client.clearScroll(request, RequestOptions.DEFAULT);
|
||||
if (response != null) {
|
||||
return response.isSucceeded();
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
public <T> T toPojo(GetResponse response, Class<T> clazz) {
|
||||
if (null == response) {
|
||||
return null;
|
||||
|
|
|
@ -1,12 +1,17 @@
|
|||
package io.github.dunwu.javadb.elasticsearch.config;
|
||||
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import io.github.dunwu.javadb.elasticsearch.ElasticsearchFactory;
|
||||
import io.github.dunwu.javadb.elasticsearch.ElasticsearchTemplate;
|
||||
import org.elasticsearch.client.RestHighLevelClient;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.ComponentScan;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* ES 配置
|
||||
*
|
||||
|
@ -17,14 +22,23 @@ import org.springframework.context.annotation.Configuration;
|
|||
@ComponentScan(value = "io.github.dunwu.javadb.elasticsearch.mapper")
|
||||
public class ElasticsearchConfig {
|
||||
|
||||
@Value("${es.hosts:#{null}}")
|
||||
private String hostsConfig;
|
||||
|
||||
@Bean("restHighLevelClient")
|
||||
@ConditionalOnMissingBean
|
||||
public RestHighLevelClient restHighLevelClient() {
|
||||
if (hostsConfig == null) {
|
||||
return ElasticsearchFactory.newRestHighLevelClient();
|
||||
} else {
|
||||
List<String> hosts = StrUtil.split(hostsConfig, ",");
|
||||
return ElasticsearchFactory.newRestHighLevelClient(hosts);
|
||||
}
|
||||
}
|
||||
|
||||
@Bean("elasticsearchTemplate")
|
||||
public ElasticsearchTemplate elasticsearchTemplate() {
|
||||
return ElasticsearchFactory.newElasticsearchTemplate();
|
||||
public ElasticsearchTemplate elasticsearchTemplate(RestHighLevelClient restHighLevelClient) {
|
||||
return ElasticsearchFactory.newElasticsearchTemplate(restHighLevelClient);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,15 @@
|
|||
package io.github.dunwu.javadb.elasticsearch.constant;
|
||||
|
||||
/**
|
||||
* 请求 / 应答状态接口
|
||||
*
|
||||
* @author <a href="mailto:forbreak@163.com">Zhang Peng</a>
|
||||
* @since 2019-06-06
|
||||
*/
|
||||
public interface CodeMsg {
|
||||
|
||||
int getCode();
|
||||
|
||||
String getMsg();
|
||||
|
||||
}
|
|
@ -0,0 +1,97 @@
|
|||
package io.github.dunwu.javadb.elasticsearch.constant;
|
||||
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
|
||||
import java.util.stream.Stream;
|
||||
|
||||
/**
|
||||
* 系统级错误码
|
||||
*
|
||||
* @author <a href="mailto:forbreak@163.com">Zhang Peng</a>
|
||||
* @see <a href="https://httpstatuses.com/">HTTP 状态码</a>
|
||||
* @see <a href="http://wiki.open.qq.com/wiki/%E9%94%99%E8%AF%AF%E7%A0%81">腾讯开放平台错误码</a>
|
||||
* @see <a href="https://open.weibo.com/wiki/Error_code">新浪开放平台错误码</a>
|
||||
* @see <a href= "https://docs.open.alipay.com/api_1/alipay.trade.order.settle/">支付宝开放平台API</a>
|
||||
* @see <a href=
|
||||
* "https://open.weixin.qq.com/cgi-bin/showdocument?action=dir_list&t=resource/res_list&verify=1&id=open1419318634&token=&lang=zh_CN">微信开放平台错误码</a>
|
||||
* @since 2019-04-11
|
||||
*/
|
||||
public enum ResultCode implements CodeMsg {
|
||||
|
||||
OK(0, "成功"),
|
||||
|
||||
PART_OK(1, "部分成功"),
|
||||
|
||||
FAIL(-1, "失败"),
|
||||
|
||||
// -----------------------------------------------------
|
||||
// 系统级错误码
|
||||
// -----------------------------------------------------
|
||||
|
||||
ERROR(1000, "服务器错误"),
|
||||
|
||||
PARAM_ERROR(1001, "参数错误"),
|
||||
|
||||
TASK_ERROR(1001, "调度任务错误"),
|
||||
|
||||
CONFIG_ERROR(1003, "配置错误"),
|
||||
|
||||
REQUEST_ERROR(1004, "请求错误"),
|
||||
|
||||
IO_ERROR(1005, "IO 错误"),
|
||||
|
||||
// -----------------------------------------------------
|
||||
// 2000 ~ 2999 数据库错误
|
||||
// -----------------------------------------------------
|
||||
|
||||
DATA_ERROR(2000, "数据库错误"),
|
||||
|
||||
// -----------------------------------------------------
|
||||
// 3000 ~ 3999 三方错误
|
||||
// -----------------------------------------------------
|
||||
|
||||
THIRD_PART_ERROR(3000, "三方错误"),
|
||||
|
||||
// -----------------------------------------------------
|
||||
// 3000 ~ 3999 认证错误
|
||||
// -----------------------------------------------------
|
||||
|
||||
AUTH_ERROR(4000, "认证错误");
|
||||
|
||||
private final int code;
|
||||
|
||||
private final String msg;
|
||||
|
||||
ResultCode(int code, String msg) {
|
||||
this.code = code;
|
||||
this.msg = msg;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getCode() {
|
||||
return code;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getMsg() {
|
||||
return msg;
|
||||
}
|
||||
|
||||
public static String getNameByCode(int code) {
|
||||
return Stream.of(ResultCode.values()).filter(item -> item.getCode() == code).findFirst()
|
||||
.map(ResultCode::getMsg).orElse(null);
|
||||
}
|
||||
|
||||
public static ResultCode getEnumByCode(int code) {
|
||||
return Stream.of(ResultCode.values()).filter(item -> item.getCode() == code).findFirst().orElse(null);
|
||||
}
|
||||
|
||||
public static String getTypeInfo() {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
ResultCode[] types = ResultCode.values();
|
||||
for (ResultCode type : types) {
|
||||
sb.append(StrUtil.format("{}:{}, ", type.getCode(), type.getMsg()));
|
||||
}
|
||||
return sb.toString();
|
||||
}
|
||||
}
|
|
@ -4,6 +4,8 @@ import lombok.Data;
|
|||
import lombok.ToString;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* ES 实体接口
|
||||
|
@ -15,6 +17,8 @@ import java.io.Serializable;
|
|||
@ToString
|
||||
public abstract class BaseEsEntity implements Serializable {
|
||||
|
||||
public static final String DOC_ID = "docId";
|
||||
|
||||
/**
|
||||
* 获取版本
|
||||
*/
|
||||
|
@ -24,4 +28,10 @@ public abstract class BaseEsEntity implements Serializable {
|
|||
|
||||
public abstract String getDocId();
|
||||
|
||||
public static Map<String, String> getPropertiesMap() {
|
||||
Map<String, String> map = new LinkedHashMap<>(1);
|
||||
map.put(BaseEsEntity.DOC_ID, "keyword");
|
||||
return map;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -1,36 +0,0 @@
|
|||
package io.github.dunwu.javadb.elasticsearch.entity;
|
||||
|
||||
import lombok.Data;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* 分页实体
|
||||
*
|
||||
* @author <a href="mailto:forbreak@163.com">Zhang Peng</a>
|
||||
* @date 2023-06-28
|
||||
*/
|
||||
@Data
|
||||
public class Page<T> {
|
||||
|
||||
private long total;
|
||||
private int page;
|
||||
private int size;
|
||||
private List<T> content = new ArrayList<>();
|
||||
|
||||
public Page(int page, int size, long total) {
|
||||
this.total = total;
|
||||
this.page = page;
|
||||
this.size = size;
|
||||
}
|
||||
|
||||
public Page(int page, int size, long total, Collection<T> list) {
|
||||
this.total = total;
|
||||
this.page = page;
|
||||
this.size = size;
|
||||
this.content.addAll(list);
|
||||
}
|
||||
|
||||
}
|
|
@ -2,32 +2,43 @@ package io.github.dunwu.javadb.elasticsearch.entity;
|
|||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.EqualsAndHashCode;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.Setter;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* 用户实体
|
||||
* 短剧、长视频消费数据 ES 实体
|
||||
*
|
||||
* @author <a href="mailto:forbreak@163.com">Zhang Peng</a>
|
||||
* @since 2023-06-28
|
||||
* @date 2024-04-02
|
||||
*/
|
||||
@Data
|
||||
@Builder
|
||||
@EqualsAndHashCode(callSuper = true)
|
||||
@AllArgsConstructor
|
||||
@Getter
|
||||
@Setter
|
||||
@NoArgsConstructor
|
||||
public class User extends BaseEsEntity {
|
||||
@AllArgsConstructor
|
||||
public class User extends BaseEsEntity implements Serializable {
|
||||
|
||||
private Long id;
|
||||
private String username;
|
||||
private String password;
|
||||
private String id;
|
||||
private String name;
|
||||
private Integer age;
|
||||
private String email;
|
||||
|
||||
@Override
|
||||
public String getDocId() {
|
||||
return String.valueOf(id);
|
||||
return id;
|
||||
}
|
||||
|
||||
public static Map<String, String> getPropertiesMap() {
|
||||
Map<String, String> map = new LinkedHashMap<>();
|
||||
map.put(BaseEsEntity.DOC_ID, "keyword");
|
||||
map.put("id", "long");
|
||||
map.put("name", "keyword");
|
||||
map.put("age", "integer");
|
||||
return map;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,37 @@
|
|||
package io.github.dunwu.javadb.elasticsearch.entity.common;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.Setter;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* 分页实体
|
||||
*
|
||||
* @author <a href="mailto:forbreak@163.com">Zhang Peng</a>
|
||||
* @date 2023-06-28
|
||||
*/
|
||||
@Getter
|
||||
@Setter
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
public class PageData<T> implements Serializable {
|
||||
|
||||
private int page;
|
||||
private int size;
|
||||
private long total;
|
||||
private List<T> content = new ArrayList<>();
|
||||
|
||||
public PageData(int page, int size, long total) {
|
||||
this.total = total;
|
||||
this.page = page;
|
||||
this.size = size;
|
||||
}
|
||||
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
}
|
|
@ -0,0 +1,30 @@
|
|||
package io.github.dunwu.javadb.elasticsearch.entity.common;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Getter;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.Setter;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.Collection;
|
||||
|
||||
/**
|
||||
* Hbase 滚动数据实体
|
||||
*
|
||||
* @author <a href="mailto:forbreak@163.com">Zhang Peng</a>
|
||||
* @date 2023-11-16
|
||||
*/
|
||||
@Getter
|
||||
@Setter
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
public class ScrollData<T> implements Serializable {
|
||||
|
||||
private String scrollId;
|
||||
private int size = 10;
|
||||
private long total = 0L;
|
||||
private Collection<T> content;
|
||||
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
}
|
|
@ -0,0 +1,128 @@
|
|||
package io.github.dunwu.javadb.elasticsearch.exception;
|
||||
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import io.github.dunwu.javadb.elasticsearch.constant.CodeMsg;
|
||||
import io.github.dunwu.javadb.elasticsearch.constant.ResultCode;
|
||||
|
||||
/**
|
||||
* 基础异常
|
||||
*
|
||||
* @author <a href="mailto:forbreak@163.com">Zhang Peng</a>
|
||||
* @since 2021-09-25
|
||||
*/
|
||||
public class CodeMsgException extends RuntimeException implements CodeMsg {
|
||||
|
||||
private static final long serialVersionUID = 6146660782281445735L;
|
||||
|
||||
/**
|
||||
* 状态码
|
||||
*/
|
||||
protected int code;
|
||||
|
||||
/**
|
||||
* 响应信息
|
||||
*/
|
||||
protected String msg;
|
||||
|
||||
/**
|
||||
* 提示信息
|
||||
*/
|
||||
protected String toast;
|
||||
|
||||
public CodeMsgException() {
|
||||
this(ResultCode.FAIL);
|
||||
}
|
||||
|
||||
public CodeMsgException(CodeMsg codeMsg) {
|
||||
this(codeMsg.getCode(), codeMsg.getMsg());
|
||||
}
|
||||
|
||||
public CodeMsgException(CodeMsg codeMsg, String msg) {
|
||||
this(codeMsg.getCode(), msg, null);
|
||||
}
|
||||
|
||||
public CodeMsgException(CodeMsg codeMsg, String msg, String toast) {
|
||||
this(codeMsg.getCode(), msg, toast);
|
||||
}
|
||||
|
||||
public CodeMsgException(String msg) {
|
||||
this(ResultCode.FAIL, msg);
|
||||
}
|
||||
|
||||
public CodeMsgException(int code, String msg) {
|
||||
this(code, msg, msg);
|
||||
}
|
||||
|
||||
public CodeMsgException(int code, String msg, String toast) {
|
||||
super(msg);
|
||||
setCode(code);
|
||||
setMsg(msg);
|
||||
setToast(toast);
|
||||
}
|
||||
|
||||
public CodeMsgException(Throwable cause) {
|
||||
this(cause, ResultCode.FAIL);
|
||||
}
|
||||
|
||||
public CodeMsgException(Throwable cause, String msg) {
|
||||
this(cause, ResultCode.FAIL, msg);
|
||||
}
|
||||
|
||||
public CodeMsgException(Throwable cause, CodeMsg codeMsg) {
|
||||
this(cause, codeMsg.getCode(), codeMsg.getMsg());
|
||||
}
|
||||
|
||||
public CodeMsgException(Throwable cause, CodeMsg codeMsg, String msg) {
|
||||
this(cause, codeMsg.getCode(), msg, null);
|
||||
}
|
||||
|
||||
public CodeMsgException(Throwable cause, CodeMsg codeMsg, String msg, String toast) {
|
||||
this(cause, codeMsg.getCode(), msg, toast);
|
||||
}
|
||||
|
||||
public CodeMsgException(Throwable cause, int code, String msg) {
|
||||
this(cause, code, msg, null);
|
||||
}
|
||||
|
||||
public CodeMsgException(Throwable cause, int code, String msg, String toast) {
|
||||
super(msg, cause);
|
||||
setCode(code);
|
||||
setMsg(msg);
|
||||
setToast(toast);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getMessage() {
|
||||
if (StrUtil.isNotBlank(msg)) {
|
||||
return StrUtil.format("[{}]{}", code, msg);
|
||||
}
|
||||
return super.getMessage();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getCode() {
|
||||
return code;
|
||||
}
|
||||
|
||||
public void setCode(int code) {
|
||||
this.code = code;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getMsg() {
|
||||
return msg;
|
||||
}
|
||||
|
||||
public void setMsg(String msg) {
|
||||
this.msg = msg;
|
||||
}
|
||||
|
||||
public String getToast() {
|
||||
return toast;
|
||||
}
|
||||
|
||||
public void setToast(String toast) {
|
||||
this.toast = toast;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,72 @@
|
|||
package io.github.dunwu.javadb.elasticsearch.exception;
|
||||
|
||||
import io.github.dunwu.javadb.elasticsearch.constant.CodeMsg;
|
||||
import io.github.dunwu.javadb.elasticsearch.constant.ResultCode;
|
||||
|
||||
/**
|
||||
* 默认异常
|
||||
*
|
||||
* @author <a href="mailto:forbreak@163.com">Zhang Peng</a>
|
||||
* @since 2021-12-30
|
||||
*/
|
||||
public class DefaultException extends CodeMsgException {
|
||||
|
||||
private static final long serialVersionUID = -7027578114976830416L;
|
||||
|
||||
public DefaultException() {
|
||||
this(ResultCode.FAIL);
|
||||
}
|
||||
|
||||
public DefaultException(CodeMsg codeMsg) {
|
||||
this(codeMsg.getCode(), codeMsg.getMsg());
|
||||
}
|
||||
|
||||
public DefaultException(CodeMsg codeMsg, String msg) {
|
||||
this(codeMsg.getCode(), msg, null);
|
||||
}
|
||||
|
||||
public DefaultException(CodeMsg codeMsg, String msg, String toast) {
|
||||
this(codeMsg.getCode(), msg, toast);
|
||||
}
|
||||
|
||||
public DefaultException(String msg) {
|
||||
this(ResultCode.FAIL, msg);
|
||||
}
|
||||
|
||||
public DefaultException(int code, String msg) {
|
||||
this(code, msg, msg);
|
||||
}
|
||||
|
||||
public DefaultException(int code, String msg, String toast) {
|
||||
super(code, msg, toast);
|
||||
}
|
||||
|
||||
public DefaultException(Throwable cause) {
|
||||
this(cause, ResultCode.FAIL);
|
||||
}
|
||||
|
||||
public DefaultException(Throwable cause, String msg) {
|
||||
this(cause, ResultCode.FAIL, msg);
|
||||
}
|
||||
|
||||
public DefaultException(Throwable cause, CodeMsg codeMsg) {
|
||||
this(cause, codeMsg.getCode(), codeMsg.getMsg());
|
||||
}
|
||||
|
||||
public DefaultException(Throwable cause, CodeMsg codeMsg, String msg) {
|
||||
this(cause, codeMsg.getCode(), msg, null);
|
||||
}
|
||||
|
||||
public DefaultException(Throwable cause, CodeMsg codeMsg, String msg, String toast) {
|
||||
this(cause, codeMsg.getCode(), msg, toast);
|
||||
}
|
||||
|
||||
public DefaultException(Throwable cause, int code, String msg) {
|
||||
this(cause, code, msg, null);
|
||||
}
|
||||
|
||||
public DefaultException(Throwable cause, int code, String msg, String toast) {
|
||||
super(cause, code, msg, toast);
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,210 @@
|
|||
package io.github.dunwu.javadb.elasticsearch.mapper;
|
||||
|
||||
import cn.hutool.core.date.DatePattern;
|
||||
import cn.hutool.core.date.DateTime;
|
||||
import cn.hutool.core.date.DateUtil;
|
||||
import cn.hutool.core.map.MapUtil;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import io.github.dunwu.javadb.elasticsearch.ElasticsearchTemplate;
|
||||
import io.github.dunwu.javadb.elasticsearch.constant.ResultCode;
|
||||
import io.github.dunwu.javadb.elasticsearch.entity.BaseEsEntity;
|
||||
import io.github.dunwu.javadb.elasticsearch.entity.common.PageData;
|
||||
import io.github.dunwu.javadb.elasticsearch.entity.common.ScrollData;
|
||||
import io.github.dunwu.javadb.elasticsearch.exception.DefaultException;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.bulk.BulkResponse;
|
||||
import org.elasticsearch.action.get.GetResponse;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.index.query.QueryBuilder;
|
||||
import org.elasticsearch.search.builder.SearchSourceBuilder;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* 动态 ES Mapper 基础类(以时间为维度动态创建、删除 index),用于数据量特别大,需要按照日期分片的索引。
|
||||
* <p>
|
||||
* 注:使用此 Mapper 的索引、别名必须遵循命名格式:索引名 = 别名_yyyyMMdd
|
||||
*
|
||||
* @author <a href="mailto:forbreak@163.com">Zhang Peng</a>
|
||||
* @date 2024-04-07
|
||||
*/
|
||||
@Slf4j
|
||||
public abstract class BaseDynamicEsMapper<T extends BaseEsEntity> extends BaseEsMapper<T> {
|
||||
|
||||
public BaseDynamicEsMapper(ElasticsearchTemplate elasticsearchTemplate) {
|
||||
super(elasticsearchTemplate);
|
||||
}
|
||||
|
||||
// ====================================================================
|
||||
// 索引管理操作
|
||||
// ====================================================================
|
||||
|
||||
public String getIndex(String day) {
|
||||
|
||||
String alias = getAlias();
|
||||
if (StrUtil.isBlank(day)) {
|
||||
String msg = StrUtil.format("【ES】获取 {} 索引失败!day 不能为空!", alias);
|
||||
throw new DefaultException(ResultCode.PARAM_ERROR, msg);
|
||||
}
|
||||
|
||||
DateTime date;
|
||||
try {
|
||||
date = DateUtil.parse(day, DatePattern.NORM_DATE_PATTERN);
|
||||
} catch (Exception e) {
|
||||
String msg = StrUtil.format("【ES】获取 {} 索引失败!day: {} 不符合日期格式 {}!",
|
||||
alias, day, DatePattern.NORM_DATE_PATTERN);
|
||||
throw new DefaultException(e, ResultCode.PARAM_ERROR, msg);
|
||||
}
|
||||
|
||||
String formatDate = DateUtil.format(date, DatePattern.PURE_DATE_FORMAT);
|
||||
return alias + "_" + formatDate;
|
||||
}
|
||||
|
||||
public boolean isIndexExistsInDay(String day) throws IOException {
|
||||
return elasticsearchTemplate.isIndexExists(getIndex(day));
|
||||
}
|
||||
|
||||
public String createIndexInDay(String day) throws IOException, DefaultException {
|
||||
String index = getIndex(day);
|
||||
boolean indexExists = isIndexExistsInDay(day);
|
||||
if (indexExists) {
|
||||
return index;
|
||||
}
|
||||
elasticsearchTemplate.createIndex(index, getType(), getAlias(), getShard(), getReplica());
|
||||
Map<String, String> map = getPropertiesMap();
|
||||
if (MapUtil.isNotEmpty(map)) {
|
||||
elasticsearchTemplate.setMapping(index, getType(), map);
|
||||
}
|
||||
return index;
|
||||
}
|
||||
|
||||
public void deleteIndexInDay(String day) throws IOException {
|
||||
elasticsearchTemplate.deleteIndex(getIndex(day));
|
||||
}
|
||||
|
||||
public void updateAliasInDay(String day) throws IOException {
|
||||
elasticsearchTemplate.updateAlias(getIndex(day), getAlias());
|
||||
}
|
||||
|
||||
// ====================================================================
|
||||
// CRUD 操作
|
||||
// ====================================================================
|
||||
|
||||
public GetResponse getByIdInDay(String day, String id) throws IOException {
|
||||
return elasticsearchTemplate.getById(getIndex(day), getType(), id, null);
|
||||
}
|
||||
|
||||
public T pojoByIdInDay(String day, String id) throws IOException {
|
||||
return elasticsearchTemplate.pojoById(getIndex(day), getType(), id, null, getEntityClass());
|
||||
}
|
||||
|
||||
public List<T> pojoListByIdsInDay(String day, Collection<String> ids) throws IOException {
|
||||
return elasticsearchTemplate.pojoListByIds(getIndex(day), getType(), ids, getEntityClass());
|
||||
}
|
||||
|
||||
public long countInDay(String day, SearchSourceBuilder builder) throws IOException {
|
||||
return elasticsearchTemplate.count(getIndex(day), getType(), builder);
|
||||
}
|
||||
|
||||
public SearchResponse queryInDay(String day, SearchSourceBuilder builder) throws IOException {
|
||||
return elasticsearchTemplate.query(getIndex(day), getType(), builder);
|
||||
}
|
||||
|
||||
public PageData<T> pojoPageInDay(String day, SearchSourceBuilder builder) throws IOException {
|
||||
return elasticsearchTemplate.pojoPage(getIndex(day), getType(), builder, getEntityClass());
|
||||
}
|
||||
|
||||
public ScrollData<T> pojoPageByLastIdInDay(String day, String lastId, int size, QueryBuilder queryBuilder)
|
||||
throws IOException {
|
||||
return elasticsearchTemplate.pojoPageByLastId(getIndex(day), getType(), lastId, size,
|
||||
queryBuilder, getEntityClass());
|
||||
}
|
||||
|
||||
public ScrollData<T> pojoScrollBeginInDay(String day, SearchSourceBuilder builder) throws IOException {
|
||||
return elasticsearchTemplate.pojoScrollBegin(getIndex(day), getType(), builder, getEntityClass());
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据日期动态选择索引并更新
|
||||
*
|
||||
* @param day 日期,格式为:yyyy-MM-dd
|
||||
* @param entity 待更新的数据
|
||||
* @return /
|
||||
*/
|
||||
public boolean saveInDay(String day, T entity) throws IOException, DefaultException {
|
||||
String index = checkIndex(day);
|
||||
checkData(entity);
|
||||
elasticsearchTemplate.save(index, getType(), entity);
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据日期动态选择索引并批量更新
|
||||
*
|
||||
* @param day 日期,格式为:yyyy-MM-dd
|
||||
* @param list 待更新的数据
|
||||
* @return /
|
||||
*/
|
||||
public boolean saveBatchInDay(String day, Collection<T> list) throws IOException, DefaultException {
|
||||
String index = checkIndex(day);
|
||||
checkData(list);
|
||||
elasticsearchTemplate.saveBatch(index, getType(), list);
|
||||
return true;
|
||||
}
|
||||
|
||||
public void asyncSaveBatchInDay(String day, Collection<T> list) throws IOException {
|
||||
String index = checkIndex(day);
|
||||
checkData(list);
|
||||
ActionListener<BulkResponse> listener = new ActionListener<BulkResponse>() {
|
||||
@Override
|
||||
public void onResponse(BulkResponse response) {
|
||||
if (response != null && !response.hasFailures()) {
|
||||
String msg = StrUtil.format("【ES】按日期异步批量保存 {} 成功!", index);
|
||||
log.info(msg);
|
||||
} else {
|
||||
String msg = StrUtil.format("【ES】按日期异步批量保存 {} 失败!", index);
|
||||
log.warn(msg);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
String msg = StrUtil.format("【ES】按日期异步批量保存 {} 异常!", index);
|
||||
log.error(msg, e);
|
||||
}
|
||||
};
|
||||
asyncSaveBatchInDay(day, list, listener);
|
||||
}
|
||||
|
||||
public void asyncSaveBatchInDay(String day, Collection<T> list, ActionListener<BulkResponse> listener)
|
||||
throws IOException {
|
||||
String index = checkIndex(day);
|
||||
checkData(list);
|
||||
elasticsearchTemplate.asyncSaveBatch(getIndex(day), getType(), list, listener);
|
||||
}
|
||||
|
||||
public boolean deleteByIdInDay(String day, String id) throws IOException {
|
||||
return elasticsearchTemplate.deleteById(getIndex(day), getType(), id);
|
||||
}
|
||||
|
||||
public boolean deleteBatchIdsInDay(String day, Collection<String> ids) throws IOException {
|
||||
return elasticsearchTemplate.deleteBatchIds(getIndex(day), getType(), ids);
|
||||
}
|
||||
|
||||
protected String checkIndex(String day) throws IOException {
|
||||
if (!enableAutoCreateIndex()) {
|
||||
return getIndex(day);
|
||||
}
|
||||
String index = createIndexInDay(day);
|
||||
if (StrUtil.isBlank(index)) {
|
||||
String msg = StrUtil.format("【ES】按日期批量保存 {} 失败!索引找不到且创建失败!", index);
|
||||
throw new DefaultException(ResultCode.ERROR, msg);
|
||||
}
|
||||
return index;
|
||||
}
|
||||
|
||||
}
|
|
@ -1,23 +1,31 @@
|
|||
package io.github.dunwu.javadb.elasticsearch.mapper;
|
||||
|
||||
import cn.hutool.core.collection.CollectionUtil;
|
||||
import cn.hutool.core.map.MapUtil;
|
||||
import cn.hutool.core.util.ReflectUtil;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import io.github.dunwu.javadb.elasticsearch.ElasticsearchTemplate;
|
||||
import io.github.dunwu.javadb.elasticsearch.constant.ResultCode;
|
||||
import io.github.dunwu.javadb.elasticsearch.entity.BaseEsEntity;
|
||||
import io.github.dunwu.javadb.elasticsearch.entity.Page;
|
||||
import io.github.dunwu.javadb.elasticsearch.entity.common.PageData;
|
||||
import io.github.dunwu.javadb.elasticsearch.entity.common.ScrollData;
|
||||
import io.github.dunwu.javadb.elasticsearch.exception.DefaultException;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
|
||||
import org.elasticsearch.action.bulk.BulkProcessor;
|
||||
import org.elasticsearch.action.bulk.BulkResponse;
|
||||
import org.elasticsearch.action.get.GetResponse;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.client.IndicesClient;
|
||||
import org.elasticsearch.client.RequestOptions;
|
||||
import org.elasticsearch.client.RestHighLevelClient;
|
||||
import org.elasticsearch.index.query.QueryBuilder;
|
||||
import org.elasticsearch.search.builder.SearchSourceBuilder;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* ES Mapper 基础类
|
||||
|
@ -28,7 +36,7 @@ import java.util.List;
|
|||
@Slf4j
|
||||
public abstract class BaseEsMapper<T extends BaseEsEntity> implements EsMapper<T> {
|
||||
|
||||
private BulkProcessor bulkProcessor;
|
||||
protected BulkProcessor bulkProcessor;
|
||||
|
||||
protected final ElasticsearchTemplate elasticsearchTemplate;
|
||||
|
||||
|
@ -36,6 +44,21 @@ public abstract class BaseEsMapper<T extends BaseEsEntity> implements EsMapper<T
|
|||
this.elasticsearchTemplate = elasticsearchTemplate;
|
||||
}
|
||||
|
||||
public int getShard() {
|
||||
return 5;
|
||||
}
|
||||
|
||||
public int getReplica() {
|
||||
return 1;
|
||||
}
|
||||
|
||||
/**
|
||||
* 如果开启,添加 ES 数据时,如果索引不存在,会自动创建索引
|
||||
*/
|
||||
public boolean enableAutoCreateIndex() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RestHighLevelClient getClient() {
|
||||
if (elasticsearchTemplate == null) {
|
||||
|
@ -52,14 +75,63 @@ public abstract class BaseEsMapper<T extends BaseEsEntity> implements EsMapper<T
|
|||
return bulkProcessor;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public Map<String, String> getPropertiesMap() {
|
||||
|
||||
Class<T> clazz = getEntityClass();
|
||||
Method method;
|
||||
try {
|
||||
method = clazz.getMethod("getPropertiesMap");
|
||||
} catch (NoSuchMethodException e) {
|
||||
String msg = StrUtil.format("【ES】检查并创建 {} 索引失败!day 不能为空!", getAlias());
|
||||
throw new DefaultException(e, ResultCode.ERROR, msg);
|
||||
}
|
||||
|
||||
Object result = ReflectUtil.invokeStatic(method);
|
||||
if (result == null) {
|
||||
return new HashMap<>(0);
|
||||
}
|
||||
return (Map<String, String>) result;
|
||||
}
|
||||
|
||||
// ====================================================================
|
||||
// 索引管理操作
|
||||
// ====================================================================
|
||||
|
||||
@Override
|
||||
public boolean isIndexExists() throws IOException {
|
||||
IndicesClient indicesClient = getClient().indices();
|
||||
GetIndexRequest request = new GetIndexRequest();
|
||||
request.indices(getIndex());
|
||||
return indicesClient.exists(request, RequestOptions.DEFAULT);
|
||||
return elasticsearchTemplate.isIndexExists(getIndex());
|
||||
}
|
||||
|
||||
@Override
|
||||
public String createIndexIfNotExists() throws IOException {
|
||||
String index = getIndex();
|
||||
boolean exists = elasticsearchTemplate.isIndexExists(index);
|
||||
if (exists) {
|
||||
return index;
|
||||
}
|
||||
elasticsearchTemplate.createIndex(index, getType(), getAlias(), getShard(), getReplica());
|
||||
Map<String, String> propertiesMap = getPropertiesMap();
|
||||
if (MapUtil.isNotEmpty(propertiesMap)) {
|
||||
elasticsearchTemplate.setMapping(index, getType(), propertiesMap);
|
||||
}
|
||||
return index;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deleteIndex() throws IOException {
|
||||
elasticsearchTemplate.deleteIndex(getIndex());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void updateAlias() throws IOException {
|
||||
elasticsearchTemplate.updateAlias(getIndex(), getAlias());
|
||||
}
|
||||
|
||||
// ====================================================================
|
||||
// CRUD 操作
|
||||
// ====================================================================
|
||||
|
||||
@Override
|
||||
public GetResponse getById(String id) throws IOException {
|
||||
return getById(id, null);
|
||||
|
@ -85,11 +157,6 @@ public abstract class BaseEsMapper<T extends BaseEsEntity> implements EsMapper<T
|
|||
return elasticsearchTemplate.pojoListByIds(getIndex(), getType(), ids, getEntityClass());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Page<T> pojoPage(SearchSourceBuilder builder) throws IOException {
|
||||
return elasticsearchTemplate.pojoPage(getIndex(), getType(), builder, getEntityClass());
|
||||
}
|
||||
|
||||
@Override
|
||||
public long count(SearchSourceBuilder builder) throws IOException {
|
||||
return elasticsearchTemplate.count(getIndex(), getType(), builder);
|
||||
|
@ -99,55 +166,95 @@ public abstract class BaseEsMapper<T extends BaseEsEntity> implements EsMapper<T
|
|||
public SearchResponse query(SearchSourceBuilder builder) throws IOException {
|
||||
return elasticsearchTemplate.query(getIndex(), getType(), builder);
|
||||
}
|
||||
@Override
|
||||
public PageData<T> pojoPage(SearchSourceBuilder builder) throws IOException {
|
||||
return elasticsearchTemplate.pojoPage(getIndex(), getType(), builder, getEntityClass());
|
||||
}
|
||||
|
||||
@Override
|
||||
public ScrollData<T> pojoPageByLastId(String lastId, int size, QueryBuilder queryBuilder) throws IOException {
|
||||
return elasticsearchTemplate.pojoPageByLastId(getIndex(), getType(), lastId, size,
|
||||
queryBuilder, getEntityClass());
|
||||
}
|
||||
|
||||
@Override
|
||||
public ScrollData<T> pojoScrollBegin(SearchSourceBuilder builder) throws IOException {
|
||||
return elasticsearchTemplate.pojoScrollBegin(getIndex(), getType(), builder, getEntityClass());
|
||||
}
|
||||
|
||||
@Override
|
||||
public ScrollData<T> pojoScroll(String scrollId, SearchSourceBuilder builder) throws IOException {
|
||||
return elasticsearchTemplate.pojoScroll(scrollId, builder, getEntityClass());
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean pojoScrollEnd(String scrollId) throws IOException {
|
||||
return elasticsearchTemplate.pojoScrollEnd(scrollId);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public T save(T entity) throws IOException {
|
||||
return elasticsearchTemplate.save(getIndex(), getType(), entity);
|
||||
String index = checkIndex();
|
||||
checkData(entity);
|
||||
return elasticsearchTemplate.save(index, getType(), entity);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean batchSave(Collection<T> list) throws IOException {
|
||||
return elasticsearchTemplate.batchSave(getIndex(), getType(), list);
|
||||
public boolean saveBatch(Collection<T> list) throws IOException {
|
||||
String index = checkIndex();
|
||||
checkData(list);
|
||||
return elasticsearchTemplate.saveBatch(index, getType(), list);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void asyncBatchSave(Collection<T> list) throws IOException {
|
||||
public void asyncSaveBatch(Collection<T> list) throws IOException {
|
||||
String index = checkIndex();
|
||||
checkData(list);
|
||||
ActionListener<BulkResponse> listener = new ActionListener<BulkResponse>() {
|
||||
@Override
|
||||
public void onResponse(BulkResponse response) {
|
||||
if (response != null && !response.hasFailures()) {
|
||||
log.info("【ES】异步批量插入成功!");
|
||||
String msg = StrUtil.format("【ES】异步批量保存 {} 成功!", index);
|
||||
log.info(msg);
|
||||
} else {
|
||||
log.warn("【ES】异步批量插入失败!");
|
||||
String msg = StrUtil.format("【ES】异步批量保存 {} 失败!", index);
|
||||
log.warn(msg);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
log.error("【ES】异步批量插入异常!", e);
|
||||
String msg = StrUtil.format("【ES】异步批量保存 {} 异常!", index);
|
||||
log.error(msg, e);
|
||||
}
|
||||
};
|
||||
asyncBatchSave(list, listener);
|
||||
asyncSaveBatch(list, listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void asyncBatchSave(Collection<T> list, ActionListener<BulkResponse> listener) {
|
||||
elasticsearchTemplate.asyncBatchSave(getIndex(), getType(), list, listener);
|
||||
public void asyncSaveBatch(Collection<T> list, ActionListener<BulkResponse> listener) throws IOException {
|
||||
String index = checkIndex();
|
||||
checkData(list);
|
||||
elasticsearchTemplate.asyncSaveBatch(index, getType(), list, listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
public T updateById(T entity) throws IOException {
|
||||
checkData(entity);
|
||||
return elasticsearchTemplate.updateById(getIndex(), getType(), entity);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean batchUpdateById(Collection<T> list) throws IOException {
|
||||
return elasticsearchTemplate.batchUpdateById(getIndex(), getType(), list);
|
||||
public boolean updateBatchIds(Collection<T> list) throws IOException {
|
||||
checkData(list);
|
||||
return elasticsearchTemplate.updateBatchIds(getIndex(), getType(), list);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void asyncBatchUpdateById(Collection<T> list, ActionListener<BulkResponse> listener) {
|
||||
elasticsearchTemplate.asyncBatchUpdateById(getIndex(), getType(), list, listener);
|
||||
public void asyncUpdateBatchIds(Collection<T> list, ActionListener<BulkResponse> listener) {
|
||||
checkData(list);
|
||||
elasticsearchTemplate.asyncUpdateBatchIds(getIndex(), getType(), list, listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -156,12 +263,12 @@ public abstract class BaseEsMapper<T extends BaseEsEntity> implements EsMapper<T
|
|||
}
|
||||
|
||||
@Override
|
||||
public boolean batchDeleteById(Collection<String> ids) throws IOException {
|
||||
return elasticsearchTemplate.batchDeleteById(getIndex(), getType(), ids);
|
||||
public boolean deleteBatchIds(Collection<String> ids) throws IOException {
|
||||
return elasticsearchTemplate.deleteBatchIds(getIndex(), getType(), ids);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void asyncBatchDeleteById(Collection<String> ids) throws IOException {
|
||||
public void asyncDeleteBatchIds(Collection<String> ids) throws IOException {
|
||||
ActionListener<BulkResponse> listener = new ActionListener<BulkResponse>() {
|
||||
@Override
|
||||
public void onResponse(BulkResponse response) {
|
||||
|
@ -177,12 +284,38 @@ public abstract class BaseEsMapper<T extends BaseEsEntity> implements EsMapper<T
|
|||
log.error("【ES】异步批量删除异常!ids: {}", ids, e);
|
||||
}
|
||||
};
|
||||
asyncBatchDeleteById(ids, listener);
|
||||
asyncDeleteBatchIds(ids, listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void asyncBatchDeleteById(Collection<String> ids, ActionListener<BulkResponse> listener) throws IOException {
|
||||
elasticsearchTemplate.asyncBatchDeleteById(getIndex(), getType(), ids, listener);
|
||||
public void asyncDeleteBatchIds(Collection<String> ids, ActionListener<BulkResponse> listener) throws IOException {
|
||||
elasticsearchTemplate.asyncDeleteBatchIds(getIndex(), getType(), ids, listener);
|
||||
}
|
||||
|
||||
protected String checkIndex() throws IOException {
|
||||
if (!enableAutoCreateIndex()) {
|
||||
return getIndex();
|
||||
}
|
||||
String index = createIndexIfNotExists();
|
||||
if (StrUtil.isBlank(index)) {
|
||||
String msg = StrUtil.format("【ES】索引找不到且创建失败!", index);
|
||||
throw new DefaultException(ResultCode.ERROR, msg);
|
||||
}
|
||||
return index;
|
||||
}
|
||||
|
||||
protected void checkData(Collection<T> list) {
|
||||
if (CollectionUtil.isEmpty(list)) {
|
||||
String msg = StrUtil.format("【ES】写入 {} 失败!list 不能为空!", getIndex());
|
||||
throw new DefaultException(ResultCode.PARAM_ERROR, msg);
|
||||
}
|
||||
}
|
||||
|
||||
protected void checkData(T entity) {
|
||||
if (entity == null) {
|
||||
String msg = StrUtil.format("【ES】写入 {} 失败!entity 不能为空!", getIndex());
|
||||
throw new DefaultException(ResultCode.PARAM_ERROR, msg);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -2,13 +2,15 @@ package io.github.dunwu.javadb.elasticsearch.mapper;
|
|||
|
||||
import cn.hutool.core.collection.CollectionUtil;
|
||||
import io.github.dunwu.javadb.elasticsearch.entity.BaseEsEntity;
|
||||
import io.github.dunwu.javadb.elasticsearch.entity.Page;
|
||||
import io.github.dunwu.javadb.elasticsearch.entity.common.PageData;
|
||||
import io.github.dunwu.javadb.elasticsearch.entity.common.ScrollData;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.bulk.BulkProcessor;
|
||||
import org.elasticsearch.action.bulk.BulkResponse;
|
||||
import org.elasticsearch.action.get.GetResponse;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.client.RestHighLevelClient;
|
||||
import org.elasticsearch.index.query.QueryBuilder;
|
||||
import org.elasticsearch.search.builder.SearchSourceBuilder;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -25,6 +27,11 @@ import java.util.Map;
|
|||
*/
|
||||
public interface EsMapper<T extends BaseEsEntity> {
|
||||
|
||||
/**
|
||||
* 获取别名
|
||||
*/
|
||||
String getAlias();
|
||||
|
||||
/**
|
||||
* 获取索引名
|
||||
*/
|
||||
|
@ -35,6 +42,16 @@ public interface EsMapper<T extends BaseEsEntity> {
|
|||
*/
|
||||
String getType();
|
||||
|
||||
/**
|
||||
* 获取分片数
|
||||
*/
|
||||
int getShard();
|
||||
|
||||
/**
|
||||
* 获取副本数
|
||||
*/
|
||||
int getReplica();
|
||||
|
||||
/**
|
||||
* 获取实体类型
|
||||
*/
|
||||
|
@ -46,6 +63,12 @@ public interface EsMapper<T extends BaseEsEntity> {
|
|||
|
||||
boolean isIndexExists() throws IOException;
|
||||
|
||||
String createIndexIfNotExists() throws IOException;
|
||||
|
||||
void deleteIndex() throws IOException;
|
||||
|
||||
void updateAlias() throws IOException;
|
||||
|
||||
GetResponse getById(String id) throws IOException;
|
||||
|
||||
GetResponse getById(String id, Long version) throws IOException;
|
||||
|
@ -69,32 +92,40 @@ public interface EsMapper<T extends BaseEsEntity> {
|
|||
return map;
|
||||
}
|
||||
|
||||
Page<T> pojoPage(SearchSourceBuilder builder) throws IOException;
|
||||
|
||||
long count(SearchSourceBuilder builder) throws IOException;
|
||||
|
||||
SearchResponse query(SearchSourceBuilder builder) throws IOException;
|
||||
|
||||
PageData<T> pojoPage(SearchSourceBuilder builder) throws IOException;
|
||||
|
||||
ScrollData<T> pojoPageByLastId(String lastId, int size, QueryBuilder queryBuilder) throws IOException;
|
||||
|
||||
ScrollData<T> pojoScrollBegin(SearchSourceBuilder builder) throws IOException;
|
||||
|
||||
ScrollData<T> pojoScroll(String scrollId, SearchSourceBuilder builder) throws IOException;
|
||||
|
||||
boolean pojoScrollEnd(String scrollId) throws IOException;
|
||||
|
||||
T save(T entity) throws IOException;
|
||||
|
||||
boolean batchSave(Collection<T> list) throws IOException;
|
||||
boolean saveBatch(Collection<T> list) throws IOException;
|
||||
|
||||
void asyncBatchSave(Collection<T> list) throws IOException;
|
||||
void asyncSaveBatch(Collection<T> list) throws IOException;
|
||||
|
||||
void asyncBatchSave(Collection<T> list, ActionListener<BulkResponse> listener) throws IOException;
|
||||
void asyncSaveBatch(Collection<T> list, ActionListener<BulkResponse> listener) throws IOException;
|
||||
|
||||
T updateById(T entity) throws IOException;
|
||||
|
||||
boolean batchUpdateById(Collection<T> list) throws IOException;
|
||||
boolean updateBatchIds(Collection<T> list) throws IOException;
|
||||
|
||||
void asyncBatchUpdateById(Collection<T> list, ActionListener<BulkResponse> listener);
|
||||
void asyncUpdateBatchIds(Collection<T> list, ActionListener<BulkResponse> listener);
|
||||
|
||||
boolean deleteById(String id) throws IOException;
|
||||
|
||||
boolean batchDeleteById(Collection<String> ids) throws IOException;
|
||||
boolean deleteBatchIds(Collection<String> ids) throws IOException;
|
||||
|
||||
void asyncBatchDeleteById(Collection<String> ids) throws IOException;
|
||||
void asyncDeleteBatchIds(Collection<String> ids) throws IOException;
|
||||
|
||||
void asyncBatchDeleteById(Collection<String> ids, ActionListener<BulkResponse> listener) throws IOException;
|
||||
void asyncDeleteBatchIds(Collection<String> ids, ActionListener<BulkResponse> listener) throws IOException;
|
||||
|
||||
}
|
||||
|
|
|
@ -1,27 +1,39 @@
|
|||
package io.github.dunwu.javadb.elasticsearch.mapper;
|
||||
|
||||
import cn.hutool.core.date.DatePattern;
|
||||
import cn.hutool.core.date.DateUtil;
|
||||
import io.github.dunwu.javadb.elasticsearch.ElasticsearchTemplate;
|
||||
import io.github.dunwu.javadb.elasticsearch.entity.User;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.Date;
|
||||
|
||||
/**
|
||||
* User ES Mapper
|
||||
* open_applet_consume_yyyyMMdd ES Mapper
|
||||
*
|
||||
* @author <a href="mailto:forbreak@163.com">Zhang Peng</a>
|
||||
* @date 2023-06-27
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
public class UserEsMapper extends BaseEsMapper<User> {
|
||||
public class UserEsMapper extends BaseDynamicEsMapper<User> {
|
||||
|
||||
public UserEsMapper(ElasticsearchTemplate elasticsearchTemplate) {
|
||||
super(elasticsearchTemplate);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getIndex() {
|
||||
public String getAlias() {
|
||||
return "user";
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getIndex() {
|
||||
String date = DateUtil.format(new Date(), DatePattern.PURE_DATE_FORMAT);
|
||||
return getAlias() + "_" + date;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getType() {
|
||||
return "_doc";
|
||||
|
|
|
@ -16,9 +16,6 @@ import org.springframework.web.context.WebApplicationContext;
|
|||
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
|
||||
public abstract class BaseApplicationTests {
|
||||
|
||||
// ---------------------------------------------------------------------------- 测试常量数据
|
||||
|
||||
// ----------------------------------------------------------------------------
|
||||
protected MockMvc mockMvc;
|
||||
|
||||
@Autowired
|
||||
|
@ -26,7 +23,7 @@ public abstract class BaseApplicationTests {
|
|||
|
||||
@BeforeEach
|
||||
public void setUp() {
|
||||
mockMvc = MockMvcBuilders.webAppContextSetup(context).build(); //构造MockMvc
|
||||
mockMvc = MockMvcBuilders.webAppContextSetup(context).build();
|
||||
}
|
||||
|
||||
@BeforeAll
|
||||
|
|
|
@ -0,0 +1,241 @@
|
|||
package io.github.dunwu.javadb.elasticsearch;
|
||||
|
||||
import cn.hutool.core.collection.CollectionUtil;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import io.github.dunwu.javadb.elasticsearch.entity.BaseEsEntity;
|
||||
import io.github.dunwu.javadb.elasticsearch.entity.common.PageData;
|
||||
import io.github.dunwu.javadb.elasticsearch.entity.common.ScrollData;
|
||||
import io.github.dunwu.javadb.elasticsearch.util.JsonUtil;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.assertj.core.api.Assertions;
|
||||
import org.elasticsearch.action.get.GetResponse;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.index.query.BoolQueryBuilder;
|
||||
import org.elasticsearch.index.query.QueryBuilders;
|
||||
import org.elasticsearch.search.SearchHit;
|
||||
import org.elasticsearch.search.builder.SearchSourceBuilder;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* ElasticsearchTemplate 测试
|
||||
*
|
||||
* @author <a href="mailto:forbreak@163.com">Zhang Peng</a>
|
||||
* @date 2023-11-13
|
||||
*/
|
||||
@Slf4j
|
||||
public abstract class BaseElasticsearchTemplateTest<T extends BaseEsEntity> {
|
||||
|
||||
static final int FROM = 0;
|
||||
static final int SIZE = 10;
|
||||
static final String TEST_ID_01 = "1";
|
||||
static final String TEST_ID_02 = "2";
|
||||
|
||||
protected ElasticsearchTemplate TEMPLATE = ElasticsearchFactory.newElasticsearchTemplate();
|
||||
|
||||
protected abstract String getAlias();
|
||||
|
||||
protected abstract String getIndex();
|
||||
|
||||
protected abstract String getType();
|
||||
|
||||
protected abstract int getShard();
|
||||
|
||||
protected abstract int getReplica();
|
||||
|
||||
protected abstract Class<T> getEntityClass();
|
||||
|
||||
protected abstract Map<String, String> getPropertiesMap();
|
||||
|
||||
protected abstract T getOneMockData(String id);
|
||||
|
||||
protected abstract List<T> getMockList(int num);
|
||||
|
||||
protected void deleteIndex() throws IOException {
|
||||
boolean exists = TEMPLATE.isIndexExists(getIndex());
|
||||
if (!exists) {
|
||||
return;
|
||||
}
|
||||
TEMPLATE.deleteIndex(getIndex());
|
||||
exists = TEMPLATE.isIndexExists(getIndex());
|
||||
Assertions.assertThat(exists).isFalse();
|
||||
}
|
||||
|
||||
protected void createIndex() throws IOException {
|
||||
boolean exists = TEMPLATE.isIndexExists(getIndex());
|
||||
if (exists) {
|
||||
return;
|
||||
}
|
||||
TEMPLATE.createIndex(getIndex(), getType(), getAlias(), getShard(), getReplica());
|
||||
TEMPLATE.setMapping(getIndex(), getType(), getPropertiesMap());
|
||||
exists = TEMPLATE.isIndexExists(getIndex());
|
||||
Assertions.assertThat(exists).isTrue();
|
||||
}
|
||||
|
||||
protected void save() throws IOException {
|
||||
String id = "1";
|
||||
T entity = getOneMockData(id);
|
||||
TEMPLATE.save(getIndex(), getType(), entity);
|
||||
T newEntity = TEMPLATE.pojoById(getIndex(), getType(), id, getEntityClass());
|
||||
log.info("记录:{}", JsonUtil.toString(newEntity));
|
||||
Assertions.assertThat(newEntity).isNotNull();
|
||||
}
|
||||
|
||||
protected void saveBatch() throws IOException {
|
||||
int total = 10000;
|
||||
List<List<T>> listGroup = CollectionUtil.split(getMockList(total), 1000);
|
||||
for (List<T> list : listGroup) {
|
||||
TEMPLATE.saveBatch(getIndex(), getType(), list);
|
||||
}
|
||||
long count = TEMPLATE.count(getIndex(), getType(), new SearchSourceBuilder());
|
||||
log.info("批量更新记录数: {}", count);
|
||||
Assertions.assertThat(count).isEqualTo(total);
|
||||
}
|
||||
|
||||
protected void getById() throws IOException {
|
||||
GetResponse response = TEMPLATE.getById(getIndex(), getType(), TEST_ID_01);
|
||||
Assertions.assertThat(response).isNotNull();
|
||||
log.info("记录:{}", JsonUtil.toString(response.getSourceAsMap()));
|
||||
}
|
||||
|
||||
protected void pojoById() throws IOException {
|
||||
T entity = TEMPLATE.pojoById(getIndex(), getType(), TEST_ID_01, getEntityClass());
|
||||
Assertions.assertThat(entity).isNotNull();
|
||||
log.info("记录:{}", JsonUtil.toString(entity));
|
||||
}
|
||||
|
||||
protected void pojoListByIds() throws IOException {
|
||||
List<String> ids = Arrays.asList(TEST_ID_01, TEST_ID_02);
|
||||
List<T> list = TEMPLATE.pojoListByIds(getIndex(), getType(), ids, getEntityClass());
|
||||
Assertions.assertThat(list).isNotEmpty();
|
||||
Assertions.assertThat(list.size()).isEqualTo(2);
|
||||
for (T entity : list) {
|
||||
log.info("记录:{}", JsonUtil.toString(entity));
|
||||
}
|
||||
}
|
||||
|
||||
protected void count() throws IOException {
|
||||
BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();
|
||||
queryBuilder.must(QueryBuilders.rangeQuery("docId").lt("100"));
|
||||
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
|
||||
searchSourceBuilder.query(queryBuilder);
|
||||
long total = TEMPLATE.count(getIndex(), getType(), searchSourceBuilder);
|
||||
Assertions.assertThat(total).isNotZero();
|
||||
log.info("符合条件的记录数:{}", total);
|
||||
}
|
||||
|
||||
protected void query() throws IOException {
|
||||
BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();
|
||||
queryBuilder.must(QueryBuilders.rangeQuery("docId").lt("100"));
|
||||
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
|
||||
searchSourceBuilder.query(queryBuilder);
|
||||
searchSourceBuilder.from(FROM);
|
||||
searchSourceBuilder.size(SIZE);
|
||||
SearchResponse response = TEMPLATE.query(getIndex(), getType(), searchSourceBuilder);
|
||||
Assertions.assertThat(response).isNotNull();
|
||||
Assertions.assertThat(response.getHits()).isNotNull();
|
||||
for (SearchHit hit : response.getHits().getHits()) {
|
||||
log.info("记录:{}", hit.getSourceAsString());
|
||||
Map<String, Object> map = hit.getSourceAsMap();
|
||||
Assertions.assertThat(map).isNotNull();
|
||||
Assertions.assertThat(Integer.valueOf((String) map.get("docId"))).isLessThan(100);
|
||||
}
|
||||
}
|
||||
|
||||
protected void pojoPage() throws IOException {
|
||||
BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();
|
||||
queryBuilder.must(QueryBuilders.rangeQuery("docId").lt("100"));
|
||||
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
|
||||
searchSourceBuilder.query(queryBuilder);
|
||||
searchSourceBuilder.from(FROM);
|
||||
searchSourceBuilder.size(SIZE);
|
||||
PageData<T> page = TEMPLATE.pojoPage(getIndex(), getType(), searchSourceBuilder, getEntityClass());
|
||||
Assertions.assertThat(page).isNotNull();
|
||||
Assertions.assertThat(page.getContent()).isNotEmpty();
|
||||
for (T entity : page.getContent()) {
|
||||
log.info("记录:{}", JsonUtil.toString(entity));
|
||||
}
|
||||
}
|
||||
|
||||
protected void pojoPageByLastId() throws IOException {
|
||||
|
||||
BoolQueryBuilder queryBuilder = new BoolQueryBuilder();
|
||||
queryBuilder.must(QueryBuilders.rangeQuery("docId").lt("100"));
|
||||
|
||||
long total = TEMPLATE.count(getIndex(), getType(), queryBuilder);
|
||||
ScrollData<T> scrollData =
|
||||
TEMPLATE.pojoPageByLastId(getIndex(), getType(), null, SIZE, queryBuilder, getEntityClass());
|
||||
if (scrollData == null || scrollData.getScrollId() == null) {
|
||||
return;
|
||||
}
|
||||
Assertions.assertThat(scrollData.getTotal()).isEqualTo(total);
|
||||
|
||||
long count = 0L;
|
||||
scrollData.getContent().forEach(data -> {
|
||||
log.info("docId: {}", data.getDocId());
|
||||
});
|
||||
count += scrollData.getContent().size();
|
||||
|
||||
String scrollId = scrollData.getScrollId();
|
||||
while (CollectionUtil.isNotEmpty(scrollData.getContent())) {
|
||||
scrollData =
|
||||
TEMPLATE.pojoPageByLastId(getIndex(), getType(), scrollId, SIZE, queryBuilder, getEntityClass());
|
||||
if (scrollData == null || CollectionUtil.isEmpty(scrollData.getContent())) {
|
||||
break;
|
||||
}
|
||||
if (StrUtil.isNotBlank(scrollData.getScrollId())) {
|
||||
scrollId = scrollData.getScrollId();
|
||||
}
|
||||
scrollData.getContent().forEach(data -> {
|
||||
log.info("docId: {}", data.getDocId());
|
||||
});
|
||||
count += scrollData.getContent().size();
|
||||
}
|
||||
log.info("total: {}", total);
|
||||
Assertions.assertThat(count).isEqualTo(total);
|
||||
}
|
||||
|
||||
protected void pojoScroll() throws IOException {
|
||||
|
||||
BoolQueryBuilder queryBuilder = new BoolQueryBuilder();
|
||||
queryBuilder.must(QueryBuilders.rangeQuery("docId").lt("100"));
|
||||
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
|
||||
searchSourceBuilder.size(SIZE).query(queryBuilder).trackScores(false);
|
||||
|
||||
long total = TEMPLATE.count(getIndex(), getType(), queryBuilder);
|
||||
ScrollData<T> scrollData =
|
||||
TEMPLATE.pojoScrollBegin(getIndex(), getType(), searchSourceBuilder, getEntityClass());
|
||||
if (scrollData == null || scrollData.getScrollId() == null) {
|
||||
return;
|
||||
}
|
||||
Assertions.assertThat(scrollData.getTotal()).isEqualTo(total);
|
||||
|
||||
long count = 0L;
|
||||
scrollData.getContent().forEach(data -> {
|
||||
log.info("docId: {}", data.getDocId());
|
||||
});
|
||||
count += scrollData.getContent().size();
|
||||
|
||||
String scrollId = scrollData.getScrollId();
|
||||
while (CollectionUtil.isNotEmpty(scrollData.getContent())) {
|
||||
scrollData = TEMPLATE.pojoScroll(scrollId, searchSourceBuilder, getEntityClass());
|
||||
if (scrollData == null || CollectionUtil.isEmpty(scrollData.getContent())) {
|
||||
break;
|
||||
}
|
||||
if (StrUtil.isNotBlank(scrollData.getScrollId())) {
|
||||
scrollId = scrollData.getScrollId();
|
||||
}
|
||||
scrollData.getContent().forEach(data -> {
|
||||
log.info("docId: {}", data.getDocId());
|
||||
});
|
||||
count += scrollData.getContent().size();
|
||||
}
|
||||
TEMPLATE.pojoScrollEnd(scrollId);
|
||||
log.info("total: {}", total);
|
||||
Assertions.assertThat(count).isEqualTo(total);
|
||||
}
|
||||
|
||||
}
|
|
@ -1,193 +0,0 @@
|
|||
package io.github.dunwu.javadb.elasticsearch;
|
||||
|
||||
import io.github.dunwu.javadb.elasticsearch.entity.Page;
|
||||
import io.github.dunwu.javadb.elasticsearch.entity.User;
|
||||
import io.github.dunwu.javadb.elasticsearch.util.JsonUtil;
|
||||
import org.assertj.core.api.Assertions;
|
||||
import org.elasticsearch.action.get.GetResponse;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.index.query.BoolQueryBuilder;
|
||||
import org.elasticsearch.index.query.QueryBuilders;
|
||||
import org.elasticsearch.search.SearchHit;
|
||||
import org.elasticsearch.search.builder.SearchSourceBuilder;
|
||||
import org.junit.jupiter.api.DisplayName;
|
||||
import org.junit.jupiter.api.Nested;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* ElasticsearchTemplate 测试
|
||||
*
|
||||
* @author <a href="mailto:forbreak@163.com">Zhang Peng</a>
|
||||
* @date 2023-11-13
|
||||
*/
|
||||
public class ElasticsearchTemplateTest {
|
||||
|
||||
public static final String INDEX = "user";
|
||||
public static final String TYPE = "_doc";
|
||||
public static final String TEST_ID_01 = "1";
|
||||
public static final String TEST_ID_02 = "2";
|
||||
|
||||
private static final ElasticsearchTemplate TEMPLATE;
|
||||
|
||||
static {
|
||||
TEMPLATE = ElasticsearchFactory.newElasticsearchTemplate();
|
||||
}
|
||||
|
||||
@Test
|
||||
@DisplayName("根据ID精确查询")
|
||||
public void getById() throws IOException {
|
||||
GetResponse response = TEMPLATE.getById(INDEX, TYPE, TEST_ID_01);
|
||||
System.out.println("记录:" + JsonUtil.toString(response.getSourceAsMap()));
|
||||
}
|
||||
|
||||
@Test
|
||||
@DisplayName("根据ID精确查询POJO")
|
||||
public void pojoById() throws IOException {
|
||||
User entity = TEMPLATE.pojoById(INDEX, TYPE, TEST_ID_01, User.class);
|
||||
System.out.println("记录:" + JsonUtil.toString(entity));
|
||||
}
|
||||
|
||||
@Test
|
||||
@DisplayName("根据ID精确批量查询POJO")
|
||||
public void pojoListByIds() throws IOException {
|
||||
List<String> ids = Arrays.asList(TEST_ID_01, TEST_ID_02);
|
||||
List<User> list = TEMPLATE.pojoListByIds(INDEX, TYPE, ids, User.class);
|
||||
Assertions.assertThat(list).isNotEmpty();
|
||||
Assertions.assertThat(list.size()).isEqualTo(2);
|
||||
for (User entity : list) {
|
||||
System.out.println("记录:" + JsonUtil.toString(entity));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
@DisplayName("分页查询")
|
||||
public void pojoPage() throws IOException {
|
||||
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
|
||||
boolQueryBuilder.must(QueryBuilders.termQuery("theme", 3));
|
||||
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
|
||||
searchSourceBuilder.query(boolQueryBuilder);
|
||||
searchSourceBuilder.from(0);
|
||||
searchSourceBuilder.size(10);
|
||||
|
||||
Page<User> page = TEMPLATE.pojoPage(INDEX, TYPE, searchSourceBuilder, User.class);
|
||||
Assertions.assertThat(page).isNotNull();
|
||||
Assertions.assertThat(page.getContent()).isNotEmpty();
|
||||
for (User entity : page.getContent()) {
|
||||
System.out.println("记录:" + JsonUtil.toString(entity));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
@DisplayName("条件数量查询")
|
||||
public void count() throws IOException {
|
||||
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
|
||||
boolQueryBuilder.must(QueryBuilders.termQuery("theme", 3));
|
||||
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
|
||||
searchSourceBuilder.query(boolQueryBuilder);
|
||||
searchSourceBuilder.from(0);
|
||||
searchSourceBuilder.size(10);
|
||||
long total = TEMPLATE.count(INDEX, TYPE, searchSourceBuilder);
|
||||
Assertions.assertThat(total).isNotZero();
|
||||
System.out.println("符合条件的总记录数:" + total);
|
||||
}
|
||||
|
||||
@Test
|
||||
@DisplayName("条件查询")
|
||||
public void query() throws IOException {
|
||||
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
|
||||
boolQueryBuilder.must(QueryBuilders.termQuery("theme", 3));
|
||||
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
|
||||
searchSourceBuilder.query(boolQueryBuilder);
|
||||
searchSourceBuilder.from(0);
|
||||
searchSourceBuilder.size(10);
|
||||
SearchResponse response = TEMPLATE.query(INDEX, TYPE, searchSourceBuilder);
|
||||
Assertions.assertThat(response).isNotNull();
|
||||
Assertions.assertThat(response.getHits()).isNotNull();
|
||||
for (SearchHit hit : response.getHits().getHits()) {
|
||||
System.out.println("记录:" + hit.getSourceAsString());
|
||||
Map<String, Object> map = hit.getSourceAsMap();
|
||||
Assertions.assertThat(map).isNotNull();
|
||||
Assertions.assertThat(map.get("theme")).isEqualTo(3);
|
||||
}
|
||||
}
|
||||
|
||||
@Nested
|
||||
@DisplayName("写操作测试")
|
||||
public class WriteTest {
|
||||
|
||||
String json1 =
|
||||
"{\"id\":1,\"username\":\"user1\",\"password\":\"xxxxxx\",\"age\":18,\"email\":\"user1@xxx.com\"}";
|
||||
String json2 =
|
||||
"{\"id\":2,\"username\":\"user2\",\"password\":\"xxxxxx\",\"age\":18,\"email\":\"user2@xxx.com\"}";
|
||||
|
||||
@Test
|
||||
@DisplayName("插入、更新")
|
||||
public void saveAndUpdate() throws IOException, InterruptedException {
|
||||
|
||||
User origin = JsonUtil.toBean(json1, User.class);
|
||||
if (origin == null) {
|
||||
System.err.println("反序列化失败!");
|
||||
return;
|
||||
}
|
||||
|
||||
TEMPLATE.save(INDEX, TYPE, origin);
|
||||
TimeUnit.SECONDS.sleep(1);
|
||||
User expectEntity = TEMPLATE.pojoById(INDEX, TYPE, origin.getDocId(), User.class);
|
||||
Assertions.assertThat(expectEntity).isNotNull();
|
||||
|
||||
expectEntity.setAge(20);
|
||||
TEMPLATE.updateById(INDEX, TYPE, expectEntity);
|
||||
TimeUnit.SECONDS.sleep(18);
|
||||
User expectEntity2 =
|
||||
TEMPLATE.pojoById(INDEX, TYPE, origin.getDocId(), User.class);
|
||||
Assertions.assertThat(expectEntity2).isNotNull();
|
||||
Assertions.assertThat(expectEntity2.getAge()).isEqualTo(20);
|
||||
}
|
||||
|
||||
@Test
|
||||
@DisplayName("批量插入、更新")
|
||||
public void batchSaveAndUpdate() throws IOException, InterruptedException {
|
||||
|
||||
User origin1 = JsonUtil.toBean(json1, User.class);
|
||||
if (origin1 == null) {
|
||||
System.err.println("反序列化失败!");
|
||||
return;
|
||||
}
|
||||
|
||||
User origin2 = JsonUtil.toBean(json2, User.class);
|
||||
if (origin2 == null) {
|
||||
System.err.println("反序列化失败!");
|
||||
return;
|
||||
}
|
||||
|
||||
List<User> list = Arrays.asList(origin1, origin2);
|
||||
List<String> ids = list.stream().map(User::getDocId).collect(Collectors.toList());
|
||||
|
||||
TEMPLATE.batchSave(INDEX, TYPE, list);
|
||||
List<User> newList = TEMPLATE.pojoListByIds(INDEX, TYPE, ids, User.class);
|
||||
Assertions.assertThat(newList).isNotEmpty();
|
||||
|
||||
newList.forEach(entity -> {
|
||||
entity.setAge(20);
|
||||
});
|
||||
TEMPLATE.batchUpdateById(INDEX, TYPE, newList);
|
||||
TimeUnit.SECONDS.sleep(1);
|
||||
|
||||
List<User> expectList =
|
||||
TEMPLATE.pojoListByIds(INDEX, TYPE, ids, User.class);
|
||||
Assertions.assertThat(expectList).isNotEmpty();
|
||||
for (User item : expectList) {
|
||||
Assertions.assertThat(item.getAge()).isEqualTo(20);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,108 @@
|
|||
package io.github.dunwu.javadb.elasticsearch;
|
||||
|
||||
import cn.hutool.core.date.DatePattern;
|
||||
import cn.hutool.core.date.DateUtil;
|
||||
import cn.hutool.core.util.RandomUtil;
|
||||
import io.github.dunwu.javadb.elasticsearch.entity.User;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.junit.jupiter.api.DisplayName;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Date;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* 使用 ElasticsearchTemplate 对 user 索引进行测试
|
||||
*
|
||||
* @author <a href="mailto:forbreak@163.com">Zhang Peng</a>
|
||||
* @date 2024-04-09
|
||||
*/
|
||||
@Slf4j
|
||||
public class UserElasticsearchTemplateTest extends BaseElasticsearchTemplateTest<User> {
|
||||
|
||||
@Override
|
||||
protected String getAlias() {
|
||||
return "user";
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getIndex() {
|
||||
String date = DateUtil.format(new Date(), DatePattern.PURE_DATE_FORMAT);
|
||||
return getAlias() + "_" + date;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getType() {
|
||||
return "_doc";
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int getShard() {
|
||||
return 5;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int getReplica() {
|
||||
return 1;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Class<User> getEntityClass() {
|
||||
return User.class;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Map<String, String> getPropertiesMap() {
|
||||
return User.getPropertiesMap();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected User getOneMockData(String id) {
|
||||
return User.builder()
|
||||
.id(id)
|
||||
.name("测试数据" + id)
|
||||
.age(RandomUtil.randomInt(1, 100))
|
||||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<User> getMockList(int num) {
|
||||
List<User> list = new LinkedList<>();
|
||||
for (int i = 1; i <= num; i++) {
|
||||
User entity = getOneMockData(String.valueOf(i));
|
||||
list.add(entity);
|
||||
}
|
||||
return list;
|
||||
}
|
||||
|
||||
@Test
|
||||
@DisplayName("索引管理测试")
|
||||
public void indexTest() throws IOException {
|
||||
super.deleteIndex();
|
||||
super.createIndex();
|
||||
}
|
||||
|
||||
@Test
|
||||
@DisplayName("写数据测试")
|
||||
protected void writeTest() throws IOException {
|
||||
super.save();
|
||||
super.saveBatch();
|
||||
}
|
||||
|
||||
@Test
|
||||
@DisplayName("读数据测试")
|
||||
public void readTest() throws IOException {
|
||||
super.getById();
|
||||
super.pojoById();
|
||||
super.pojoListByIds();
|
||||
super.count();
|
||||
super.query();
|
||||
super.pojoPage();
|
||||
super.pojoPageByLastId();
|
||||
super.pojoScroll();
|
||||
}
|
||||
|
||||
}
|
|
@ -1,27 +1,31 @@
|
|||
package io.github.dunwu.javadb.elasticsearch.mapper;
|
||||
|
||||
import cn.hutool.core.collection.CollectionUtil;
|
||||
import cn.hutool.core.util.RandomUtil;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import io.github.dunwu.javadb.elasticsearch.BaseApplicationTests;
|
||||
import io.github.dunwu.javadb.elasticsearch.entity.Page;
|
||||
import io.github.dunwu.javadb.elasticsearch.entity.User;
|
||||
import io.github.dunwu.javadb.elasticsearch.entity.common.PageData;
|
||||
import io.github.dunwu.javadb.elasticsearch.entity.common.ScrollData;
|
||||
import io.github.dunwu.javadb.elasticsearch.util.JsonUtil;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.assertj.core.api.Assertions;
|
||||
import org.elasticsearch.action.get.GetResponse;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.index.query.BoolQueryBuilder;
|
||||
import org.elasticsearch.index.query.QueryBuilders;
|
||||
import org.elasticsearch.search.SearchHit;
|
||||
import org.elasticsearch.search.builder.SearchSourceBuilder;
|
||||
import org.elasticsearch.search.sort.SortOrder;
|
||||
import org.junit.jupiter.api.DisplayName;
|
||||
import org.junit.jupiter.api.Nested;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* ElasticsearchTemplate 测试
|
||||
|
@ -29,163 +33,441 @@ import java.util.stream.Collectors;
|
|||
* @author <a href="mailto:forbreak@163.com">Zhang Peng</a>
|
||||
* @date 2023-11-13
|
||||
*/
|
||||
@Slf4j
|
||||
public class UserEsMapperTest extends BaseApplicationTests {
|
||||
|
||||
static final int FROM = 0;
|
||||
static final int SIZE = 10;
|
||||
private static final String day = "2024-04-07";
|
||||
|
||||
@Autowired
|
||||
private UserEsMapper mapper;
|
||||
public static final String TEST_ID_01 = "1";
|
||||
public static final String TEST_ID_02 = "2";
|
||||
|
||||
@Nested
|
||||
@DisplayName("删除索引测试")
|
||||
class DeleteIndexTest {
|
||||
|
||||
@Test
|
||||
@DisplayName("根据ID精确查询")
|
||||
public void getById() throws IOException {
|
||||
GetResponse response = mapper.getById(TEST_ID_01);
|
||||
System.out.println("记录:" + JsonUtil.toString(response.getSourceAsMap()));
|
||||
@DisplayName("删除当天索引")
|
||||
public void deleteIndex() throws IOException {
|
||||
String index = mapper.getIndex();
|
||||
boolean indexExists = mapper.isIndexExists();
|
||||
if (!indexExists) {
|
||||
log.info("【ES】{} 不存在!", index);
|
||||
return;
|
||||
}
|
||||
mapper.deleteIndex();
|
||||
indexExists = mapper.isIndexExists();
|
||||
Assertions.assertThat(indexExists).isFalse();
|
||||
}
|
||||
|
||||
@Test
|
||||
@DisplayName("根据ID精确查询POJO")
|
||||
public void pojoById() throws IOException {
|
||||
User entity = mapper.pojoById(TEST_ID_01);
|
||||
System.out.println("记录:" + JsonUtil.toString(entity));
|
||||
@DisplayName("根据日期删除索引")
|
||||
public void deleteIndexInDay() throws IOException {
|
||||
String index = mapper.getIndex(day);
|
||||
boolean indexExists = mapper.isIndexExistsInDay(day);
|
||||
if (!indexExists) {
|
||||
log.info("【ES】{} 不存在!", index);
|
||||
return;
|
||||
}
|
||||
mapper.deleteIndexInDay(day);
|
||||
indexExists = mapper.isIndexExistsInDay(day);
|
||||
Assertions.assertThat(indexExists).isFalse();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Nested
|
||||
@DisplayName("创建索引测试")
|
||||
class CreateIndexTest {
|
||||
|
||||
@Test
|
||||
@DisplayName("创建当天索引")
|
||||
public void createIndex() throws IOException {
|
||||
|
||||
String index = mapper.getIndex();
|
||||
boolean indexExists = mapper.isIndexExists();
|
||||
if (indexExists) {
|
||||
log.info("【ES】{} 已存在!", index);
|
||||
return;
|
||||
}
|
||||
|
||||
mapper.createIndexIfNotExists();
|
||||
indexExists = mapper.isIndexExists();
|
||||
Assertions.assertThat(indexExists).isTrue();
|
||||
}
|
||||
|
||||
@Test
|
||||
@DisplayName("根据ID精确批量查询POJO")
|
||||
public void pojoListByIds() throws IOException {
|
||||
List<String> ids = Arrays.asList(TEST_ID_01, TEST_ID_02);
|
||||
List<User> list = mapper.pojoListByIds(ids);
|
||||
Assertions.assertThat(list).isNotEmpty();
|
||||
Assertions.assertThat(list.size()).isEqualTo(2);
|
||||
for (User entity : list) {
|
||||
System.out.println("记录:" + JsonUtil.toString(entity));
|
||||
}
|
||||
@DisplayName("根据日期创建索引")
|
||||
public void createIndexInDay() throws IOException {
|
||||
|
||||
String index = mapper.getIndex(day);
|
||||
boolean indexExists = mapper.isIndexExistsInDay(day);
|
||||
if (indexExists) {
|
||||
log.info("【ES】{} 已存在!", index);
|
||||
return;
|
||||
}
|
||||
|
||||
@Test
|
||||
@DisplayName("分页查询")
|
||||
public void pojoPage() throws IOException {
|
||||
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
|
||||
boolQueryBuilder.must(QueryBuilders.termQuery("id", 1));
|
||||
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
|
||||
searchSourceBuilder.query(boolQueryBuilder);
|
||||
searchSourceBuilder.from(0);
|
||||
searchSourceBuilder.size(10);
|
||||
|
||||
Page<User> page = mapper.pojoPage(searchSourceBuilder);
|
||||
Assertions.assertThat(page).isNotNull();
|
||||
Assertions.assertThat(page.getContent()).isNotEmpty();
|
||||
for (User entity : page.getContent()) {
|
||||
System.out.println("记录:" + JsonUtil.toString(entity));
|
||||
}
|
||||
mapper.createIndexInDay(day);
|
||||
indexExists = mapper.isIndexExistsInDay(day);
|
||||
Assertions.assertThat(indexExists).isTrue();
|
||||
}
|
||||
|
||||
@Test
|
||||
@DisplayName("条件数量查询")
|
||||
public void count() throws IOException {
|
||||
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
|
||||
boolQueryBuilder.must(QueryBuilders.termQuery("id", 1));
|
||||
// boolQueryBuilder.must(QueryBuilders.rangeQuery("age")
|
||||
// .from(18)
|
||||
// .to(25)
|
||||
// .includeLower(true)
|
||||
// .includeUpper(true));
|
||||
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
|
||||
searchSourceBuilder.query(boolQueryBuilder);
|
||||
searchSourceBuilder.from(0);
|
||||
searchSourceBuilder.size(10);
|
||||
long total = mapper.count(searchSourceBuilder);
|
||||
Assertions.assertThat(total).isNotZero();
|
||||
System.out.println("符合条件的总记录数:" + total);
|
||||
}
|
||||
|
||||
@Test
|
||||
@DisplayName("条件查询")
|
||||
public void query() throws IOException {
|
||||
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
|
||||
boolQueryBuilder.must(QueryBuilders.termQuery("id", 1));
|
||||
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
|
||||
searchSourceBuilder.query(boolQueryBuilder);
|
||||
searchSourceBuilder.from(0);
|
||||
searchSourceBuilder.size(10);
|
||||
SearchResponse response = mapper.query(searchSourceBuilder);
|
||||
Assertions.assertThat(response).isNotNull();
|
||||
Assertions.assertThat(response.getHits()).isNotNull();
|
||||
for (SearchHit hit : response.getHits().getHits()) {
|
||||
System.out.println("记录:" + hit.getSourceAsString());
|
||||
Map<String, Object> map = hit.getSourceAsMap();
|
||||
Assertions.assertThat(map).isNotNull();
|
||||
}
|
||||
}
|
||||
|
||||
@Nested
|
||||
@DisplayName("写操作测试")
|
||||
public class WriteTest {
|
||||
|
||||
String json1 =
|
||||
"{\"id\":1,\"username\":\"user1\",\"password\":\"xxxxxx\",\"age\":18,\"email\":\"user1@xxx.com\"}";
|
||||
String json2 =
|
||||
"{\"id\":2,\"username\":\"user2\",\"password\":\"xxxxxx\",\"age\":18,\"email\":\"user2@xxx.com\"}";
|
||||
class WriteTest {
|
||||
|
||||
@Test
|
||||
@DisplayName("插入、更新")
|
||||
public void saveAndUpdate() throws IOException, InterruptedException {
|
||||
|
||||
User origin = JsonUtil.toBean(json1, User.class);
|
||||
if (origin == null) {
|
||||
System.err.println("反序列化失败!");
|
||||
return;
|
||||
}
|
||||
|
||||
mapper.save(origin);
|
||||
TimeUnit.SECONDS.sleep(1);
|
||||
User expectEntity = mapper.pojoById(origin.getDocId());
|
||||
Assertions.assertThat(expectEntity).isNotNull();
|
||||
|
||||
expectEntity.setAge(20);
|
||||
mapper.updateById(expectEntity);
|
||||
TimeUnit.SECONDS.sleep(1);
|
||||
User expectEntity2 = mapper.pojoById(origin.getDocId());
|
||||
Assertions.assertThat(expectEntity2).isNotNull();
|
||||
Assertions.assertThat(expectEntity2.getAge()).isEqualTo(20);
|
||||
@DisplayName("保存当天数据")
|
||||
public void save() throws IOException {
|
||||
String id = "1";
|
||||
User entity = getOneMockData(id);
|
||||
mapper.save(entity);
|
||||
User newEntity = mapper.pojoById(id);
|
||||
log.info("entity: {}", JsonUtil.toString(newEntity));
|
||||
Assertions.assertThat(newEntity).isNotNull();
|
||||
}
|
||||
|
||||
@Test
|
||||
@DisplayName("批量插入、更新")
|
||||
public void batchSaveAndUpdate() throws IOException, InterruptedException {
|
||||
|
||||
User origin1 = JsonUtil.toBean(json1, User.class);
|
||||
if (origin1 == null) {
|
||||
System.err.println("反序列化失败!");
|
||||
return;
|
||||
@DisplayName("保存指定日期数据")
|
||||
public void saveInDay() throws IOException {
|
||||
String id = "1";
|
||||
User entity = getOneMockData(id);
|
||||
mapper.saveInDay(day, entity);
|
||||
User newEntity = mapper.pojoByIdInDay(day, id);
|
||||
log.info("entity: {}", JsonUtil.toString(newEntity));
|
||||
Assertions.assertThat(newEntity).isNotNull();
|
||||
}
|
||||
|
||||
User origin2 = JsonUtil.toBean(json2, User.class);
|
||||
if (origin2 == null) {
|
||||
System.err.println("反序列化失败!");
|
||||
return;
|
||||
@Test
|
||||
@DisplayName("批量保存当天数据")
|
||||
public void batchSave() throws IOException, InterruptedException {
|
||||
int total = 10000;
|
||||
List<List<User>> listGroup = CollectionUtil.split(getMockList(total), 1000);
|
||||
for (List<User> list : listGroup) {
|
||||
mapper.asyncSaveBatch(list);
|
||||
}
|
||||
TimeUnit.SECONDS.sleep(20);
|
||||
long count = mapper.count(new SearchSourceBuilder());
|
||||
log.info("count: {}", count);
|
||||
Assertions.assertThat(count).isEqualTo(10 * 1000);
|
||||
}
|
||||
|
||||
List<User> list = Arrays.asList(origin1, origin2);
|
||||
List<String> ids = list.stream().map(User::getDocId).collect(Collectors.toList());
|
||||
@Test
|
||||
@DisplayName("批量保存指定日期数据")
|
||||
public void batchSaveInDay() throws IOException, InterruptedException {
|
||||
int total = 10000;
|
||||
List<List<User>> listGroup = CollectionUtil.split(getMockList(total), 1000);
|
||||
for (List<User> list : listGroup) {
|
||||
mapper.asyncSaveBatchInDay(day, list);
|
||||
}
|
||||
TimeUnit.SECONDS.sleep(20);
|
||||
long count = mapper.countInDay(day, new SearchSourceBuilder());
|
||||
log.info("count: {}", count);
|
||||
Assertions.assertThat(count).isEqualTo(10 * 1000);
|
||||
}
|
||||
|
||||
mapper.batchSave(list);
|
||||
List<User> newList = mapper.pojoListByIds(ids);
|
||||
Assertions.assertThat(newList).isNotEmpty();
|
||||
}
|
||||
|
||||
newList.forEach(entity -> {
|
||||
entity.setAge(20);
|
||||
@Nested
|
||||
@DisplayName("读操作测试")
|
||||
class ReadTest {
|
||||
|
||||
@Test
|
||||
@DisplayName("根据ID查找当日数据")
|
||||
public void pojoById() throws IOException {
|
||||
String id = "1";
|
||||
User newEntity = mapper.pojoById(id);
|
||||
log.info("entity: {}", JsonUtil.toString(newEntity));
|
||||
Assertions.assertThat(newEntity).isNotNull();
|
||||
}
|
||||
|
||||
@Test
|
||||
@DisplayName("根据ID查找指定日期数据")
|
||||
public void pojoByIdInDay() throws IOException {
|
||||
String id = "1";
|
||||
User newEntity = mapper.pojoByIdInDay(day, id);
|
||||
log.info("entity: {}", JsonUtil.toString(newEntity));
|
||||
Assertions.assertThat(newEntity).isNotNull();
|
||||
}
|
||||
|
||||
@Test
|
||||
@DisplayName("获取匹配条件的记录数")
|
||||
public void count() throws IOException {
|
||||
BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();
|
||||
queryBuilder.must(QueryBuilders.rangeQuery("docId").lt("100"));
|
||||
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
|
||||
searchSourceBuilder.query(queryBuilder);
|
||||
long total = mapper.count(searchSourceBuilder);
|
||||
Assertions.assertThat(total).isNotZero();
|
||||
log.info("符合条件的记录数:{}", total);
|
||||
}
|
||||
|
||||
@Test
|
||||
@DisplayName("获取匹配条件的指定日期记录数")
|
||||
public void countInDay() throws IOException {
|
||||
BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();
|
||||
queryBuilder.must(QueryBuilders.rangeQuery("docId").lt("100"));
|
||||
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
|
||||
searchSourceBuilder.query(queryBuilder);
|
||||
long total = mapper.countInDay(day, searchSourceBuilder);
|
||||
Assertions.assertThat(total).isNotZero();
|
||||
log.info("符合条件的记录数:{}", total);
|
||||
}
|
||||
|
||||
@Test
|
||||
@DisplayName("获取匹配条件的记录")
|
||||
public void query() throws IOException {
|
||||
BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();
|
||||
queryBuilder.must(QueryBuilders.rangeQuery("docId").lt("100"));
|
||||
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
|
||||
searchSourceBuilder.query(queryBuilder);
|
||||
searchSourceBuilder.from(FROM);
|
||||
searchSourceBuilder.size(SIZE);
|
||||
SearchResponse response = mapper.query(searchSourceBuilder);
|
||||
Assertions.assertThat(response).isNotNull();
|
||||
Assertions.assertThat(response.getHits()).isNotNull();
|
||||
for (SearchHit hit : response.getHits().getHits()) {
|
||||
log.info("记录:{}", hit.getSourceAsString());
|
||||
Map<String, Object> map = hit.getSourceAsMap();
|
||||
Assertions.assertThat(map).isNotNull();
|
||||
Assertions.assertThat(Integer.valueOf((String) map.get("docId"))).isLessThan(100);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
@DisplayName("获取匹配条件的指定日期记录")
|
||||
public void queryInDay() throws IOException {
|
||||
BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();
|
||||
queryBuilder.must(QueryBuilders.rangeQuery("docId").lt("100"));
|
||||
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
|
||||
searchSourceBuilder.query(queryBuilder);
|
||||
searchSourceBuilder.from(FROM);
|
||||
searchSourceBuilder.size(SIZE);
|
||||
SearchResponse response = mapper.queryInDay(day, searchSourceBuilder);
|
||||
Assertions.assertThat(response).isNotNull();
|
||||
Assertions.assertThat(response.getHits()).isNotNull();
|
||||
for (SearchHit hit : response.getHits().getHits()) {
|
||||
log.info("记录:{}", hit.getSourceAsString());
|
||||
Map<String, Object> map = hit.getSourceAsMap();
|
||||
Assertions.assertThat(map).isNotNull();
|
||||
Assertions.assertThat(Integer.valueOf((String) map.get("docId"))).isLessThan(100);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
@DisplayName("from + size 分页查询当日数据")
|
||||
public void pojoPage() throws IOException {
|
||||
BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();
|
||||
queryBuilder.must(QueryBuilders.rangeQuery("docId").lt("100"));
|
||||
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
|
||||
searchSourceBuilder.query(queryBuilder);
|
||||
searchSourceBuilder.from(FROM);
|
||||
searchSourceBuilder.size(SIZE);
|
||||
PageData<User> page = mapper.pojoPage(searchSourceBuilder);
|
||||
Assertions.assertThat(page).isNotNull();
|
||||
Assertions.assertThat(page.getContent()).isNotEmpty();
|
||||
for (User entity : page.getContent()) {
|
||||
log.info("记录:{}", JsonUtil.toString(entity));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
@DisplayName("from + size 分页查询指定日期数据")
|
||||
public void pojoPageInDay() throws IOException {
|
||||
BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();
|
||||
queryBuilder.must(QueryBuilders.rangeQuery("docId").lt("100"));
|
||||
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
|
||||
searchSourceBuilder.query(queryBuilder);
|
||||
searchSourceBuilder.from(FROM);
|
||||
searchSourceBuilder.size(SIZE);
|
||||
PageData<User> page = mapper.pojoPageInDay(day, searchSourceBuilder);
|
||||
Assertions.assertThat(page).isNotNull();
|
||||
Assertions.assertThat(page.getContent()).isNotEmpty();
|
||||
for (User entity : page.getContent()) {
|
||||
log.info("记录:{}", JsonUtil.toString(entity));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
@DisplayName("search after 分页查询当日数据")
|
||||
protected void pojoPageByLastId() throws IOException {
|
||||
|
||||
BoolQueryBuilder queryBuilder = new BoolQueryBuilder();
|
||||
queryBuilder.must(QueryBuilders.rangeQuery("docId").lt("100"));
|
||||
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
|
||||
searchSourceBuilder.query(queryBuilder);
|
||||
long total = mapper.count(searchSourceBuilder);
|
||||
ScrollData<User> scrollData = mapper.pojoPageByLastId(null, SIZE, queryBuilder);
|
||||
if (scrollData == null || scrollData.getScrollId() == null) {
|
||||
return;
|
||||
}
|
||||
Assertions.assertThat(scrollData.getTotal()).isEqualTo(total);
|
||||
|
||||
long count = 0L;
|
||||
scrollData.getContent().forEach(data -> {
|
||||
log.info("docId: {}", data.getDocId());
|
||||
});
|
||||
mapper.batchUpdateById(newList);
|
||||
TimeUnit.SECONDS.sleep(1);
|
||||
count += scrollData.getContent().size();
|
||||
|
||||
List<User> expectList = mapper.pojoListByIds(ids);
|
||||
Assertions.assertThat(expectList).isNotEmpty();
|
||||
for (User item : expectList) {
|
||||
Assertions.assertThat(item.getAge()).isEqualTo(20);
|
||||
String scrollId = scrollData.getScrollId();
|
||||
while (CollectionUtil.isNotEmpty(scrollData.getContent())) {
|
||||
scrollData = mapper.pojoPageByLastId(scrollId, SIZE, queryBuilder);
|
||||
if (scrollData == null || CollectionUtil.isEmpty(scrollData.getContent())) {
|
||||
break;
|
||||
}
|
||||
if (StrUtil.isNotBlank(scrollData.getScrollId())) {
|
||||
scrollId = scrollData.getScrollId();
|
||||
}
|
||||
scrollData.getContent().forEach(data -> {
|
||||
log.info("docId: {}", data.getDocId());
|
||||
});
|
||||
count += scrollData.getContent().size();
|
||||
}
|
||||
log.info("total: {}", total);
|
||||
Assertions.assertThat(count).isEqualTo(total);
|
||||
}
|
||||
|
||||
@Test
|
||||
@DisplayName("search after 分页查询指定日期数据")
|
||||
protected void pojoPageByLastIdInDay() throws IOException {
|
||||
|
||||
BoolQueryBuilder queryBuilder = new BoolQueryBuilder();
|
||||
queryBuilder.must(QueryBuilders.rangeQuery("docId").lt("100"));
|
||||
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
|
||||
searchSourceBuilder.query(queryBuilder);
|
||||
long total = mapper.count(searchSourceBuilder);
|
||||
ScrollData<User> scrollData = mapper.pojoPageByLastIdInDay(day, null, SIZE, queryBuilder);
|
||||
if (scrollData == null || scrollData.getScrollId() == null) {
|
||||
return;
|
||||
}
|
||||
Assertions.assertThat(scrollData.getTotal()).isEqualTo(total);
|
||||
|
||||
long count = 0L;
|
||||
scrollData.getContent().forEach(data -> {
|
||||
log.info("docId: {}", data.getDocId());
|
||||
});
|
||||
count += scrollData.getContent().size();
|
||||
|
||||
String scrollId = scrollData.getScrollId();
|
||||
while (CollectionUtil.isNotEmpty(scrollData.getContent())) {
|
||||
scrollData = mapper.pojoPageByLastIdInDay(day, scrollId, SIZE, queryBuilder);
|
||||
if (scrollData == null || CollectionUtil.isEmpty(scrollData.getContent())) {
|
||||
break;
|
||||
}
|
||||
if (StrUtil.isNotBlank(scrollData.getScrollId())) {
|
||||
scrollId = scrollData.getScrollId();
|
||||
}
|
||||
scrollData.getContent().forEach(data -> {
|
||||
log.info("docId: {}", data.getDocId());
|
||||
});
|
||||
count += scrollData.getContent().size();
|
||||
}
|
||||
log.info("total: {}", total);
|
||||
Assertions.assertThat(count).isEqualTo(total);
|
||||
}
|
||||
|
||||
@Test
|
||||
@DisplayName("滚动翻页当日数据")
|
||||
public void pojoScroll() throws IOException {
|
||||
|
||||
final int size = 100;
|
||||
|
||||
BoolQueryBuilder queryBuilder = new BoolQueryBuilder();
|
||||
queryBuilder.must(QueryBuilders.rangeQuery("docId").lt("100"));
|
||||
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
|
||||
searchSourceBuilder.size(size).sort("docId", SortOrder.ASC).query(queryBuilder).trackScores(false);
|
||||
|
||||
long total = mapper.count(searchSourceBuilder);
|
||||
log.info("total: {}", total);
|
||||
|
||||
ScrollData<User> scrollData = mapper.pojoScrollBegin(searchSourceBuilder);
|
||||
if (scrollData == null || scrollData.getScrollId() == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
long count = 0L;
|
||||
scrollData.getContent().forEach(data -> {
|
||||
log.info("docId: {}", data.getDocId());
|
||||
});
|
||||
Assertions.assertThat(scrollData.getTotal()).isEqualTo(total);
|
||||
count += scrollData.getContent().size();
|
||||
|
||||
String scrollId = scrollData.getScrollId();
|
||||
while (CollectionUtil.isNotEmpty(scrollData.getContent())) {
|
||||
scrollData = mapper.pojoScroll(scrollId, searchSourceBuilder);
|
||||
if (scrollData != null && StrUtil.isNotBlank(scrollData.getScrollId())) {
|
||||
scrollId = scrollData.getScrollId();
|
||||
}
|
||||
scrollData.getContent().forEach(data -> {
|
||||
log.info("docId: {}", data.getDocId());
|
||||
});
|
||||
count += scrollData.getContent().size();
|
||||
}
|
||||
mapper.pojoScrollEnd(scrollId);
|
||||
Assertions.assertThat(count).isEqualTo(total);
|
||||
}
|
||||
|
||||
@Test
|
||||
@DisplayName("滚动翻页指定日期数据")
|
||||
public void pojoScrollInDay() throws IOException {
|
||||
|
||||
final int size = 100;
|
||||
|
||||
BoolQueryBuilder queryBuilder = new BoolQueryBuilder();
|
||||
queryBuilder.must(QueryBuilders.rangeQuery("docId").lt("100"));
|
||||
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
|
||||
searchSourceBuilder.size(size).sort("docId", SortOrder.ASC).query(queryBuilder).trackScores(false);
|
||||
|
||||
long total = mapper.countInDay(day, searchSourceBuilder);
|
||||
log.info("total: {}", total);
|
||||
|
||||
ScrollData<User> scrollData = mapper.pojoScrollBeginInDay(day, searchSourceBuilder);
|
||||
if (scrollData == null || scrollData.getScrollId() == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
long count = 0L;
|
||||
scrollData.getContent().forEach(data -> {
|
||||
log.info("docId: {}", data.getDocId());
|
||||
});
|
||||
Assertions.assertThat(scrollData.getTotal()).isEqualTo(total);
|
||||
count += scrollData.getContent().size();
|
||||
|
||||
String scrollId = scrollData.getScrollId();
|
||||
while (CollectionUtil.isNotEmpty(scrollData.getContent())) {
|
||||
scrollData = mapper.pojoScroll(scrollId, searchSourceBuilder);
|
||||
if (scrollData != null && StrUtil.isNotBlank(scrollData.getScrollId())) {
|
||||
scrollId = scrollData.getScrollId();
|
||||
}
|
||||
scrollData.getContent().forEach(data -> {
|
||||
log.info("docId: {}", data.getDocId());
|
||||
});
|
||||
count += scrollData.getContent().size();
|
||||
}
|
||||
mapper.pojoScrollEnd(scrollId);
|
||||
Assertions.assertThat(count).isEqualTo(total);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public User getOneMockData(String id) {
|
||||
return User.builder()
|
||||
.id(id)
|
||||
.name("测试数据" + id)
|
||||
.age(RandomUtil.randomInt(1, 100))
|
||||
.build();
|
||||
}
|
||||
|
||||
public List<User> getMockList(int num) {
|
||||
List<User> list = new LinkedList<>();
|
||||
for (int i = 1; i <= num; i++) {
|
||||
User entity = getOneMockData(String.valueOf(i));
|
||||
list.add(entity);
|
||||
}
|
||||
return list;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue