diff --git a/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/ElasticsearchTemplate.java b/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/ElasticsearchTemplate.java index 749956e..5e627cb 100644 --- a/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/ElasticsearchTemplate.java +++ b/codes/javadb/elasticsearch/elasticsearch6/src/main/java/io/github/dunwu/javadb/elasticsearch/ElasticsearchTemplate.java @@ -280,7 +280,7 @@ public class ElasticsearchTemplate implements Closeable { || response.getResult() == DocWriteResponse.Result.UPDATED) { return entity; } else { - log.warn("【ES】save 响应结果无效!result: {}", response.getResult()); + log.warn("【ES】save 失败,result: {}!", response.getResult()); return null; } } @@ -292,45 +292,25 @@ public class ElasticsearchTemplate implements Closeable { return true; } - BulkRequest bulkRequest = new BulkRequest(); - bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); - for (T entity : list) { - Map map = toMap(entity); - if (MapUtil.isEmpty(map)) { - continue; - } - IndexRequest request = new IndexRequest(index, type).source(map); - if (entity.getDocId() != null) { - request.id(entity.getDocId()); - } - bulkRequest.add(request); - } - + BulkRequest bulkRequest = toBulkIndexRequest(index, type, list); BulkResponse response = client.bulk(bulkRequest, RequestOptions.DEFAULT); - return response != null && !response.hasFailures(); + if (response == null) { + log.warn("【ES】saveBatch 失败,result 为空!list: {}", JsonUtil.toString(list)); + return false; + } + if (response.hasFailures()) { + log.warn("【ES】saveBatch 失败,result: {}!", response.buildFailureMessage()); + return false; + } + return true; } public void asyncSaveBatch(String index, String type, Collection list, ActionListener listener) { - if (CollectionUtil.isEmpty(list)) { return; } - - BulkRequest bulkRequest = new BulkRequest(); - bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); - for (T entity : list) { - Map map = toMap(entity); - if (MapUtil.isEmpty(map)) { - continue; - } - IndexRequest request = new IndexRequest(index, type).source(map); - if (entity.getDocId() != null) { - request.id(entity.getDocId()); - } - bulkRequest.add(request); - } - + BulkRequest bulkRequest = toBulkIndexRequest(index, type, list); client.bulkAsync(bulkRequest, RequestOptions.DEFAULT, listener); } @@ -362,7 +342,7 @@ public class ElasticsearchTemplate implements Closeable { if (response.getResult() == DocWriteResponse.Result.UPDATED) { return entity; } else { - log.warn("【ES】updateById 响应结果无效!result: {}", response.getResult()); + log.warn("【ES】updateById 响应结果无效,result: {}!", response.getResult()); return null; } } @@ -374,23 +354,49 @@ public class ElasticsearchTemplate implements Closeable { return true; } - BulkRequest bulkRequest = toUpdateBulkRequest(index, type, list); + BulkRequest bulkRequest = toBulkUpdateRequest(index, type, list); BulkResponse response = client.bulk(bulkRequest, RequestOptions.DEFAULT); - return response != null && !response.hasFailures(); + if (response == null) { + log.warn("【ES】updateBatchIds 失败,result 为空!list: {}", JsonUtil.toString(list)); + return false; + } + if (response.hasFailures()) { + log.warn("【ES】updateBatchIds 失败,result: {}!", response.buildFailureMessage()); + return false; + } + return true; } public void asyncUpdateBatchIds(String index, String type, Collection list, ActionListener listener) { - if (CollectionUtil.isEmpty(list)) { return; } - - BulkRequest bulkRequest = toUpdateBulkRequest(index, type, list); + BulkRequest bulkRequest = toBulkUpdateRequest(index, type, list); client.bulkAsync(bulkRequest, RequestOptions.DEFAULT, listener); } - private BulkRequest toUpdateBulkRequest(String index, String type, Collection list) { + private BulkRequest toBulkIndexRequest(String index, String type, Collection list) { + BulkRequest bulkRequest = new BulkRequest(); + bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + for (T entity : list) { + if (entity == null) { + continue; + } + Map map = toMap(entity); + if (MapUtil.isEmpty(map)) { + continue; + } + IndexRequest request = new IndexRequest(index, type).source(map); + if (entity.getDocId() != null) { + request.id(entity.getDocId()); + } + bulkRequest.add(request); + } + return bulkRequest; + } + + private BulkRequest toBulkUpdateRequest(String index, String type, Collection list) { BulkRequest bulkRequest = new BulkRequest(); bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); for (T entity : list) { @@ -426,11 +432,14 @@ public class ElasticsearchTemplate implements Closeable { BulkResponse response = client.bulk(bulkRequest, RequestOptions.DEFAULT); if (response == null) { - log.warn("【ES】batchDeleteById 响应结果为空!"); + log.warn("【ES】deleteBatchIds 失败,result 为空!ids: {}", JsonUtil.toString(ids)); return false; } - - return !response.hasFailures(); + if (response.hasFailures()) { + log.warn("【ES】deleteBatchIds 失败,result: {}!", response.buildFailureMessage()); + return false; + } + return true; } public void asyncDeleteBatchIds(String index, String type, Collection ids, @@ -568,7 +577,8 @@ public class ElasticsearchTemplate implements Closeable { /** * search after 分页 */ - public ScrollData pojoPageByScrollId(String index, String type, String scrollId, int size, + public ScrollData pojoPageByScrollId(String index, String type, String scrollId, + int size, QueryBuilder queryBuilder, Class clazz) throws IOException { SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); diff --git a/codes/javadb/elasticsearch/elasticsearch6/src/test/java/io/github/dunwu/javadb/elasticsearch/BaseElasticsearchTemplateTest.java b/codes/javadb/elasticsearch/elasticsearch6/src/test/java/io/github/dunwu/javadb/elasticsearch/BaseElasticsearchTemplateTest.java index 6727795..4ec530a 100644 --- a/codes/javadb/elasticsearch/elasticsearch6/src/test/java/io/github/dunwu/javadb/elasticsearch/BaseElasticsearchTemplateTest.java +++ b/codes/javadb/elasticsearch/elasticsearch6/src/test/java/io/github/dunwu/javadb/elasticsearch/BaseElasticsearchTemplateTest.java @@ -8,6 +8,7 @@ import io.github.dunwu.javadb.elasticsearch.entity.common.ScrollData; import io.github.dunwu.javadb.elasticsearch.util.JsonUtil; import lombok.extern.slf4j.Slf4j; import org.assertj.core.api.Assertions; +import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.get.GetResponse; @@ -59,12 +60,18 @@ public abstract class BaseElasticsearchTemplateTest { protected abstract List getMockList(int num); protected void deleteIndex() throws IOException { - boolean exists = TEMPLATE.isIndexExists(getIndex()); - if (!exists) { - return; + try { + Set set = TEMPLATE.getIndexSet(getAlias()); + if (CollectionUtil.isNotEmpty(set)) { + for (String index : set) { + log.info("删除 alias: {}, index: {}", getAlias(), index); + TEMPLATE.deleteIndex(index); + } + } + } catch (IOException | ElasticsearchException e) { + log.error("删除索引失败!", e); } - TEMPLATE.deleteIndex(getIndex()); - exists = TEMPLATE.isIndexExists(getIndex()); + boolean exists = TEMPLATE.isIndexExists(getIndex()); Assertions.assertThat(exists).isFalse(); } @@ -98,7 +105,7 @@ public abstract class BaseElasticsearchTemplateTest { int total = 5000; List> listGroup = CollectionUtil.split(getMockList(total), 1000); for (List list : listGroup) { - TEMPLATE.saveBatch(getIndex(), getType(), list); + Assertions.assertThat(TEMPLATE.saveBatch(getIndex(), getType(), list)).isTrue(); } long count = TEMPLATE.count(getIndex(), getType(), new SearchSourceBuilder()); log.info("批量更新记录数: {}", count); diff --git a/codes/javadb/redis/pom.xml b/codes/javadb/redis/pom.xml index e847e9c..3150ffb 100644 --- a/codes/javadb/redis/pom.xml +++ b/codes/javadb/redis/pom.xml @@ -6,7 +6,7 @@ org.springframework.boot spring-boot-starter-parent - 2.6.3 + 2.7.18 io.github.dunwu @@ -36,7 +36,7 @@ cn.hutool hutool-all - 5.5.9 + 5.8.27 org.projectlombok @@ -51,27 +51,11 @@ org.redisson redisson - 3.16.8 + 3.29.0 - - - junit - junit - test - - - - - org.redisson - redisson - ${redisson.version} - - - - diff --git a/codes/javadb/redis/src/main/java/io/github/dunwu/javadb/redis/springboot/RedisAutoConfiguration.java b/codes/javadb/redis/src/main/java/io/github/dunwu/javadb/redis/springboot/RedisAutoConfiguration.java index 6ea387b..eb7c40e 100644 --- a/codes/javadb/redis/src/main/java/io/github/dunwu/javadb/redis/springboot/RedisAutoConfiguration.java +++ b/codes/javadb/redis/src/main/java/io/github/dunwu/javadb/redis/springboot/RedisAutoConfiguration.java @@ -1,14 +1,24 @@ package io.github.dunwu.javadb.redis.springboot; +import cn.hutool.core.util.StrUtil; import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.PropertyAccessor; import com.fasterxml.jackson.databind.ObjectMapper; +import org.redisson.Redisson; +import org.redisson.api.RedissonClient; +import org.redisson.config.Config; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; import org.springframework.data.redis.connection.RedisConnectionFactory; -import org.springframework.data.redis.core.*; +import org.springframework.data.redis.core.HashOperations; +import org.springframework.data.redis.core.ListOperations; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.data.redis.core.SetOperations; +import org.springframework.data.redis.core.ValueOperations; +import org.springframework.data.redis.core.ZSetOperations; import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer; import org.springframework.data.redis.serializer.StringRedisSerializer; @@ -22,6 +32,19 @@ public class RedisAutoConfiguration { @Autowired private ObjectMapper objectMapper; + @Value("${spring.redis.host:localhost}") + private String host; + + @Value("${spring.redis.port:6379}") + private String port; + + @Bean + public RedissonClient redissonClient() { + Config config = new Config(); + config.useSingleServer().setAddress(StrUtil.format("redis://{}:{}", host, port)); + return Redisson.create(config); + } + @Bean public HashOperations hashOperations(RedisTemplate redisTemplate) { return redisTemplate.opsForHash(); @@ -44,7 +67,6 @@ public class RedisAutoConfiguration { // 使用Jackson2JsonRedisSerializer来序列化和反序列化redis的value值(默认使用JDK的序列化方式) Jackson2JsonRedisSerializer serializer = new Jackson2JsonRedisSerializer(Object.class); serializer.setObjectMapper(objectMapper); - RedisTemplate template = new RedisTemplate<>(); // 配置连接工厂 template.setConnectionFactory(factory); diff --git a/codes/javadb/redis/src/test/java/io/github/dunwu/javadb/redis/RedissonStandaloneTest.java b/codes/javadb/redis/src/test/java/io/github/dunwu/javadb/redis/RedissonStandaloneTest.java index f483c9a..ed3c13a 100644 --- a/codes/javadb/redis/src/test/java/io/github/dunwu/javadb/redis/RedissonStandaloneTest.java +++ b/codes/javadb/redis/src/test/java/io/github/dunwu/javadb/redis/RedissonStandaloneTest.java @@ -1,26 +1,104 @@ package io.github.dunwu.javadb.redis; +import cn.hutool.core.thread.ThreadUtil; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; import org.redisson.api.RBucket; +import org.redisson.api.RLock; import org.redisson.api.RedissonClient; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; + /** * @author Zhang Peng * @since 2018/6/19 */ +@Slf4j public class RedissonStandaloneTest { - @Test - public void testRedissonConnect() { + private static RedissonClient redissonClient; + + static { ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:redisson-standalone.xml"); - RedissonClient redisson = (RedissonClient) applicationContext.getBean("standalone"); + redissonClient = (RedissonClient) applicationContext.getBean("standalone"); + } + + @Test + @DisplayName("测试连接") + public void testRedissonConnect() { // 首先获取redis中的key-value对象,key不存在没关系 - RBucket keyObject = redisson.getBucket("key"); + RBucket keyObject = redissonClient.getBucket("key"); // 如果key存在,就设置key的值为新值value // 如果key不存在,就设置key的值为value keyObject.set("value"); + String value = keyObject.get(); + System.out.println("value=" + value); + } + + @Test + @DisplayName("分布式锁测试") + public void testLock() { + // 两个线程任务都是不断再尝试获取或,直到成功获取锁后才推出任务 + // 第一个线程获取到锁后,第二个线程需要等待 5 秒超时后才能获取到锁 + CountDownLatch latch = new CountDownLatch(2); + ExecutorService executorService = ThreadUtil.newFixedExecutor(2, "获取锁", true); + executorService.submit(new Task(latch)); + executorService.submit(new Task(latch)); + + try { + latch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + // 输出: + // 17:59:25.896 [获取锁1] [INFO ] i.g.d.j.redis.RedissonStandaloneTest.run - + // 获取分布式锁成功 + // 17:59:26.888 [获取锁0] [WARN ] i.g.d.j.redis.RedissonStandaloneTest.run - + // 获取分布式锁失败 + // 17:59:27.889 [获取锁0] [WARN ] i.g.d.j.redis.RedissonStandaloneTest.run - + // 获取分布式锁失败 + // 17:59:28.891 [获取锁0] [WARN ] i.g.d.j.redis.RedissonStandaloneTest.run - + // 获取分布式锁失败 + // 17:59:29.892 [获取锁0] [WARN ] i.g.d.j.redis.RedissonStandaloneTest.run - + // 获取分布式锁失败 + // 17:59:30.895 [获取锁0] [WARN ] i.g.d.j.redis.RedissonStandaloneTest.run - + // 获取分布式锁失败 + // 17:59:30.896 [获取锁0] [INFO ] i.g.d.j.redis.RedissonStandaloneTest.run - + // 获取分布式锁成功 + + static class Task implements Runnable { + + private CountDownLatch latch; + + public Task(CountDownLatch latch) { + this.latch = latch; + } + + @Override + public void run() { + while (true) { + RLock lock = redissonClient.getLock("test_lock"); + try { + boolean isLock = lock.tryLock(1, 5, TimeUnit.SECONDS); + if (isLock) { + log.info("获取分布式锁成功"); + break; + } else { + log.warn("获取分布式锁失败"); + } + } catch (Exception e) { + log.error("获取分布式锁异常", e); + } + } + latch.countDown(); + } + } } diff --git a/codes/javadb/redis/src/test/java/io/github/dunwu/javadb/redis/jedis/JedisPoolDemoTest.java b/codes/javadb/redis/src/test/java/io/github/dunwu/javadb/redis/jedis/JedisPoolDemoTest.java index 9b693ba..e52d65b 100644 --- a/codes/javadb/redis/src/test/java/io/github/dunwu/javadb/redis/jedis/JedisPoolDemoTest.java +++ b/codes/javadb/redis/src/test/java/io/github/dunwu/javadb/redis/jedis/JedisPoolDemoTest.java @@ -1,13 +1,11 @@ package io.github.dunwu.javadb.redis.jedis; -import org.junit.Test; -import org.junit.runner.RunWith; +import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.test.context.ActiveProfiles; import org.springframework.test.context.ContextConfiguration; -import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; @@ -18,7 +16,6 @@ import java.util.Set; /** * @author Zhang Peng */ -@RunWith(SpringJUnit4ClassRunner.class) @ActiveProfiles("dev") @ContextConfiguration(locations = {"classpath:/applicationContext.xml"}) public class JedisPoolDemoTest { diff --git a/codes/javadb/redis/src/test/resources/redisson-standalone.xml b/codes/javadb/redis/src/test/resources/redisson-standalone.xml index 462abd6..4760737 100644 --- a/codes/javadb/redis/src/test/resources/redisson-standalone.xml +++ b/codes/javadb/redis/src/test/resources/redisson-standalone.xml @@ -14,8 +14,6 @@ idle-connection-timeout="10000" connect-timeout="10000" timeout="3000" - ping-timeout="30000" - reconnection-timeout="30000" database="0"/>