feat: elasticsearch 6.x 示例

pull/21/merge
dunwu 2024-04-15 07:43:19 +08:00
parent 72e3c1142e
commit bd6b17cda7
7 changed files with 658 additions and 278 deletions

View File

@ -15,6 +15,7 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.admin.indices.alias.Alias; import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest; import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
@ -40,8 +41,10 @@ import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.GetAliasesResponse;
import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.cluster.metadata.AliasMetaData;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.ByteSizeValue;
@ -65,6 +68,7 @@ import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer; import java.util.function.BiConsumer;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -194,6 +198,20 @@ public class ElasticsearchTemplate implements Closeable {
return client.indices().exists(request.indices(index), RequestOptions.DEFAULT); return client.indices().exists(request.indices(index), RequestOptions.DEFAULT);
} }
public Set<String> getIndexSet(String alias) throws IOException {
GetAliasesRequest request = new GetAliasesRequest(alias);
GetAliasesResponse response = client.indices().getAlias(request, RequestOptions.DEFAULT);
if (StrUtil.isNotBlank(response.getError())) {
String msg = StrUtil.format("【ES】获取索引失败alias: {}, error: {}", alias, response.getError());
throw new ElasticsearchException(msg);
}
if (response.getException() != null) {
throw response.getException();
}
Map<String, Set<AliasMetaData>> aliasMap = response.getAliases();
return aliasMap.keySet();
}
public void setMapping(String index, String type, Map<String, String> propertiesMap) throws IOException { public void setMapping(String index, String type, Map<String, String> propertiesMap) throws IOException {
if (MapUtil.isEmpty(propertiesMap)) { if (MapUtil.isEmpty(propertiesMap)) {
@ -459,7 +477,7 @@ public class ElasticsearchTemplate implements Closeable {
throws IOException { throws IOException {
if (CollectionUtil.isEmpty(ids)) { if (CollectionUtil.isEmpty(ids)) {
return null; return new ArrayList<>(0);
} }
MultiGetRequest request = new MultiGetRequest(); MultiGetRequest request = new MultiGetRequest();
@ -471,7 +489,7 @@ public class ElasticsearchTemplate implements Closeable {
if (null == multiGetResponse if (null == multiGetResponse
|| multiGetResponse.getResponses() == null || multiGetResponse.getResponses() == null
|| multiGetResponse.getResponses().length <= 0) { || multiGetResponse.getResponses().length <= 0) {
return new ArrayList<>(); return new ArrayList<>(0);
} }
List<T> list = new ArrayList<>(); List<T> list = new ArrayList<>();
@ -491,7 +509,7 @@ public class ElasticsearchTemplate implements Closeable {
public long count(String index, String type, SearchSourceBuilder builder) throws IOException { public long count(String index, String type, SearchSourceBuilder builder) throws IOException {
SearchResponse response = query(index, type, builder); SearchResponse response = query(index, type, builder);
if (response == null || response.status() != RestStatus.OK) { if (response == null || response.status() != RestStatus.OK) {
return -1L; return 0L;
} }
SearchHits searchHits = response.getHits(); SearchHits searchHits = response.getHits();
return searchHits.getTotalHits(); return searchHits.getTotalHits();
@ -550,15 +568,15 @@ public class ElasticsearchTemplate implements Closeable {
/** /**
* search after * search after
*/ */
public <T extends BaseEsEntity> ScrollData<T> pojoPageByLastId(String index, String type, String lastId, int size, public <T extends BaseEsEntity> ScrollData<T> pojoPageByScrollId(String index, String type, String scrollId, int size,
QueryBuilder queryBuilder, Class<T> clazz) throws IOException { QueryBuilder queryBuilder, Class<T> clazz) throws IOException {
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.size(size); searchSourceBuilder.size(size);
searchSourceBuilder.sort(BaseEsEntity.DOC_ID, SortOrder.ASC); searchSourceBuilder.sort(BaseEsEntity.DOC_ID, SortOrder.ASC);
if (StrUtil.isNotBlank(lastId)) { if (StrUtil.isNotBlank(scrollId)) {
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery(); BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
boolQueryBuilder.must(queryBuilder).must(QueryBuilders.rangeQuery(BaseEsEntity.DOC_ID).gt(lastId)); boolQueryBuilder.must(queryBuilder).must(QueryBuilders.rangeQuery(BaseEsEntity.DOC_ID).gt(scrollId));
searchSourceBuilder.query(boolQueryBuilder); searchSourceBuilder.query(boolQueryBuilder);
} else { } else {
searchSourceBuilder.query(queryBuilder); searchSourceBuilder.query(queryBuilder);
@ -639,9 +657,7 @@ public class ElasticsearchTemplate implements Closeable {
} }
public <T> T toPojo(GetResponse response, Class<T> clazz) { public <T> T toPojo(GetResponse response, Class<T> clazz) {
if (null == response) { if (null == response || StrUtil.isBlank(response.getSourceAsString())) {
return null;
} else if (StrUtil.isBlank(response.getSourceAsString())) {
return null; return null;
} else { } else {
return JsonUtil.toBean(response.getSourceAsString(), clazz); return JsonUtil.toBean(response.getSourceAsString(), clazz);
@ -649,15 +665,12 @@ public class ElasticsearchTemplate implements Closeable {
} }
public <T> List<T> toPojoList(SearchResponse response, Class<T> clazz) { public <T> List<T> toPojoList(SearchResponse response, Class<T> clazz) {
if (response == null || response.status() != RestStatus.OK) { if (response == null || response.status() != RestStatus.OK) {
return new ArrayList<>(); return new ArrayList<>(0);
} }
if (ArrayUtil.isEmpty(response.getHits().getHits())) { if (ArrayUtil.isEmpty(response.getHits().getHits())) {
return new ArrayList<>(); return new ArrayList<>(0);
} }
return Stream.of(response.getHits().getHits()) return Stream.of(response.getHits().getHits())
.map(hit -> JsonUtil.toBean(hit.getSourceAsString(), clazz)) .map(hit -> JsonUtil.toBean(hit.getSourceAsString(), clazz))
.collect(Collectors.toList()); .collect(Collectors.toList());

View File

@ -1,10 +1,11 @@
package io.github.dunwu.javadb.elasticsearch.mapper; package io.github.dunwu.javadb.elasticsearch.mapper;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.date.DatePattern; import cn.hutool.core.date.DatePattern;
import cn.hutool.core.date.DateTime; import cn.hutool.core.date.DateTime;
import cn.hutool.core.date.DateUtil; import cn.hutool.core.date.DateUtil;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONUtil;
import io.github.dunwu.javadb.elasticsearch.ElasticsearchTemplate; import io.github.dunwu.javadb.elasticsearch.ElasticsearchTemplate;
import io.github.dunwu.javadb.elasticsearch.constant.ResultCode; import io.github.dunwu.javadb.elasticsearch.constant.ResultCode;
import io.github.dunwu.javadb.elasticsearch.entity.BaseEsEntity; import io.github.dunwu.javadb.elasticsearch.entity.BaseEsEntity;
@ -20,9 +21,9 @@ import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Map;
/** /**
* ES Mapper index * ES Mapper index
@ -39,6 +40,11 @@ public abstract class BaseDynamicEsMapper<T extends BaseEsEntity> extends BaseEs
super(elasticsearchTemplate); super(elasticsearchTemplate);
} }
@Override
public boolean enableAutoCreateIndex() {
return true;
}
// ==================================================================== // ====================================================================
// 索引管理操作 // 索引管理操作
// ==================================================================== // ====================================================================
@ -64,68 +70,139 @@ public abstract class BaseDynamicEsMapper<T extends BaseEsEntity> extends BaseEs
return alias + "_" + formatDate; return alias + "_" + formatDate;
} }
public boolean isIndexExistsInDay(String day) throws IOException { public boolean isIndexExistsInDay(String day) {
return elasticsearchTemplate.isIndexExists(getIndex(day)); if (StrUtil.isBlank(day)) {
} return false;
}
public String createIndexInDay(String day) throws IOException, DefaultException {
String index = getIndex(day); String index = getIndex(day);
boolean indexExists = isIndexExistsInDay(day); try {
if (indexExists) { return elasticsearchTemplate.isIndexExists(getIndex(day));
return index; } catch (Exception e) {
log.error("【ES】判断索引是否存在异常index: {}", index, e);
return false;
} }
elasticsearchTemplate.createIndex(index, getType(), getAlias(), getShard(), getReplica());
Map<String, String> map = getPropertiesMap();
if (MapUtil.isNotEmpty(map)) {
elasticsearchTemplate.setMapping(index, getType(), map);
}
return index;
} }
public void deleteIndexInDay(String day) throws IOException { public String createIndexIfNotExistsInDay(String day) {
elasticsearchTemplate.deleteIndex(getIndex(day)); String index = getIndex(day);
String type = getType();
String alias = getAlias();
int shard = getShard();
int replica = getReplica();
return createIndex(index, type, alias, shard, replica);
} }
public void updateAliasInDay(String day) throws IOException { public void deleteIndexInDay(String day) {
elasticsearchTemplate.updateAlias(getIndex(day), getAlias()); String index = getIndex(day);
try {
log.info("【ES】删除索引成功index: {}", index);
elasticsearchTemplate.deleteIndex(index);
} catch (Exception e) {
log.error("【ES】删除索引异常index: {}", index, e);
}
}
public void updateAliasInDay(String day) {
String index = getIndex(day);
String alias = getAlias();
try {
log.info("【ES】更新别名成功alias: {} -> index: {}", alias, index);
elasticsearchTemplate.updateAlias(index, alias);
} catch (IOException e) {
log.error("【ES】更新别名异常alias: {} -> index: {}", alias, index, e);
}
} }
// ==================================================================== // ====================================================================
// CRUD 操作 // CRUD 操作
// ==================================================================== // ====================================================================
public GetResponse getByIdInDay(String day, String id) throws IOException { public GetResponse getByIdInDay(String day, String id) {
return elasticsearchTemplate.getById(getIndex(day), getType(), id, null); String index = getIndex(day);
String type = getType();
try {
return elasticsearchTemplate.getById(index, type, id, null);
} catch (IOException e) {
log.error("【ES】根据ID查询异常index: {}, type: {}, id: {}", index, type, id, e);
return null;
}
} }
public T pojoByIdInDay(String day, String id) throws IOException { public T pojoByIdInDay(String day, String id) {
return elasticsearchTemplate.pojoById(getIndex(day), getType(), id, null, getEntityClass()); String index = getIndex(day);
String type = getType();
try {
return elasticsearchTemplate.pojoById(index, type, id, null, getEntityClass());
} catch (IOException e) {
log.error("【ES】根据ID查询POJO异常index: {}, type: {}, id: {}", index, type, id, e);
return null;
}
} }
public List<T> pojoListByIdsInDay(String day, Collection<String> ids) throws IOException { public List<T> pojoListByIdsInDay(String day, Collection<String> ids) {
return elasticsearchTemplate.pojoListByIds(getIndex(day), getType(), ids, getEntityClass()); String index = getIndex(day);
String type = getType();
try {
return elasticsearchTemplate.pojoListByIds(index, type, ids, getEntityClass());
} catch (IOException e) {
log.error("【ES】根据ID查询POJO列表异常index: {}, type: {}, ids: {}", index, type, ids, e);
return new ArrayList<>(0);
}
} }
public long countInDay(String day, SearchSourceBuilder builder) throws IOException { public long countInDay(String day, SearchSourceBuilder builder) {
return elasticsearchTemplate.count(getIndex(day), getType(), builder); String index = getIndex(day);
String type = getType();
try {
return elasticsearchTemplate.count(index, type, builder);
} catch (IOException e) {
log.error("【ES】获取匹配记录数异常index: {}, type: {}", index, type, e);
return 0L;
}
} }
public SearchResponse queryInDay(String day, SearchSourceBuilder builder) throws IOException { public SearchResponse queryInDay(String day, SearchSourceBuilder builder) {
return elasticsearchTemplate.query(getIndex(day), getType(), builder); String index = getIndex(day);
String type = getType();
try {
return elasticsearchTemplate.query(index, type, builder);
} catch (IOException e) {
log.error("【ES】条件查询异常index: {}, type: {}", index, type, e);
return null;
}
} }
public PageData<T> pojoPageInDay(String day, SearchSourceBuilder builder) throws IOException { public PageData<T> pojoPageInDay(String day, SearchSourceBuilder builder) {
return elasticsearchTemplate.pojoPage(getIndex(day), getType(), builder, getEntityClass()); String index = getIndex(day);
String type = getType();
try {
return elasticsearchTemplate.pojoPage(index, type, builder, getEntityClass());
} catch (IOException e) {
log.error("【ES】from + size 分页条件查询异常index: {}, type: {}", index, type, e);
return null;
}
} }
public ScrollData<T> pojoPageByLastIdInDay(String day, String lastId, int size, QueryBuilder queryBuilder) public ScrollData<T> pojoPageByLastIdInDay(String day, String scrollId, int size, QueryBuilder queryBuilder) {
throws IOException { String index = getIndex(day);
return elasticsearchTemplate.pojoPageByLastId(getIndex(day), getType(), lastId, size, String type = getType();
queryBuilder, getEntityClass()); try {
return elasticsearchTemplate.pojoPageByScrollId(index, type, scrollId, size, queryBuilder, getEntityClass());
} catch (IOException e) {
log.error("【ES】search after 分页条件查询异常index: {}, type: {}", index, type, e);
return null;
}
} }
public ScrollData<T> pojoScrollBeginInDay(String day, SearchSourceBuilder builder) throws IOException { public ScrollData<T> pojoScrollBeginInDay(String day, SearchSourceBuilder builder) {
return elasticsearchTemplate.pojoScrollBegin(getIndex(day), getType(), builder, getEntityClass()); String index = getIndex(day);
String type = getType();
try {
return elasticsearchTemplate.pojoScrollBegin(index, type, builder, getEntityClass());
} catch (IOException e) {
log.error("【ES】开启滚动分页条件查询异常index: {}, type: {}", index, type, e);
return null;
}
} }
/** /**
@ -135,11 +212,20 @@ public abstract class BaseDynamicEsMapper<T extends BaseEsEntity> extends BaseEs
* @param entity * @param entity
* @return / * @return /
*/ */
public boolean saveInDay(String day, T entity) throws IOException, DefaultException { public T saveInDay(String day, T entity) {
String index = checkIndex(day); if (StrUtil.isBlank(day) || entity == null) {
checkData(entity); return null;
elasticsearchTemplate.save(index, getType(), entity); }
return true; String index = getIndex(day);
String type = getType();
try {
checkIndex(day);
checkData(entity);
return elasticsearchTemplate.save(index, getType(), entity);
} catch (IOException e) {
log.error("【ES】添加数据异常index: {}, type: {}, entity: {}", index, type, JSONUtil.toJsonStr(entity), e);
return null;
}
} }
/** /**
@ -149,59 +235,94 @@ public abstract class BaseDynamicEsMapper<T extends BaseEsEntity> extends BaseEs
* @param list * @param list
* @return / * @return /
*/ */
public boolean saveBatchInDay(String day, Collection<T> list) throws IOException, DefaultException { public boolean saveBatchInDay(String day, Collection<T> list) {
String index = checkIndex(day); if (StrUtil.isBlank(day) || CollectionUtil.isEmpty(list)) {
checkData(list); return false;
elasticsearchTemplate.saveBatch(index, getType(), list); }
return true; String index = getIndex(day);
String type = getType();
try {
checkIndex(day);
checkData(list);
return elasticsearchTemplate.saveBatch(index, type, list);
} catch (IOException e) {
log.error("【ES】批量添加数据异常index: {}, type: {}, size: {}", index, type, list.size(), e);
return false;
}
} }
public void asyncSaveBatchInDay(String day, Collection<T> list) throws IOException { public void asyncSaveBatchInDay(String day, Collection<T> list) {
String index = checkIndex(day); asyncSaveBatchInDay(day, list, DEFAULT_BULK_LISTENER);
checkData(list);
ActionListener<BulkResponse> listener = new ActionListener<BulkResponse>() {
@Override
public void onResponse(BulkResponse response) {
if (response != null && !response.hasFailures()) {
String msg = StrUtil.format("【ES】按日期异步批量保存 {} 成功!", index);
log.info(msg);
} else {
String msg = StrUtil.format("【ES】按日期异步批量保存 {} 失败!", index);
log.warn(msg);
}
}
@Override
public void onFailure(Exception e) {
String msg = StrUtil.format("【ES】按日期异步批量保存 {} 异常!", index);
log.error(msg, e);
}
};
asyncSaveBatchInDay(day, list, listener);
} }
public void asyncSaveBatchInDay(String day, Collection<T> list, ActionListener<BulkResponse> listener) public void asyncSaveBatchInDay(String day, Collection<T> list, ActionListener<BulkResponse> listener) {
throws IOException { if (StrUtil.isBlank(day) || CollectionUtil.isEmpty(list)) {
String index = checkIndex(day); return;
checkData(list); }
elasticsearchTemplate.asyncSaveBatch(getIndex(day), getType(), list, listener); String index = getIndex(day);
String type = getType();
try {
checkIndex(day);
checkData(list);
elasticsearchTemplate.asyncSaveBatch(index, type, list, listener);
} catch (Exception e) {
log.error("【ES】异步批量添加数据异常index: {}, type: {}, size: {}", index, type, list.size(), e);
}
} }
public boolean deleteByIdInDay(String day, String id) throws IOException { public void asyncUpdateBatchIdsInDay(String day, Collection<T> list) {
return elasticsearchTemplate.deleteById(getIndex(day), getType(), id); asyncUpdateBatchIdsInDay(day, list, DEFAULT_BULK_LISTENER);
} }
public boolean deleteBatchIdsInDay(String day, Collection<String> ids) throws IOException { public void asyncUpdateBatchIdsInDay(String day, Collection<T> list, ActionListener<BulkResponse> listener) {
return elasticsearchTemplate.deleteBatchIds(getIndex(day), getType(), ids); if (StrUtil.isBlank(day) || CollectionUtil.isEmpty(list)) {
return;
}
String index = getIndex(day);
String type = getType();
try {
checkData(list);
elasticsearchTemplate.asyncUpdateBatchIds(index, type, list, listener);
} catch (Exception e) {
log.error("【ES】异步批量更新数据异常index: {}, type: {}, size: {}", index, type, list.size(), e);
}
} }
protected String checkIndex(String day) throws IOException { public boolean deleteByIdInDay(String day, String id) {
if (StrUtil.isBlank(day) || StrUtil.isBlank(id)) {
return false;
}
String index = getIndex(day);
String type = getType();
try {
return elasticsearchTemplate.deleteById(index, type, id);
} catch (IOException e) {
log.error("【ES】根据ID删除数据异常index: {}, type: {}, id: {}", index, type, id, e);
return false;
}
}
public boolean deleteBatchIdsInDay(String day, Collection<String> ids) {
if (StrUtil.isBlank(day) || CollectionUtil.isEmpty(ids)) {
return false;
}
String index = getIndex(day);
String type = getType();
try {
return elasticsearchTemplate.deleteBatchIds(index, type, ids);
} catch (IOException e) {
log.error("【ES】根据ID批量删除数据异常index: {}, type: {}, ids: {}", index, type, ids, e);
return false;
}
}
protected String checkIndex(String day) {
if (!enableAutoCreateIndex()) { if (!enableAutoCreateIndex()) {
return getIndex(day); return getIndex(day);
} }
String index = createIndexInDay(day); String index = createIndexIfNotExistsInDay(day);
if (StrUtil.isBlank(index)) { if (StrUtil.isBlank(index)) {
String msg = StrUtil.format("【ES】按日期批量保存 {} 失败!索引找不到且创建失败!", index); String msg = StrUtil.format("【ES】索引 {}_{} 找不到且创建失败!", getAlias(), day);
throw new DefaultException(ResultCode.ERROR, msg); throw new DefaultException(ResultCode.ERROR, msg);
} }
return index; return index;

View File

@ -4,6 +4,7 @@ import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.map.MapUtil; import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.ReflectUtil; import cn.hutool.core.util.ReflectUtil;
import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONUtil;
import io.github.dunwu.javadb.elasticsearch.ElasticsearchTemplate; import io.github.dunwu.javadb.elasticsearch.ElasticsearchTemplate;
import io.github.dunwu.javadb.elasticsearch.constant.ResultCode; import io.github.dunwu.javadb.elasticsearch.constant.ResultCode;
import io.github.dunwu.javadb.elasticsearch.entity.BaseEsEntity; import io.github.dunwu.javadb.elasticsearch.entity.BaseEsEntity;
@ -20,12 +21,14 @@ import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder;
import java.io.IOException;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
/** /**
* ES Mapper * ES Mapper
@ -52,13 +55,6 @@ public abstract class BaseEsMapper<T extends BaseEsEntity> implements EsMapper<T
return 1; return 1;
} }
/**
* ES
*/
public boolean enableAutoCreateIndex() {
return true;
}
@Override @Override
public RestHighLevelClient getClient() { public RestHighLevelClient getClient() {
if (elasticsearchTemplate == null) { if (elasticsearchTemplate == null) {
@ -77,14 +73,13 @@ public abstract class BaseEsMapper<T extends BaseEsEntity> implements EsMapper<T
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public Map<String, String> getPropertiesMap() { public Map<String, String> getPropertiesMap() {
Class<T> clazz = getEntityClass(); Class<T> clazz = getEntityClass();
Method method; Method method;
try { try {
method = clazz.getMethod("getPropertiesMap"); method = clazz.getMethod("getPropertiesMap");
} catch (NoSuchMethodException e) { } catch (NoSuchMethodException e) {
String msg = StrUtil.format("【ES】检查并创建 {} 索引失败day 不能为空!", getAlias()); log.error("【ES】{} 中不存在 getPropertiesMap 方法!", clazz.getCanonicalName());
throw new DefaultException(e, ResultCode.ERROR, msg); return new HashMap<>(0);
} }
Object result = ReflectUtil.invokeStatic(method); Object result = ReflectUtil.invokeStatic(method);
@ -99,33 +94,80 @@ public abstract class BaseEsMapper<T extends BaseEsEntity> implements EsMapper<T
// ==================================================================== // ====================================================================
@Override @Override
public boolean isIndexExists() throws IOException { public boolean isIndexExists() {
return elasticsearchTemplate.isIndexExists(getIndex());
}
@Override
public String createIndexIfNotExists() throws IOException {
String index = getIndex(); String index = getIndex();
boolean exists = elasticsearchTemplate.isIndexExists(index); try {
if (exists) { return elasticsearchTemplate.isIndexExists(index);
} catch (Exception e) {
log.error("【ES】判断索引是否存在异常index: {}", index, e);
return false;
}
}
@Override
public String createIndexIfNotExists() {
String index = getIndex();
String type = getType();
String alias = getAlias();
int shard = getShard();
int replica = getReplica();
return createIndex(index, type, alias, shard, replica);
}
protected String createIndex(String index, String type, String alias, int shard, int replica) {
try {
if (elasticsearchTemplate.isIndexExists(index)) {
return index;
}
elasticsearchTemplate.createIndex(index, type, alias, shard, replica);
log.info("【ES】创建索引成功index: {}, type: {}, alias: {}, shard: {}, replica: {}",
index, type, alias, shard, replica);
Map<String, String> propertiesMap = getPropertiesMap();
if (MapUtil.isNotEmpty(propertiesMap)) {
elasticsearchTemplate.setMapping(index, type, propertiesMap);
log.error("【ES】设置索引 mapping 成功index: {}, type: {}, propertiesMap: {}",
index, type, JSONUtil.toJsonStr(propertiesMap));
}
return index; return index;
} catch (Exception e) {
log.error("【ES】创建索引异常index: {}, type: {}, alias: {}, shard: {}, replica: {}",
index, type, alias, shard, replica, e);
return null;
} }
elasticsearchTemplate.createIndex(index, getType(), getAlias(), getShard(), getReplica());
Map<String, String> propertiesMap = getPropertiesMap();
if (MapUtil.isNotEmpty(propertiesMap)) {
elasticsearchTemplate.setMapping(index, getType(), propertiesMap);
}
return index;
} }
@Override @Override
public void deleteIndex() throws IOException { public void deleteIndex() {
elasticsearchTemplate.deleteIndex(getIndex()); String index = getIndex();
try {
log.info("【ES】删除索引成功index: {}", index);
elasticsearchTemplate.deleteIndex(index);
} catch (Exception e) {
log.error("【ES】删除索引异常index: {}", index, e);
}
} }
@Override @Override
public void updateAlias() throws IOException { public void updateAlias() {
elasticsearchTemplate.updateAlias(getIndex(), getAlias()); String index = getIndex();
String alias = getAlias();
try {
log.info("【ES】更新别名成功alias: {} -> index: {}", alias, index);
elasticsearchTemplate.updateAlias(index, alias);
} catch (Exception e) {
log.error("【ES】更新别名异常alias: {} -> index: {}", alias, index, e);
}
}
@Override
public Set<String> getIndexSet() {
String alias = getAlias();
try {
return elasticsearchTemplate.getIndexSet(alias);
} catch (Exception e) {
log.error("【ES】获取别名的所有索引异常alias: {}", alias, e);
return new HashSet<>(0);
}
} }
// ==================================================================== // ====================================================================
@ -133,172 +175,295 @@ public abstract class BaseEsMapper<T extends BaseEsEntity> implements EsMapper<T
// ==================================================================== // ====================================================================
@Override @Override
public GetResponse getById(String id) throws IOException { public GetResponse getById(String id) {
return getById(id, null); return getById(id, null);
} }
@Override @Override
public GetResponse getById(String id, Long version) throws IOException { public GetResponse getById(String id, Long version) {
return elasticsearchTemplate.getById(getIndex(), getType(), id, version); String index = getIndex();
String type = getType();
try {
return elasticsearchTemplate.getById(index, type, id, version);
} catch (Exception e) {
log.error("【ES】根据ID查询异常index: {}, type: {}, id: {}, version: {}", index, type, id, version, e);
return null;
}
} }
@Override @Override
public T pojoById(String id) throws IOException { public T pojoById(String id) {
return pojoById(id, null); return pojoById(id, null);
} }
@Override @Override
public T pojoById(String id, Long version) throws IOException { public T pojoById(String id, Long version) {
return elasticsearchTemplate.pojoById(getIndex(), getType(), id, version, getEntityClass()); String index = getIndex();
String type = getType();
try {
return elasticsearchTemplate.pojoById(index, type, id, version, getEntityClass());
} catch (Exception e) {
log.error("【ES】根据ID查询POJO异常index: {}, type: {}, id: {}, version: {}", index, type, id, version, e);
return null;
}
} }
@Override @Override
public List<T> pojoListByIds(Collection<String> ids) throws IOException { public List<T> pojoListByIds(Collection<String> ids) {
return elasticsearchTemplate.pojoListByIds(getIndex(), getType(), ids, getEntityClass()); String index = getIndex();
String type = getType();
try {
return elasticsearchTemplate.pojoListByIds(index, type, ids, getEntityClass());
} catch (Exception e) {
log.error("【ES】根据ID查询POJO列表异常index: {}, type: {}, ids: {}", index, type, ids, e);
return new ArrayList<>(0);
}
} }
@Override @Override
public long count(SearchSourceBuilder builder) throws IOException { public long count(SearchSourceBuilder builder) {
return elasticsearchTemplate.count(getIndex(), getType(), builder); String index = getIndex();
String type = getType();
try {
return elasticsearchTemplate.count(index, type, builder);
} catch (Exception e) {
log.error("【ES】获取匹配记录数异常index: {}, type: {}", index, type, e);
return 0L;
}
} }
@Override @Override
public SearchResponse query(SearchSourceBuilder builder) throws IOException { public SearchResponse query(SearchSourceBuilder builder) {
return elasticsearchTemplate.query(getIndex(), getType(), builder); String index = getIndex();
} String type = getType();
@Override try {
public PageData<T> pojoPage(SearchSourceBuilder builder) throws IOException { return elasticsearchTemplate.query(index, type, builder);
return elasticsearchTemplate.pojoPage(getIndex(), getType(), builder, getEntityClass()); } catch (Exception e) {
log.error("【ES】条件查询异常index: {}, type: {}", index, type, e);
return null;
}
} }
@Override @Override
public ScrollData<T> pojoPageByLastId(String lastId, int size, QueryBuilder queryBuilder) throws IOException { public PageData<T> pojoPage(SearchSourceBuilder builder) {
return elasticsearchTemplate.pojoPageByLastId(getIndex(), getType(), lastId, size, String index = getIndex();
queryBuilder, getEntityClass()); String type = getType();
try {
return elasticsearchTemplate.pojoPage(index, type, builder, getEntityClass());
} catch (Exception e) {
log.error("【ES】from + size 分页条件查询异常index: {}, type: {}", index, type, e);
return null;
}
} }
@Override @Override
public ScrollData<T> pojoScrollBegin(SearchSourceBuilder builder) throws IOException { public ScrollData<T> pojoPageByLastId(String scrollId, int size, QueryBuilder queryBuilder) {
return elasticsearchTemplate.pojoScrollBegin(getIndex(), getType(), builder, getEntityClass()); String index = getIndex();
String type = getType();
try {
return elasticsearchTemplate.pojoPageByScrollId(index, type, scrollId, size, queryBuilder,
getEntityClass());
} catch (Exception e) {
log.error("【ES】search after 分页条件查询异常index: {}, type: {}", index, type, e);
return null;
}
} }
@Override @Override
public ScrollData<T> pojoScroll(String scrollId, SearchSourceBuilder builder) throws IOException { public ScrollData<T> pojoScrollBegin(SearchSourceBuilder builder) {
return elasticsearchTemplate.pojoScroll(scrollId, builder, getEntityClass()); String index = getIndex();
String type = getType();
try {
return elasticsearchTemplate.pojoScrollBegin(index, type, builder, getEntityClass());
} catch (Exception e) {
log.error("【ES】开启滚动分页条件查询异常index: {}, type: {}", index, type, e);
return null;
}
} }
@Override @Override
public boolean pojoScrollEnd(String scrollId) throws IOException { public ScrollData<T> pojoScroll(String scrollId, SearchSourceBuilder builder) {
return elasticsearchTemplate.pojoScrollEnd(scrollId); try {
} return elasticsearchTemplate.pojoScroll(scrollId, builder, getEntityClass());
} catch (Exception e) {
log.error("【ES】滚动分页条件查询异常scrollId: {}", scrollId, e);
@Override return null;
public T save(T entity) throws IOException { }
String index = checkIndex();
checkData(entity);
return elasticsearchTemplate.save(index, getType(), entity);
} }
@Override @Override
public boolean saveBatch(Collection<T> list) throws IOException { public boolean pojoScrollEnd(String scrollId) {
String index = checkIndex(); try {
checkData(list); return elasticsearchTemplate.pojoScrollEnd(scrollId);
return elasticsearchTemplate.saveBatch(index, getType(), list); } catch (Exception e) {
log.error("【ES】关闭滚动分页条件查询异常scrollId: {}", scrollId, e);
return false;
}
} }
@Override @Override
public void asyncSaveBatch(Collection<T> list) throws IOException { public T save(T entity) {
String index = checkIndex(); if (entity == null) {
checkData(list); return null;
ActionListener<BulkResponse> listener = new ActionListener<BulkResponse>() { }
@Override String index = getIndex();
public void onResponse(BulkResponse response) { String type = getType();
if (response != null && !response.hasFailures()) { try {
String msg = StrUtil.format("【ES】异步批量保存 {} 成功!", index); checkIndex();
log.info(msg); checkData(entity);
} else { return elasticsearchTemplate.save(index, type, entity);
String msg = StrUtil.format("【ES】异步批量保存 {} 失败!", index); } catch (Exception e) {
log.warn(msg); log.error("【ES】添加数据异常index: {}, type: {}, entity: {}", index, type, JSONUtil.toJsonStr(entity), e);
} return null;
} }
@Override
public void onFailure(Exception e) {
String msg = StrUtil.format("【ES】异步批量保存 {} 异常!", index);
log.error(msg, e);
}
};
asyncSaveBatch(list, listener);
} }
@Override @Override
public void asyncSaveBatch(Collection<T> list, ActionListener<BulkResponse> listener) throws IOException { public boolean saveBatch(Collection<T> list) {
String index = checkIndex(); if (CollectionUtil.isEmpty(list)) {
checkData(list); return false;
elasticsearchTemplate.asyncSaveBatch(index, getType(), list, listener); }
String index = getIndex();
String type = getType();
try {
checkIndex();
checkData(list);
return elasticsearchTemplate.saveBatch(index, type, list);
} catch (Exception e) {
log.error("【ES】批量添加数据异常index: {}, type: {}, size: {}", index, type, list.size(), e);
return false;
}
} }
@Override @Override
public T updateById(T entity) throws IOException { public void asyncSaveBatch(Collection<T> list) {
checkData(entity); asyncSaveBatch(list, DEFAULT_BULK_LISTENER);
return elasticsearchTemplate.updateById(getIndex(), getType(), entity);
} }
@Override @Override
public boolean updateBatchIds(Collection<T> list) throws IOException { public void asyncSaveBatch(Collection<T> list, ActionListener<BulkResponse> listener) {
checkData(list); if (CollectionUtil.isEmpty(list)) {
return elasticsearchTemplate.updateBatchIds(getIndex(), getType(), list); return;
}
String index = getIndex();
String type = getType();
try {
checkIndex();
checkData(list);
elasticsearchTemplate.asyncSaveBatch(index, getType(), list, listener);
} catch (Exception e) {
log.error("【ES】异步批量添加数据异常index: {}, type: {}, size: {}", index, type, list.size(), e);
}
}
@Override
public T updateById(T entity) {
if (entity == null) {
return null;
}
String index = getIndex();
String type = getType();
try {
checkData(entity);
return elasticsearchTemplate.updateById(index, type, entity);
} catch (Exception e) {
log.error("【ES】更新数据异常index: {}, type: {}", index, type, e);
return null;
}
}
@Override
public boolean updateBatchIds(Collection<T> list) {
if (CollectionUtil.isEmpty(list)) {
return false;
}
String index = getIndex();
String type = getType();
try {
checkData(list);
return elasticsearchTemplate.updateBatchIds(index, type, list);
} catch (Exception e) {
log.error("【ES】批量更新数据异常index: {}, type: {}, size: {}", index, type, list.size(), e);
return false;
}
}
@Override
public void asyncUpdateBatchIds(Collection<T> list) {
asyncUpdateBatchIds(list, DEFAULT_BULK_LISTENER);
} }
@Override @Override
public void asyncUpdateBatchIds(Collection<T> list, ActionListener<BulkResponse> listener) { public void asyncUpdateBatchIds(Collection<T> list, ActionListener<BulkResponse> listener) {
checkData(list); if (CollectionUtil.isEmpty(list)) {
elasticsearchTemplate.asyncUpdateBatchIds(getIndex(), getType(), list, listener); return;
}
String index = getIndex();
String type = getType();
try {
checkData(list);
elasticsearchTemplate.asyncUpdateBatchIds(index, type, list, listener);
} catch (Exception e) {
log.error("【ES】异步批量更新数据异常index: {}, type: {}, size: {}", index, type, list.size(), e);
}
} }
@Override @Override
public boolean deleteById(String id) throws IOException { public boolean deleteById(String id) {
return elasticsearchTemplate.deleteById(getIndex(), getType(), id); if (StrUtil.isBlank(id)) {
return false;
}
String index = getIndex();
String type = getType();
try {
return elasticsearchTemplate.deleteById(index, type, id);
} catch (Exception e) {
log.error("【ES】根据ID删除数据异常index: {}, type: {}, id: {}", index, type, id, e);
return false;
}
} }
@Override @Override
public boolean deleteBatchIds(Collection<String> ids) throws IOException { public boolean deleteBatchIds(Collection<String> ids) {
return elasticsearchTemplate.deleteBatchIds(getIndex(), getType(), ids); if (CollectionUtil.isEmpty(ids)) {
return false;
}
String index = getIndex();
String type = getType();
try {
return elasticsearchTemplate.deleteBatchIds(index, type, ids);
} catch (Exception e) {
log.error("【ES】根据ID批量删除数据异常index: {}, type: {}, ids: {}", index, type, ids, e);
return false;
}
} }
@Override @Override
public void asyncDeleteBatchIds(Collection<String> ids) throws IOException { public void asyncDeleteBatchIds(Collection<String> ids) {
ActionListener<BulkResponse> listener = new ActionListener<BulkResponse>() { asyncDeleteBatchIds(ids, DEFAULT_BULK_LISTENER);
@Override
public void onResponse(BulkResponse response) {
if (response != null && !response.hasFailures()) {
log.info("【ES】异步批量删除成功");
} else {
log.warn("【ES】异步批量删除失败ids: {}", ids);
}
}
@Override
public void onFailure(Exception e) {
log.error("【ES】异步批量删除异常ids: {}", ids, e);
}
};
asyncDeleteBatchIds(ids, listener);
} }
@Override @Override
public void asyncDeleteBatchIds(Collection<String> ids, ActionListener<BulkResponse> listener) throws IOException { public void asyncDeleteBatchIds(Collection<String> ids, ActionListener<BulkResponse> listener) {
elasticsearchTemplate.asyncDeleteBatchIds(getIndex(), getType(), ids, listener); if (CollectionUtil.isEmpty(ids)) {
return;
}
String index = getIndex();
String type = getType();
try {
elasticsearchTemplate.asyncDeleteBatchIds(index, type, ids, listener);
} catch (Exception e) {
log.error("【ES】异步根据ID批量删除数据异常index: {}, type: {}, ids: {}", index, type, ids, e);
}
} }
protected String checkIndex() throws IOException { protected String checkIndex() {
if (!enableAutoCreateIndex()) { if (!enableAutoCreateIndex()) {
return getIndex(); return getIndex();
} }
String index = createIndexIfNotExists(); String index = createIndexIfNotExists();
if (StrUtil.isBlank(index)) { if (StrUtil.isBlank(index)) {
String msg = StrUtil.format("【ES】索引找不到且创建失败", index); String msg = StrUtil.format("【ES】索引 {} 找不到且创建失败!", index);
throw new DefaultException(ResultCode.ERROR, msg); throw new DefaultException(ResultCode.ERROR, msg);
} }
return index; return index;
@ -318,4 +483,20 @@ public abstract class BaseEsMapper<T extends BaseEsEntity> implements EsMapper<T
} }
} }
protected final ActionListener<BulkResponse> DEFAULT_BULK_LISTENER = new ActionListener<BulkResponse>() {
@Override
public void onResponse(BulkResponse response) {
if (response != null && !response.hasFailures()) {
log.info("【ES】异步批量写数据成功index: {}, type: {}", getIndex(), getType());
} else {
log.warn("【ES】异步批量写数据失败index: {}, type: {}", getIndex(), getType());
}
}
@Override
public void onFailure(Exception e) {
log.error("【ES】异步批量写数据异常index: {}, type: {}", getIndex(), getType());
}
};
} }

View File

@ -13,11 +13,11 @@ import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder;
import java.io.IOException;
import java.util.Collection; import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
/** /**
* ES Mapper * ES Mapper
@ -57,29 +57,38 @@ public interface EsMapper<T extends BaseEsEntity> {
*/ */
Class<T> getEntityClass(); Class<T> getEntityClass();
RestHighLevelClient getClient() throws IOException; /**
* ES
*/
default boolean enableAutoCreateIndex() {
return false;
}
BulkProcessor getBulkProcessor() throws IOException; RestHighLevelClient getClient();
boolean isIndexExists() throws IOException; BulkProcessor getBulkProcessor();
String createIndexIfNotExists() throws IOException; boolean isIndexExists();
void deleteIndex() throws IOException; String createIndexIfNotExists();
void updateAlias() throws IOException; void deleteIndex();
GetResponse getById(String id) throws IOException; void updateAlias();
GetResponse getById(String id, Long version) throws IOException; Set<String> getIndexSet();
T pojoById(String id) throws IOException; GetResponse getById(String id);
T pojoById(String id, Long version) throws IOException; GetResponse getById(String id, Long version);
List<T> pojoListByIds(Collection<String> ids) throws IOException; T pojoById(String id);
default Map<String, T> pojoMapByIds(Collection<String> ids) throws IOException { T pojoById(String id, Long version);
List<T> pojoListByIds(Collection<String> ids);
default Map<String, T> pojoMapByIds(Collection<String> ids) {
List<T> list = pojoListByIds(ids); List<T> list = pojoListByIds(ids);
if (CollectionUtil.isEmpty(list)) { if (CollectionUtil.isEmpty(list)) {
return new HashMap<>(0); return new HashMap<>(0);
@ -92,40 +101,42 @@ public interface EsMapper<T extends BaseEsEntity> {
return map; return map;
} }
long count(SearchSourceBuilder builder) throws IOException; long count(SearchSourceBuilder builder);
SearchResponse query(SearchSourceBuilder builder) throws IOException; SearchResponse query(SearchSourceBuilder builder);
PageData<T> pojoPage(SearchSourceBuilder builder) throws IOException; PageData<T> pojoPage(SearchSourceBuilder builder);
ScrollData<T> pojoPageByLastId(String lastId, int size, QueryBuilder queryBuilder) throws IOException; ScrollData<T> pojoPageByLastId(String scrollId, int size, QueryBuilder queryBuilder);
ScrollData<T> pojoScrollBegin(SearchSourceBuilder builder) throws IOException; ScrollData<T> pojoScrollBegin(SearchSourceBuilder builder);
ScrollData<T> pojoScroll(String scrollId, SearchSourceBuilder builder) throws IOException; ScrollData<T> pojoScroll(String scrollId, SearchSourceBuilder builder);
boolean pojoScrollEnd(String scrollId) throws IOException; boolean pojoScrollEnd(String scrollId);
T save(T entity) throws IOException; T save(T entity);
boolean saveBatch(Collection<T> list) throws IOException; boolean saveBatch(Collection<T> list);
void asyncSaveBatch(Collection<T> list) throws IOException; void asyncSaveBatch(Collection<T> list);
void asyncSaveBatch(Collection<T> list, ActionListener<BulkResponse> listener) throws IOException; void asyncSaveBatch(Collection<T> list, ActionListener<BulkResponse> listener);
T updateById(T entity) throws IOException; T updateById(T entity);
boolean updateBatchIds(Collection<T> list) throws IOException; boolean updateBatchIds(Collection<T> list);
void asyncUpdateBatchIds(Collection<T> list);
void asyncUpdateBatchIds(Collection<T> list, ActionListener<BulkResponse> listener); void asyncUpdateBatchIds(Collection<T> list, ActionListener<BulkResponse> listener);
boolean deleteById(String id) throws IOException; boolean deleteById(String id);
boolean deleteBatchIds(Collection<String> ids) throws IOException; boolean deleteBatchIds(Collection<String> ids);
void asyncDeleteBatchIds(Collection<String> ids) throws IOException; void asyncDeleteBatchIds(Collection<String> ids);
void asyncDeleteBatchIds(Collection<String> ids, ActionListener<BulkResponse> listener) throws IOException; void asyncDeleteBatchIds(Collection<String> ids, ActionListener<BulkResponse> listener);
} }

View File

@ -8,6 +8,8 @@ import io.github.dunwu.javadb.elasticsearch.entity.common.ScrollData;
import io.github.dunwu.javadb.elasticsearch.util.JsonUtil; import io.github.dunwu.javadb.elasticsearch.util.JsonUtil;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.assertj.core.api.Assertions; import org.assertj.core.api.Assertions;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.BoolQueryBuilder;
@ -19,6 +21,8 @@ import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
/** /**
* ElasticsearchTemplate * ElasticsearchTemplate
@ -75,8 +79,34 @@ public abstract class BaseElasticsearchTemplateTest<T extends BaseEsEntity> {
Assertions.assertThat(exists).isTrue(); Assertions.assertThat(exists).isTrue();
} }
public void getIndexList() throws IOException {
Set<String> set = TEMPLATE.getIndexSet(getAlias());
log.info("alias: {}, indexList: {}", getAlias(), set);
Assertions.assertThat(set).isNotEmpty();
}
protected void save() throws IOException { protected void save() throws IOException {
String id = "1"; String id = "1";
T oldEntity = getOneMockData(id);
TEMPLATE.save(getIndex(), getType(), oldEntity);
T newEntity = TEMPLATE.pojoById(getIndex(), getType(), id, getEntityClass());
log.info("记录:{}", JsonUtil.toString(newEntity));
Assertions.assertThat(newEntity).isNotNull();
}
protected void saveBatch() throws IOException {
int total = 5000;
List<List<T>> listGroup = CollectionUtil.split(getMockList(total), 1000);
for (List<T> list : listGroup) {
TEMPLATE.saveBatch(getIndex(), getType(), list);
}
long count = TEMPLATE.count(getIndex(), getType(), new SearchSourceBuilder());
log.info("批量更新记录数: {}", count);
Assertions.assertThat(count).isEqualTo(total);
}
protected void asyncSave() throws IOException {
String id = "10000";
T entity = getOneMockData(id); T entity = getOneMockData(id);
TEMPLATE.save(getIndex(), getType(), entity); TEMPLATE.save(getIndex(), getType(), entity);
T newEntity = TEMPLATE.pojoById(getIndex(), getType(), id, getEntityClass()); T newEntity = TEMPLATE.pojoById(getIndex(), getType(), id, getEntityClass());
@ -84,12 +114,13 @@ public abstract class BaseElasticsearchTemplateTest<T extends BaseEsEntity> {
Assertions.assertThat(newEntity).isNotNull(); Assertions.assertThat(newEntity).isNotNull();
} }
protected void saveBatch() throws IOException { protected void asyncSaveBatch() throws IOException, InterruptedException {
int total = 10000; int total = 10000;
List<List<T>> listGroup = CollectionUtil.split(getMockList(total), 1000); List<List<T>> listGroup = CollectionUtil.split(getMockList(total), 1000);
for (List<T> list : listGroup) { for (List<T> list : listGroup) {
TEMPLATE.saveBatch(getIndex(), getType(), list); TEMPLATE.asyncSaveBatch(getIndex(), getType(), list, DEFAULT_BULK_LISTENER);
} }
TimeUnit.SECONDS.sleep(20);
long count = TEMPLATE.count(getIndex(), getType(), new SearchSourceBuilder()); long count = TEMPLATE.count(getIndex(), getType(), new SearchSourceBuilder());
log.info("批量更新记录数: {}", count); log.info("批量更新记录数: {}", count);
Assertions.assertThat(count).isEqualTo(total); Assertions.assertThat(count).isEqualTo(total);
@ -167,7 +198,7 @@ public abstract class BaseElasticsearchTemplateTest<T extends BaseEsEntity> {
long total = TEMPLATE.count(getIndex(), getType(), queryBuilder); long total = TEMPLATE.count(getIndex(), getType(), queryBuilder);
ScrollData<T> scrollData = ScrollData<T> scrollData =
TEMPLATE.pojoPageByLastId(getIndex(), getType(), null, SIZE, queryBuilder, getEntityClass()); TEMPLATE.pojoPageByScrollId(getIndex(), getType(), null, SIZE, queryBuilder, getEntityClass());
if (scrollData == null || scrollData.getScrollId() == null) { if (scrollData == null || scrollData.getScrollId() == null) {
return; return;
} }
@ -181,8 +212,8 @@ public abstract class BaseElasticsearchTemplateTest<T extends BaseEsEntity> {
String scrollId = scrollData.getScrollId(); String scrollId = scrollData.getScrollId();
while (CollectionUtil.isNotEmpty(scrollData.getContent())) { while (CollectionUtil.isNotEmpty(scrollData.getContent())) {
scrollData = scrollData = TEMPLATE.pojoPageByScrollId(getIndex(), getType(), scrollId, SIZE,
TEMPLATE.pojoPageByLastId(getIndex(), getType(), scrollId, SIZE, queryBuilder, getEntityClass()); queryBuilder, getEntityClass());
if (scrollData == null || CollectionUtil.isEmpty(scrollData.getContent())) { if (scrollData == null || CollectionUtil.isEmpty(scrollData.getContent())) {
break; break;
} }
@ -238,4 +269,20 @@ public abstract class BaseElasticsearchTemplateTest<T extends BaseEsEntity> {
Assertions.assertThat(count).isEqualTo(total); Assertions.assertThat(count).isEqualTo(total);
} }
final ActionListener<BulkResponse> DEFAULT_BULK_LISTENER = new ActionListener<BulkResponse>() {
@Override
public void onResponse(BulkResponse response) {
if (response != null && !response.hasFailures()) {
log.info("【ES】异步批量写数据成功index: {}, type: {}", getIndex(), getType());
} else {
log.warn("【ES】异步批量写数据失败index: {}, type: {}", getIndex(), getType());
}
}
@Override
public void onFailure(Exception e) {
log.error("【ES】异步批量写数据异常index: {}, type: {}", getIndex(), getType());
}
};
} }

View File

@ -83,6 +83,7 @@ public class UserElasticsearchTemplateTest extends BaseElasticsearchTemplateTest
public void indexTest() throws IOException { public void indexTest() throws IOException {
super.deleteIndex(); super.deleteIndex();
super.createIndex(); super.createIndex();
super.getIndexList();
} }
@Test @Test
@ -92,6 +93,13 @@ public class UserElasticsearchTemplateTest extends BaseElasticsearchTemplateTest
super.saveBatch(); super.saveBatch();
} }
@Test
@DisplayName("异步写数据测试")
public void asyncWriteTest() throws IOException, InterruptedException {
super.asyncSave();
super.asyncSaveBatch();
}
@Test @Test
@DisplayName("读数据测试") @DisplayName("读数据测试")
public void readTest() throws IOException { public void readTest() throws IOException {

View File

@ -21,7 +21,6 @@ import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import java.io.IOException;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -49,7 +48,7 @@ public class UserEsMapperTest extends BaseApplicationTests {
@Test @Test
@DisplayName("删除当天索引") @DisplayName("删除当天索引")
public void deleteIndex() throws IOException { public void deleteIndex() {
String index = mapper.getIndex(); String index = mapper.getIndex();
boolean indexExists = mapper.isIndexExists(); boolean indexExists = mapper.isIndexExists();
if (!indexExists) { if (!indexExists) {
@ -63,7 +62,7 @@ public class UserEsMapperTest extends BaseApplicationTests {
@Test @Test
@DisplayName("根据日期删除索引") @DisplayName("根据日期删除索引")
public void deleteIndexInDay() throws IOException { public void deleteIndexInDay() {
String index = mapper.getIndex(day); String index = mapper.getIndex(day);
boolean indexExists = mapper.isIndexExistsInDay(day); boolean indexExists = mapper.isIndexExistsInDay(day);
if (!indexExists) { if (!indexExists) {
@ -83,7 +82,7 @@ public class UserEsMapperTest extends BaseApplicationTests {
@Test @Test
@DisplayName("创建当天索引") @DisplayName("创建当天索引")
public void createIndex() throws IOException { public void createIndex() {
String index = mapper.getIndex(); String index = mapper.getIndex();
boolean indexExists = mapper.isIndexExists(); boolean indexExists = mapper.isIndexExists();
@ -99,7 +98,7 @@ public class UserEsMapperTest extends BaseApplicationTests {
@Test @Test
@DisplayName("根据日期创建索引") @DisplayName("根据日期创建索引")
public void createIndexInDay() throws IOException { public void createIndexInDay() {
String index = mapper.getIndex(day); String index = mapper.getIndex(day);
boolean indexExists = mapper.isIndexExistsInDay(day); boolean indexExists = mapper.isIndexExistsInDay(day);
@ -108,7 +107,7 @@ public class UserEsMapperTest extends BaseApplicationTests {
return; return;
} }
mapper.createIndexInDay(day); mapper.createIndexIfNotExistsInDay(day);
indexExists = mapper.isIndexExistsInDay(day); indexExists = mapper.isIndexExistsInDay(day);
Assertions.assertThat(indexExists).isTrue(); Assertions.assertThat(indexExists).isTrue();
} }
@ -121,7 +120,7 @@ public class UserEsMapperTest extends BaseApplicationTests {
@Test @Test
@DisplayName("保存当天数据") @DisplayName("保存当天数据")
public void save() throws IOException { public void save() {
String id = "1"; String id = "1";
User entity = getOneMockData(id); User entity = getOneMockData(id);
mapper.save(entity); mapper.save(entity);
@ -132,7 +131,7 @@ public class UserEsMapperTest extends BaseApplicationTests {
@Test @Test
@DisplayName("保存指定日期数据") @DisplayName("保存指定日期数据")
public void saveInDay() throws IOException { public void saveInDay() {
String id = "1"; String id = "1";
User entity = getOneMockData(id); User entity = getOneMockData(id);
mapper.saveInDay(day, entity); mapper.saveInDay(day, entity);
@ -143,7 +142,7 @@ public class UserEsMapperTest extends BaseApplicationTests {
@Test @Test
@DisplayName("批量保存当天数据") @DisplayName("批量保存当天数据")
public void batchSave() throws IOException, InterruptedException { public void batchSave() throws InterruptedException {
int total = 10000; int total = 10000;
List<List<User>> listGroup = CollectionUtil.split(getMockList(total), 1000); List<List<User>> listGroup = CollectionUtil.split(getMockList(total), 1000);
for (List<User> list : listGroup) { for (List<User> list : listGroup) {
@ -157,7 +156,7 @@ public class UserEsMapperTest extends BaseApplicationTests {
@Test @Test
@DisplayName("批量保存指定日期数据") @DisplayName("批量保存指定日期数据")
public void batchSaveInDay() throws IOException, InterruptedException { public void batchSaveInDay() throws InterruptedException {
int total = 10000; int total = 10000;
List<List<User>> listGroup = CollectionUtil.split(getMockList(total), 1000); List<List<User>> listGroup = CollectionUtil.split(getMockList(total), 1000);
for (List<User> list : listGroup) { for (List<User> list : listGroup) {
@ -177,7 +176,7 @@ public class UserEsMapperTest extends BaseApplicationTests {
@Test @Test
@DisplayName("根据ID查找当日数据") @DisplayName("根据ID查找当日数据")
public void pojoById() throws IOException { public void pojoById() {
String id = "1"; String id = "1";
User newEntity = mapper.pojoById(id); User newEntity = mapper.pojoById(id);
log.info("entity: {}", JsonUtil.toString(newEntity)); log.info("entity: {}", JsonUtil.toString(newEntity));
@ -186,7 +185,7 @@ public class UserEsMapperTest extends BaseApplicationTests {
@Test @Test
@DisplayName("根据ID查找指定日期数据") @DisplayName("根据ID查找指定日期数据")
public void pojoByIdInDay() throws IOException { public void pojoByIdInDay() {
String id = "1"; String id = "1";
User newEntity = mapper.pojoByIdInDay(day, id); User newEntity = mapper.pojoByIdInDay(day, id);
log.info("entity: {}", JsonUtil.toString(newEntity)); log.info("entity: {}", JsonUtil.toString(newEntity));
@ -195,7 +194,7 @@ public class UserEsMapperTest extends BaseApplicationTests {
@Test @Test
@DisplayName("获取匹配条件的记录数") @DisplayName("获取匹配条件的记录数")
public void count() throws IOException { public void count() {
BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery(); BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();
queryBuilder.must(QueryBuilders.rangeQuery("docId").lt("100")); queryBuilder.must(QueryBuilders.rangeQuery("docId").lt("100"));
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
@ -207,7 +206,7 @@ public class UserEsMapperTest extends BaseApplicationTests {
@Test @Test
@DisplayName("获取匹配条件的指定日期记录数") @DisplayName("获取匹配条件的指定日期记录数")
public void countInDay() throws IOException { public void countInDay() {
BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery(); BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();
queryBuilder.must(QueryBuilders.rangeQuery("docId").lt("100")); queryBuilder.must(QueryBuilders.rangeQuery("docId").lt("100"));
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
@ -219,7 +218,7 @@ public class UserEsMapperTest extends BaseApplicationTests {
@Test @Test
@DisplayName("获取匹配条件的记录") @DisplayName("获取匹配条件的记录")
public void query() throws IOException { public void query() {
BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery(); BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();
queryBuilder.must(QueryBuilders.rangeQuery("docId").lt("100")); queryBuilder.must(QueryBuilders.rangeQuery("docId").lt("100"));
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
@ -239,7 +238,7 @@ public class UserEsMapperTest extends BaseApplicationTests {
@Test @Test
@DisplayName("获取匹配条件的指定日期记录") @DisplayName("获取匹配条件的指定日期记录")
public void queryInDay() throws IOException { public void queryInDay() {
BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery(); BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();
queryBuilder.must(QueryBuilders.rangeQuery("docId").lt("100")); queryBuilder.must(QueryBuilders.rangeQuery("docId").lt("100"));
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
@ -259,7 +258,7 @@ public class UserEsMapperTest extends BaseApplicationTests {
@Test @Test
@DisplayName("from + size 分页查询当日数据") @DisplayName("from + size 分页查询当日数据")
public void pojoPage() throws IOException { public void pojoPage() {
BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery(); BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();
queryBuilder.must(QueryBuilders.rangeQuery("docId").lt("100")); queryBuilder.must(QueryBuilders.rangeQuery("docId").lt("100"));
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
@ -276,7 +275,7 @@ public class UserEsMapperTest extends BaseApplicationTests {
@Test @Test
@DisplayName("from + size 分页查询指定日期数据") @DisplayName("from + size 分页查询指定日期数据")
public void pojoPageInDay() throws IOException { public void pojoPageInDay() {
BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery(); BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();
queryBuilder.must(QueryBuilders.rangeQuery("docId").lt("100")); queryBuilder.must(QueryBuilders.rangeQuery("docId").lt("100"));
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
@ -293,7 +292,7 @@ public class UserEsMapperTest extends BaseApplicationTests {
@Test @Test
@DisplayName("search after 分页查询当日数据") @DisplayName("search after 分页查询当日数据")
protected void pojoPageByLastId() throws IOException { protected void pojoPageByLastId() {
BoolQueryBuilder queryBuilder = new BoolQueryBuilder(); BoolQueryBuilder queryBuilder = new BoolQueryBuilder();
queryBuilder.must(QueryBuilders.rangeQuery("docId").lt("100")); queryBuilder.must(QueryBuilders.rangeQuery("docId").lt("100"));
@ -332,7 +331,7 @@ public class UserEsMapperTest extends BaseApplicationTests {
@Test @Test
@DisplayName("search after 分页查询指定日期数据") @DisplayName("search after 分页查询指定日期数据")
protected void pojoPageByLastIdInDay() throws IOException { protected void pojoPageByLastIdInDay() {
BoolQueryBuilder queryBuilder = new BoolQueryBuilder(); BoolQueryBuilder queryBuilder = new BoolQueryBuilder();
queryBuilder.must(QueryBuilders.rangeQuery("docId").lt("100")); queryBuilder.must(QueryBuilders.rangeQuery("docId").lt("100"));
@ -371,7 +370,7 @@ public class UserEsMapperTest extends BaseApplicationTests {
@Test @Test
@DisplayName("滚动翻页当日数据") @DisplayName("滚动翻页当日数据")
public void pojoScroll() throws IOException { public void pojoScroll() {
final int size = 100; final int size = 100;
@ -412,7 +411,7 @@ public class UserEsMapperTest extends BaseApplicationTests {
@Test @Test
@DisplayName("滚动翻页指定日期数据") @DisplayName("滚动翻页指定日期数据")
public void pojoScrollInDay() throws IOException { public void pojoScrollInDay() {
final int size = 100; final int size = 100;