feat: 示例更新

master
dunwu 2024-10-09 07:16:02 +08:00
parent bd6b17cda7
commit 47d7d1552f
7 changed files with 176 additions and 80 deletions

View File

@ -280,7 +280,7 @@ public class ElasticsearchTemplate implements Closeable {
|| response.getResult() == DocWriteResponse.Result.UPDATED) {
return entity;
} else {
log.warn("【ES】save 响应结果无效result: {}", response.getResult());
log.warn("【ES】save 失败result: {}", response.getResult());
return null;
}
}
@ -292,45 +292,25 @@ public class ElasticsearchTemplate implements Closeable {
return true;
}
BulkRequest bulkRequest = new BulkRequest();
bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
for (T entity : list) {
Map<String, Object> map = toMap(entity);
if (MapUtil.isEmpty(map)) {
continue;
}
IndexRequest request = new IndexRequest(index, type).source(map);
if (entity.getDocId() != null) {
request.id(entity.getDocId());
}
bulkRequest.add(request);
}
BulkRequest bulkRequest = toBulkIndexRequest(index, type, list);
BulkResponse response = client.bulk(bulkRequest, RequestOptions.DEFAULT);
return response != null && !response.hasFailures();
if (response == null) {
log.warn("【ES】saveBatch 失败result 为空list: {}", JsonUtil.toString(list));
return false;
}
if (response.hasFailures()) {
log.warn("【ES】saveBatch 失败result: {}", response.buildFailureMessage());
return false;
}
return true;
}
public <T extends BaseEsEntity> void asyncSaveBatch(String index, String type, Collection<T> list,
ActionListener<BulkResponse> listener) {
if (CollectionUtil.isEmpty(list)) {
return;
}
BulkRequest bulkRequest = new BulkRequest();
bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
for (T entity : list) {
Map<String, Object> map = toMap(entity);
if (MapUtil.isEmpty(map)) {
continue;
}
IndexRequest request = new IndexRequest(index, type).source(map);
if (entity.getDocId() != null) {
request.id(entity.getDocId());
}
bulkRequest.add(request);
}
BulkRequest bulkRequest = toBulkIndexRequest(index, type, list);
client.bulkAsync(bulkRequest, RequestOptions.DEFAULT, listener);
}
@ -362,7 +342,7 @@ public class ElasticsearchTemplate implements Closeable {
if (response.getResult() == DocWriteResponse.Result.UPDATED) {
return entity;
} else {
log.warn("【ES】updateById 响应结果无效result: {}", response.getResult());
log.warn("【ES】updateById 响应结果无效result: {}", response.getResult());
return null;
}
}
@ -374,23 +354,49 @@ public class ElasticsearchTemplate implements Closeable {
return true;
}
BulkRequest bulkRequest = toUpdateBulkRequest(index, type, list);
BulkRequest bulkRequest = toBulkUpdateRequest(index, type, list);
BulkResponse response = client.bulk(bulkRequest, RequestOptions.DEFAULT);
return response != null && !response.hasFailures();
if (response == null) {
log.warn("【ES】updateBatchIds 失败result 为空list: {}", JsonUtil.toString(list));
return false;
}
if (response.hasFailures()) {
log.warn("【ES】updateBatchIds 失败result: {}", response.buildFailureMessage());
return false;
}
return true;
}
public <T extends BaseEsEntity> void asyncUpdateBatchIds(String index, String type, Collection<T> list,
ActionListener<BulkResponse> listener) {
if (CollectionUtil.isEmpty(list)) {
return;
}
BulkRequest bulkRequest = toUpdateBulkRequest(index, type, list);
BulkRequest bulkRequest = toBulkUpdateRequest(index, type, list);
client.bulkAsync(bulkRequest, RequestOptions.DEFAULT, listener);
}
private <T extends BaseEsEntity> BulkRequest toUpdateBulkRequest(String index, String type, Collection<T> list) {
private <T extends BaseEsEntity> BulkRequest toBulkIndexRequest(String index, String type, Collection<T> list) {
BulkRequest bulkRequest = new BulkRequest();
bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
for (T entity : list) {
if (entity == null) {
continue;
}
Map<String, Object> map = toMap(entity);
if (MapUtil.isEmpty(map)) {
continue;
}
IndexRequest request = new IndexRequest(index, type).source(map);
if (entity.getDocId() != null) {
request.id(entity.getDocId());
}
bulkRequest.add(request);
}
return bulkRequest;
}
private <T extends BaseEsEntity> BulkRequest toBulkUpdateRequest(String index, String type, Collection<T> list) {
BulkRequest bulkRequest = new BulkRequest();
bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
for (T entity : list) {
@ -426,11 +432,14 @@ public class ElasticsearchTemplate implements Closeable {
BulkResponse response = client.bulk(bulkRequest, RequestOptions.DEFAULT);
if (response == null) {
log.warn("【ES】batchDeleteById 响应结果为空!");
log.warn("【ES】deleteBatchIds 失败result 为空ids: {}", JsonUtil.toString(ids));
return false;
}
return !response.hasFailures();
if (response.hasFailures()) {
log.warn("【ES】deleteBatchIds 失败result: {}", response.buildFailureMessage());
return false;
}
return true;
}
public void asyncDeleteBatchIds(String index, String type, Collection<String> ids,
@ -568,7 +577,8 @@ public class ElasticsearchTemplate implements Closeable {
/**
* search after
*/
public <T extends BaseEsEntity> ScrollData<T> pojoPageByScrollId(String index, String type, String scrollId, int size,
public <T extends BaseEsEntity> ScrollData<T> pojoPageByScrollId(String index, String type, String scrollId,
int size,
QueryBuilder queryBuilder, Class<T> clazz) throws IOException {
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();

View File

@ -8,6 +8,7 @@ import io.github.dunwu.javadb.elasticsearch.entity.common.ScrollData;
import io.github.dunwu.javadb.elasticsearch.util.JsonUtil;
import lombok.extern.slf4j.Slf4j;
import org.assertj.core.api.Assertions;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.get.GetResponse;
@ -59,12 +60,18 @@ public abstract class BaseElasticsearchTemplateTest<T extends BaseEsEntity> {
protected abstract List<T> getMockList(int num);
protected void deleteIndex() throws IOException {
boolean exists = TEMPLATE.isIndexExists(getIndex());
if (!exists) {
return;
try {
Set<String> set = TEMPLATE.getIndexSet(getAlias());
if (CollectionUtil.isNotEmpty(set)) {
for (String index : set) {
log.info("删除 alias: {}, index: {}", getAlias(), index);
TEMPLATE.deleteIndex(index);
}
}
} catch (IOException | ElasticsearchException e) {
log.error("删除索引失败!", e);
}
TEMPLATE.deleteIndex(getIndex());
exists = TEMPLATE.isIndexExists(getIndex());
boolean exists = TEMPLATE.isIndexExists(getIndex());
Assertions.assertThat(exists).isFalse();
}
@ -98,7 +105,7 @@ public abstract class BaseElasticsearchTemplateTest<T extends BaseEsEntity> {
int total = 5000;
List<List<T>> listGroup = CollectionUtil.split(getMockList(total), 1000);
for (List<T> list : listGroup) {
TEMPLATE.saveBatch(getIndex(), getType(), list);
Assertions.assertThat(TEMPLATE.saveBatch(getIndex(), getType(), list)).isTrue();
}
long count = TEMPLATE.count(getIndex(), getType(), new SearchSourceBuilder());
log.info("批量更新记录数: {}", count);

View File

@ -6,7 +6,7 @@
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.6.3</version>
<version>2.7.18</version>
</parent>
<groupId>io.github.dunwu</groupId>
@ -36,7 +36,7 @@
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.5.9</version>
<version>5.8.27</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
@ -51,27 +51,11 @@
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.16.8</version>
<version>3.29.0</version>
</dependency>
<!-- database end -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>${redisson.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>

View File

@ -1,14 +1,24 @@
package io.github.dunwu.javadb.redis.springboot;
import cn.hutool.core.util.StrUtil;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.*;
import org.springframework.data.redis.core.HashOperations;
import org.springframework.data.redis.core.ListOperations;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.SetOperations;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.data.redis.core.ZSetOperations;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
@ -22,6 +32,19 @@ public class RedisAutoConfiguration {
@Autowired
private ObjectMapper objectMapper;
@Value("${spring.redis.host:localhost}")
private String host;
@Value("${spring.redis.port:6379}")
private String port;
@Bean
public RedissonClient redissonClient() {
Config config = new Config();
config.useSingleServer().setAddress(StrUtil.format("redis://{}:{}", host, port));
return Redisson.create(config);
}
@Bean
public HashOperations<String, String, Object> hashOperations(RedisTemplate<String, Object> redisTemplate) {
return redisTemplate.opsForHash();
@ -44,7 +67,6 @@ public class RedisAutoConfiguration {
// 使用Jackson2JsonRedisSerializer来序列化和反序列化redis的value值默认使用JDK的序列化方式
Jackson2JsonRedisSerializer serializer = new Jackson2JsonRedisSerializer(Object.class);
serializer.setObjectMapper(objectMapper);
RedisTemplate<String, Object> template = new RedisTemplate<>();
// 配置连接工厂
template.setConnectionFactory(factory);

View File

@ -1,26 +1,104 @@
package io.github.dunwu.javadb.redis;
import cn.hutool.core.thread.ThreadUtil;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.redisson.api.RBucket;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
/**
* @author <a href="mailto:forbreak@163.com">Zhang Peng</a>
* @since 2018/6/19
*/
@Slf4j
public class RedissonStandaloneTest {
@Test
public void testRedissonConnect() {
private static RedissonClient redissonClient;
static {
ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:redisson-standalone.xml");
RedissonClient redisson = (RedissonClient) applicationContext.getBean("standalone");
redissonClient = (RedissonClient) applicationContext.getBean("standalone");
}
@Test
@DisplayName("测试连接")
public void testRedissonConnect() {
// 首先获取redis中的key-value对象key不存在没关系
RBucket<String> keyObject = redisson.getBucket("key");
RBucket<String> keyObject = redissonClient.getBucket("key");
// 如果key存在就设置key的值为新值value
// 如果key不存在就设置key的值为value
keyObject.set("value");
String value = keyObject.get();
System.out.println("value=" + value);
}
@Test
@DisplayName("分布式锁测试")
public void testLock() {
// 两个线程任务都是不断再尝试获取或,直到成功获取锁后才推出任务
// 第一个线程获取到锁后,第二个线程需要等待 5 秒超时后才能获取到锁
CountDownLatch latch = new CountDownLatch(2);
ExecutorService executorService = ThreadUtil.newFixedExecutor(2, "获取锁", true);
executorService.submit(new Task(latch));
executorService.submit(new Task(latch));
try {
latch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
// 输出:
// 17:59:25.896 [获取锁1] [INFO ] i.g.d.j.redis.RedissonStandaloneTest.run -
// 获取分布式锁成功
// 17:59:26.888 [获取锁0] [WARN ] i.g.d.j.redis.RedissonStandaloneTest.run -
// 获取分布式锁失败
// 17:59:27.889 [获取锁0] [WARN ] i.g.d.j.redis.RedissonStandaloneTest.run -
// 获取分布式锁失败
// 17:59:28.891 [获取锁0] [WARN ] i.g.d.j.redis.RedissonStandaloneTest.run -
// 获取分布式锁失败
// 17:59:29.892 [获取锁0] [WARN ] i.g.d.j.redis.RedissonStandaloneTest.run -
// 获取分布式锁失败
// 17:59:30.895 [获取锁0] [WARN ] i.g.d.j.redis.RedissonStandaloneTest.run -
// 获取分布式锁失败
// 17:59:30.896 [获取锁0] [INFO ] i.g.d.j.redis.RedissonStandaloneTest.run -
// 获取分布式锁成功
static class Task implements Runnable {
private CountDownLatch latch;
public Task(CountDownLatch latch) {
this.latch = latch;
}
@Override
public void run() {
while (true) {
RLock lock = redissonClient.getLock("test_lock");
try {
boolean isLock = lock.tryLock(1, 5, TimeUnit.SECONDS);
if (isLock) {
log.info("获取分布式锁成功");
break;
} else {
log.warn("获取分布式锁失败");
}
} catch (Exception e) {
log.error("获取分布式锁异常", e);
}
}
latch.countDown();
}
}
}

View File

@ -1,13 +1,11 @@
package io.github.dunwu.javadb.redis.jedis;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
@ -18,7 +16,6 @@ import java.util.Set;
/**
* @author <a href="mailto:forbreak@163.com">Zhang Peng</a>
*/
@RunWith(SpringJUnit4ClassRunner.class)
@ActiveProfiles("dev")
@ContextConfiguration(locations = {"classpath:/applicationContext.xml"})
public class JedisPoolDemoTest {

View File

@ -14,8 +14,6 @@
idle-connection-timeout="10000"
connect-timeout="10000"
timeout="3000"
ping-timeout="30000"
reconnection-timeout="30000"
database="0"/>
</redisson:client>
</beans>