From 25378c657c07a42c6dd5bd4252166f750d0ceb8f Mon Sep 17 00:00:00 2001 From: Zhang Peng Date: Sun, 24 Jun 2018 23:16:39 +0800 Subject: [PATCH] =?UTF-8?q?:bookmark:=20Redis=20In=20Action=20=E6=BA=90?= =?UTF-8?q?=E7=A0=81=20Java=20=E7=89=88=E6=9C=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- codes/redis/redis-in-action/pom.xml | 65 ++ .../src/main/java/Chapter01.java | 180 ++++ .../src/main/java/Chapter02.java | 464 +++++++++ .../src/main/java/Chapter04.java | 214 ++++ .../src/main/java/Chapter05.java | 636 ++++++++++++ .../src/main/java/Chapter06.java | 928 +++++++++++++++++ .../src/main/java/Chapter07.java | 955 ++++++++++++++++++ .../src/main/java/Chapter08.java | 555 ++++++++++ .../src/main/java/Chapter09.java | 480 +++++++++ 9 files changed, 4477 insertions(+) create mode 100644 codes/redis/redis-in-action/pom.xml create mode 100644 codes/redis/redis-in-action/src/main/java/Chapter01.java create mode 100644 codes/redis/redis-in-action/src/main/java/Chapter02.java create mode 100644 codes/redis/redis-in-action/src/main/java/Chapter04.java create mode 100644 codes/redis/redis-in-action/src/main/java/Chapter05.java create mode 100644 codes/redis/redis-in-action/src/main/java/Chapter06.java create mode 100644 codes/redis/redis-in-action/src/main/java/Chapter07.java create mode 100644 codes/redis/redis-in-action/src/main/java/Chapter08.java create mode 100644 codes/redis/redis-in-action/src/main/java/Chapter09.java diff --git a/codes/redis/redis-in-action/pom.xml b/codes/redis/redis-in-action/pom.xml new file mode 100644 index 0000000..7a6849d --- /dev/null +++ b/codes/redis/redis-in-action/pom.xml @@ -0,0 +1,65 @@ + + + 4.0.0 + io.github.dunwu + redis-in-action + 1.0.0 + jar + + + UTF-8 + 1.8 + ${java.version} + ${java.version} + + 1.2.3 + 2.9.0 + 4.12 + + + + + + redis.clients + jedis + ${jedis.version} + + + + + + ch.qos.logback + logback-parent + ${logback.version} + pom + import + + + + + + junit + junit + ${junit.version} + test + + + + + com.google.code.gson + gson + 2.8.5 + + + org.apache.commons + commons-csv + 1.5 + + + org.javatuples + javatuples + 1.1 + + + diff --git a/codes/redis/redis-in-action/src/main/java/Chapter01.java b/codes/redis/redis-in-action/src/main/java/Chapter01.java new file mode 100644 index 0000000..06c0c73 --- /dev/null +++ b/codes/redis/redis-in-action/src/main/java/Chapter01.java @@ -0,0 +1,180 @@ +import redis.clients.jedis.Jedis; +import redis.clients.jedis.ZParams; + +import java.util.*; + +/** + * Redis In Action Chapter01 - 你好 Redis + */ +public class Chapter01 { + + private static final int ONE_WEEK_IN_SECONDS = 7 * 86400; + private static final int VOTE_SCORE = 432; + private static final int ARTICLES_PER_PAGE = 25; + + public static final void main(String[] args) { + new Chapter01().run(); + } + + public void run() { + Jedis conn = new Jedis("localhost"); + conn.select(15); + + String articleId = postArticle(conn, "username", "A title", "http://www.google.com"); + System.out.println("We posted a new article with id: " + articleId); + System.out.println("Its HASH looks like:"); + Map articleData = conn.hgetAll("article:" + articleId); + for (Map.Entry entry : articleData.entrySet()) { + System.out.println(" " + entry.getKey() + ": " + entry.getValue()); + } + + System.out.println(); + + articleVote(conn, "other_user", "article:" + articleId); + String votes = conn.hget("article:" + articleId, "votes"); + System.out.println("We voted for the article, it now has votes: " + votes); + assert Integer.parseInt(votes) > 1; + + System.out.println("The currently highest-scoring articles are:"); + List> articles = getArticles(conn, 1); + printArticles(articles); + assert articles.size() >= 1; + + addRemoveGroups(conn, articleId, new String[] {"new-group"}, new String[] {}); + System.out.println("We added the article to a new group, other articles include:"); + articles = getGroupArticles(conn, "new-group", 1); + printArticles(articles); + assert articles.size() >= 1; + } + + /** + * 代码清单 1-6 对文章进行投票 + */ + public void articleVote(Jedis conn, String user, String article) { + // 计算文章的投票截止时间。 + long cutoff = (System.currentTimeMillis() / 1000) - ONE_WEEK_IN_SECONDS; + + // 检查是否还可以对文章进行投票 + //(虽然使用散列也可以获取文章的发布时间, + // 但有序集合返回的文章发布时间为浮点数, + // 可以不进行转换直接使用)。 + if (conn.zscore("time:", article) < cutoff) { + return; + } + + // 从article:id标识符(identifier)里面取出文章的ID。 + String articleId = article.substring(article.indexOf(':') + 1); + + // 如果用户是第一次为这篇文章投票,那么增加这篇文章的投票数量和评分。 + if (conn.sadd("voted:" + articleId, user) == 1) { + conn.zincrby("score:", VOTE_SCORE, article); + conn.hincrBy(article, "votes", 1); + } + } + + /** + * 代码清单 1-7 发布文章 + */ + public String postArticle(Jedis conn, String user, String title, String link) { + // 生成一个新的文章ID。 + String articleId = String.valueOf(conn.incr("article:")); + + String voted = "voted:" + articleId; + // 将发布文章的用户添加到文章的已投票用户名单里面, + conn.sadd(voted, user); + // 然后将这个名单的过期时间设置为一周(第3章将对过期时间作更详细的介绍)。 + conn.expire(voted, ONE_WEEK_IN_SECONDS); + + long now = System.currentTimeMillis() / 1000; + String article = "article:" + articleId; + // 将文章信息存储到一个散列里面。 + HashMap articleData = new HashMap(); + articleData.put("title", title); + articleData.put("link", link); + articleData.put("user", user); + articleData.put("now", String.valueOf(now)); + articleData.put("votes", "1"); + conn.hmset(article, articleData); + + // 将文章添加到根据发布时间排序的有序集合和根据评分排序的有序集合里面。 + conn.zadd("score:", now + VOTE_SCORE, article); + conn.zadd("time:", now, article); + + return articleId; + } + + public List> getArticles(Jedis conn, int page) { + return getArticles(conn, page, "score:"); + } + + /** + * 代码清单 1-8 获取文章 + */ + public List> getArticles(Jedis conn, int page, String order) { + // 设置获取文章的起始索引和结束索引。 + int start = (page - 1) * ARTICLES_PER_PAGE; + int end = start + ARTICLES_PER_PAGE - 1; + + // 获取多个文章ID。 + Set ids = conn.zrevrange(order, start, end); + List> articles = new ArrayList>(); + // 根据文章ID获取文章的详细信息。 + for (String id : ids) { + Map articleData = conn.hgetAll(id); + articleData.put("id", id); + articles.add(articleData); + } + + return articles; + } + + /** + * 代码清单 1-9 + */ + public void addRemoveGroups(Jedis conn, String articleId, String[] toAdd, String[] toRemove) { + // 构建存储文章信息的键名。 + String article = "article:" + articleId; + // 将文章添加到它所属的群组里面。 + for (String group : toAdd) { + conn.sadd("group:" + group, article); + } + // 从群组里面移除文章。 + for (String group : toRemove) { + conn.srem("group:" + group, article); + } + } + + public List> getGroupArticles(Jedis conn, String group, int page) { + return getGroupArticles(conn, group, page, "score:"); + } + + /** + * 代码清单 1-10 取出群组里的文章 + */ + public List> getGroupArticles(Jedis conn, String group, int page, String order) { + // 为每个群组的每种排列顺序都创建一个键。 + String key = order + group; + // 检查是否有已缓存的排序结果,如果没有的话就现在进行排序。 + if (!conn.exists(key)) { + // 根据评分或者发布时间,对群组文章进行排序。 + ZParams params = new ZParams().aggregate(ZParams.Aggregate.MAX); + conn.zinterstore(key, params, "group:" + group, order); + // 让Redis在60秒钟之后自动删除这个有序集合。 + conn.expire(key, 60); + } + // 调用之前定义的getArticles函数来进行分页并获取文章数据。 + return getArticles(conn, page, key); + } + + private void printArticles(List> articles) { + for (Map article : articles) { + System.out.println(" id: " + article.get("id")); + for (Map.Entry entry : article.entrySet()) { + if (entry.getKey().equals("id")) { + continue; + } + System.out.println(" " + entry.getKey() + ": " + entry.getValue()); + } + } + } +} diff --git a/codes/redis/redis-in-action/src/main/java/Chapter02.java b/codes/redis/redis-in-action/src/main/java/Chapter02.java new file mode 100644 index 0000000..c236c52 --- /dev/null +++ b/codes/redis/redis-in-action/src/main/java/Chapter02.java @@ -0,0 +1,464 @@ +import com.google.gson.Gson; +import redis.clients.jedis.Jedis; +import redis.clients.jedis.Tuple; + +import java.net.MalformedURLException; +import java.net.URL; +import java.util.*; + +public class Chapter02 { + + public static final void main(String[] args) throws InterruptedException { + new Chapter02().run(); + } + + public void run() throws InterruptedException { + Jedis conn = new Jedis("localhost"); + conn.select(15); + + testLoginCookies(conn); + testShopppingCartCookies(conn); + testCacheRows(conn); + testCacheRequest(conn); + } + + public void testLoginCookies(Jedis conn) throws InterruptedException { + System.out.println("\n----- testLoginCookies -----"); + String token = UUID.randomUUID().toString(); + + updateToken(conn, token, "username", "itemX"); + System.out.println("We just logged-in/updated token: " + token); + System.out.println("For user: 'username'"); + System.out.println(); + + System.out.println("What username do we get when we look-up that token?"); + String r = checkToken(conn, token); + System.out.println(r); + System.out.println(); + assert r != null; + + System.out.println("Let's drop the maximum number of cookies to 0 to clean them out"); + System.out.println("We will start a thread to do the cleaning, while we stop it later"); + + CleanSessionsThread thread = new CleanSessionsThread(0); + thread.start(); + Thread.sleep(1000); + thread.quit(); + Thread.sleep(2000); + if (thread.isAlive()) { + throw new RuntimeException("The clean sessions thread is still alive?!?"); + } + + long s = conn.hlen("login:"); + System.out.println("The current number of sessions still available is: " + s); + assert s == 0; + } + + public void testShopppingCartCookies(Jedis conn) throws InterruptedException { + System.out.println("\n----- testShopppingCartCookies -----"); + String token = UUID.randomUUID().toString(); + + System.out.println("We'll refresh our session..."); + updateToken(conn, token, "username", "itemX"); + System.out.println("And add an item to the shopping cart"); + addToCart(conn, token, "itemY", 3); + Map r = conn.hgetAll("cart:" + token); + System.out.println("Our shopping cart currently has:"); + for (Map.Entry entry : r.entrySet()) { + System.out.println(" " + entry.getKey() + ": " + entry.getValue()); + } + System.out.println(); + + assert r.size() >= 1; + + System.out.println("Let's clean out our sessions and carts"); + CleanFullSessionsThread thread = new CleanFullSessionsThread(0); + thread.start(); + Thread.sleep(1000); + thread.quit(); + Thread.sleep(2000); + if (thread.isAlive()) { + throw new RuntimeException("The clean sessions thread is still alive?!?"); + } + + r = conn.hgetAll("cart:" + token); + System.out.println("Our shopping cart now contains:"); + for (Map.Entry entry : r.entrySet()) { + System.out.println(" " + entry.getKey() + ": " + entry.getValue()); + } + assert r.size() == 0; + } + + public void testCacheRows(Jedis conn) throws InterruptedException { + System.out.println("\n----- testCacheRows -----"); + System.out.println("First, let's schedule caching of itemX every 5 seconds"); + scheduleRowCache(conn, "itemX", 5); + System.out.println("Our schedule looks like:"); + Set s = conn.zrangeWithScores("schedule:", 0, -1); + for (Tuple tuple : s) { + System.out.println(" " + tuple.getElement() + ", " + tuple.getScore()); + } + assert s.size() != 0; + + System.out.println("We'll start a caching thread that will cache the data..."); + + CacheRowsThread thread = new CacheRowsThread(); + thread.start(); + + Thread.sleep(1000); + System.out.println("Our cached data looks like:"); + String r = conn.get("inv:itemX"); + System.out.println(r); + assert r != null; + System.out.println(); + + System.out.println("We'll check again in 5 seconds..."); + Thread.sleep(5000); + System.out.println("Notice that the data has changed..."); + String r2 = conn.get("inv:itemX"); + System.out.println(r2); + System.out.println(); + assert r2 != null; + assert !r.equals(r2); + + System.out.println("Let's force un-caching"); + scheduleRowCache(conn, "itemX", -1); + Thread.sleep(1000); + r = conn.get("inv:itemX"); + System.out.println("The cache was cleared? " + (r == null)); + assert r == null; + + thread.quit(); + Thread.sleep(2000); + if (thread.isAlive()) { + throw new RuntimeException("The database caching thread is still alive?!?"); + } + } + + public void testCacheRequest(Jedis conn) { + System.out.println("\n----- testCacheRequest -----"); + String token = UUID.randomUUID().toString(); + + Callback callback = request -> "content for " + request; + + updateToken(conn, token, "username", "itemX"); + String url = "http://test.com/?item=itemX"; + System.out.println("We are going to cache a simple request against " + url); + String result = cacheRequest(conn, url, callback); + System.out.println("We got initial content:\n" + result); + System.out.println(); + + assert result != null; + + System.out.println("To test that we've cached the request, we'll pass a bad callback"); + String result2 = cacheRequest(conn, url, null); + System.out.println("We ended up getting the same response!\n" + result2); + + assert result.equals(result2); + + assert !canCache(conn, "http://test.com/"); + assert !canCache(conn, "http://test.com/?item=itemX&_=1234536"); + } + + /** + * 代码清单 2-1 + */ + public String checkToken(Jedis conn, String token) { + // 尝试获取并返回令牌对应的用户。 + return conn.hget("login:", token); + } + + /** + * 代码清单 2-2 + * 代码清单 2-9 + */ + public void updateToken(Jedis conn, String token, String user, String item) { + // 获取当前时间戳。 + long timestamp = System.currentTimeMillis() / 1000; + // 维持令牌与已登录用户之间的映射。 + conn.hset("login:", token, user); + // 记录令牌最后一次出现的时间。 + conn.zadd("recent:", timestamp, token); + if (item != null) { + // 记录用户浏览过的商品。 + conn.zadd("viewed:" + token, timestamp, item); + // 移除旧的记录,只保留用户最近浏览过的25个商品。 + conn.zremrangeByRank("viewed:" + token, 0, -26); + conn.zincrby("viewed:", -1, item); + } + } + + /** + * 代码清单 2-4 + */ + public void addToCart(Jedis conn, String session, String item, int count) { + if (count <= 0) { + // 从购物车里面移除指定的商品。 + conn.hdel("cart:" + session, item); + } else { + // 将指定的商品添加到购物车。 + conn.hset("cart:" + session, item, String.valueOf(count)); + } + } + + /** + * 代码清单 2-7 + */ + public void scheduleRowCache(Jedis conn, String rowId, int delay) { + // 先设置数据行的延迟值。 + conn.zadd("delay:", delay, rowId); + // 立即缓存数据行。 + conn.zadd("schedule:", System.currentTimeMillis() / 1000, rowId); + } + + /** + * 代码清单 2-6 + */ + public String cacheRequest(Jedis conn, String request, Callback callback) { + // 对于不能被缓存的请求,直接调用回调函数。 + if (!canCache(conn, request)) { + return callback != null ? callback.call(request) : null; + } + + // 将请求转换成一个简单的字符串键,方便之后进行查找。 + String pageKey = "cache:" + hashRequest(request); + // 尝试查找被缓存的页面。 + String content = conn.get(pageKey); + + if (content == null && callback != null) { + // 如果页面还没有被缓存,那么生成页面。 + content = callback.call(request); + // 将新生成的页面放到缓存里面。 + conn.setex(pageKey, 300, content); + } + + // 返回页面。 + return content; + } + + /** + * 代码清单 2-11 + */ + public boolean canCache(Jedis conn, String request) { + try { + URL url = new URL(request); + HashMap params = new HashMap(); + if (url.getQuery() != null) { + for (String param : url.getQuery().split("&")) { + String[] pair = param.split("=", 2); + params.put(pair[0], pair.length == 2 ? pair[1] : null); + } + } + + // 尝试从页面里面取出商品ID。 + String itemId = extractItemId(params); + // 检查这个页面能否被缓存以及这个页面是否为商品页面。 + if (itemId == null || isDynamic(params)) { + return false; + } + // 取得商品的浏览次数排名。 + Long rank = conn.zrank("viewed:", itemId); + // 根据商品的浏览次数排名来判断是否需要缓存这个页面。 + return rank != null && rank < 10000; + } catch (MalformedURLException mue) { + return false; + } + } + + public boolean isDynamic(Map params) { + return params.containsKey("_"); + } + + public String extractItemId(Map params) { + return params.get("item"); + } + + public String hashRequest(String request) { + return String.valueOf(request.hashCode()); + } + + public interface Callback { + + String call(String request); + } + + + /** + * 代码清单 2-3 + */ + public class CleanSessionsThread extends Thread { + + private Jedis conn; + private int limit; + private boolean quit; + + public CleanSessionsThread(int limit) { + this.conn = new Jedis("localhost"); + this.conn.select(15); + this.limit = limit; + } + + public void quit() { + quit = true; + } + + @Override + public void run() { + while (!quit) { + // 找出目前已有令牌的数量。 + long size = conn.zcard("recent:"); + // 令牌数量未超过限制,休眠并在之后重新检查。 + if (size <= limit) { + try { + sleep(1000); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } + continue; + } + + // 获取需要移除的令牌ID。 + long endIndex = Math.min(size - limit, 100); + Set tokenSet = conn.zrange("recent:", 0, endIndex - 1); + String[] tokens = tokenSet.toArray(new String[tokenSet.size()]); + + // 为那些将要被删除的令牌构建键名。 + ArrayList sessionKeys = new ArrayList(); + for (String token : tokens) { + sessionKeys.add("viewed:" + token); + } + + // 移除最旧的那些令牌。 + conn.del(sessionKeys.toArray(new String[sessionKeys.size()])); + conn.hdel("login:", tokens); + conn.zrem("recent:", tokens); + } + } + } + + + /** + * 代码清单 2-5 + */ + public class CleanFullSessionsThread extends Thread { + + private Jedis conn; + private int limit; + private boolean quit; + + public CleanFullSessionsThread(int limit) { + this.conn = new Jedis("localhost"); + this.conn.select(15); + this.limit = limit; + } + + public void quit() { + quit = true; + } + + @Override + public void run() { + while (!quit) { + long size = conn.zcard("recent:"); + if (size <= limit) { + try { + sleep(1000); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } + continue; + } + + long endIndex = Math.min(size - limit, 100); + Set sessionSet = conn.zrange("recent:", 0, endIndex - 1); + String[] sessions = sessionSet.toArray(new String[sessionSet.size()]); + + ArrayList sessionKeys = new ArrayList(); + for (String sess : sessions) { + sessionKeys.add("viewed:" + sess); + // 新增加的这行代码用于删除旧会话对应用户的购物车。 + sessionKeys.add("cart:" + sess); + } + + conn.del(sessionKeys.toArray(new String[sessionKeys.size()])); + conn.hdel("login:", sessions); + conn.zrem("recent:", sessions); + } + } + } + + + /** + * 代码清单 2-8 + */ + public class CacheRowsThread extends Thread { + + private Jedis conn; + private boolean quit; + + public CacheRowsThread() { + this.conn = new Jedis("localhost"); + this.conn.select(15); + } + + public void quit() { + quit = true; + } + + @Override + public void run() { + Gson gson = new Gson(); + while (!quit) { + // 尝试获取下一个需要被缓存的数据行以及该行的调度时间戳, + // 命令会返回一个包含零个或一个元组(tuple)的列表。 + Set range = conn.zrangeWithScores("schedule:", 0, 0); + Tuple next = range.size() > 0 ? range.iterator().next() : null; + long now = System.currentTimeMillis() / 1000; + if (next == null || next.getScore() > now) { + try { + // 暂时没有行需要被缓存,休眠50毫秒后重试。 + sleep(50); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } + continue; + } + + String rowId = next.getElement(); + // 获取下一次调度前的延迟时间。 + double delay = conn.zscore("delay:", rowId); + if (delay <= 0) { + // 不必再缓存这个行,将它从缓存中移除。 + conn.zrem("delay:", rowId); + conn.zrem("schedule:", rowId); + conn.del("inv:" + rowId); + continue; + } + + // 读取数据行。 + Inventory row = Inventory.get(rowId); + // 更新调度时间并设置缓存值。 + conn.zadd("schedule:", now + delay, rowId); + conn.set("inv:" + rowId, gson.toJson(row)); + } + } + } + + + public static class Inventory { + + private String id; + private String data; + private long time; + + private Inventory(String id) { + this.id = id; + this.data = "data to cache..."; + this.time = System.currentTimeMillis() / 1000; + } + + public static Inventory get(String id) { + return new Inventory(id); + } + } +} diff --git a/codes/redis/redis-in-action/src/main/java/Chapter04.java b/codes/redis/redis-in-action/src/main/java/Chapter04.java new file mode 100644 index 0000000..2e6d38d --- /dev/null +++ b/codes/redis/redis-in-action/src/main/java/Chapter04.java @@ -0,0 +1,214 @@ +import redis.clients.jedis.Jedis; +import redis.clients.jedis.Pipeline; +import redis.clients.jedis.Transaction; +import redis.clients.jedis.Tuple; + +import java.lang.reflect.Method; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public class Chapter04 { + public static final void main(String[] args) { + new Chapter04().run(); + } + + public void run() { + Jedis conn = new Jedis("localhost"); + conn.select(15); + + testListItem(conn, false); + testPurchaseItem(conn); + testBenchmarkUpdateToken(conn); + } + + public void testListItem(Jedis conn, boolean nested) { + if (!nested){ + System.out.println("\n----- testListItem -----"); + } + + System.out.println("We need to set up just enough state so that a user can list an item"); + String seller = "userX"; + String item = "itemX"; + conn.sadd("inventory:" + seller, item); + Set i = conn.smembers("inventory:" + seller); + + System.out.println("The user's inventory has:"); + for (String member : i){ + System.out.println(" " + member); + } + assert i.size() > 0; + System.out.println(); + + System.out.println("Listing the item..."); + boolean l = listItem(conn, item, seller, 10); + System.out.println("Listing the item succeeded? " + l); + assert l; + Set r = conn.zrangeWithScores("market:", 0, -1); + System.out.println("The market contains:"); + for (Tuple tuple : r){ + System.out.println(" " + tuple.getElement() + ", " + tuple.getScore()); + } + assert r.size() > 0; + } + + public void testPurchaseItem(Jedis conn) { + System.out.println("\n----- testPurchaseItem -----"); + testListItem(conn, true); + + System.out.println("We need to set up just enough state so a user can buy an item"); + conn.hset("users:userY", "funds", "125"); + Map r = conn.hgetAll("users:userY"); + System.out.println("The user has some money:"); + for (Map.Entry entry : r.entrySet()){ + System.out.println(" " + entry.getKey() + ": " + entry.getValue()); + } + assert r.size() > 0; + assert r.get("funds") != null; + System.out.println(); + + System.out.println("Let's purchase an item"); + boolean p = purchaseItem(conn, "userY", "itemX", "userX", 10); + System.out.println("Purchasing an item succeeded? " + p); + assert p; + r = conn.hgetAll("users:userY"); + System.out.println("Their money is now:"); + for (Map.Entry entry : r.entrySet()){ + System.out.println(" " + entry.getKey() + ": " + entry.getValue()); + } + assert r.size() > 0; + + String buyer = "userY"; + Set i = conn.smembers("inventory:" + buyer); + System.out.println("Their inventory is now:"); + for (String member : i){ + System.out.println(" " + member); + } + assert i.size() > 0; + assert i.contains("itemX"); + assert conn.zscore("market:", "itemX.userX") == null; + } + + public void testBenchmarkUpdateToken(Jedis conn) { + System.out.println("\n----- testBenchmarkUpdate -----"); + benchmarkUpdateToken(conn, 5); + } + + public boolean listItem( + Jedis conn, String itemId, String sellerId, double price) { + + String inventory = "inventory:" + sellerId; + String item = itemId + '.' + sellerId; + long end = System.currentTimeMillis() + 5000; + + while (System.currentTimeMillis() < end) { + conn.watch(inventory); + if (!conn.sismember(inventory, itemId)){ + conn.unwatch(); + return false; + } + + Transaction trans = conn.multi(); + trans.zadd("market:", price, item); + trans.srem(inventory, itemId); + List results = trans.exec(); + // null response indicates that the transaction was aborted due to + // the watched key changing. + if (results == null){ + continue; + } + return true; + } + return false; + } + + public boolean purchaseItem( + Jedis conn, String buyerId, String itemId, String sellerId, double lprice) { + + String buyer = "users:" + buyerId; + String seller = "users:" + sellerId; + String item = itemId + '.' + sellerId; + String inventory = "inventory:" + buyerId; + long end = System.currentTimeMillis() + 10000; + + while (System.currentTimeMillis() < end){ + conn.watch("market:", buyer); + + double price = conn.zscore("market:", item); + double funds = Double.parseDouble(conn.hget(buyer, "funds")); + if (price != lprice || price > funds){ + conn.unwatch(); + return false; + } + + Transaction trans = conn.multi(); + trans.hincrBy(seller, "funds", (int)price); + trans.hincrBy(buyer, "funds", (int)-price); + trans.sadd(inventory, itemId); + trans.zrem("market:", item); + List results = trans.exec(); + // null response indicates that the transaction was aborted due to + // the watched key changing. + if (results == null){ + continue; + } + return true; + } + + return false; + } + + public void benchmarkUpdateToken(Jedis conn, int duration) { + try{ + @SuppressWarnings("rawtypes") + Class[] args = new Class[]{ + Jedis.class, String.class, String.class, String.class}; + Method[] methods = new Method[]{ + this.getClass().getDeclaredMethod("updateToken", args), + this.getClass().getDeclaredMethod("updateTokenPipeline", args), + }; + for (Method method : methods){ + int count = 0; + long start = System.currentTimeMillis(); + long end = start + (duration * 1000); + while (System.currentTimeMillis() < end){ + count++; + method.invoke(this, conn, "token", "user", "item"); + } + long delta = System.currentTimeMillis() - start; + System.out.println( + method.getName() + ' ' + + count + ' ' + + (delta / 1000) + ' ' + + (count / (delta / 1000))); + } + }catch(Exception e){ + throw new RuntimeException(e); + } + } + + public void updateToken(Jedis conn, String token, String user, String item) { + long timestamp = System.currentTimeMillis() / 1000; + conn.hset("login:", token, user); + conn.zadd("recent:", timestamp, token); + if (item != null) { + conn.zadd("viewed:" + token, timestamp, item); + conn.zremrangeByRank("viewed:" + token, 0, -26); + conn.zincrby("viewed:", -1, item); + } + } + + public void updateTokenPipeline(Jedis conn, String token, String user, String item) { + long timestamp = System.currentTimeMillis() / 1000; + Pipeline pipe = conn.pipelined(); + pipe.multi(); + pipe.hset("login:", token, user); + pipe.zadd("recent:", timestamp, token); + if (item != null){ + pipe.zadd("viewed:" + token, timestamp, item); + pipe.zremrangeByRank("viewed:" + token, 0, -26); + pipe.zincrby("viewed:", -1, item); + } + pipe.exec(); + } +} diff --git a/codes/redis/redis-in-action/src/main/java/Chapter05.java b/codes/redis/redis-in-action/src/main/java/Chapter05.java new file mode 100644 index 0000000..59461cc --- /dev/null +++ b/codes/redis/redis-in-action/src/main/java/Chapter05.java @@ -0,0 +1,636 @@ +//import com.google.gson.Gson; +//import com.google.gson.reflect.TypeToken; +//import org.apache.commons.csv.CSVParser; +//import org.javatuples.Pair; +//import redis.clients.jedis.*; +// +//import java.io.File; +//import java.io.FileReader; +//import java.text.Collator; +//import java.text.SimpleDateFormat; +//import java.util.*; +// +//public class Chapter05 { +// public static final String DEBUG = "debug"; +// public static final String INFO = "info"; +// public static final String WARNING = "warning"; +// public static final String ERROR = "error"; +// public static final String CRITICAL = "critical"; +// +// public static final Collator COLLATOR = Collator.getInstance(); +// +// public static final SimpleDateFormat TIMESTAMP = +// new SimpleDateFormat("EEE MMM dd HH:00:00 yyyy"); +// private static final SimpleDateFormat ISO_FORMAT = +// new SimpleDateFormat("yyyy-MM-dd'T'HH:00:00"); +// static{ +// ISO_FORMAT.setTimeZone(TimeZone.getTimeZone("UTC")); +// } +// +// public static final void main(String[] args) +// throws InterruptedException +// { +// new Chapter05().run(); +// } +// +// public void run() +// throws InterruptedException +// { +// Jedis conn = new Jedis("localhost"); +// conn.select(15); +// +// testLogRecent(conn); +// testLogCommon(conn); +// testCounters(conn); +// testStats(conn); +// testAccessTime(conn); +// testIpLookup(conn); +// testIsUnderMaintenance(conn); +// testConfig(conn); +// } +// +// public void testLogRecent(Jedis conn) { +// System.out.println("\n----- testLogRecent -----"); +// System.out.println("Let's write a few logs to the recent log"); +// for (int i = 0; i < 5; i++) { +// logRecent(conn, "test", "this is message " + i); +// } +// List recent = conn.lrange("recent:test:info", 0, -1); +// System.out.println( +// "The current recent message log has this many messages: " + +// recent.size()); +// System.out.println("Those messages include:"); +// for (String message : recent){ +// System.out.println(message); +// } +// assert recent.size() >= 5; +// } +// +// public void testLogCommon(Jedis conn) { +// System.out.println("\n----- testLogCommon -----"); +// System.out.println("Let's write some items to the common log"); +// for (int count = 1; count < 6; count++) { +// for (int i = 0; i < count; i ++) { +// logCommon(conn, "test", "message-" + count); +// } +// } +// Set common = conn.zrevrangeWithScores("common:test:info", 0, -1); +// System.out.println("The current number of common messages is: " + common.size()); +// System.out.println("Those common messages are:"); +// for (Tuple tuple : common){ +// System.out.println(" " + tuple.getElement() + ", " + tuple.getScore()); +// } +// assert common.size() >= 5; +// } +// +// public void testCounters(Jedis conn) +// throws InterruptedException +// { +// System.out.println("\n----- testCounters -----"); +// System.out.println("Let's update some counters for now and a little in the future"); +// long now = System.currentTimeMillis() / 1000; +// for (int i = 0; i < 10; i++) { +// int count = (int)(Math.random() * 5) + 1; +// updateCounter(conn, "test", count, now + i); +// } +// +// List> counter = getCounter(conn, "test", 1); +// System.out.println("We have some per-second counters: " + counter.size()); +// System.out.println("These counters include:"); +// for (Pair count : counter){ +// System.out.println(" " + count); +// } +// assert counter.size() >= 10; +// +// counter = getCounter(conn, "test", 5); +// System.out.println("We have some per-5-second counters: " + counter.size()); +// System.out.println("These counters include:"); +// for (Pair count : counter){ +// System.out.println(" " + count); +// } +// assert counter.size() >= 2; +// System.out.println(); +// +// System.out.println("Let's clean out some counters by setting our sample count to 0"); +// CleanCountersThread thread = new CleanCountersThread(0, 2 * 86400000); +// thread.start(); +// Thread.sleep(1000); +// thread.quit(); +// thread.interrupt(); +// counter = getCounter(conn, "test", 86400); +// System.out.println("Did we clean out all of the counters? " + (counter.size() == 0)); +// assert counter.size() == 0; +// } +// +// public void testStats(Jedis conn) { +// System.out.println("\n----- testStats -----"); +// System.out.println("Let's add some data for our statistics!"); +// List r = null; +// for (int i = 0; i < 5; i++){ +// double value = (Math.random() * 11) + 5; +// r = updateStats(conn, "temp", "example", value); +// } +// System.out.println("We have some aggregate statistics: " + r); +// Map stats = getStats(conn, "temp", "example"); +// System.out.println("Which we can also fetch manually:"); +// System.out.println(stats); +// assert stats.get("count") >= 5; +// } +// +// public void testAccessTime(Jedis conn) +// throws InterruptedException +// { +// System.out.println("\n----- testAccessTime -----"); +// System.out.println("Let's calculate some access times..."); +// AccessTimer timer = new AccessTimer(conn); +// for (int i = 0; i < 10; i++){ +// timer.start(); +// Thread.sleep((int)((.5 + Math.random()) * 1000)); +// timer.stop("req-" + i); +// } +// System.out.println("The slowest access times are:"); +// Set atimes = conn.zrevrangeWithScores("slowest:AccessTime", 0, -1); +// for (Tuple tuple : atimes){ +// System.out.println(" " + tuple.getElement() + ", " + tuple.getScore()); +// } +// assert atimes.size() >= 10; +// System.out.println(); +// } +// +// public void testIpLookup(Jedis conn) { +// System.out.println("\n----- testIpLookup -----"); +// String cwd = System.getProperty("user.dir"); +// File blocks = new File(cwd + "/GeoLiteCity-Blocks.csv"); +// File locations = new File(cwd + "/GeoLiteCity-Location.csv"); +// if (!blocks.exists()){ +// System.out.println("********"); +// System.out.println("GeoLiteCity-Blocks.csv not found at: " + blocks); +// System.out.println("********"); +// return; +// } +// if (!locations.exists()){ +// System.out.println("********"); +// System.out.println("GeoLiteCity-Location.csv not found at: " + locations); +// System.out.println("********"); +// return; +// } +// +// System.out.println("Importing IP addresses to Redis... (this may take a while)"); +// importIpsToRedis(conn, blocks); +// long ranges = conn.zcard("ip2cityid:"); +// System.out.println("Loaded ranges into Redis: " + ranges); +// assert ranges > 1000; +// System.out.println(); +// +// System.out.println("Importing Location lookups to Redis... (this may take a while)"); +// importCitiesToRedis(conn, locations); +// long cities = conn.hlen("cityid2city:"); +// System.out.println("Loaded city lookups into Redis:" + cities); +// assert cities > 1000; +// System.out.println(); +// +// System.out.println("Let's lookup some locations!"); +// for (int i = 0; i < 5; i++){ +// String ip = +// randomOctet(255) + '.' + +// randomOctet(256) + '.' + +// randomOctet(256) + '.' + +// randomOctet(256); +// System.out.println(Arrays.toString(findCityByIp(conn, ip))); +// } +// } +// +// public void testIsUnderMaintenance(Jedis conn) +// throws InterruptedException +// { +// System.out.println("\n----- testIsUnderMaintenance -----"); +// System.out.println("Are we under maintenance (we shouldn't be)? " + isUnderMaintenance(conn)); +// conn.set("is-under-maintenance", "yes"); +// System.out.println("We cached this, so it should be the same: " + isUnderMaintenance(conn)); +// Thread.sleep(1000); +// System.out.println("But after a sleep, it should change: " + isUnderMaintenance(conn)); +// System.out.println("Cleaning up..."); +// conn.del("is-under-maintenance"); +// Thread.sleep(1000); +// System.out.println("Should be False again: " + isUnderMaintenance(conn)); +// } +// +// public void testConfig(Jedis conn) { +// System.out.println("\n----- testConfig -----"); +// System.out.println("Let's set a config and then get a connection from that config..."); +// Map config = new HashMap(); +// config.put("db", 15); +// setConfig(conn, "redis", "test", config); +// +// Jedis conn2 = redisConnection("test"); +// System.out.println( +// "We can run commands from the configured connection: " + (conn2.info() != null)); +// } +// +// public void logRecent(Jedis conn, String name, String message) { +// logRecent(conn, name, message, INFO); +// } +// +// public void logRecent(Jedis conn, String name, String message, String severity) { +// String destination = "recent:" + name + ':' + severity; +// Pipeline pipe = conn.pipelined(); +// pipe.lpush(destination, TIMESTAMP.format(new Date()) + ' ' + message); +// pipe.ltrim(destination, 0, 99); +// pipe.sync(); +// } +// +// public void logCommon(Jedis conn, String name, String message) { +// logCommon(conn, name, message, INFO, 5000); +// } +// +// public void logCommon( +// Jedis conn, String name, String message, String severity, int timeout) { +// String commonDest = "common:" + name + ':' + severity; +// String startKey = commonDest + ":start"; +// long end = System.currentTimeMillis() + timeout; +// while (System.currentTimeMillis() < end){ +// conn.watch(startKey); +// String hourStart = ISO_FORMAT.format(new Date()); +// String existing = conn.get(startKey); +// +// Transaction trans = conn.multi(); +// if (existing != null && COLLATOR.compare(existing, hourStart) < 0){ +// trans.rename(commonDest, commonDest + ":last"); +// trans.rename(startKey, commonDest + ":pstart"); +// trans.set(startKey, hourStart); +// } +// +// trans.zincrby(commonDest, 1, message); +// +// String recentDest = "recent:" + name + ':' + severity; +// trans.lpush(recentDest, TIMESTAMP.format(new Date()) + ' ' + message); +// trans.ltrim(recentDest, 0, 99); +// List results = trans.exec(); +// // null response indicates that the transaction was aborted due to +// // the watched key changing. +// if (results == null){ +// continue; +// } +// return; +// } +// } +// +// public void updateCounter(Jedis conn, String name, int count) { +// updateCounter(conn, name, count, System.currentTimeMillis() / 1000); +// } +// +// public static final int[] PRECISION = new int[]{1, 5, 60, 300, 3600, 18000, 86400}; +// public void updateCounter(Jedis conn, String name, int count, long now){ +// Transaction trans = conn.multi(); +// for (int prec : PRECISION) { +// long pnow = (now / prec) * prec; +// String hash = String.valueOf(prec) + ':' + name; +// trans.zadd("known:", 0, hash); +// trans.hincrBy("count:" + hash, String.valueOf(pnow), count); +// } +// trans.exec(); +// } +// +// public List> getCounter( +// Jedis conn, String name, int precision) +// { +// String hash = String.valueOf(precision) + ':' + name; +// Map data = conn.hgetAll("count:" + hash); +// ArrayList> results = +// new ArrayList>(); +// for (Map.Entry entry : data.entrySet()) { +// results.add(new Pair( +// Integer.parseInt(entry.getKey()), +// Integer.parseInt(entry.getValue()))); +// } +// Collections.sort(results); +// return results; +// } +// +// public List updateStats(Jedis conn, String context, String type, double value){ +// int timeout = 5000; +// String destination = "stats:" + context + ':' + type; +// String startKey = destination + ":start"; +// long end = System.currentTimeMillis() + timeout; +// while (System.currentTimeMillis() < end){ +// conn.watch(startKey); +// String hourStart = ISO_FORMAT.format(new Date()); +// +// String existing = conn.get(startKey); +// Transaction trans = conn.multi(); +// if (existing != null && COLLATOR.compare(existing, hourStart) < 0){ +// trans.rename(destination, destination + ":last"); +// trans.rename(startKey, destination + ":pstart"); +// trans.set(startKey, hourStart); +// } +// +// String tkey1 = UUID.randomUUID().toString(); +// String tkey2 = UUID.randomUUID().toString(); +// trans.zadd(tkey1, value, "min"); +// trans.zadd(tkey2, value, "max"); +// +// trans.zunionstore( +// destination, +// new ZParams().aggregate(ZParams.Aggregate.MIN), +// destination, tkey1); +// trans.zunionstore( +// destination, +// new ZParams().aggregate(ZParams.Aggregate.MAX), +// destination, tkey2); +// +// trans.del(tkey1, tkey2); +// trans.zincrby(destination, 1, "count"); +// trans.zincrby(destination, value, "sum"); +// trans.zincrby(destination, value * value, "sumsq"); +// +// List results = trans.exec(); +// if (results == null){ +// continue; +// } +// return results.subList(results.size() - 3, results.size()); +// } +// return null; +// } +// +// public Map getStats(Jedis conn, String context, String type){ +// String key = "stats:" + context + ':' + type; +// Map stats = new HashMap(); +// Set data = conn.zrangeWithScores(key, 0, -1); +// for (Tuple tuple : data){ +// stats.put(tuple.getElement(), tuple.getScore()); +// } +// stats.put("average", stats.get("sum") / stats.get("count")); +// double numerator = stats.get("sumsq") - Math.pow(stats.get("sum"), 2) / stats.get("count"); +// double count = stats.get("count"); +// stats.put("stddev", Math.pow(numerator / (count > 1 ? count - 1 : 1), .5)); +// return stats; +// } +// +// private long lastChecked; +// private boolean underMaintenance; +// public boolean isUnderMaintenance(Jedis conn) { +// if (lastChecked < System.currentTimeMillis() - 1000){ +// lastChecked = System.currentTimeMillis(); +// String flag = conn.get("is-under-maintenance"); +// underMaintenance = "yes".equals(flag); +// } +// +// return underMaintenance; +// } +// +// public void setConfig( +// Jedis conn, String type, String component, Map config) { +// Gson gson = new Gson(); +// conn.set("config:" + type + ':' + component, gson.toJson(config)); +// } +// +// private static final Map> CONFIGS = +// new HashMap>(); +// private static final Map CHECKED = new HashMap(); +// +// @SuppressWarnings("unchecked") +// public Map getConfig(Jedis conn, String type, String component) { +// int wait = 1000; +// String key = "config:" + type + ':' + component; +// +// Long lastChecked = CHECKED.get(key); +// if (lastChecked == null || lastChecked < System.currentTimeMillis() - wait){ +// CHECKED.put(key, System.currentTimeMillis()); +// +// String value = conn.get(key); +// Map config = null; +// if (value != null){ +// Gson gson = new Gson(); +// config = (Map)gson.fromJson( +// value, new TypeToken>(){}.getType()); +// }else{ +// config = new HashMap(); +// } +// +// CONFIGS.put(key, config); +// } +// +// return CONFIGS.get(key); +// } +// +// public static final Map REDIS_CONNECTIONS = +// new HashMap(); +// public Jedis redisConnection(String component){ +// Jedis configConn = REDIS_CONNECTIONS.get("config"); +// if (configConn == null){ +// configConn = new Jedis("localhost"); +// configConn.select(15); +// REDIS_CONNECTIONS.put("config", configConn); +// } +// +// String key = "config:redis:" + component; +// Map oldConfig = CONFIGS.get(key); +// Map config = getConfig(configConn, "redis", component); +// +// if (!config.equals(oldConfig)){ +// Jedis conn = new Jedis("localhost"); +// if (config.containsKey("db")){ +// conn.select(((Double)config.get("db")).intValue()); +// } +// REDIS_CONNECTIONS.put(key, conn); +// } +// +// return REDIS_CONNECTIONS.get(key); +// } +// +// public void importIpsToRedis(Jedis conn, File file) { +// FileReader reader = null; +// try{ +// reader = new FileReader(file); +// CSVParser parser = new CSVParser(reader); +// int count = 0; +// String[] line = null; +// while ((line = parser.getLine()) != null){ +// String startIp = line.length > 1 ? line[0] : ""; +// if (startIp.toLowerCase().indexOf('i') != -1){ +// continue; +// } +// int score = 0; +// if (startIp.indexOf('.') != -1){ +// score = ipToScore(startIp); +// }else{ +// try{ +// score = Integer.parseInt(startIp, 10); +// }catch(NumberFormatException nfe){ +// continue; +// } +// } +// +// String cityId = line[2] + '_' + count; +// conn.zadd("ip2cityid:", score, cityId); +// count++; +// } +// }catch(Exception e){ +// throw new RuntimeException(e); +// }finally{ +// try{ +// reader.close(); +// }catch(Exception e){ +// // ignore +// } +// } +// } +// +// public void importCitiesToRedis(Jedis conn, File file) { +// Gson gson = new Gson(); +// FileReader reader = null; +// try{ +// reader = new FileReader(file); +// CSVParser parser = new CSVParser(reader); +// String[] line = null; +// while ((line = parser.getLine()) != null){ +// if (line.length < 4 || !Character.isDigit(line[0].charAt(0))){ +// continue; +// } +// String cityId = line[0]; +// String country = line[1]; +// String region = line[2]; +// String city = line[3]; +// String json = gson.toJson(new String[]{city, region, country}); +// conn.hset("cityid2city:", cityId, json); +// } +// }catch(Exception e){ +// throw new RuntimeException(e); +// }finally{ +// try{ +// reader.close(); +// }catch(Exception e){ +// // ignore +// } +// } +// } +// +// public int ipToScore(String ipAddress) { +// int score = 0; +// for (String v : ipAddress.split("\\.")){ +// score = score * 256 + Integer.parseInt(v, 10); +// } +// return score; +// } +// +// public String randomOctet(int max) { +// return String.valueOf((int)(Math.random() * max)); +// } +// +// public String[] findCityByIp(Jedis conn, String ipAddress) { +// int score = ipToScore(ipAddress); +// Set results = conn.zrevrangeByScore("ip2cityid:", score, 0, 0, 1); +// if (results.size() == 0) { +// return null; +// } +// +// String cityId = results.iterator().next(); +// cityId = cityId.substring(0, cityId.indexOf('_')); +// return new Gson().fromJson(conn.hget("cityid2city:", cityId), String[].class); +// } +// +// public class CleanCountersThread +// extends Thread +// { +// private Jedis conn; +// private int sampleCount = 100; +// private boolean quit; +// private long timeOffset; // used to mimic a time in the future. +// +// public CleanCountersThread(int sampleCount, long timeOffset){ +// this.conn = new Jedis("localhost"); +// this.conn.select(15); +// this.sampleCount = sampleCount; +// this.timeOffset = timeOffset; +// } +// +// public void quit(){ +// quit = true; +// } +// +// public void run(){ +// int passes = 0; +// while (!quit){ +// long start = System.currentTimeMillis() + timeOffset; +// int index = 0; +// while (index < conn.zcard("known:")){ +// Set hashSet = conn.zrange("known:", index, index); +// index++; +// if (hashSet.size() == 0) { +// break; +// } +// String hash = hashSet.iterator().next(); +// int prec = Integer.parseInt(hash.substring(0, hash.indexOf(':'))); +// int bprec = (int)Math.floor(prec / 60); +// if (bprec == 0){ +// bprec = 1; +// } +// if ((passes % bprec) != 0){ +// continue; +// } +// +// String hkey = "count:" + hash; +// String cutoff = String.valueOf( +// ((System.currentTimeMillis() + timeOffset) / 1000) - sampleCount * prec); +// ArrayList samples = new ArrayList(conn.hkeys(hkey)); +// Collections.sort(samples); +// int remove = bisectRight(samples, cutoff); +// +// if (remove != 0){ +// conn.hdel(hkey, samples.subList(0, remove).toArray(new String[0])); +// if (remove == samples.size()){ +// conn.watch(hkey); +// if (conn.hlen(hkey) == 0) { +// Transaction trans = conn.multi(); +// trans.zrem("known:", hash); +// trans.exec(); +// index--; +// }else{ +// conn.unwatch(); +// } +// } +// } +// } +// +// passes++; +// long duration = Math.min( +// (System.currentTimeMillis() + timeOffset) - start + 1000, 60000); +// try { +// sleep(Math.max(60000 - duration, 1000)); +// }catch(InterruptedException ie){ +// Thread.currentThread().interrupt(); +// } +// } +// } +// +// // mimic python's bisect.bisect_right +// public int bisectRight(List values, String key) { +// int index = Collections.binarySearch(values, key); +// return index < 0 ? Math.abs(index) - 1 : index + 1; +// } +// } +// +// public class AccessTimer { +// private Jedis conn; +// private long start; +// +// public AccessTimer(Jedis conn){ +// this.conn = conn; +// } +// +// public void start(){ +// start = System.currentTimeMillis(); +// } +// +// public void stop(String context){ +// long delta = System.currentTimeMillis() - start; +// List stats = updateStats(conn, context, "AccessTime", delta / 1000.0); +// double average = (Double)stats.get(1) / (Double)stats.get(0); +// +// Transaction trans = conn.multi(); +// trans.zadd("slowest:AccessTime", average, context); +// trans.zremrangeByRank("slowest:AccessTime", 0, -101); +// trans.exec(); +// } +// } +//} diff --git a/codes/redis/redis-in-action/src/main/java/Chapter06.java b/codes/redis/redis-in-action/src/main/java/Chapter06.java new file mode 100644 index 0000000..c50dbd7 --- /dev/null +++ b/codes/redis/redis-in-action/src/main/java/Chapter06.java @@ -0,0 +1,928 @@ +import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; +import redis.clients.jedis.Jedis; +import redis.clients.jedis.Transaction; +import redis.clients.jedis.Tuple; +import redis.clients.jedis.ZParams; + +import java.io.*; +import java.util.*; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; + +public class Chapter06 { + public static final void main(String[] args) + throws Exception + { + new Chapter06().run(); + } + + public void run() + throws InterruptedException, IOException + { + Jedis conn = new Jedis("localhost"); + conn.select(15); + + testAddUpdateContact(conn); + testAddressBookAutocomplete(conn); + testDistributedLocking(conn); + testCountingSemaphore(conn); + testDelayedTasks(conn); + testMultiRecipientMessaging(conn); + testFileDistribution(conn); + } + + public void testAddUpdateContact(Jedis conn) { + System.out.println("\n----- testAddUpdateContact -----"); + conn.del("recent:user"); + + System.out.println("Let's add a few contacts..."); + for (int i = 0; i < 10; i++){ + addUpdateContact(conn, "user", "contact-" + ((int)Math.floor(i / 3)) + '-' + i); + } + System.out.println("Current recently contacted contacts"); + List contacts = conn.lrange("recent:user", 0, -1); + for(String contact : contacts){ + System.out.println(" " + contact); + } + assert contacts.size() >= 10; + System.out.println(); + + System.out.println("Let's pull one of the older ones up to the front"); + addUpdateContact(conn, "user", "contact-1-4"); + contacts = conn.lrange("recent:user", 0, 2); + System.out.println("New top-3 contacts:"); + for(String contact : contacts){ + System.out.println(" " + contact); + } + assert "contact-1-4".equals(contacts.get(0)); + System.out.println(); + + System.out.println("Let's remove a contact..."); + removeContact(conn, "user", "contact-2-6"); + contacts = conn.lrange("recent:user", 0, -1); + System.out.println("New contacts:"); + for(String contact : contacts){ + System.out.println(" " + contact); + } + assert contacts.size() >= 9; + System.out.println(); + + System.out.println("And let's finally autocomplete on "); + List all = conn.lrange("recent:user", 0, -1); + contacts = fetchAutocompleteList(conn, "user", "c"); + assert all.equals(contacts); + List equiv = new ArrayList(); + for (String contact : all){ + if (contact.startsWith("contact-2-")){ + equiv.add(contact); + } + } + contacts = fetchAutocompleteList(conn, "user", "contact-2-"); + Collections.sort(equiv); + Collections.sort(contacts); + assert equiv.equals(contacts); + conn.del("recent:user"); + } + + public void testAddressBookAutocomplete(Jedis conn) { + System.out.println("\n----- testAddressBookAutocomplete -----"); + conn.del("members:test"); + System.out.println("the start/end range of 'abc' is: " + + Arrays.toString(findPrefixRange("abc"))); + System.out.println(); + + System.out.println("Let's add a few people to the guild"); + for (String name : new String[]{"jeff", "jenny", "jack", "jennifer"}){ + joinGuild(conn, "test", name); + } + System.out.println(); + System.out.println("now let's try to find users with names starting with 'je':"); + Set r = autocompleteOnPrefix(conn, "test", "je"); + System.out.println(r); + assert r.size() == 3; + + System.out.println("jeff just left to join a different guild..."); + leaveGuild(conn, "test", "jeff"); + r = autocompleteOnPrefix(conn, "test", "je"); + System.out.println(r); + assert r.size() == 2; + conn.del("members:test"); + } + + public void testDistributedLocking(Jedis conn) + throws InterruptedException + { + System.out.println("\n----- testDistributedLocking -----"); + conn.del("lock:testlock"); + System.out.println("Getting an initial lock..."); + assert acquireLockWithTimeout(conn, "testlock", 1000, 1000) != null; + System.out.println("Got it!"); + System.out.println("Trying to get it again without releasing the first one..."); + assert acquireLockWithTimeout(conn, "testlock", 10, 1000) == null; + System.out.println("Failed to get it!"); + System.out.println(); + + System.out.println("Waiting for the lock to timeout..."); + Thread.sleep(2000); + System.out.println("Getting the lock again..."); + String lockId = acquireLockWithTimeout(conn, "testlock", 1000, 1000); + assert lockId != null; + System.out.println("Got it!"); + System.out.println("Releasing the lock..."); + assert releaseLock(conn, "testlock", lockId); + System.out.println("Released it..."); + System.out.println(); + + System.out.println("Acquiring it again..."); + assert acquireLockWithTimeout(conn, "testlock", 1000, 1000) != null; + System.out.println("Got it!"); + conn.del("lock:testlock"); + } + + public void testCountingSemaphore(Jedis conn) + throws InterruptedException + { + System.out.println("\n----- testCountingSemaphore -----"); + conn.del("testsem", "testsem:owner", "testsem:counter"); + System.out.println("Getting 3 initial semaphores with a limit of 3..."); + for (int i = 0; i < 3; i++) { + assert acquireFairSemaphore(conn, "testsem", 3, 1000) != null; + } + System.out.println("Done!"); + System.out.println("Getting one more that should fail..."); + assert acquireFairSemaphore(conn, "testsem", 3, 1000) == null; + System.out.println("Couldn't get it!"); + System.out.println(); + + System.out.println("Lets's wait for some of them to time out"); + Thread.sleep(2000); + System.out.println("Can we get one?"); + String id = acquireFairSemaphore(conn, "testsem", 3, 1000); + assert id != null; + System.out.println("Got one!"); + System.out.println("Let's release it..."); + assert releaseFairSemaphore(conn, "testsem", id); + System.out.println("Released!"); + System.out.println(); + System.out.println("And let's make sure we can get 3 more!"); + for (int i = 0; i < 3; i++) { + assert acquireFairSemaphore(conn, "testsem", 3, 1000) != null; + } + System.out.println("We got them!"); + conn.del("testsem", "testsem:owner", "testsem:counter"); + } + + public void testDelayedTasks(Jedis conn) + throws InterruptedException + { + System.out.println("\n----- testDelayedTasks -----"); + conn.del("queue:tqueue", "delayed:"); + System.out.println("Let's start some regular and delayed tasks..."); + for (long delay : new long[]{0, 500, 0, 1500}){ + assert executeLater(conn, "tqueue", "testfn", new ArrayList(), delay) != null; + } + long r = conn.llen("queue:tqueue"); + System.out.println("How many non-delayed tasks are there (should be 2)? " + r); + assert r == 2; + System.out.println(); + + System.out.println("Let's start up a thread to bring those delayed tasks back..."); + PollQueueThread thread = new PollQueueThread(); + thread.start(); + System.out.println("Started."); + System.out.println("Let's wait for those tasks to be prepared..."); + Thread.sleep(2000); + thread.quit(); + thread.join(); + r = conn.llen("queue:tqueue"); + System.out.println("Waiting is over, how many tasks do we have (should be 4)? " + r); + assert r == 4; + conn.del("queue:tqueue", "delayed:"); + } + + public void testMultiRecipientMessaging(Jedis conn) { + System.out.println("\n----- testMultiRecipientMessaging -----"); + conn.del("ids:chat:", "msgs:1", "ids:1", "seen:joe", "seen:jeff", "seen:jenny"); + + System.out.println("Let's create a new chat session with some recipients..."); + Set recipients = new HashSet(); + recipients.add("jeff"); + recipients.add("jenny"); + String chatId = createChat(conn, "joe", recipients, "message 1"); + System.out.println("Now let's send a few messages..."); + for (int i = 2; i < 5; i++){ + sendMessage(conn, chatId, "joe", "message " + i); + } + System.out.println(); + + System.out.println("And let's get the messages that are waiting for jeff and jenny..."); + List r1 = fetchPendingMessages(conn, "jeff"); + List r2 = fetchPendingMessages(conn, "jenny"); + System.out.println("They are the same? " + r1.equals(r2)); + assert r1.equals(r2); + System.out.println("Those messages are:"); + for(ChatMessages chat : r1){ + System.out.println(" chatId: " + chat.chatId); + System.out.println(" messages:"); + for(Map message : chat.messages){ + System.out.println(" " + message); + } + } + + conn.del("ids:chat:", "msgs:1", "ids:1", "seen:joe", "seen:jeff", "seen:jenny"); + } + + public void testFileDistribution(Jedis conn) + throws InterruptedException, IOException + { + System.out.println("\n----- testFileDistribution -----"); + String[] keys = conn.keys("test:*").toArray(new String[0]); + if (keys.length > 0){ + conn.del(keys); + } + conn.del( + "msgs:test:", + "seen:0", + "seen:source", + "ids:test:", + "chat:test:"); + + System.out.println("Creating some temporary 'log' files..."); + File f1 = File.createTempFile("temp_redis_1_", ".txt"); + f1.deleteOnExit(); + Writer writer = new FileWriter(f1); + writer.write("one line\n"); + writer.close(); + + File f2 = File.createTempFile("temp_redis_2_", ".txt"); + f2.deleteOnExit(); + writer = new FileWriter(f2); + for (int i = 0; i < 100; i++){ + writer.write("many lines " + i + '\n'); + } + writer.close(); + + File f3 = File.createTempFile("temp_redis_3_", ".txt.gz"); + f3.deleteOnExit(); + writer = new OutputStreamWriter( + new GZIPOutputStream( + new FileOutputStream(f3))); + Random random = new Random(); + for (int i = 0; i < 1000; i++){ + writer.write("random line " + Long.toHexString(random.nextLong()) + '\n'); + } + writer.close(); + + long size = f3.length(); + System.out.println("Done."); + System.out.println(); + System.out.println("Starting up a thread to copy logs to redis..."); + File path = f1.getParentFile(); + CopyLogsThread thread = new CopyLogsThread(path, "test:", 1, size); + thread.start(); + + System.out.println("Let's pause to let some logs get copied to Redis..."); + Thread.sleep(250); + System.out.println(); + System.out.println("Okay, the logs should be ready. Let's process them!"); + + System.out.println("Files should have 1, 100, and 1000 lines"); + TestCallback callback = new TestCallback(); + processLogsFromRedis(conn, "0", callback); + System.out.println(Arrays.toString(callback.counts.toArray(new Integer[0]))); + assert callback.counts.get(0) == 1; + assert callback.counts.get(1) == 100; + assert callback.counts.get(2) == 1000; + + System.out.println(); + System.out.println("Let's wait for the copy thread to finish cleaning up..."); + thread.join(); + System.out.println("Done cleaning out Redis!"); + + keys = conn.keys("test:*").toArray(new String[0]); + if (keys.length > 0){ + conn.del(keys); + } + conn.del( + "msgs:test:", + "seen:0", + "seen:source", + "ids:test:", + "chat:test:"); + } + + public class TestCallback + implements Callback + { + private int index; + public List counts = new ArrayList(); + + public void callback(String line){ + if (line == null){ + index++; + return; + } + while (counts.size() == index){ + counts.add(0); + } + counts.set(index, counts.get(index) + 1); + } + } + + public void addUpdateContact(Jedis conn, String user, String contact) { + String acList = "recent:" + user; + Transaction trans = conn.multi(); + trans.lrem(acList, 0, contact); + trans.lpush(acList, contact); + trans.ltrim(acList, 0, 99); + trans.exec(); + } + + public void removeContact(Jedis conn, String user, String contact) { + conn.lrem("recent:" + user, 0, contact); + } + + public List fetchAutocompleteList(Jedis conn, String user, String prefix) { + List candidates = conn.lrange("recent:" + user, 0, -1); + List matches = new ArrayList(); + for (String candidate : candidates) { + if (candidate.toLowerCase().startsWith(prefix)){ + matches.add(candidate); + } + } + return matches; + } + + private static final String VALID_CHARACTERS = "`abcdefghijklmnopqrstuvwxyz{"; + public String[] findPrefixRange(String prefix) { + int posn = VALID_CHARACTERS.indexOf(prefix.charAt(prefix.length() - 1)); + char suffix = VALID_CHARACTERS.charAt(posn > 0 ? posn - 1 : 0); + String start = prefix.substring(0, prefix.length() - 1) + suffix + '{'; + String end = prefix + '{'; + return new String[]{start, end}; + } + + public void joinGuild(Jedis conn, String guild, String user) { + conn.zadd("members:" + guild, 0, user); + } + + public void leaveGuild(Jedis conn, String guild, String user) { + conn.zrem("members:" + guild, user); + } + + @SuppressWarnings("unchecked") + public Set autocompleteOnPrefix(Jedis conn, String guild, String prefix) { + String[] range = findPrefixRange(prefix); + String start = range[0]; + String end = range[1]; + String identifier = UUID.randomUUID().toString(); + start += identifier; + end += identifier; + String zsetName = "members:" + guild; + + conn.zadd(zsetName, 0, start); + conn.zadd(zsetName, 0, end); + + Set items = null; + while (true){ + conn.watch(zsetName); + int sindex = conn.zrank(zsetName, start).intValue(); + int eindex = conn.zrank(zsetName, end).intValue(); + int erange = Math.min(sindex + 9, eindex - 2); + + Transaction trans = conn.multi(); + trans.zrem(zsetName, start); + trans.zrem(zsetName, end); + trans.zrange(zsetName, sindex, erange); + List results = trans.exec(); + if (results != null){ + items = (Set)results.get(results.size() - 1); + break; + } + } + + for (Iterator iterator = items.iterator(); iterator.hasNext(); ){ + if (iterator.next().indexOf('{') != -1){ + iterator.remove(); + } + } + return items; + } + + public String acquireLock(Jedis conn, String lockName) { + return acquireLock(conn, lockName, 10000); + } + public String acquireLock(Jedis conn, String lockName, long acquireTimeout){ + String identifier = UUID.randomUUID().toString(); + + long end = System.currentTimeMillis() + acquireTimeout; + while (System.currentTimeMillis() < end){ + if (conn.setnx("lock:" + lockName, identifier) == 1){ + return identifier; + } + + try { + Thread.sleep(1); + }catch(InterruptedException ie){ + Thread.currentThread().interrupt(); + } + } + + return null; + } + + public String acquireLockWithTimeout( + Jedis conn, String lockName, long acquireTimeout, long lockTimeout) + { + String identifier = UUID.randomUUID().toString(); + String lockKey = "lock:" + lockName; + int lockExpire = (int)(lockTimeout / 1000); + + long end = System.currentTimeMillis() + acquireTimeout; + while (System.currentTimeMillis() < end) { + if (conn.setnx(lockKey, identifier) == 1){ + conn.expire(lockKey, lockExpire); + return identifier; + } + if (conn.ttl(lockKey) == -1) { + conn.expire(lockKey, lockExpire); + } + + try { + Thread.sleep(1); + }catch(InterruptedException ie){ + Thread.currentThread().interrupt(); + } + } + + // null indicates that the lock was not acquired + return null; + } + + public boolean releaseLock(Jedis conn, String lockName, String identifier) { + String lockKey = "lock:" + lockName; + + while (true){ + conn.watch(lockKey); + if (identifier.equals(conn.get(lockKey))){ + Transaction trans = conn.multi(); + trans.del(lockKey); + List results = trans.exec(); + if (results == null){ + continue; + } + return true; + } + + conn.unwatch(); + break; + } + + return false; + } + + public String acquireFairSemaphore( + Jedis conn, String semname, int limit, long timeout) + { + String identifier = UUID.randomUUID().toString(); + String czset = semname + ":owner"; + String ctr = semname + ":counter"; + + long now = System.currentTimeMillis(); + Transaction trans = conn.multi(); + trans.zremrangeByScore( + semname.getBytes(), + "-inf".getBytes(), + String.valueOf(now - timeout).getBytes()); + ZParams params = new ZParams(); + params.weights(1, 0); + trans.zinterstore(czset, params, czset, semname); + trans.incr(ctr); + List results = trans.exec(); + int counter = ((Long)results.get(results.size() - 1)).intValue(); + + trans = conn.multi(); + trans.zadd(semname, now, identifier); + trans.zadd(czset, counter, identifier); + trans.zrank(czset, identifier); + results = trans.exec(); + int result = ((Long)results.get(results.size() - 1)).intValue(); + if (result < limit){ + return identifier; + } + + trans = conn.multi(); + trans.zrem(semname, identifier); + trans.zrem(czset, identifier); + trans.exec(); + return null; + } + + public boolean releaseFairSemaphore( + Jedis conn, String semname, String identifier) + { + Transaction trans = conn.multi(); + trans.zrem(semname, identifier); + trans.zrem(semname + ":owner", identifier); + List results = trans.exec(); + return (Long)results.get(results.size() - 1) == 1; + } + + public String executeLater( + Jedis conn, String queue, String name, List args, long delay) + { + Gson gson = new Gson(); + String identifier = UUID.randomUUID().toString(); + String itemArgs = gson.toJson(args); + String item = gson.toJson(new String[]{identifier, queue, name, itemArgs}); + if (delay > 0){ + conn.zadd("delayed:", System.currentTimeMillis() + delay, item); + } else { + conn.rpush("queue:" + queue, item); + } + return identifier; + } + + public String createChat(Jedis conn, String sender, Set recipients, String message) { + String chatId = String.valueOf(conn.incr("ids:chat:")); + return createChat(conn, sender, recipients, message, chatId); + } + + public String createChat( + Jedis conn, String sender, Set recipients, String message, String chatId) + { + recipients.add(sender); + + Transaction trans = conn.multi(); + for (String recipient : recipients){ + trans.zadd("chat:" + chatId, 0, recipient); + trans.zadd("seen:" + recipient, 0, chatId); + } + trans.exec(); + + return sendMessage(conn, chatId, sender, message); + } + + public String sendMessage(Jedis conn, String chatId, String sender, String message) { + String identifier = acquireLock(conn, "chat:" + chatId); + if (identifier == null){ + throw new RuntimeException("Couldn't get the lock"); + } + try { + long messageId = conn.incr("ids:" + chatId); + HashMap values = new HashMap(); + values.put("id", messageId); + values.put("ts", System.currentTimeMillis()); + values.put("sender", sender); + values.put("message", message); + String packed = new Gson().toJson(values); + conn.zadd("msgs:" + chatId, messageId, packed); + }finally{ + releaseLock(conn, "chat:" + chatId, identifier); + } + return chatId; + } + + @SuppressWarnings("unchecked") + public List fetchPendingMessages(Jedis conn, String recipient) { + Set seenSet = conn.zrangeWithScores("seen:" + recipient, 0, -1); + List seenList = new ArrayList(seenSet); + + Transaction trans = conn.multi(); + for (Tuple tuple : seenList){ + String chatId = tuple.getElement(); + int seenId = (int)tuple.getScore(); + trans.zrangeByScore("msgs:" + chatId, String.valueOf(seenId + 1), "inf"); + } + List results = trans.exec(); + + Gson gson = new Gson(); + Iterator seenIterator = seenList.iterator(); + Iterator resultsIterator = results.iterator(); + + List chatMessages = new ArrayList(); + List seenUpdates = new ArrayList(); + List msgRemoves = new ArrayList(); + while (seenIterator.hasNext()){ + Tuple seen = seenIterator.next(); + Set messageStrings = (Set)resultsIterator.next(); + if (messageStrings.size() == 0){ + continue; + } + + int seenId = 0; + String chatId = seen.getElement(); + List> messages = new ArrayList>(); + for (String messageJson : messageStrings){ + Map message = (Map)gson.fromJson( + messageJson, new TypeToken>(){}.getType()); + int messageId = ((Double)message.get("id")).intValue(); + if (messageId > seenId){ + seenId = messageId; + } + message.put("id", messageId); + messages.add(message); + } + + conn.zadd("chat:" + chatId, seenId, recipient); + seenUpdates.add(new Object[]{"seen:" + recipient, seenId, chatId}); + + Set minIdSet = conn.zrangeWithScores("chat:" + chatId, 0, 0); + if (minIdSet.size() > 0){ + msgRemoves.add(new Object[]{ + "msgs:" + chatId, minIdSet.iterator().next().getScore()}); + } + chatMessages.add(new ChatMessages(chatId, messages)); + } + + trans = conn.multi(); + for (Object[] seenUpdate : seenUpdates){ + trans.zadd( + (String)seenUpdate[0], + (Integer)seenUpdate[1], + (String)seenUpdate[2]); + } + for (Object[] msgRemove : msgRemoves){ + trans.zremrangeByScore( + (String)msgRemove[0], 0, ((Double)msgRemove[1]).intValue()); + } + trans.exec(); + + return chatMessages; + } + + public void processLogsFromRedis(Jedis conn, String id, Callback callback) + throws InterruptedException, IOException + { + while (true){ + List fdata = fetchPendingMessages(conn, id); + + for (ChatMessages messages : fdata){ + for (Map message : messages.messages){ + String logFile = (String)message.get("message"); + + if (":done".equals(logFile)){ + return; + } + if (logFile == null || logFile.length() == 0){ + continue; + } + + InputStream in = new RedisInputStream( + conn, messages.chatId + logFile); + if (logFile.endsWith(".gz")){ + in = new GZIPInputStream(in); + } + + BufferedReader reader = new BufferedReader(new InputStreamReader(in)); + try{ + String line = null; + while ((line = reader.readLine()) != null){ + callback.callback(line); + } + callback.callback(null); + }finally{ + reader.close(); + } + + conn.incr(messages.chatId + logFile + ":done"); + } + } + + if (fdata.size() == 0){ + Thread.sleep(100); + } + } + } + + public class RedisInputStream + extends InputStream + { + private Jedis conn; + private String key; + private int pos; + + public RedisInputStream(Jedis conn, String key){ + this.conn = conn; + this.key = key; + } + + @Override + public int available() + throws IOException + { + long len = conn.strlen(key); + return (int)(len - pos); + } + + @Override + public int read() + throws IOException + { + byte[] block = conn.substr(key.getBytes(), pos, pos); + if (block == null || block.length == 0){ + return -1; + } + pos++; + return (int)(block[0] & 0xff); + } + + @Override + public int read(byte[] buf, int off, int len) + throws IOException + { + byte[] block = conn.substr(key.getBytes(), pos, pos + (len - off - 1)); + if (block == null || block.length == 0){ + return -1; + } + System.arraycopy(block, 0, buf, off, block.length); + pos += block.length; + return block.length; + } + + @Override + public void close() { + // no-op + } + } + + public interface Callback { + void callback(String line); + } + + public class ChatMessages + { + public String chatId; + public List> messages; + + public ChatMessages(String chatId, List> messages){ + this.chatId = chatId; + this.messages = messages; + } + + public boolean equals(Object other){ + if (!(other instanceof ChatMessages)){ + return false; + } + ChatMessages otherCm = (ChatMessages)other; + return chatId.equals(otherCm.chatId) && + messages.equals(otherCm.messages); + } + } + + public class PollQueueThread + extends Thread + { + private Jedis conn; + private boolean quit; + private Gson gson = new Gson(); + + public PollQueueThread(){ + this.conn = new Jedis("localhost"); + this.conn.select(15); + } + + public void quit() { + quit = true; + } + + public void run() { + while (!quit){ + Set items = conn.zrangeWithScores("delayed:", 0, 0); + Tuple item = items.size() > 0 ? items.iterator().next() : null; + if (item == null || item.getScore() > System.currentTimeMillis()) { + try{ + sleep(10); + }catch(InterruptedException ie){ + Thread.interrupted(); + } + continue; + } + + String json = item.getElement(); + String[] values = gson.fromJson(json, String[].class); + String identifier = values[0]; + String queue = values[1]; + + String locked = acquireLock(conn, identifier); + if (locked == null){ + continue; + } + + if (conn.zrem("delayed:", json) == 1){ + conn.rpush("queue:" + queue, json); + } + + releaseLock(conn, identifier, locked); + } + } + } + + public class CopyLogsThread + extends Thread + { + private Jedis conn; + private File path; + private String channel; + private int count; + private long limit; + + public CopyLogsThread(File path, String channel, int count, long limit) { + this.conn = new Jedis("localhost"); + this.conn.select(15); + this.path = path; + this.channel = channel; + this.count = count; + this.limit = limit; + } + + public void run() { + Deque waiting = new ArrayDeque(); + long bytesInRedis = 0; + + Set recipients= new HashSet(); + for (int i = 0; i < count; i++){ + recipients.add(String.valueOf(i)); + } + createChat(conn, "source", recipients, "", channel); + File[] logFiles = path.listFiles(new FilenameFilter(){ + public boolean accept(File dir, String name){ + return name.startsWith("temp_redis"); + } + }); + Arrays.sort(logFiles); + for (File logFile : logFiles){ + long fsize = logFile.length(); + while ((bytesInRedis + fsize) > limit){ + long cleaned = clean(waiting, count); + if (cleaned != 0){ + bytesInRedis -= cleaned; + }else{ + try{ + sleep(250); + }catch(InterruptedException ie){ + Thread.interrupted(); + } + } + } + + BufferedInputStream in = null; + try{ + in = new BufferedInputStream(new FileInputStream(logFile)); + int read = 0; + byte[] buffer = new byte[8192]; + while ((read = in.read(buffer, 0, buffer.length)) != -1){ + if (buffer.length != read){ + byte[] bytes = new byte[read]; + System.arraycopy(buffer, 0, bytes, 0, read); + conn.append((channel + logFile).getBytes(), bytes); + }else{ + conn.append((channel + logFile).getBytes(), buffer); + } + } + }catch(IOException ioe){ + ioe.printStackTrace(); + throw new RuntimeException(ioe); + }finally{ + try{ + in.close(); + }catch(Exception ignore){ + } + } + + sendMessage(conn, channel, "source", logFile.toString()); + + bytesInRedis += fsize; + waiting.addLast(logFile); + } + + sendMessage(conn, channel, "source", ":done"); + + while (waiting.size() > 0){ + long cleaned = clean(waiting, count); + if (cleaned != 0){ + bytesInRedis -= cleaned; + }else{ + try{ + sleep(250); + }catch(InterruptedException ie){ + Thread.interrupted(); + } + } + } + } + + private long clean(Deque waiting, int count) { + if (waiting.size() == 0){ + return 0; + } + File w0 = waiting.getFirst(); + if (String.valueOf(count).equals(conn.get(channel + w0 + ":done"))){ + conn.del(channel + w0, channel + w0 + ":done"); + return waiting.removeFirst().length(); + } + return 0; + } + } +} diff --git a/codes/redis/redis-in-action/src/main/java/Chapter07.java b/codes/redis/redis-in-action/src/main/java/Chapter07.java new file mode 100644 index 0000000..fcf30fd --- /dev/null +++ b/codes/redis/redis-in-action/src/main/java/Chapter07.java @@ -0,0 +1,955 @@ +//import org.javatuples.Pair; +//import redis.clients.jedis.*; +// +//import java.util.*; +//import java.util.regex.Matcher; +//import java.util.regex.Pattern; +// +//public class Chapter07 { +// private static final Pattern QUERY_RE = Pattern.compile("[+-]?[a-z']{2,}"); +// private static final Pattern WORDS_RE = Pattern.compile("[a-z']{2,}"); +// private static final Set STOP_WORDS = new HashSet(); +// static { +// for (String word : +// ("able about across after all almost also am among " + +// "an and any are as at be because been but by can " + +// "cannot could dear did do does either else ever " + +// "every for from get got had has have he her hers " + +// "him his how however if in into is it its just " + +// "least let like likely may me might most must my " + +// "neither no nor not of off often on only or other " + +// "our own rather said say says she should since so " + +// "some than that the their them then there these " + +// "they this tis to too twas us wants was we were " + +// "what when where which while who whom why will " + +// "with would yet you your").split(" ")) +// { +// STOP_WORDS.add(word); +// } +// } +// +// +// private static String CONTENT = +// "this is some random content, look at how it is indexed."; +// +// +// public static final void main(String[] args) { +// new Chapter07().run(); +// } +// +// public void run(){ +// Jedis conn = new Jedis("localhost"); +// conn.select(15); +// conn.flushDB(); +// +// testIndexDocument(conn); +// testSetOperations(conn); +// testParseQuery(conn); +// testParseAndSearch(conn); +// testSearchWithSort(conn); +// testSearchWithZsort(conn); +// conn.flushDB(); +// +// testStringToScore(conn); +// testIndexAndTargetAds(conn); +// testIsQualifiedForJob(conn); +// testIndexAndFindJobs(conn); +// } +// +// public void testIndexDocument(Jedis conn) { +// System.out.println("\n----- testIndexDocument -----"); +// +// System.out.println("We're tokenizing some content..."); +// Set tokens = tokenize(CONTENT); +// System.out.println("Those tokens are: " + +// Arrays.toString(tokens.toArray())); +// assert tokens.size() > 0; +// +// System.out.println("And now we are indexing that content..."); +// int count = indexDocument(conn, "test", CONTENT); +// assert count == tokens.size(); +// Set test = new HashSet(); +// test.add("test"); +// for (String t : tokens){ +// Set members = conn.smembers("idx:" + t); +// assert test.equals(members); +// } +// } +// +// public void testSetOperations(Jedis conn) { +// System.out.println("\n----- testSetOperations -----"); +// indexDocument(conn, "test", CONTENT); +// +// Set test = new HashSet(); +// test.add("test"); +// +// Transaction trans = conn.multi(); +// String id = intersect(trans, 30, "content", "indexed"); +// trans.exec(); +// assert test.equals(conn.smembers("idx:" + id)); +// +// trans = conn.multi(); +// id = intersect(trans, 30, "content", "ignored"); +// trans.exec(); +// assert conn.smembers("idx:" + id).isEmpty(); +// +// trans = conn.multi(); +// id = union(trans, 30, "content", "ignored"); +// trans.exec(); +// assert test.equals(conn.smembers("idx:" + id)); +// +// trans = conn.multi(); +// id = difference(trans, 30, "content", "ignored"); +// trans.exec(); +// assert test.equals(conn.smembers("idx:" + id)); +// +// trans = conn.multi(); +// id = difference(trans, 30, "content", "indexed"); +// trans.exec(); +// assert conn.smembers("idx:" + id).isEmpty(); +// } +// +// public void testParseQuery(Jedis conn) { +// System.out.println("\n----- testParseQuery -----"); +// String queryString = "test query without stopwords"; +// Query query = parse(queryString); +// String[] words = queryString.split(" "); +// for (int i = 0; i < words.length; i++){ +// List word = new ArrayList(); +// word.add(words[i]); +// assert word.equals(query.all.get(i)); +// } +// assert query.unwanted.isEmpty(); +// +// queryString = "test +query without -stopwords"; +// query = parse(queryString); +// assert "test".equals(query.all.get(0).get(0)); +// assert "query".equals(query.all.get(0).get(1)); +// assert "without".equals(query.all.get(1).get(0)); +// assert "stopwords".equals(query.unwanted.toArray()[0]); +// } +// +// public void testParseAndSearch(Jedis conn) { +// System.out.println("\n----- testParseAndSearch -----"); +// System.out.println("And now we are testing search..."); +// indexDocument(conn, "test", CONTENT); +// +// Set test = new HashSet(); +// test.add("test"); +// +// String id = parseAndSearch(conn, "content", 30); +// assert test.equals(conn.smembers("idx:" + id)); +// +// id = parseAndSearch(conn, "content indexed random", 30); +// assert test.equals(conn.smembers("idx:" + id)); +// +// id = parseAndSearch(conn, "content +indexed random", 30); +// assert test.equals(conn.smembers("idx:" + id)); +// +// id = parseAndSearch(conn, "content indexed +random", 30); +// assert test.equals(conn.smembers("idx:" + id)); +// +// id = parseAndSearch(conn, "content indexed -random", 30); +// assert conn.smembers("idx:" + id).isEmpty(); +// +// id = parseAndSearch(conn, "content indexed +random", 30); +// assert test.equals(conn.smembers("idx:" + id)); +// +// System.out.println("Which passed!"); +// } +// +// public void testSearchWithSort(Jedis conn) { +// System.out.println("\n----- testSearchWithSort -----"); +// System.out.println("And now let's test searching with sorting..."); +// +// indexDocument(conn, "test", CONTENT); +// indexDocument(conn, "test2", CONTENT); +// +// HashMap values = new HashMap(); +// values.put("updated", "12345"); +// values.put("id", "10"); +// conn.hmset("kb:doc:test", values); +// +// values.put("updated", "54321"); +// values.put("id", "1"); +// conn.hmset("kb:doc:test2", values); +// +// SearchResult result = searchAndSort(conn, "content", "-updated"); +// assert "test2".equals(result.results.get(0)); +// assert "test".equals(result.results.get(1)); +// +// result = searchAndSort(conn, "content", "-id"); +// assert "test".equals(result.results.get(0)); +// assert "test2".equals(result.results.get(1)); +// +// System.out.println("Which passed!"); +// } +// +// public void testSearchWithZsort(Jedis conn) { +// System.out.println("\n----- testSearchWithZsort -----"); +// System.out.println("And now let's test searching with sorting via zset..."); +// +// indexDocument(conn, "test", CONTENT); +// indexDocument(conn, "test2", CONTENT); +// +// conn.zadd("idx:sort:update", 12345, "test"); +// conn.zadd("idx:sort:update", 54321, "test2"); +// conn.zadd("idx:sort:votes", 10, "test"); +// conn.zadd("idx:sort:votes", 1, "test2"); +// +// Map weights = new HashMap(); +// weights.put("update", 1); +// weights.put("vote", 0); +// SearchResult result = searchAndZsort(conn, "content", false, weights); +// assert "test".equals(result.results.get(0)); +// assert "test2".equals(result.results.get(1)); +// +// weights.put("update", 0); +// weights.put("vote", 1); +// result = searchAndZsort(conn, "content", false, weights); +// assert "test2".equals(result.results.get(0)); +// assert "test".equals(result.results.get(1)); +// System.out.println("Which passed!"); +// } +// +// public void testStringToScore(Jedis conn) { +// System.out.println("\n----- testStringToScore -----"); +// +// String[] words = "these are some words that will be sorted".split(" "); +// +// List pairs = new ArrayList(); +// for (String word : words) { +// pairs.add(new WordScore(word, stringToScore(word))); +// } +// List pairs2 = new ArrayList(pairs); +// Collections.sort(pairs); +// Collections.sort(pairs2, new Comparator(){ +// public int compare(WordScore o1, WordScore o2){ +// long diff = o1.score - o2.score; +// return diff < 0 ? -1 : diff > 0 ? 1 : 0; +// } +// }); +// assert pairs.equals(pairs2); +// +// Map lower = new HashMap(); +// lower.put(-1, -1); +// int start = (int)'a'; +// int end = (int)'z'; +// for (int i = start ; i <= end; i++){ +// lower.put(i, i - start); +// } +// +// words = "these are some words that will be sorted".split(" "); +// pairs = new ArrayList(); +// for (String word : words) { +// pairs.add(new WordScore(word, stringToScoreGeneric(word, lower))); +// } +// pairs2 = new ArrayList(pairs); +// Collections.sort(pairs); +// Collections.sort(pairs2, new Comparator(){ +// public int compare(WordScore o1, WordScore o2){ +// long diff = o1.score - o2.score; +// return diff < 0 ? -1 : diff > 0 ? 1 : 0; +// } +// }); +// assert pairs.equals(pairs2); +// +// Map values = new HashMap(); +// values.put("test", "value"); +// values.put("test2", "other"); +// zaddString(conn, "key", values); +// assert conn.zscore("key", "test") == stringToScore("value"); +// assert conn.zscore("key", "test2") == stringToScore("other"); +// } +// +// public void testIndexAndTargetAds(Jedis conn) { +// System.out.println("\n----- testIndexAndTargetAds -----"); +// indexAd(conn, "1", new String[]{"USA", "CA"}, CONTENT, Ecpm.CPC, .25); +// indexAd(conn, "2", new String[]{"USA", "VA"}, CONTENT + " wooooo", Ecpm.CPC, .125); +// +// String[] usa = new String[]{"USA"}; +// for (int i = 0; i < 100; i++) { +// targetAds(conn, usa, CONTENT); +// } +// Pair result = targetAds(conn, usa, CONTENT); +// long targetId = result.getValue0(); +// String adId = result.getValue1(); +// assert "1".equals(result.getValue1()); +// +// result = targetAds(conn, new String[]{"VA"}, "wooooo"); +// assert "2".equals(result.getValue1()); +// +// Iterator range = conn.zrangeWithScores("idx:ad:value:", 0, -1).iterator(); +// assert new Tuple("2", 0.125).equals(range.next()); +// assert new Tuple("1", 0.25).equals(range.next()); +// +// range = conn.zrangeWithScores("ad:base_value:", 0, -1).iterator(); +// assert new Tuple("2", 0.125).equals(range.next()); +// assert new Tuple("1", 0.25).equals(range.next()); +// +// recordClick(conn, targetId, adId, false); +// +// range = conn.zrangeWithScores("idx:ad:value:", 0, -1).iterator(); +// assert new Tuple("2", 0.125).equals(range.next()); +// assert new Tuple("1", 2.5).equals(range.next()); +// +// range = conn.zrangeWithScores("ad:base_value:", 0, -1).iterator(); +// assert new Tuple("2", 0.125).equals(range.next()); +// assert new Tuple("1", 0.25).equals(range.next()); +// } +// +// public void testIsQualifiedForJob(Jedis conn) { +// System.out.println("\n----- testIsQualifiedForJob -----"); +// addJob(conn, "test", "q1", "q2", "q3"); +// assert isQualified(conn, "test", "q1", "q3", "q2"); +// assert !isQualified(conn, "test", "q1", "q2"); +// } +// +// public void testIndexAndFindJobs(Jedis conn) { +// System.out.println("\n----- testIndexAndFindJobs -----"); +// indexJob(conn, "test1", "q1", "q2", "q3"); +// indexJob(conn, "test2", "q1", "q3", "q4"); +// indexJob(conn, "test3", "q1", "q3", "q5"); +// +// assert findJobs(conn, "q1").size() == 0; +// +// Iterator result = findJobs(conn, "q1", "q3", "q4").iterator(); +// assert "test2".equals(result.next()); +// +// result = findJobs(conn, "q1", "q3", "q5").iterator(); +// assert "test3".equals(result.next()); +// +// result = findJobs(conn, "q1", "q2", "q3", "q4", "q5").iterator(); +// assert "test1".equals(result.next()); +// assert "test2".equals(result.next()); +// assert "test3".equals(result.next()); +// } +// +// public Set tokenize(String content) { +// Set words = new HashSet(); +// Matcher matcher = WORDS_RE.matcher(content); +// while (matcher.find()){ +// String word = matcher.group().trim(); +// if (word.length() > 2 && !STOP_WORDS.contains(word)){ +// words.add(word); +// } +// } +// return words; +// } +// +// public int indexDocument(Jedis conn, String docid, String content) { +// Set words = tokenize(content); +// Transaction trans = conn.multi(); +// for (String word : words) { +// trans.sadd("idx:" + word, docid); +// } +// return trans.exec().size(); +// } +// +// private String setCommon( +// Transaction trans, String method, int ttl, String... items) +// { +// String[] keys = new String[items.length]; +// for (int i = 0; i < items.length; i++){ +// keys[i] = "idx:" + items[i]; +// } +// +// String id = UUID.randomUUID().toString(); +// try{ +// trans.getClass() +// .getDeclaredMethod(method, String.class, String[].class) +// .invoke(trans, "idx:" + id, keys); +// }catch(Exception e){ +// throw new RuntimeException(e); +// } +// trans.expire("idx:" + id, ttl); +// return id; +// } +// +// public String intersect(Transaction trans, int ttl, String... items) { +// return setCommon(trans, "sinterstore", ttl, items); +// } +// +// public String union(Transaction trans, int ttl, String... items) { +// return setCommon(trans, "sunionstore", ttl, items); +// } +// +// public String difference(Transaction trans, int ttl, String... items) { +// return setCommon(trans, "sdiffstore", ttl, items); +// } +// +// private String zsetCommon( +// Transaction trans, String method, int ttl, ZParams params, String... sets) +// { +// String[] keys = new String[sets.length]; +// for (int i = 0; i < sets.length; i++) { +// keys[i] = "idx:" + sets[i]; +// } +// +// String id = UUID.randomUUID().toString(); +// try{ +// trans.getClass() +// .getDeclaredMethod(method, String.class, ZParams.class, String[].class) +// .invoke(trans, "idx:" + id, params, keys); +// }catch(Exception e){ +// throw new RuntimeException(e); +// } +// trans.expire("idx:" + id, ttl); +// return id; +// } +// +// public String zintersect( +// Transaction trans, int ttl, ZParams params, String... sets) +// { +// return zsetCommon(trans, "zinterstore", ttl, params, sets); +// } +// +// public String zunion( +// Transaction trans, int ttl, ZParams params, String... sets) +// { +// return zsetCommon(trans, "zunionstore", ttl, params, sets); +// } +// +// public Query parse(String queryString) { +// Query query = new Query(); +// Set current = new HashSet(); +// Matcher matcher = QUERY_RE.matcher(queryString.toLowerCase()); +// while (matcher.find()){ +// String word = matcher.group().trim(); +// char prefix = word.charAt(0); +// if (prefix == '+' || prefix == '-') { +// word = word.substring(1); +// } +// +// if (word.length() < 2 || STOP_WORDS.contains(word)) { +// continue; +// } +// +// if (prefix == '-') { +// query.unwanted.add(word); +// continue; +// } +// +// if (!current.isEmpty() && prefix != '+') { +// query.all.add(new ArrayList(current)); +// current.clear(); +// } +// current.add(word); +// } +// +// if (!current.isEmpty()){ +// query.all.add(new ArrayList(current)); +// } +// return query; +// } +// +// public String parseAndSearch(Jedis conn, String queryString, int ttl) { +// Query query = parse(queryString); +// if (query.all.isEmpty()){ +// return null; +// } +// +// List toIntersect = new ArrayList(); +// for (List syn : query.all) { +// if (syn.size() > 1) { +// Transaction trans = conn.multi(); +// toIntersect.add(union(trans, ttl, syn.toArray(new String[syn.size()]))); +// trans.exec(); +// }else{ +// toIntersect.add(syn.get(0)); +// } +// } +// +// String intersectResult = null; +// if (toIntersect.size() > 1) { +// Transaction trans = conn.multi(); +// intersectResult = intersect( +// trans, ttl, toIntersect.toArray(new String[toIntersect.size()])); +// trans.exec(); +// }else{ +// intersectResult = toIntersect.get(0); +// } +// +// if (!query.unwanted.isEmpty()) { +// String[] keys = query.unwanted +// .toArray(new String[query.unwanted.size() + 1]); +// keys[keys.length - 1] = intersectResult; +// Transaction trans = conn.multi(); +// intersectResult = difference(trans, ttl, keys); +// trans.exec(); +// } +// +// return intersectResult; +// } +// +// @SuppressWarnings("unchecked") +// public SearchResult searchAndSort(Jedis conn, String queryString, String sort) +// { +// boolean desc = sort.startsWith("-"); +// if (desc){ +// sort = sort.substring(1); +// } +// boolean alpha = !"updated".equals(sort) && !"id".equals(sort); +// String by = "kb:doc:*->" + sort; +// +// String id = parseAndSearch(conn, queryString, 300); +// +// Transaction trans = conn.multi(); +// trans.scard("idx:" + id); +// SortingParams params = new SortingParams(); +// if (desc) { +// params.desc(); +// } +// if (alpha){ +// params.alpha(); +// } +// params.by(by); +// params.limit(0, 20); +// trans.sort("idx:" + id, params); +// List results = trans.exec(); +// +// return new SearchResult( +// id, +// ((Long)results.get(0)).longValue(), +// (List)results.get(1)); +// } +// +// @SuppressWarnings("unchecked") +// public SearchResult searchAndZsort( +// Jedis conn, String queryString, boolean desc, Map weights) +// { +// int ttl = 300; +// int start = 0; +// int num = 20; +// String id = parseAndSearch(conn, queryString, ttl); +// +// int updateWeight = weights.containsKey("update") ? weights.get("update") : 1; +// int voteWeight = weights.containsKey("vote") ? weights.get("vote") : 0; +// +// String[] keys = new String[]{id, "sort:update", "sort:votes"}; +// Transaction trans = conn.multi(); +// id = zintersect( +// trans, ttl, new ZParams().weights(0, updateWeight, voteWeight), keys); +// +// trans.zcard("idx:" + id); +// if (desc) { +// trans.zrevrange("idx:" + id, start, start + num - 1); +// }else{ +// trans.zrange("idx:" + id, start, start + num - 1); +// } +// List results = trans.exec(); +// +// return new SearchResult( +// id, +// ((Long)results.get(results.size() - 2)).longValue(), +// // Note: it's a LinkedHashSet, so it's ordered +// new ArrayList((Set)results.get(results.size() - 1))); +// } +// +// public long stringToScore(String string) { +// return stringToScore(string, false); +// } +// +// public long stringToScore(String string, boolean ignoreCase) { +// if (ignoreCase){ +// string = string.toLowerCase(); +// } +// +// List pieces = new ArrayList(); +// for (int i = 0; i < Math.min(string.length(), 6); i++) { +// pieces.add((int)string.charAt(i)); +// } +// while (pieces.size() < 6){ +// pieces.add(-1); +// } +// +// long score = 0; +// for (int piece : pieces) { +// score = score * 257 + piece + 1; +// } +// +// return score * 2 + (string.length() > 6 ? 1 : 0); +// } +// +// public long stringToScoreGeneric(String string, Map mapping) { +// int length = (int)(52 / (Math.log(mapping.size()) / Math.log(2))); +// +// List pieces = new ArrayList(); +// for (int i = 0; i < Math.min(string.length(), length); i++) { +// pieces.add((int)string.charAt(i)); +// } +// while (pieces.size() < 6){ +// pieces.add(-1); +// } +// +// long score = 0; +// for (int piece : pieces) { +// int value = mapping.get(piece); +// score = score * mapping.size() + value + 1; +// } +// +// return score * 2 + (string.length() > 6 ? 1 : 0); +// } +// +// public long zaddString(Jedis conn, String name, Map values) { +// Map pieces = new HashMap(values.size()); +// for (Map.Entry entry : values.entrySet()) { +// pieces.put((double)stringToScore(entry.getValue()), entry.getKey()); +// } +// +// return conn.zadd(name, pieces); +// } +// +// private Map AVERAGE_PER_1K = new HashMap(); +// public void indexAd( +// Jedis conn, String id, String[] locations, +// String content, Ecpm type, double value) +// { +// Transaction trans = conn.multi(); +// +// for (String location : locations) { +// trans.sadd("idx:req:" + location, id); +// } +// +// Set words = tokenize(content); +// for (String word : tokenize(content)) { +// trans.zadd("idx:" + word, 0, id); +// } +// +// +// double avg = AVERAGE_PER_1K.containsKey(type) ? AVERAGE_PER_1K.get(type) : 1; +// double rvalue = toEcpm(type, 1000, avg, value); +// +// trans.hset("type:", id, type.name().toLowerCase()); +// trans.zadd("idx:ad:value:", rvalue, id); +// trans.zadd("ad:base_value:", value, id); +// for (String word : words){ +// trans.sadd("terms:" + id, word); +// } +// trans.exec(); +// } +// +// public double toEcpm(Ecpm type, double views, double avg, double value) { +// switch(type){ +// case CPC: +// case CPA: +// return 1000. * value * avg / views; +// case CPM: +// return value; +// } +// return value; +// } +// +// @SuppressWarnings("unchecked") +// public Pair targetAds( +// Jedis conn, String[] locations, String content) +// { +// Transaction trans = conn.multi(); +// +// String matchedAds = matchLocation(trans, locations); +// +// String baseEcpm = zintersect( +// trans, 30, new ZParams().weights(0, 1), matchedAds, "ad:value:"); +// +// Pair,String> result = finishScoring( +// trans, matchedAds, baseEcpm, content); +// +// trans.incr("ads:served:"); +// trans.zrevrange("idx:" + result.getValue1(), 0, 0); +// +// List response = trans.exec(); +// long targetId = (Long)response.get(response.size() - 2); +// Set targetedAds = (Set)response.get(response.size() - 1); +// +// if (targetedAds.size() == 0){ +// return new Pair(null, null); +// } +// +// String adId = targetedAds.iterator().next(); +// recordTargetingResult(conn, targetId, adId, result.getValue0()); +// +// return new Pair(targetId, adId); +// } +// +// public String matchLocation(Transaction trans, String[] locations) { +// String[] required = new String[locations.length]; +// for(int i = 0; i < locations.length; i++){ +// required[i] = "req:" + locations[i]; +// } +// return union(trans, 300, required); +// } +// +// public Pair,String> finishScoring( +// Transaction trans, String matched, String base, String content) +// { +// Map bonusEcpm = new HashMap(); +// Set words = tokenize(content); +// for (String word : words){ +// String wordBonus = zintersect( +// trans, 30, new ZParams().weights(0, 1), matched, word); +// bonusEcpm.put(wordBonus, 1); +// } +// +// if (bonusEcpm.size() > 0){ +// +// String[] keys = new String[bonusEcpm.size()]; +// int[] weights = new int[bonusEcpm.size()]; +// int index = 0; +// for (Map.Entry bonus : bonusEcpm.entrySet()){ +// keys[index] = bonus.getKey(); +// weights[index] = bonus.getValue(); +// index++; +// } +// +// ZParams minParams = new ZParams().aggregate(ZParams.Aggregate.MIN).weights(weights); +// String minimum = zunion(trans, 30, minParams, keys); +// +// ZParams maxParams = new ZParams().aggregate(ZParams.Aggregate.MAX).weights(weights); +// String maximum = zunion(trans, 30, maxParams, keys); +// +// String result = zunion( +// trans, 30, new ZParams().weights(2, 1, 1), base, minimum, maximum); +// return new Pair,String>(words, result); +// } +// return new Pair,String>(words, base); +// } +// +// public void recordTargetingResult( +// Jedis conn, long targetId, String adId, Set words) +// { +// Set terms = conn.smembers("terms:" + adId); +// String type = conn.hget("type:", adId); +// +// Transaction trans = conn.multi(); +// terms.addAll(words); +// if (terms.size() > 0) { +// String matchedKey = "terms:matched:" + targetId; +// for (String term : terms) { +// trans.sadd(matchedKey, term); +// } +// trans.expire(matchedKey, 900); +// } +// +// trans.incr("type:" + type + ":views:"); +// for (String term : terms) { +// trans.zincrby("views:" + adId, 1, term); +// } +// trans.zincrby("views:" + adId, 1, ""); +// +// List response = trans.exec(); +// double views = (Double)response.get(response.size() - 1); +// if ((views % 100) == 0){ +// updateCpms(conn, adId); +// } +// } +// +// @SuppressWarnings("unchecked") +// public void updateCpms(Jedis conn, String adId) { +// Transaction trans = conn.multi(); +// trans.hget("type:", adId); +// trans.zscore("ad:base_value:", adId); +// trans.smembers("terms:" + adId); +// List response = trans.exec(); +// String type = (String)response.get(0); +// Double baseValue = (Double)response.get(1); +// Set words = (Set)response.get(2); +// +// String which = "clicks"; +// Ecpm ecpm = Enum.valueOf(Ecpm.class, type.toUpperCase()); +// if (Ecpm.CPA.equals(ecpm)) { +// which = "actions"; +// } +// +// trans = conn.multi(); +// trans.get("type:" + type + ":views:"); +// trans.get("type:" + type + ':' + which); +// response = trans.exec(); +// String typeViews = (String)response.get(0); +// String typeClicks = (String)response.get(1); +// +// AVERAGE_PER_1K.put(ecpm, +// 1000. * +// Integer.valueOf(typeClicks != null ? typeClicks : "1") / +// Integer.valueOf(typeViews != null ? typeViews : "1")); +// +// if (Ecpm.CPM.equals(ecpm)) { +// return; +// } +// +// String viewKey = "views:" + adId; +// String clickKey = which + ':' + adId; +// +// trans = conn.multi(); +// trans.zscore(viewKey, ""); +// trans.zscore(clickKey, ""); +// response = trans.exec(); +// Double adViews = (Double)response.get(0); +// Double adClicks = (Double)response.get(1); +// +// double adEcpm = 0; +// if (adClicks == null || adClicks < 1){ +// Double score = conn.zscore("idx:ad:value:", adId); +// adEcpm = score != null ? score.doubleValue() : 0; +// }else{ +// adEcpm = toEcpm( +// ecpm, +// adViews != null ? adViews.doubleValue() : 1, +// adClicks != null ? adClicks.doubleValue() : 0, +// baseValue); +// conn.zadd("idx:ad:value:", adEcpm, adId); +// } +// for (String word : words) { +// trans = conn.multi(); +// trans.zscore(viewKey, word); +// trans.zscore(clickKey, word); +// response = trans.exec(); +// Double views = (Double)response.get(0); +// Double clicks = (Double)response.get(1); +// +// if (clicks == null || clicks < 1){ +// continue; +// } +// +// double wordEcpm = toEcpm( +// ecpm, +// views != null ? views.doubleValue() : 1, +// clicks != null ? clicks.doubleValue() : 0, +// baseValue); +// double bonus = wordEcpm - adEcpm; +// conn.zadd("idx:" + word, bonus, adId); +// } +// } +// +// public void recordClick(Jedis conn, long targetId, String adId, boolean action) { +// String type = conn.hget("type:", adId); +// Ecpm ecpm = Enum.valueOf(Ecpm.class, type.toUpperCase()); +// +// String clickKey = "clicks:" + adId; +// String matchKey = "terms:matched:" + targetId; +// Set matched = conn.smembers(matchKey); +// matched.add(""); +// +// Transaction trans = conn.multi(); +// if (Ecpm.CPA.equals(ecpm)) { +// trans.expire(matchKey, 900); +// if (action) { +// clickKey = "actions:" + adId; +// } +// } +// +// if (action && Ecpm.CPA.equals(ecpm)) { +// trans.incr("type:" + type + ":actions:"); +// }else{ +// trans.incr("type:" + type + ":clicks:"); +// } +// +// for (String word : matched) { +// trans.zincrby(clickKey, 1, word); +// } +// trans.exec(); +// +// updateCpms(conn, adId); +// } +// +// public void addJob(Jedis conn, String jobId, String... requiredSkills) { +// conn.sadd("job:" + jobId, requiredSkills); +// } +// +// @SuppressWarnings("unchecked") +// public boolean isQualified(Jedis conn, String jobId, String... candidateSkills) { +// String temp = UUID.randomUUID().toString(); +// Transaction trans = conn.multi(); +// for(String skill : candidateSkills) { +// trans.sadd(temp, skill); +// } +// trans.expire(temp, 5); +// trans.sdiff("job:" + jobId, temp); +// +// List response = trans.exec(); +// Set diff = (Set)response.get(response.size() - 1); +// return diff.size() == 0; +// } +// +// public void indexJob(Jedis conn, String jobId, String... skills) { +// Transaction trans = conn.multi(); +// Set unique = new HashSet(); +// for (String skill : skills) { +// trans.sadd("idx:skill:" + skill, jobId); +// unique.add(skill); +// } +// trans.zadd("idx:jobs:req", unique.size(), jobId); +// trans.exec(); +// } +// +// public Set findJobs(Jedis conn, String... candidateSkills) { +// String[] keys = new String[candidateSkills.length]; +// int[] weights = new int[candidateSkills.length]; +// for (int i = 0; i < candidateSkills.length; i++) { +// keys[i] = "skill:" + candidateSkills[i]; +// weights[i] = 1; +// } +// +// Transaction trans = conn.multi(); +// String jobScores = zunion( +// trans, 30, new ZParams().weights(weights), keys); +// String finalResult = zintersect( +// trans, 30, new ZParams().weights(-1, 1), jobScores, "jobs:req"); +// trans.exec(); +// +// return conn.zrangeByScore("idx:" + finalResult, 0, 0); +// } +// +// public class Query { +// public final List> all = new ArrayList>(); +// public final Set unwanted = new HashSet(); +// } +// +// public class SearchResult { +// public final String id; +// public final long total; +// public final List results; +// +// public SearchResult(String id, long total, List results) { +// this.id = id; +// this.total = total; +// this.results = results; +// } +// } +// +// public class WordScore +// implements Comparable +// { +// public final String word; +// public final long score; +// +// public WordScore(String word, long score) { +// this.word = word; +// this.score = score; +// } +// +// public boolean equals(Object other) { +// if (!(other instanceof WordScore)){ +// return false; +// } +// WordScore t2 = (WordScore)other; +// return this.word.equals(t2.word) && this.score == t2.score; +// } +// +// @Override +// public int compareTo(WordScore other) { +// if (this.word.equals(other.word)) { +// long diff = this.score - other.score; +// return diff < 0 ? -1 : diff > 0 ? 1 : 0; +// } +// return this.word.compareTo(other.word); +// } +// +// public String toString(){ +// return word + '=' + score; +// } +// } +// +// public enum Ecpm { +// CPC, CPA, CPM +// } +//} diff --git a/codes/redis/redis-in-action/src/main/java/Chapter08.java b/codes/redis/redis-in-action/src/main/java/Chapter08.java new file mode 100644 index 0000000..f3c64ce --- /dev/null +++ b/codes/redis/redis-in-action/src/main/java/Chapter08.java @@ -0,0 +1,555 @@ +import redis.clients.jedis.Jedis; +import redis.clients.jedis.Pipeline; +import redis.clients.jedis.Transaction; +import redis.clients.jedis.Tuple; + +import java.lang.reflect.Method; +import java.util.*; + +public class Chapter08 { + private static int HOME_TIMELINE_SIZE = 1000; + private static int POSTS_PER_PASS = 1000; + private static int REFILL_USERS_STEP = 50; + + public static final void main(String[] args) + throws InterruptedException + { + new Chapter08().run(); + } + + public void run() + throws InterruptedException + { + Jedis conn = new Jedis("localhost"); + conn.select(15); + conn.flushDB(); + + testCreateUserAndStatus(conn); + conn.flushDB(); + + testFollowUnfollowUser(conn); + conn.flushDB(); + + testSyndicateStatus(conn); + conn.flushDB(); + + testRefillTimeline(conn); + } + + public void testCreateUserAndStatus(Jedis conn) { + System.out.println("\n----- testCreateUserAndStatus -----"); + + assert createUser(conn, "TestUser", "Test User") == 1; + assert createUser(conn, "TestUser", "Test User2") == -1; + + assert createStatus(conn, 1, "This is a new status message") == 1; + assert "1".equals(conn.hget("user:1", "posts")); + } + + public void testFollowUnfollowUser(Jedis conn) { + System.out.println("\n----- testFollowUnfollowUser -----"); + + assert createUser(conn, "TestUser", "Test User") == 1; + assert createUser(conn, "TestUser2", "Test User2") == 2; + + assert followUser(conn, 1, 2); + assert conn.zcard("followers:2") == 1; + assert conn.zcard("followers:1") == 0; + assert conn.zcard("following:1") == 1; + assert conn.zcard("following:2") == 0; + assert "1".equals(conn.hget("user:1", "following")); + assert "0".equals(conn.hget("user:2", "following")); + assert "0".equals(conn.hget("user:1", "followers")); + assert "1".equals(conn.hget("user:2", "followers")); + + assert !unfollowUser(conn, 2, 1); + assert unfollowUser(conn, 1, 2); + assert conn.zcard("followers:2") == 0; + assert conn.zcard("followers:1") == 0; + assert conn.zcard("following:1") == 0; + assert conn.zcard("following:2") == 0; + assert "0".equals(conn.hget("user:1", "following")); + assert "0".equals(conn.hget("user:2", "following")); + assert "0".equals(conn.hget("user:1", "followers")); + assert "0".equals(conn.hget("user:2", "followers")); + } + + public void testSyndicateStatus(Jedis conn) + throws InterruptedException + { + System.out.println("\n----- testSyndicateStatus -----"); + + assert createUser(conn, "TestUser", "Test User") == 1; + assert createUser(conn, "TestUser2", "Test User2") == 2; + + assert followUser(conn, 1, 2); + assert conn.zcard("followers:2") == 1; + assert "1".equals(conn.hget("user:1", "following")); + assert postStatus(conn, 2, "this is some message content") == 1; + assert getStatusMessages(conn, 1).size() == 1; + + for(int i = 3; i < 11; i++) { + assert createUser(conn, "TestUser" + i, "Test User" + i) == i; + followUser(conn, i, 2); + } + + POSTS_PER_PASS = 5; + + assert postStatus(conn, 2, "this is some other message content") == 2; + Thread.sleep(100); + assert getStatusMessages(conn, 9).size() == 2; + + assert unfollowUser(conn, 1, 2); + assert getStatusMessages(conn, 1).size() == 0; + } + + public void testRefillTimeline(Jedis conn) + throws InterruptedException + { + System.out.println("\n----- testRefillTimeline -----"); + + assert createUser(conn, "TestUser", "Test User") == 1; + assert createUser(conn, "TestUser2", "Test User2") == 2; + assert createUser(conn, "TestUser3", "Test User3") == 3; + + assert followUser(conn, 1, 2); + assert followUser(conn, 1, 3); + + HOME_TIMELINE_SIZE = 5; + + for (int i = 0; i < 10; i++) { + assert postStatus(conn, 2, "message") != -1; + assert postStatus(conn, 3, "message") != -1; + Thread.sleep(50); + } + + assert getStatusMessages(conn, 1).size() == 5; + assert unfollowUser(conn, 1, 2); + assert getStatusMessages(conn, 1).size() < 5; + + refillTimeline(conn, "following:1", "home:1"); + List> messages = getStatusMessages(conn, 1); + assert messages.size() == 5; + for (Map message : messages) { + assert "3".equals(message.get("uid")); + } + + long statusId = Long.valueOf(messages.get(messages.size() -1).get("id")); + assert deleteStatus(conn, 3, statusId); + assert getStatusMessages(conn, 1).size() == 4; + assert conn.zcard("home:1") == 5; + cleanTimelines(conn, 3, statusId); + assert conn.zcard("home:1") == 4; + } + + public String acquireLockWithTimeout( + Jedis conn, String lockName, int acquireTimeout, int lockTimeout) + { + String id = UUID.randomUUID().toString(); + lockName = "lock:" + lockName; + + long end = System.currentTimeMillis() + (acquireTimeout * 1000); + while (System.currentTimeMillis() < end) { + if (conn.setnx(lockName, id) >= 1) { + conn.expire(lockName, lockTimeout); + return id; + }else if (conn.ttl(lockName) <= 0){ + conn.expire(lockName, lockTimeout); + } + + try{ + Thread.sleep(1); + }catch(InterruptedException ie){ + Thread.interrupted(); + } + } + + return null; + } + + public boolean releaseLock(Jedis conn, String lockName, String identifier) { + lockName = "lock:" + lockName; + while (true) { + conn.watch(lockName); + if (identifier.equals(conn.get(lockName))) { + Transaction trans = conn.multi(); + trans.del(lockName); + List result = trans.exec(); + // null response indicates that the transaction was aborted due + // to the watched key changing. + if (result == null){ + continue; + } + return true; + } + + conn.unwatch(); + break; + } + + return false; + } + + public long createUser(Jedis conn, String login, String name) { + String llogin = login.toLowerCase(); + String lock = acquireLockWithTimeout(conn, "user:" + llogin, 10, 1); + if (lock == null){ + return -1; + } + + if (conn.hget("users:", llogin) != null) { + return -1; + } + + long id = conn.incr("user:id:"); + Transaction trans = conn.multi(); + trans.hset("users:", llogin, String.valueOf(id)); + Map values = new HashMap(); + values.put("login", login); + values.put("id", String.valueOf(id)); + values.put("name", name); + values.put("followers", "0"); + values.put("following", "0"); + values.put("posts", "0"); + values.put("signup", String.valueOf(System.currentTimeMillis())); + trans.hmset("user:" + id, values); + trans.exec(); + releaseLock(conn, "user:" + llogin, lock); + return id; + } + + @SuppressWarnings("unchecked") + public boolean followUser(Jedis conn, long uid, long otherUid) { + String fkey1 = "following:" + uid; + String fkey2 = "followers:" + otherUid; + + if (conn.zscore(fkey1, String.valueOf(otherUid)) != null) { + return false; + } + + long now = System.currentTimeMillis(); + + Transaction trans = conn.multi(); + trans.zadd(fkey1, now, String.valueOf(otherUid)); + trans.zadd(fkey2, now, String.valueOf(uid)); + trans.zcard(fkey1); + trans.zcard(fkey2); + trans.zrevrangeWithScores("profile:" + otherUid, 0, HOME_TIMELINE_SIZE - 1); + + List response = trans.exec(); + long following = (Long)response.get(response.size() - 3); + long followers = (Long)response.get(response.size() - 2); + Set statuses = (Set)response.get(response.size() - 1); + + trans = conn.multi(); + trans.hset("user:" + uid, "following", String.valueOf(following)); + trans.hset("user:" + otherUid, "followers", String.valueOf(followers)); + if (statuses.size() > 0) { + for (Tuple status : statuses){ + trans.zadd("home:" + uid, status.getScore(), status.getElement()); + } + } + trans.zremrangeByRank("home:" + uid, 0, 0 - HOME_TIMELINE_SIZE - 1); + trans.exec(); + + return true; + } + + @SuppressWarnings("unchecked") + public boolean unfollowUser(Jedis conn, long uid, long otherUid) { + String fkey1 = "following:" + uid; + String fkey2 = "followers:" + otherUid; + + if (conn.zscore(fkey1, String.valueOf(otherUid)) == null) { + return false; + } + + Transaction trans = conn.multi(); + trans.zrem(fkey1, String.valueOf(otherUid)); + trans.zrem(fkey2, String.valueOf(uid)); + trans.zcard(fkey1); + trans.zcard(fkey2); + trans.zrevrange("profile:" + otherUid, 0, HOME_TIMELINE_SIZE - 1); + + List response = trans.exec(); + long following = (Long)response.get(response.size() - 3); + long followers = (Long)response.get(response.size() - 2); + Set statuses = (Set)response.get(response.size() - 1); + + trans = conn.multi(); + trans.hset("user:" + uid, "following", String.valueOf(following)); + trans.hset("user:" + otherUid, "followers", String.valueOf(followers)); + if (statuses.size() > 0){ + for (String status : statuses) { + trans.zrem("home:" + uid, status); + } + } + + trans.exec(); + return true; + } + + public long createStatus(Jedis conn, long uid, String message) { + return createStatus(conn, uid, message, null); + } + public long createStatus( + Jedis conn, long uid, String message, Map data) + { + Transaction trans = conn.multi(); + trans.hget("user:" + uid, "login"); + trans.incr("status:id:"); + + List response = trans.exec(); + String login = (String)response.get(0); + long id = (Long)response.get(1); + + if (login == null) { + return -1; + } + + if (data == null){ + data = new HashMap(); + } + data.put("message", message); + data.put("posted", String.valueOf(System.currentTimeMillis())); + data.put("id", String.valueOf(id)); + data.put("uid", String.valueOf(uid)); + data.put("login", login); + + trans = conn.multi(); + trans.hmset("status:" + id, data); + trans.hincrBy("user:" + uid, "posts", 1); + trans.exec(); + return id; + } + + public long postStatus(Jedis conn, long uid, String message) { + return postStatus(conn, uid, message, null); + } + public long postStatus( + Jedis conn, long uid, String message, Map data) + { + long id = createStatus(conn, uid, message, data); + if (id == -1){ + return -1; + } + + String postedString = conn.hget("status:" + id, "posted"); + if (postedString == null) { + return -1; + } + + long posted = Long.parseLong(postedString); + conn.zadd("profile:" + uid, posted, String.valueOf(id)); + + syndicateStatus(conn, uid, id, posted, 0); + return id; + } + + public void syndicateStatus( + Jedis conn, long uid, long postId, long postTime, double start) + { + Set followers = conn.zrangeByScoreWithScores( + "followers:" + uid, + String.valueOf(start), "inf", + 0, POSTS_PER_PASS); + + Transaction trans = conn.multi(); + for (Tuple tuple : followers){ + String follower = tuple.getElement(); + start = tuple.getScore(); + trans.zadd("home:" + follower, postTime, String.valueOf(postId)); + trans.zrange("home:" + follower, 0, -1); + trans.zremrangeByRank( + "home:" + follower, 0, 0 - HOME_TIMELINE_SIZE - 1); + } + trans.exec(); + + if (followers.size() >= POSTS_PER_PASS) { + try{ + Method method = getClass().getDeclaredMethod( + "syndicateStatus", Jedis.class, Long.TYPE, Long.TYPE, Long.TYPE, Double.TYPE); + executeLater("default", method, uid, postId, postTime, start); + }catch(Exception e){ + throw new RuntimeException(e); + } + } + } + + public boolean deleteStatus(Jedis conn, long uid, long statusId) { + String key = "status:" + statusId; + String lock = acquireLockWithTimeout(conn, key, 1, 10); + if (lock == null) { + return false; + } + + try{ + if (!String.valueOf(uid).equals(conn.hget(key, "uid"))) { + return false; + } + + Transaction trans = conn.multi(); + trans.del(key); + trans.zrem("profile:" + uid, String.valueOf(statusId)); + trans.zrem("home:" + uid, String.valueOf(statusId)); + trans.hincrBy("user:" + uid, "posts", -1); + trans.exec(); + + return true; + }finally{ + releaseLock(conn, key, lock); + } + } + + public List> getStatusMessages(Jedis conn, long uid) { + return getStatusMessages(conn, uid, 1, 30); + } + + @SuppressWarnings("unchecked") + public List> getStatusMessages( + Jedis conn, long uid, int page, int count) + { + Set statusIds = conn.zrevrange( + "home:" + uid, (page - 1) * count, page * count - 1); + + Transaction trans = conn.multi(); + for (String id : statusIds) { + trans.hgetAll("status:" + id); + } + + List> statuses = new ArrayList>(); + for (Object result : trans.exec()) { + Map status = (Map)result; + if (status != null && status.size() > 0){ + statuses.add(status); + } + } + return statuses; + } + + public void refillTimeline(Jedis conn, String incoming, String timeline) { + refillTimeline(conn, incoming, timeline, 0); + } + + @SuppressWarnings("unchecked") + public void refillTimeline( + Jedis conn, String incoming, String timeline, double start) + { + if (start == 0 && conn.zcard(timeline) >= 750) { + return; + } + + Set users = conn.zrangeByScoreWithScores( + incoming, String.valueOf(start), "inf", 0, REFILL_USERS_STEP); + + Pipeline pipeline = conn.pipelined(); + for (Tuple tuple : users){ + String uid = tuple.getElement(); + start = tuple.getScore(); + pipeline.zrevrangeWithScores( + "profile:" + uid, 0, HOME_TIMELINE_SIZE - 1); + } + + List response = pipeline.syncAndReturnAll(); + List messages = new ArrayList(); + for (Object results : response) { + messages.addAll((Set)results); + } + + Collections.sort(messages); + messages = messages.subList(0, HOME_TIMELINE_SIZE); + + Transaction trans = conn.multi(); + if (messages.size() > 0) { + for (Tuple tuple : messages) { + trans.zadd(timeline, tuple.getScore(), tuple.getElement()); + } + } + trans.zremrangeByRank(timeline, 0, 0 - HOME_TIMELINE_SIZE - 1); + trans.exec(); + + if (users.size() >= REFILL_USERS_STEP) { + try{ + Method method = getClass().getDeclaredMethod( + "refillTimeline", Jedis.class, String.class, String.class, Double.TYPE); + executeLater("default", method, incoming, timeline, start); + }catch(Exception e){ + throw new RuntimeException(e); + } + } + } + + public void cleanTimelines(Jedis conn, long uid, long statusId) { + cleanTimelines(conn, uid, statusId, 0, false); + } + public void cleanTimelines( + Jedis conn, long uid, long statusId, double start, boolean onLists) + { + String key = "followers:" + uid; + String base = "home:"; + if (onLists) { + key = "list:out:" + uid; + base = "list:statuses:"; + } + Set followers = conn.zrangeByScoreWithScores( + key, String.valueOf(start), "inf", 0, POSTS_PER_PASS); + + Transaction trans = conn.multi(); + for (Tuple tuple : followers) { + start = tuple.getScore(); + String follower = tuple.getElement(); + trans.zrem(base + follower, String.valueOf(statusId)); + } + trans.exec(); + + Method method = null; + try{ + method = getClass().getDeclaredMethod( + "cleanTimelines", Jedis.class, + Long.TYPE, Long.TYPE, Double.TYPE, Boolean.TYPE); + }catch(Exception e){ + throw new RuntimeException(e); + } + + if (followers.size() >= POSTS_PER_PASS) { + executeLater("default", method, uid, statusId, start, onLists); + + }else if (!onLists) { + executeLater("default", method, uid, statusId, 0, true); + } + } + + public void executeLater(String queue, Method method, Object... args) { + MethodThread thread = new MethodThread(this, method, args); + thread.start(); + } + + public class MethodThread + extends Thread + { + private Object instance; + private Method method; + private Object[] args; + + public MethodThread(Object instance, Method method, Object... args) { + this.instance = instance; + this.method = method; + this.args = args; + } + + public void run() { + Jedis conn = new Jedis("localhost"); + conn.select(15); + + Object[] args = new Object[this.args.length + 1]; + System.arraycopy(this.args, 0, args, 1, this.args.length); + args[0] = conn; + + try{ + method.invoke(instance, args); + }catch(Exception e){ + throw new RuntimeException(e); + } + } + } +} diff --git a/codes/redis/redis-in-action/src/main/java/Chapter09.java b/codes/redis/redis-in-action/src/main/java/Chapter09.java new file mode 100644 index 0000000..08e6740 --- /dev/null +++ b/codes/redis/redis-in-action/src/main/java/Chapter09.java @@ -0,0 +1,480 @@ +import org.javatuples.Pair; +import redis.clients.jedis.Jedis; +import redis.clients.jedis.Pipeline; +import redis.clients.jedis.ZParams; + +import java.io.IOException; +import java.io.InputStream; +import java.text.SimpleDateFormat; +import java.util.*; +import java.util.zip.CRC32; + +public class Chapter09 { + private static final String[] COUNTRIES = ( + "ABW AFG AGO AIA ALA ALB AND ARE ARG ARM ASM ATA ATF ATG AUS AUT AZE BDI " + + "BEL BEN BES BFA BGD BGR BHR BHS BIH BLM BLR BLZ BMU BOL BRA BRB BRN BTN " + + "BVT BWA CAF CAN CCK CHE CHL CHN CIV CMR COD COG COK COL COM CPV CRI CUB " + + "CUW CXR CYM CYP CZE DEU DJI DMA DNK DOM DZA ECU EGY ERI ESH ESP EST ETH " + + "FIN FJI FLK FRA FRO FSM GAB GBR GEO GGY GHA GIB GIN GLP GMB GNB GNQ GRC " + + "GRD GRL GTM GUF GUM GUY HKG HMD HND HRV HTI HUN IDN IMN IND IOT IRL IRN " + + "IRQ ISL ISR ITA JAM JEY JOR JPN KAZ KEN KGZ KHM KIR KNA KOR KWT LAO LBN " + + "LBR LBY LCA LIE LKA LSO LTU LUX LVA MAC MAF MAR MCO MDA MDG MDV MEX MHL " + + "MKD MLI MLT MMR MNE MNG MNP MOZ MRT MSR MTQ MUS MWI MYS MYT NAM NCL NER " + + "NFK NGA NIC NIU NLD NOR NPL NRU NZL OMN PAK PAN PCN PER PHL PLW PNG POL " + + "PRI PRK PRT PRY PSE PYF QAT REU ROU RUS RWA SAU SDN SEN SGP SGS SHN SJM " + + "SLB SLE SLV SMR SOM SPM SRB SSD STP SUR SVK SVN SWE SWZ SXM SYC SYR TCA " + + "TCD TGO THA TJK TKL TKM TLS TON TTO TUN TUR TUV TWN TZA UGA UKR UMI URY " + + "USA UZB VAT VCT VEN VGB VIR VNM VUT WLF WSM YEM ZAF ZMB ZWE").split(" "); + + private static final Map STATES = new HashMap(); + static { + STATES.put("CAN", "AB BC MB NB NL NS NT NU ON PE QC SK YT".split(" ")); + STATES.put("USA", ( + "AA AE AK AL AP AR AS AZ CA CO CT DC DE FL FM GA GU HI IA ID IL IN " + + "KS KY LA MA MD ME MH MI MN MO MP MS MT NC ND NE NH NJ NM NV NY OH " + + "OK OR PA PR PW RI SC SD TN TX UT VA VI VT WA WI WV WY").split(" ")); + } + + private static final SimpleDateFormat ISO_FORMAT = + new SimpleDateFormat("yyyy-MM-dd'T'HH:00:00"); + static{ + ISO_FORMAT.setTimeZone(TimeZone.getTimeZone("UTC")); + } + + public static final void main(String[] args) { + new Chapter09().run(); + } + + public void run() { + Jedis conn = new Jedis("localhost"); + conn.select(15); + conn.flushDB(); + + testLongZiplistPerformance(conn); + testShardKey(conn); + testShardedHash(conn); + testShardedSadd(conn); + testUniqueVisitors(conn); + testUserLocation(conn); + } + + public void testLongZiplistPerformance(Jedis conn) { + System.out.println("\n----- testLongZiplistPerformance -----"); + + longZiplistPerformance(conn, "test", 5, 10, 10); + assert conn.llen("test") == 5; + } + + public void testShardKey(Jedis conn) { + System.out.println("\n----- testShardKey -----"); + + String base = "test"; + assert "test:0".equals(shardKey(base, "1", 2, 2)); + assert "test:1".equals(shardKey(base, "125", 1000, 100)); + + for (int i = 0; i < 50; i++) { + String key = shardKey(base, "hello:" + i, 1000, 100); + String[] parts = key.split(":"); + assert Integer.parseInt(parts[parts.length - 1]) < 20; + + key = shardKey(base, String.valueOf(i), 1000, 100); + parts = key.split(":"); + assert Integer.parseInt(parts[parts.length - 1]) < 10; + } + } + + public void testShardedHash(Jedis conn) { + System.out.println("\n----- testShardedHash -----"); + + for (int i = 0; i < 50; i++) { + String istr = String.valueOf(i); + shardHset(conn, "test", "keyname:" + i, istr, 1000, 100); + assert istr.equals(shardHget(conn, "test", "keyname:" + i, 1000, 100)); + shardHset(conn, "test2", istr, istr, 1000, 100); + assert istr.equals(shardHget(conn, "test2", istr, 1000, 100)); + } + } + + public void testShardedSadd(Jedis conn) { + System.out.println("\n----- testShardedSadd -----"); + + for (int i = 0; i < 50; i++) { + shardSadd(conn, "testx", String.valueOf(i), 50, 50); + } + assert conn.scard("testx:0") + conn.scard("testx:1") == 50; + } + + public void testUniqueVisitors(Jedis conn) { + System.out.println("\n----- testUniqueVisitors -----"); + + DAILY_EXPECTED = 10000; + + for (int i = 0; i < 179; i++) { + countVisit(conn, UUID.randomUUID().toString()); + } + assert "179".equals(conn.get("unique:" + ISO_FORMAT.format(new Date()))); + + conn.flushDB(); + Calendar yesterday = Calendar.getInstance(); + yesterday.add(Calendar.DATE, -1); + conn.set("unique:" + ISO_FORMAT.format(yesterday.getTime()), "1000"); + for (int i = 0; i < 183; i++) { + countVisit(conn, UUID.randomUUID().toString()); + } + assert "183".equals(conn.get("unique:" + ISO_FORMAT.format(new Date()))); + } + + public void testUserLocation(Jedis conn) { + System.out.println("\n----- testUserLocation -----"); + + int i = 0; + for (String country : COUNTRIES) { + if (STATES.containsKey(country)){ + for (String state : STATES.get(country)) { + setLocation(conn, i, country, state); + i++; + } + }else{ + setLocation(conn, i, country, ""); + i++; + } + } + + Pair,Map>> _aggs = + aggregateLocation(conn); + + long[] userIds = new long[i + 1]; + for (int j = 0; j <= i; j++) { + userIds[j] = j; + } + Pair,Map>> aggs = + aggregateLocationList(conn, userIds); + + assert _aggs.equals(aggs); + + Map countries = aggs.getValue0(); + Map> states = aggs.getValue1(); + for (String country : aggs.getValue0().keySet()){ + if (STATES.containsKey(country)) { + assert STATES.get(country).length == countries.get(country); + for (String state : STATES.get(country)){ + assert states.get(country).get(state) == 1; + } + }else{ + assert countries.get(country) == 1; + } + } + } + + public double longZiplistPerformance( + Jedis conn, String key, int length, int passes, int psize) + { + conn.del(key); + for (int i = 0; i < length; i++) { + conn.rpush(key, String.valueOf(i)); + } + + Pipeline pipeline = conn.pipelined(); + long time = System.currentTimeMillis(); + for (int p = 0; p < passes; p++) { + for (int pi = 0; pi < psize; pi++) { + pipeline.rpoplpush(key, key); + } + pipeline.sync(); + } + + return (passes * psize) / (System.currentTimeMillis() - time); + } + + public String shardKey(String base, String key, long totalElements, int shardSize) { + long shardId = 0; + if (isDigit(key)) { + shardId = Integer.parseInt(key, 10) / shardSize; + }else{ + CRC32 crc = new CRC32(); + crc.update(key.getBytes()); + long shards = 2 * totalElements / shardSize; + shardId = Math.abs(((int)crc.getValue()) % shards); + } + return base + ':' + shardId; + } + + public Long shardHset( + Jedis conn, String base, String key, String value, long totalElements, int shardSize) + { + String shard = shardKey(base, key, totalElements, shardSize); + return conn.hset(shard, key, value); + } + + public String shardHget( + Jedis conn, String base, String key, int totalElements, int shardSize) + { + String shard = shardKey(base, key, totalElements, shardSize); + return conn.hget(shard, key); + } + + public Long shardSadd( + Jedis conn, String base, String member, long totalElements, int shardSize) + { + String shard = shardKey(base, "x" + member, totalElements, shardSize); + return conn.sadd(shard, member); + } + + private int SHARD_SIZE = 512; + public void countVisit(Jedis conn, String sessionId) { + Calendar today = Calendar.getInstance(); + String key = "unique:" + ISO_FORMAT.format(today.getTime()); + long expected = getExpected(conn, key, today); + long id = Long.parseLong(sessionId.replace("-", "").substring(0, 15), 16); + if (shardSadd(conn, key, String.valueOf(id), expected, SHARD_SIZE) != 0) { + conn.incr(key); + } + } + + private long DAILY_EXPECTED = 1000000; + private Map EXPECTED = new HashMap(); + + public long getExpected(Jedis conn, String key, Calendar today) { + if (!EXPECTED.containsKey(key)) { + String exkey = key + ":expected"; + String expectedStr = conn.get(exkey); + + long expected = 0; + if (expectedStr == null) { + Calendar yesterday = (Calendar)today.clone(); + yesterday.add(Calendar.DATE, -1); + expectedStr = conn.get( + "unique:" + ISO_FORMAT.format(yesterday.getTime())); + expected = expectedStr != null ? Long.parseLong(expectedStr) : DAILY_EXPECTED; + + expected = (long)Math.pow(2, (long)(Math.ceil(Math.log(expected * 1.5) / Math.log(2)))); + if (conn.setnx(exkey, String.valueOf(expected)) == 0) { + expectedStr = conn.get(exkey); + expected = Integer.parseInt(expectedStr); + } + }else{ + expected = Long.parseLong(expectedStr); + } + + EXPECTED.put(key, expected); + } + + return EXPECTED.get(key); + } + + private long USERS_PER_SHARD = (long)Math.pow(2, 20); + + public void setLocation( + Jedis conn, long userId, String country, String state) + { + String code = getCode(country, state); + + long shardId = userId / USERS_PER_SHARD; + int position = (int)(userId % USERS_PER_SHARD); + int offset = position * 2; + + Pipeline pipe = conn.pipelined(); + pipe.setrange("location:" + shardId, offset, code); + + String tkey = UUID.randomUUID().toString(); + pipe.zadd(tkey, userId, "max"); + pipe.zunionstore( + "location:max", + new ZParams().aggregate(ZParams.Aggregate.MAX), + tkey, + "location:max"); + pipe.del(tkey); + pipe.sync(); + } + + public Pair,Map>> aggregateLocation(Jedis conn) { + Map countries = new HashMap(); + Map> states = new HashMap>(); + + long maxId = conn.zscore("location:max", "max").longValue(); + long maxBlock = maxId; + + byte[] buffer = new byte[(int)Math.pow(2, 17)]; + for (int shardId = 0; shardId <= maxBlock; shardId++) { + InputStream in = new RedisInputStream(conn, "location:" + shardId); + try{ + int read = 0; + while ((read = in.read(buffer, 0, buffer.length)) != -1){ + for (int offset = 0; offset < read - 1; offset += 2) { + String code = new String(buffer, offset, 2); + updateAggregates(countries, states, code); + } + } + }catch(IOException ioe) { + throw new RuntimeException(ioe); + }finally{ + try{ + in.close(); + }catch(Exception e){ + // ignore + } + } + } + + return new Pair,Map>>(countries, states); + } + + public Pair,Map>> aggregateLocationList( + Jedis conn, long[] userIds) + { + Map countries = new HashMap(); + Map> states = new HashMap>(); + + Pipeline pipe = conn.pipelined(); + for (int i = 0; i < userIds.length; i++) { + long userId = userIds[i]; + long shardId = userId / USERS_PER_SHARD; + int position = (int)(userId % USERS_PER_SHARD); + int offset = position * 2; + + pipe.substr("location:" + shardId, offset, offset + 1); + + if ((i + 1) % 1000 == 0) { + updateAggregates(countries, states, pipe.syncAndReturnAll()); + } + } + + updateAggregates(countries, states, pipe.syncAndReturnAll()); + + return new Pair,Map>>(countries, states); + } + + public void updateAggregates( + Map countries, Map> states, List codes) + { + for (Object code : codes) { + updateAggregates(countries, states, (String)code); + } + } + + public void updateAggregates( + Map countries, Map> states, String code) + { + if (code.length() != 2) { + return; + } + + int countryIdx = (int)code.charAt(0) - 1; + int stateIdx = (int)code.charAt(1) - 1; + + if (countryIdx < 0 || countryIdx >= COUNTRIES.length) { + return; + } + + String country = COUNTRIES[countryIdx]; + Long countryAgg = countries.get(country); + if (countryAgg == null){ + countryAgg = Long.valueOf(0); + } + countries.put(country, countryAgg + 1); + + if (!STATES.containsKey(country)) { + return; + } + if (stateIdx < 0 || stateIdx >= STATES.get(country).length){ + return; + } + + String state = STATES.get(country)[stateIdx]; + Map stateAggs = states.get(country); + if (stateAggs == null){ + stateAggs = new HashMap(); + states.put(country, stateAggs); + } + Long stateAgg = stateAggs.get(state); + if (stateAgg == null){ + stateAgg = Long.valueOf(0); + } + stateAggs.put(state, stateAgg + 1); + } + + public String getCode(String country, String state) { + int cindex = bisectLeft(COUNTRIES, country); + if (cindex > COUNTRIES.length || !country.equals(COUNTRIES[cindex])) { + cindex = -1; + } + cindex++; + + int sindex = -1; + if (state != null && STATES.containsKey(country)) { + String[] states = STATES.get(country); + sindex = bisectLeft(states, state); + if (sindex > states.length || !state.equals(states[sindex])) { + sindex--; + } + } + sindex++; + + return new String(new char[]{(char)cindex, (char)sindex}); + } + + private int bisectLeft(String[] values, String key) { + int index = Arrays.binarySearch(values, key); + return index < 0 ? Math.abs(index) - 1 : index; + } + + private boolean isDigit(String string) { + for(char c : string.toCharArray()) { + if (!Character.isDigit(c)){ + return false; + } + } + return true; + } + + public class RedisInputStream + extends InputStream + { + private Jedis conn; + private String key; + private int pos; + + public RedisInputStream(Jedis conn, String key){ + this.conn = conn; + this.key = key; + } + + @Override + public int available() + throws IOException + { + long len = conn.strlen(key); + return (int)(len - pos); + } + + @Override + public int read() + throws IOException + { + byte[] block = conn.substr(key.getBytes(), pos, pos); + if (block == null || block.length == 0){ + return -1; + } + pos++; + return (int)(block[0] & 0xff); + } + + @Override + public int read(byte[] buf, int off, int len) + throws IOException + { + byte[] block = conn.substr(key.getBytes(), pos, pos + (len - off - 1)); + if (block == null || block.length == 0){ + return -1; + } + System.arraycopy(block, 0, buf, off, block.length); + pos += block.length; + return block.length; + } + + @Override + public void close() { + // no-op + } + } +}