🔖 redis-in-action

pull/1/head
Zhang Peng 2018-06-27 17:32:03 +08:00
parent 2a323f6137
commit ab6531833b
1 changed files with 217 additions and 160 deletions

View File

@ -1,25 +1,45 @@
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import java.io.BufferedInputStream;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.FileWriter;
import java.io.FilenameFilter;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Transaction;
import redis.clients.jedis.Tuple;
import redis.clients.jedis.ZParams;
import java.io.*;
import java.util.*;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
public class Chapter06 {
public static final void main(String[] args)
throws Exception
{
throws Exception {
new Chapter06().run();
}
public void run()
throws InterruptedException, IOException
{
throws InterruptedException, IOException {
Jedis conn = new Jedis("localhost");
conn.select(15);
@ -37,12 +57,12 @@ public class Chapter06 {
conn.del("recent:user");
System.out.println("Let's add a few contacts...");
for (int i = 0; i < 10; i++){
addUpdateContact(conn, "user", "contact-" + ((int)Math.floor(i / 3)) + '-' + i);
for (int i = 0; i < 10; i++) {
addUpdateContact(conn, "user", "contact-" + ((int) Math.floor(i / 3)) + '-' + i);
}
System.out.println("Current recently contacted contacts");
List<String> contacts = conn.lrange("recent:user", 0, -1);
for(String contact : contacts){
for (String contact : contacts) {
System.out.println(" " + contact);
}
assert contacts.size() >= 10;
@ -52,7 +72,7 @@ public class Chapter06 {
addUpdateContact(conn, "user", "contact-1-4");
contacts = conn.lrange("recent:user", 0, 2);
System.out.println("New top-3 contacts:");
for(String contact : contacts){
for (String contact : contacts) {
System.out.println(" " + contact);
}
assert "contact-1-4".equals(contacts.get(0));
@ -62,7 +82,7 @@ public class Chapter06 {
removeContact(conn, "user", "contact-2-6");
contacts = conn.lrange("recent:user", 0, -1);
System.out.println("New contacts:");
for(String contact : contacts){
for (String contact : contacts) {
System.out.println(" " + contact);
}
assert contacts.size() >= 9;
@ -73,8 +93,8 @@ public class Chapter06 {
contacts = fetchAutocompleteList(conn, "user", "c");
assert all.equals(contacts);
List<String> equiv = new ArrayList<String>();
for (String contact : all){
if (contact.startsWith("contact-2-")){
for (String contact : all) {
if (contact.startsWith("contact-2-")) {
equiv.add(contact);
}
}
@ -93,7 +113,7 @@ public class Chapter06 {
System.out.println();
System.out.println("Let's add a few people to the guild");
for (String name : new String[]{"jeff", "jenny", "jack", "jennifer"}){
for (String name : new String[]{"jeff", "jenny", "jack", "jennifer"}) {
joinGuild(conn, "test", name);
}
System.out.println();
@ -111,8 +131,7 @@ public class Chapter06 {
}
public void testDistributedLocking(Jedis conn)
throws InterruptedException
{
throws InterruptedException {
System.out.println("\n----- testDistributedLocking -----");
conn.del("lock:testlock");
System.out.println("Getting an initial lock...");
@ -141,8 +160,7 @@ public class Chapter06 {
}
public void testCountingSemaphore(Jedis conn)
throws InterruptedException
{
throws InterruptedException {
System.out.println("\n----- testCountingSemaphore -----");
conn.del("testsem", "testsem:owner", "testsem:counter");
System.out.println("Getting 3 initial semaphores with a limit of 3...");
@ -174,12 +192,11 @@ public class Chapter06 {
}
public void testDelayedTasks(Jedis conn)
throws InterruptedException
{
throws InterruptedException {
System.out.println("\n----- testDelayedTasks -----");
conn.del("queue:tqueue", "delayed:");
System.out.println("Let's start some regular and delayed tasks...");
for (long delay : new long[]{0, 500, 0, 1500}){
for (long delay : new long[]{0, 500, 0, 1500}) {
assert executeLater(conn, "tqueue", "testfn", new ArrayList<String>(), delay) != null;
}
long r = conn.llen("queue:tqueue");
@ -211,7 +228,7 @@ public class Chapter06 {
recipients.add("jenny");
String chatId = createChat(conn, "joe", recipients, "message 1");
System.out.println("Now let's send a few messages...");
for (int i = 2; i < 5; i++){
for (int i = 2; i < 5; i++) {
sendMessage(conn, chatId, "joe", "message " + i);
}
System.out.println();
@ -222,10 +239,10 @@ public class Chapter06 {
System.out.println("They are the same? " + r1.equals(r2));
assert r1.equals(r2);
System.out.println("Those messages are:");
for(ChatMessages chat : r1){
for (ChatMessages chat : r1) {
System.out.println(" chatId: " + chat.chatId);
System.out.println(" messages:");
for(Map<String,Object> message : chat.messages){
for (Map<String, Object> message : chat.messages) {
System.out.println(" " + message);
}
}
@ -234,11 +251,10 @@ public class Chapter06 {
}
public void testFileDistribution(Jedis conn)
throws InterruptedException, IOException
{
throws InterruptedException, IOException {
System.out.println("\n----- testFileDistribution -----");
String[] keys = conn.keys("test:*").toArray(new String[0]);
if (keys.length > 0){
if (keys.length > 0) {
conn.del(keys);
}
conn.del(
@ -258,7 +274,7 @@ public class Chapter06 {
File f2 = File.createTempFile("temp_redis_2_", ".txt");
f2.deleteOnExit();
writer = new FileWriter(f2);
for (int i = 0; i < 100; i++){
for (int i = 0; i < 100; i++) {
writer.write("many lines " + i + '\n');
}
writer.close();
@ -269,7 +285,7 @@ public class Chapter06 {
new GZIPOutputStream(
new FileOutputStream(f3)));
Random random = new Random();
for (int i = 0; i < 1000; i++){
for (int i = 0; i < 1000; i++) {
writer.write("random line " + Long.toHexString(random.nextLong()) + '\n');
}
writer.close();
@ -301,7 +317,7 @@ public class Chapter06 {
System.out.println("Done cleaning out Redis!");
keys = conn.keys("test:*").toArray(new String[0]);
if (keys.length > 0){
if (keys.length > 0) {
conn.del(keys);
}
conn.del(
@ -313,29 +329,36 @@ public class Chapter06 {
}
public class TestCallback
implements Callback
{
implements Callback {
private int index;
public List<Integer> counts = new ArrayList<Integer>();
public void callback(String line){
if (line == null){
public void callback(String line) {
if (line == null) {
index++;
return;
}
while (counts.size() == index){
while (counts.size() == index) {
counts.add(0);
}
counts.set(index, counts.get(index) + 1);
}
}
/**
* 6-1
*/
public void addUpdateContact(Jedis conn, String user, String contact) {
String acList = "recent:" + user;
// 准备执行事务
Transaction trans = conn.multi();
// 如果联系人已经存在,那么移除他。
trans.lrem(acList, 0, contact);
// 将联系人推入到列表的最前端。
trans.lpush(acList, contact);
// 只保留列表里面的前100个联系人。
trans.ltrim(acList, 0, 99);
// 实际地执行以上操作。
trans.exec();
}
@ -343,23 +366,38 @@ public class Chapter06 {
conn.lrem("recent:" + user, 0, contact);
}
/**
* 6-2
*/
public List<String> fetchAutocompleteList(Jedis conn, String user, String prefix) {
// 获取自动补完列表。
List<String> candidates = conn.lrange("recent:" + user, 0, -1);
List<String> matches = new ArrayList<String>();
// 检查每个候选联系人。
for (String candidate : candidates) {
if (candidate.toLowerCase().startsWith(prefix)){
if (candidate.toLowerCase().startsWith(prefix)) {
// 发现一个匹配的联系人。
matches.add(candidate);
}
}
// 返回所有匹配的联系人。
return matches;
}
// 准备一个由已知字符组成的列表。
private static final String VALID_CHARACTERS = "`abcdefghijklmnopqrstuvwxyz{";
/**
* 6-3
*/
public String[] findPrefixRange(String prefix) {
// 在字符列表中查找前缀字符所处的位置。
int posn = VALID_CHARACTERS.indexOf(prefix.charAt(prefix.length() - 1));
// 找到前驱字符。
char suffix = VALID_CHARACTERS.charAt(posn > 0 ? posn - 1 : 0);
String start = prefix.substring(0, prefix.length() - 1) + suffix + '{';
String end = prefix + '{';
// 返回范围。
return new String[]{start, end};
}
@ -371,8 +409,12 @@ public class Chapter06 {
conn.zrem("members:" + guild, user);
}
/**
* 6-4
*/
@SuppressWarnings("unchecked")
public Set<String> autocompleteOnPrefix(Jedis conn, String guild, String prefix) {
// 根据给定的前缀计算出查找范围的起点和终点。
String[] range = findPrefixRange(prefix);
String start = range[0];
String end = range[1];
@ -381,29 +423,36 @@ public class Chapter06 {
end += identifier;
String zsetName = "members:" + guild;
// 将范围的起始元素和结束元素添加到有序集合里面。
conn.zadd(zsetName, 0, start);
conn.zadd(zsetName, 0, end);
Set<String> items = null;
while (true){
while (true) {
conn.watch(zsetName);
// 找到两个被插入元素在有序集合中的排名。
int sindex = conn.zrank(zsetName, start).intValue();
int eindex = conn.zrank(zsetName, end).intValue();
int erange = Math.min(sindex + 9, eindex - 2);
Transaction trans = conn.multi();
// 获取范围内的值,然后删除之前插入的起始元素和结束元素。
trans.zrem(zsetName, start);
trans.zrem(zsetName, end);
trans.zrange(zsetName, sindex, erange);
List<Object> results = trans.exec();
if (results != null){
items = (Set<String>)results.get(results.size() - 1);
// 如果自动补完有序集合已经被其他客户端修改过了,那么进行重试。
if (results != null) {
items = (Set<String>) results.get(results.size() - 1);
break;
}
}
for (Iterator<String> iterator = items.iterator(); iterator.hasNext(); ){
if (iterator.next().indexOf('{') != -1){
// 如果有其他自动补完操作正在执行,
// 那么从获取到的元素里面移除起始元素和终结元素。
for (Iterator<String> iterator = items.iterator(); iterator.hasNext(); ) {
if (iterator.next().indexOf('{') != -1) {
iterator.remove();
}
}
@ -413,18 +462,24 @@ public class Chapter06 {
public String acquireLock(Jedis conn, String lockName) {
return acquireLock(conn, lockName, 10000);
}
public String acquireLock(Jedis conn, String lockName, long acquireTimeout){
/**
* 6-8
*/
public String acquireLock(Jedis conn, String lockName, long acquireTimeout) {
// 128位随机标识符。
String identifier = UUID.randomUUID().toString();
long end = System.currentTimeMillis() + acquireTimeout;
while (System.currentTimeMillis() < end){
if (conn.setnx("lock:" + lockName, identifier) == 1){
while (System.currentTimeMillis() < end) {
// 尝试取得锁。
if (conn.setnx("lock:" + lockName, identifier) == 1) {
return identifier;
}
try {
Thread.sleep(1);
}catch(InterruptedException ie){
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
}
@ -433,25 +488,28 @@ public class Chapter06 {
}
public String acquireLockWithTimeout(
Jedis conn, String lockName, long acquireTimeout, long lockTimeout)
{
Jedis conn, String lockName, long acquireTimeout, long lockTimeout) {
// 128位随机标识符。
String identifier = UUID.randomUUID().toString();
String lockKey = "lock:" + lockName;
int lockExpire = (int)(lockTimeout / 1000);
// 确保传给EXPIRE的都是整数。
int lockExpire = (int) (lockTimeout / 1000);
long end = System.currentTimeMillis() + acquireTimeout;
while (System.currentTimeMillis() < end) {
if (conn.setnx(lockKey, identifier) == 1){
// 获取锁并设置过期时间。
if (conn.setnx(lockKey, identifier) == 1) {
conn.expire(lockKey, lockExpire);
return identifier;
}
// 检查过期时间,并在有需要时对其进行更新。
if (conn.ttl(lockKey) == -1) {
conn.expire(lockKey, lockExpire);
}
try {
Thread.sleep(1);
}catch(InterruptedException ie){
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
}
@ -460,37 +518,47 @@ public class Chapter06 {
return null;
}
/**
* 6-10
*/
public boolean releaseLock(Jedis conn, String lockName, String identifier) {
String lockKey = "lock:" + lockName;
while (true){
while (true) {
// 检查并确认进程还持有着锁。
conn.watch(lockKey);
if (identifier.equals(conn.get(lockKey))){
if (identifier.equals(conn.get(lockKey))) {
// 释放锁。
Transaction trans = conn.multi();
trans.del(lockKey);
List<Object> results = trans.exec();
if (results == null){
if (results == null) {
continue;
}
return true;
}
// 有其他客户端修改了锁;重试。
conn.unwatch();
break;
}
// 进程已经失去了锁。
return false;
}
public String acquireFairSemaphore(
Jedis conn, String semname, int limit, long timeout)
{
/**
* 6-12
*/
public String acquireFairSemaphore(Jedis conn, String semname, int limit, long timeout) {
// 128位随机标识符。
String identifier = UUID.randomUUID().toString();
String czset = semname + ":owner";
String ctr = semname + ":counter";
long now = System.currentTimeMillis();
Transaction trans = conn.multi();
// 清理过期的信号量持有者。
trans.zremrangeByScore(
semname.getBytes(),
"-inf".getBytes(),
@ -500,15 +568,15 @@ public class Chapter06 {
trans.zinterstore(czset, params, czset, semname);
trans.incr(ctr);
List<Object> results = trans.exec();
int counter = ((Long)results.get(results.size() - 1)).intValue();
int counter = ((Long) results.get(results.size() - 1)).intValue();
trans = conn.multi();
trans.zadd(semname, now, identifier);
trans.zadd(czset, counter, identifier);
trans.zrank(czset, identifier);
results = trans.exec();
int result = ((Long)results.get(results.size() - 1)).intValue();
if (result < limit){
int result = ((Long) results.get(results.size() - 1)).intValue();
if (result < limit) {
return identifier;
}
@ -520,23 +588,21 @@ public class Chapter06 {
}
public boolean releaseFairSemaphore(
Jedis conn, String semname, String identifier)
{
Jedis conn, String semname, String identifier) {
Transaction trans = conn.multi();
trans.zrem(semname, identifier);
trans.zrem(semname + ":owner", identifier);
List<Object> results = trans.exec();
return (Long)results.get(results.size() - 1) == 1;
return (Long) results.get(results.size() - 1) == 1;
}
public String executeLater(
Jedis conn, String queue, String name, List<String> args, long delay)
{
Jedis conn, String queue, String name, List<String> args, long delay) {
Gson gson = new Gson();
String identifier = UUID.randomUUID().toString();
String itemArgs = gson.toJson(args);
String item = gson.toJson(new String[]{identifier, queue, name, itemArgs});
if (delay > 0){
if (delay > 0) {
conn.zadd("delayed:", System.currentTimeMillis() + delay, item);
} else {
conn.rpush("queue:" + queue, item);
@ -550,12 +616,11 @@ public class Chapter06 {
}
public String createChat(
Jedis conn, String sender, Set<String> recipients, String message, String chatId)
{
Jedis conn, String sender, Set<String> recipients, String message, String chatId) {
recipients.add(sender);
Transaction trans = conn.multi();
for (String recipient : recipients){
for (String recipient : recipients) {
trans.zadd("chat:" + chatId, 0, recipient);
trans.zadd("seen:" + recipient, 0, chatId);
}
@ -566,19 +631,19 @@ public class Chapter06 {
public String sendMessage(Jedis conn, String chatId, String sender, String message) {
String identifier = acquireLock(conn, "chat:" + chatId);
if (identifier == null){
if (identifier == null) {
throw new RuntimeException("Couldn't get the lock");
}
try {
long messageId = conn.incr("ids:" + chatId);
HashMap<String,Object> values = new HashMap<String,Object>();
HashMap<String, Object> values = new HashMap<String, Object>();
values.put("id", messageId);
values.put("ts", System.currentTimeMillis());
values.put("sender", sender);
values.put("message", message);
String packed = new Gson().toJson(values);
conn.zadd("msgs:" + chatId, messageId, packed);
}finally{
} finally {
releaseLock(conn, "chat:" + chatId, identifier);
}
return chatId;
@ -590,9 +655,9 @@ public class Chapter06 {
List<Tuple> seenList = new ArrayList<Tuple>(seenSet);
Transaction trans = conn.multi();
for (Tuple tuple : seenList){
for (Tuple tuple : seenList) {
String chatId = tuple.getElement();
int seenId = (int)tuple.getScore();
int seenId = (int) tuple.getScore();
trans.zrangeByScore("msgs:" + chatId, String.valueOf(seenId + 1), "inf");
}
List<Object> results = trans.exec();
@ -604,21 +669,21 @@ public class Chapter06 {
List<ChatMessages> chatMessages = new ArrayList<ChatMessages>();
List<Object[]> seenUpdates = new ArrayList<Object[]>();
List<Object[]> msgRemoves = new ArrayList<Object[]>();
while (seenIterator.hasNext()){
while (seenIterator.hasNext()) {
Tuple seen = seenIterator.next();
Set<String> messageStrings = (Set<String>)resultsIterator.next();
if (messageStrings.size() == 0){
Set<String> messageStrings = (Set<String>) resultsIterator.next();
if (messageStrings.size() == 0) {
continue;
}
int seenId = 0;
String chatId = seen.getElement();
List<Map<String,Object>> messages = new ArrayList<Map<String,Object>>();
for (String messageJson : messageStrings){
Map<String,Object> message = (Map<String,Object>)gson.fromJson(
messageJson, new TypeToken<Map<String,Object>>(){}.getType());
int messageId = ((Double)message.get("id")).intValue();
if (messageId > seenId){
List<Map<String, Object>> messages = new ArrayList<Map<String, Object>>();
for (String messageJson : messageStrings) {
Map<String, Object> message = (Map<String, Object>) gson.fromJson(
messageJson, new TypeToken<Map<String, Object>>() {}.getType());
int messageId = ((Double) message.get("id")).intValue();
if (messageId > seenId) {
seenId = messageId;
}
message.put("id", messageId);
@ -629,7 +694,7 @@ public class Chapter06 {
seenUpdates.add(new Object[]{"seen:" + recipient, seenId, chatId});
Set<Tuple> minIdSet = conn.zrangeWithScores("chat:" + chatId, 0, 0);
if (minIdSet.size() > 0){
if (minIdSet.size() > 0) {
msgRemoves.add(new Object[]{
"msgs:" + chatId, minIdSet.iterator().next().getScore()});
}
@ -637,15 +702,15 @@ public class Chapter06 {
}
trans = conn.multi();
for (Object[] seenUpdate : seenUpdates){
for (Object[] seenUpdate : seenUpdates) {
trans.zadd(
(String)seenUpdate[0],
(Integer)seenUpdate[1],
(String)seenUpdate[2]);
(String) seenUpdate[0],
(Integer) seenUpdate[1],
(String) seenUpdate[2]);
}
for (Object[] msgRemove : msgRemoves){
for (Object[] msgRemove : msgRemoves) {
trans.zremrangeByScore(
(String)msgRemove[0], 0, ((Double)msgRemove[1]).intValue());
(String) msgRemove[0], 0, ((Double) msgRemove[1]).intValue());
}
trans.exec();
@ -653,36 +718,35 @@ public class Chapter06 {
}
public void processLogsFromRedis(Jedis conn, String id, Callback callback)
throws InterruptedException, IOException
{
while (true){
throws InterruptedException, IOException {
while (true) {
List<ChatMessages> fdata = fetchPendingMessages(conn, id);
for (ChatMessages messages : fdata){
for (Map<String,Object> message : messages.messages){
String logFile = (String)message.get("message");
for (ChatMessages messages : fdata) {
for (Map<String, Object> message : messages.messages) {
String logFile = (String) message.get("message");
if (":done".equals(logFile)){
if (":done".equals(logFile)) {
return;
}
if (logFile == null || logFile.length() == 0){
if (logFile == null || logFile.length() == 0) {
continue;
}
InputStream in = new RedisInputStream(
conn, messages.chatId + logFile);
if (logFile.endsWith(".gz")){
if (logFile.endsWith(".gz")) {
in = new GZIPInputStream(in);
}
BufferedReader reader = new BufferedReader(new InputStreamReader(in));
try{
try {
String line = null;
while ((line = reader.readLine()) != null){
while ((line = reader.readLine()) != null) {
callback.callback(line);
}
callback.callback(null);
}finally{
} finally {
reader.close();
}
@ -690,50 +754,46 @@ public class Chapter06 {
}
}
if (fdata.size() == 0){
if (fdata.size() == 0) {
Thread.sleep(100);
}
}
}
public class RedisInputStream
extends InputStream
{
extends InputStream {
private Jedis conn;
private String key;
private int pos;
public RedisInputStream(Jedis conn, String key){
public RedisInputStream(Jedis conn, String key) {
this.conn = conn;
this.key = key;
}
@Override
public int available()
throws IOException
{
throws IOException {
long len = conn.strlen(key);
return (int)(len - pos);
return (int) (len - pos);
}
@Override
public int read()
throws IOException
{
throws IOException {
byte[] block = conn.substr(key.getBytes(), pos, pos);
if (block == null || block.length == 0){
if (block == null || block.length == 0) {
return -1;
}
pos++;
return (int)(block[0] & 0xff);
return (int) (block[0] & 0xff);
}
@Override
public int read(byte[] buf, int off, int len)
throws IOException
{
throws IOException {
byte[] block = conn.substr(key.getBytes(), pos, pos + (len - off - 1));
if (block == null || block.length == 0){
if (block == null || block.length == 0) {
return -1;
}
System.arraycopy(block, 0, buf, off, block.length);
@ -751,34 +811,32 @@ public class Chapter06 {
void callback(String line);
}
public class ChatMessages
{
public class ChatMessages {
public String chatId;
public List<Map<String,Object>> messages;
public List<Map<String, Object>> messages;
public ChatMessages(String chatId, List<Map<String,Object>> messages){
public ChatMessages(String chatId, List<Map<String, Object>> messages) {
this.chatId = chatId;
this.messages = messages;
}
public boolean equals(Object other){
if (!(other instanceof ChatMessages)){
public boolean equals(Object other) {
if (!(other instanceof ChatMessages)) {
return false;
}
ChatMessages otherCm = (ChatMessages)other;
ChatMessages otherCm = (ChatMessages) other;
return chatId.equals(otherCm.chatId) &&
messages.equals(otherCm.messages);
}
}
public class PollQueueThread
extends Thread
{
extends Thread {
private Jedis conn;
private boolean quit;
private Gson gson = new Gson();
public PollQueueThread(){
public PollQueueThread() {
this.conn = new Jedis("localhost");
this.conn.select(15);
}
@ -788,13 +846,13 @@ public class Chapter06 {
}
public void run() {
while (!quit){
while (!quit) {
Set<Tuple> items = conn.zrangeWithScores("delayed:", 0, 0);
Tuple item = items.size() > 0 ? items.iterator().next() : null;
if (item == null || item.getScore() > System.currentTimeMillis()) {
try{
try {
sleep(10);
}catch(InterruptedException ie){
} catch (InterruptedException ie) {
Thread.interrupted();
}
continue;
@ -806,11 +864,11 @@ public class Chapter06 {
String queue = values[1];
String locked = acquireLock(conn, identifier);
if (locked == null){
if (locked == null) {
continue;
}
if (conn.zrem("delayed:", json) == 1){
if (conn.zrem("delayed:", json) == 1) {
conn.rpush("queue:" + queue, json);
}
@ -820,8 +878,7 @@ public class Chapter06 {
}
public class CopyLogsThread
extends Thread
{
extends Thread {
private Jedis conn;
private File path;
private String channel;
@ -841,53 +898,53 @@ public class Chapter06 {
Deque<File> waiting = new ArrayDeque<File>();
long bytesInRedis = 0;
Set<String> recipients= new HashSet<String>();
for (int i = 0; i < count; i++){
Set<String> recipients = new HashSet<String>();
for (int i = 0; i < count; i++) {
recipients.add(String.valueOf(i));
}
createChat(conn, "source", recipients, "", channel);
File[] logFiles = path.listFiles(new FilenameFilter(){
public boolean accept(File dir, String name){
File[] logFiles = path.listFiles(new FilenameFilter() {
public boolean accept(File dir, String name) {
return name.startsWith("temp_redis");
}
});
Arrays.sort(logFiles);
for (File logFile : logFiles){
for (File logFile : logFiles) {
long fsize = logFile.length();
while ((bytesInRedis + fsize) > limit){
while ((bytesInRedis + fsize) > limit) {
long cleaned = clean(waiting, count);
if (cleaned != 0){
if (cleaned != 0) {
bytesInRedis -= cleaned;
}else{
try{
} else {
try {
sleep(250);
}catch(InterruptedException ie){
} catch (InterruptedException ie) {
Thread.interrupted();
}
}
}
BufferedInputStream in = null;
try{
try {
in = new BufferedInputStream(new FileInputStream(logFile));
int read = 0;
byte[] buffer = new byte[8192];
while ((read = in.read(buffer, 0, buffer.length)) != -1){
if (buffer.length != read){
while ((read = in.read(buffer, 0, buffer.length)) != -1) {
if (buffer.length != read) {
byte[] bytes = new byte[read];
System.arraycopy(buffer, 0, bytes, 0, read);
conn.append((channel + logFile).getBytes(), bytes);
}else{
} else {
conn.append((channel + logFile).getBytes(), buffer);
}
}
}catch(IOException ioe){
} catch (IOException ioe) {
ioe.printStackTrace();
throw new RuntimeException(ioe);
}finally{
try{
} finally {
try {
in.close();
}catch(Exception ignore){
} catch (Exception ignore) {
}
}
@ -899,14 +956,14 @@ public class Chapter06 {
sendMessage(conn, channel, "source", ":done");
while (waiting.size() > 0){
while (waiting.size() > 0) {
long cleaned = clean(waiting, count);
if (cleaned != 0){
if (cleaned != 0) {
bytesInRedis -= cleaned;
}else{
try{
} else {
try {
sleep(250);
}catch(InterruptedException ie){
} catch (InterruptedException ie) {
Thread.interrupted();
}
}
@ -914,11 +971,11 @@ public class Chapter06 {
}
private long clean(Deque<File> waiting, int count) {
if (waiting.size() == 0){
if (waiting.size() == 0) {
return 0;
}
File w0 = waiting.getFirst();
if (String.valueOf(count).equals(conn.get(channel + w0 + ":done"))){
if (String.valueOf(count).equals(conn.get(channel + w0 + ":done"))) {
conn.del(channel + w0, channel + w0 + ":done");
return waiting.removeFirst().length();
}