Bloomfilter 原理与应用

前言: 本文只讲解原理, 不讲解BloomFilter中各项指标的公式的推算, 能让你知道有这么一个东西(what), 他能达到什么效果(why), 是如何做到的(how).
(What)

Bloom Filter 是用一个 位数组(数组的每个元素不是1就是0) 来表示一个大的元素集合, 而且通过这个数组就可以判断某个元素是不是属于这个集合

大大节省了空间, 但是有代价的, 就是有一定的错误率(为什么会有, 后面举个例说明一下)
假定我们有一个已知的集合,  S = {a1, a2, … , an} (这里的元素不仅仅是数字, 而是泛指所有对象), 我们要判断一个元素 x 是否属于这个集合, 普通的做法有两种形式:
     1. 遍历 比较,  性能和S的大小成反比     (时间问题)
     2. hash+链表(传说中的hashMap), 速度较快, 但是要把集合的所有数据都存进内存   (空间问题)
在性能要求高, 而且空间不足的情况下, BloomFilter就派上用场了
(Why)
BloomFilter能解决什么问题?
     以少量的内存空间判断一个 元素 是否属于这个集合, 代价是有一定的错误率
(How)
工作原理
     1. 初始化一个数组, 所有位标为0,  A={x1, x2, x3,…,xm}  (x1, x2, x3,…,xm 初始为0)
     2. 将已知集合S中的每一个数组, 按以下方式映射到A中
          2.0  选取n个互相独立的hash函数 h1, h2, … hk
          2.1  将元素通过以上hash函数得到一组索引值 h1(xi), h2(xi),…,hk(xi)
          2.2  将集合A中的上述索引值标记为1(如果不同元素有重复, 则重复覆盖为1, 这是一个觅等操作)
     3.  对于一个元素x, 将其根据2.0中选取的hash函数, 进行hash, 得到一组索引值 h1(x), h2(x), …,hk(x)
          如果集合A中的这些索引位置上的值都是1, 表示这个元素属于集合S, 否则则不属于S
几个前提
     1. hash函数的计算不能性能太差, 否则得不偿失
     2. 任意两个hash函数之间必须是独立的.
          即任意两个hash函数不存在单一相关性, 否则hash到其中一个索引上的元素也必定会hash到另一个相关的索引上, 这样多个hash没有意义
错误率
     工作原理的第3步, 的出来的结论, 一个是绝对靠谱的, 一个是不能100%靠谱的.
     如果集合A中的这些索引位置上的值都是1, 表示这个元素属于集合S, 否则则不属于S     标红的这句话是绝对靠谱的.
     至于错误率有多大, 我这里不想去推算, 后面会给出参考文章, 不在本文的讨论范围, 很简单就能举个例子说明错误率是存在的
         当A长度不是很大时, 很容易出现一种情况, 使得A上的元素全部被标记为1了, 这时所有的元素都会被认为是S里的元素, 所以, 错误率是存在的!
          (可以看出, 错误的大小跟A的长度以及hash函数的个数有关)
(In Action)
应用
1. 假如你有一个很大的商品库(亿级别), 然后你要做一个浏览型的网站, 这时候, 你不可能把所有的商品都丢给用户去浏览, 而是从商品中挑选出部分属于
     “精品”的商品来给用户浏览, 提高用户体验和转化率, 你对你的精品库建立一套搜索引擎
2. 由于互动需要, 你需要对你维护的精品库的商品数据实时更新动态, 比如XXX在某时间给了一个”好评”,  “购买了一笔”, “赞”,”喜欢”等等
3. 为了实现这种实时更新, 你通过MQ订阅了商品相关的消息(notify)(交易, 评价, SNS), 只要商品发生动态就会发送给你的系统.
4. 这时候, 由于全网的商品很多, 发生动态的消息很多情况下是跟你的精品库没有关系的, 这时候你需要挡掉这些消息, 不进行处理.
5. 此时, 不可能每来一个商品数据你先通过搜索引擎判断一下商品不是在你的精品库内(效率问题, 压力问题), 这时候, bloomFilter派送用场了.
6. 从上面错误率的点, 我们可以看到, 如果一个元素被BloomFilter判断为不属于原有的集合, 那么这个元素是肯定不属于这个集合的(被排除的准确率是100%的)
     通过bloomfilter的几项指标, 就可以挡掉大多数没有相关的数据, 而只处理有关系(虽然有部分无关)的数据了.
(Graphics)附图一张
参考文章:
     http://blog.csdn.net/jiaomeng/article/details/1495500
     http://en.wikipedia.org/wiki/Bloom_filter

mongodb 修改器学习

1. 执行update相关操作都可以使用修改器.
2. 修改器是为了不整个大文档来替换修改, 而是修改局部
3. 详情:
3.1 $inc (自增)
为增加某个字段的数值
例子:
原文档如下:

{
 "_id" : ObjectId("4b253b067525f35f94b60a31"),
 "url" : "www.example.com",
 "pageviews" : 52
}

执行修改器 $inc
db.test.update({url:’www.example.com’},{$inc:{pageviews: 1}})
表示为pageviews这个 key 的值 加上1. 后面是对应要加上的值.
注意, $inc中, 字段对应的值只能是 数字. 但是可以是 正数也可以是负数. 如果是负数, 就是执行一个减法操作.
3.2 $set (设置)

修改某个字段的值为指定的值. 这里的修改是与字段类型无关的. 你可以把一个string修改成long或者一个对象都行.
例子:
原文档如下:

{
"_id" : ObjectId("4b253b067525f35f94b60a31"),
"name" : "joe",
"age" : 30,
"sex" : "male",
"location" : "Wisconsin"
}

修改性别, 变成false (String -> Boolean)
执行修改器 $set

db.test.update({name:'joe'}, {$set:{sex:false}})

表示将 name为joe的这个对象的 sex修改为false
3.3 $unset (删除字段)
这个修改器是为了删除某个指定的字段及其值.
例子:
原文档如下:

{
 "_id" : ObjectId("4f9105377522000000006be0"), 
 "howlong" : 6, 
 "lovePerson" : "zhanying", 
 "person" : "jiacheo"
}

把howlong删除
执行修改器 $unset

db.test.update({person:'jiacheo'},{$unset:{howlong:1}})

后面的1没具体意义, 相当于确认
3.4 $push (把一个数据放到数组里面)
例子:
原文档如下:

{
 "_id" : ObjectId("4f9105377522000000006be0"), 
 "lovePerson" : "zhanying", 
 "person" : "jiacheo", 
 "supporters" : [ "gongjin" ]
}

执行修改器, 增加一个人到supporters里面

db.test.update({person:'jiacheo'},{$push:{supporters:'ziming'}})

这时候变成:

{ 
 "_id" : ObjectId("4f9105377522000000006be0"),
 "lovePerson" : "zhanying",
 "person" : "jiacheo",
 "supporters" : [
   "gongjin", "ziming" 
 ]
}

若果再执行一遍上面的代码, 那么数组还是会加上同样的名称, 如果你需要排同, 可以用后面这个修改器
3.5 $addToSet (将一个对象放到集合里面. 集合里面不会出现两个重复的一模一样的对象)
例子:
原文档如下:

{
"_id" : ObjectId("4f9105377522000000006be0"),
"lovePerson" : "zhanying",
"person" : "jiacheo",
"supporters" : [
"gongjin",
"ziming"
]
}

执行修改器 $addToSet

db.test.update({person:'jiacheo'},{$addToSet:{supporters:'ziming'}})

文档内容没有改变..
以上两个修改器还可以配合 $each, 把一个数据里的数据append到另一个数组上去(后者会排重)
比如:

db.test.update({person:'jiacheo'}, {$addToSet:{supporters:{$each: ["gongjin","ziming","liuxun","feidu"] }}})

执行后, 结果如下:

{
 "_id" : ObjectId("4f9105377522000000006be0"),
 "lovePerson" : "zhanying",
 "person" : "jiacheo",
 "supporters" : ["gongjin", "ziming", "feidu", "liuxun" 
 ]
}

3.6 $pop 队列出列 (按数组索引顺序出列(正序或者倒序))
例子:
原文档如下:

{
 "_id" : ObjectId("4f9105377522000000006be0"), 
 "lovePerson" : "zhanying", 
 "person" : "jiacheo",
 "supporters" : ["gongjin", "ziming", "feidu", "liuxun" 
 ]
}

执行修改器 $pop

db.test.update({person:'jiacheo'},{$pop:{supporters:1}})

修改后变为

{
 "_id" : ObjectId("4f9105377522000000006be0"),
 "lovePerson" : "zhanying",
 "person" : "jiacheo",
 "supporters" : [ "gongjin", "ziming", "feidu" ]
}

value 值为1 表示数组索引靠后的先出列, 也就是后进先出, 相当于出栈的概念. value值为-1表示数组索引考前的先出列, 也就是先进先出, 相当于一个FIFO的队列
3.7 $pull (符合条件的元素出列)
与上一个相比, 这个修改器会把符合条件的元素从数组中删除, 而不是简单的按照索引值的大小来处理.
原文档如下:

{
 "_id" : ObjectId("4f9105377522000000006be0"),
 "lovePerson" : "zhanying",
 "person" : "jiacheo",
 "supporters" : [ "gongjin", "ziming", "feidu" ]
}

执行修改器 $pull

db.test.update({person:'jiacheo'},{$pull:{supporters: 'gongjin'}})

修改后:

{
 "_id" : ObjectId("4f9105377522000000006be0"),
 "lovePerson" : "zhanying",
 "person" : "jiacheo",
 "supporters" : [ "ziming", "feidu" ]
}

可以见, gongjin从supporters中被remove掉了.
这里所有被匹配的元素都会被出列

tomcat thread dump 分析

  1. 前言

Java Thread Dump 是一个非常有用的应用诊断工具, 通过thread dump出来的信息, 可以定位到你需要了解的线程, 以及这个线程的调用栈. 如果配合linux的top命令, 可以找到你的系统中的最耗CPU的线程代码段, 这样才能有针对性地进行优化.

  1. 场景和实践

    2.1. 后台系统一直是在黑盒运行, 除了能暂停一部分任务的执行, 根本无法知道哪些任务耗CPU过多。所以一直以为是业务代码的问题, 经过各种优化(删减没必要的逻辑, 合并写操作)等等优化, 系统负载还是很高. 没什么访问量, 后台任务处理也就是每天几百万的级别, load还是达到了15以上. CPU只有4核,天天收到load告警却无从下手, 于是乎就被迫来分析一把线程.

   2.2 系统跑的是java tomcat, 要触发tomcat thread dump很简单, 先找到tomcat对应的进程id, 我们设置为PID
   【linux 命令】:  ps -ef | grep tomcat
   可以找到, 然后给这个进程发送一个QUIT的信号量, 让其触发线程的dump,  下面的操作先别急着动手, 等到看完2.3再动手不迟
    【linux 命令】: kill -3 $PID   /  kill -QUIT $PID
tomcat会把thread dump的内容输出到控制台
     【linux 命令】:cd $tomcathome/logs/
查看 catalina.out 文件, 把最后的跟thread相关的内容获取出来.
大致内容如下:
2012-04-13 16:30:41
Full thread dump OpenJDK 64-Bit Server VM (1.6.0-b09 mixed mode):
"TP-Processor12" daemon prio=10 tid=0x00000000045acc00 nid=0x7f19 in Object.wait() [0x00000000483d0000..0x00000000483d0a90]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x00002aaab5bfce70> (a org.apache.tomcat.util.threads.ThreadPool$ControlRunnable)
at java.lang.Object.wait(Object.java:502)
at org.apache.tomcat.util.threads.ThreadPool$ControlRunnable.run(ThreadPool.java:662)
- locked <0x00002aaab5bfce70> (a org.apache.tomcat.util.threads.ThreadPool$ControlRunnable)
at java.lang.Thread.run(Thread.java:636)

"TP-Processor11" daemon prio=10 tid=0x00000000048e3c00 nid=0x7f18 in Object.wait() [0x00000000482cf000..0x00000000482cfd10]
java.lang.Thread.State: WAITING (on object monitor)
....
"VM Thread" prio=10 tid=0x00000000042ff400 nid=0x77de runnable"GC task thread#0 (ParallelGC)" prio=10 tid=0x000000000429c400 nid=0x77d9 runnable

"GC task thread#1 (ParallelGC)" prio=10 tid=0x000000000429d800 nid=0x77da runnable

"GC task thread#2 (ParallelGC)" prio=10 tid=0x000000000429ec00 nid=0x77db runnable

"GC task thread#3 (ParallelGC)" prio=10 tid=0x00000000042a0000 nid=0x77dc runnable

"VM Periodic Task Thread" prio=10 tid=0x0000000004348400 nid=0x77e5 waiting on condition

JNI global references: 815

Heap
PSYoungGen      total 320192K, used 178216K [0x00002aaadce00000, 0x00002aaaf1800000, 0x00002aaaf1800000)
eden space 303744K, 55% used [0x00002aaadce00000,0x00002aaae718e048,0x00002aaaef6a0000)
from space 16448K, 65% used [0x00002aaaf0690000,0x00002aaaf110c1b0,0x00002aaaf16a0000)
to   space 16320K, 0% used [0x00002aaaef6a0000,0x00002aaaef6a0000,0x00002aaaf0690000)
PSOldGen        total 460992K, used 425946K [0x00002aaab3a00000, 0x00002aaacfc30000, 0x00002aaadce00000)
object space 460992K, 92% used [0x00002aaab3a00000,0x00002aaacd9f6a30,0x00002aaacfc30000)
PSPermGen       total 56192K, used 55353K [0x00002aaaae600000, 0x00002aaab1ce0000, 0x00002aaab3a00000)
object space 56192K, 98% used [0x00002aaaae600000,0x00002aaab1c0e520,0x00002aaab1ce0000)
最后一段是系统的对内存的使用情况.
2.3. 要知道thread dump是不会告诉你每个线程的负载情况的, 需要知道每个线程的负载情况, 还得靠top命令来查看.
    【linux 命令】:top -H -p $PID
这时候, 可以看到java进程下各个线程的负载和内存等使用情况. 也不用全部搞下来, 只要top几个负载过高的记录即可(最好按下SHIFT+T 按CPU耗时总时间倒序排序,这样找到的top几个是最耗CPU时间的,而且系统启动时间应该持续15分钟以上,这样容易看出哪个线程耗时多。)
     大致内容如下:
Tasks: 118 total,   2 running, 116 sleeping,   0 stopped,   0 zombie
Cpu(s): 92.6%us,  2.3%sy,  0.0%ni,  3.8%id,  0.7%wa,  0.1%hi,  0.7%si,  0.0%st
Mem:   4054168k total,  3892212k used,   161956k free,   115816k buffers
Swap:  4192956k total,   294448k used,  3898508k free,  2156024k cachedPID USER      PR  NI  VIRT  RES  SHR S %CPU %MEM    TIME+  COMMAND
8091 admin     16   0 1522m 814m 9660 R 22.3 20.6   4:05.61 java
8038 admin     16   0 1522m 814m 9660 R 10.3 20.6   2:46.31 java
8043 admin     15   0 1522m 814m 9660 S  3.7 20.6   1:52.04 java
8039 admin     15   0 1522m 814m 9660 S  0.7 20.6   2:10.98 java
8041 admin     15   0 1522m 814m 9660 S  0.7 20.6   1:39.66 java
8009 admin     15   0 1522m 814m 9660 S  0.3 20.6   0:27.05 java
8040 admin     15   0 1522m 814m 9660 S  0.3 20.6   0:51.46 java
7978 admin     25   0 1522m 814m 9660 S  0.0 20.6   0:00.00 java
7980 admin     19   0 1522m 814m 9660 S  0.0 20.6   0:05.05 java
7981 admin     16   0 1522m 814m 9660 S  0.0 20.6   0:06.31 java
7982 admin     15   0 1522m 814m 9660 S  0.0 20.6   0:06.50 java
7983 admin     15   0 1522m 814m 9660 S  0.0 20.6   0:06.66 java
7984 admin     15   0 1522m 814m 9660 S  0.0 20.6   0:06.87 java
7985 admin     15   0 1522m 814m 9660 S  0.0 20.6   0:33.82 java
几个字段跟top的字段意思是一致的, 就是这里的 PID是 线程在系统里面的ID, 也就是进程每创建一个线程, 不仅进程自己会分配ID, 系统也会的. 接下来的问题排查就是主要根据这个PID来走的.
看到上面的部分数据, 当前正在跑的任务中, CPU占用最高的几个线程ID
2.4. 如果不借助工具, 自己分析的话, 可以把PID字段从10进制数改为 16进制, 然后到threaddump日志中去查找一把, 找对对应的线程上下文信息, 就可以知道哪段代码耗CPU最多了.
比如 8091  的16进制是 1F9B, 查找 thread dump 日志中, nid=0x1F9B 的线程( 这里的nid意思是nativeid, 也就是上面讲的系统为线程分配的ID), 然后找到相关的代码段, 进行优化即可.
比如
"链路检测" prio=10 tid=0x00002aaafa498000 nid=0x1F9B runnable [0x0000000045fac000..0x0000000045facd10]</div>

java.lang.Thread.State: RUNNABLE
at cn.emay.sdk.communication.socket.AsynSocket$CheckConnection.run(AsynSocket.java:112)
at java.lang.Thread.run(Thread.java:636)
可以看出, 这是一个 发短信的客户端的链路检测引擎的系统负载飙升. (实际上这个线程引起的负载绝不止这么一点.)
2.5 第三方的jar包, 我感到顿时泪奔. 接下来是反编译, 看详细的代码… 果然是有一段死循环监听的… 目前是像他们要一份SDK的源代码, 或者要他们进行优化。
2.6 使用工具的话, 可以看到更多一点的信息, java的tda工具就是专门分析thread dump的.
具体功能自己去挖掘啦.

Java如何等待子线程执行结束

工作中往往会遇到异步去执行某段逻辑, 然后先处理其他事情, 处理完后再把那段逻辑的处理结果进行汇总的产景, 这时候就需要使用线程了。
一个线程启动之后, 是异步的去执行需要执行的内容的, 不会影响主线程的流程,  往往需要让主线程指定后, 等待子线程的完成. 这里有几种方式.
站在 主线程的角度, 我们可以分为主动式和被动式.
主动式指主线主动去检测某个标志位, 判断子线程是否已经完成. 被动式指主线程被动的等待子线程的结束, 很明显, 比较符合人们的胃口. 就是你事情做完了, 你告诉我, 我汇总一下, 哈哈.
那么主线程如何等待子线程工作完成呢. 很简单, Thread 类给我们提供了join 系列的方法, 这些方法的目的就是等待当前线程的die. 举个例子:

public class Threads {
    public static void main(String[] args) {
        SubThread thread = new SubThread();
        thread.start();
        //主线程处理其他工作,让子线程异步去执行.
        mainThreadOtherWork();
        System.out.println("now waiting sub thread done.");
        //主线程其他工作完毕,等待子线程的结束, 调用join系列的方法即可(可以设置超时时间)
        try {
            thread.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("now all done.");
    }

    private static void mainThreadOtherWork() {
        System.out.println("main thread work start");
        try {
            Thread.sleep(3000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("main thread work done.");
    }

    public static class SubThread extends Thread {
        @Override
        public void run() {
            working();
        }

        private void working() {
            System.out.println("sub thread start working.");
            busy();
            System.out.println("sub thread stop working.");
        }

        private void busy() {
            try {
                sleep(5000L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

    }
}

本程序的数据有可能是如下:

main thread work start
sub thread start working.
main thread work done.
now waiting sub thread done.
sub thread stop working.
now all done.

忽略标号, 当然输出也有可能是1和2调换位置了. 这个我们是无法控制的. 我们看下线程的join操作, 究竟干了什么.

public final void join() throws InterruptedException {
   join(0);
}

这里是调用了

public final synchronized void join(long millis)
    throws InterruptedException

方法, 参数为0, 表示没有超时时间, 等到线程结束为止. join(millis)方法里面有这么一段代码:

        while (isAlive()) {
                wait(0);
        }

说明, 当线程处于活跃状态的时候, 会一直等待, 直到这里的isAlive方法返回false, 才会结束.isAlive方法是一个本地方法, 他的作用是判断线程是否已经执行结束. 注释是这么写的:

Tests if this thread is alive. A thread is alive if it has been started and has not yet died.

可见, join系列方法可以帮助我们等待一个子线程的结束.
那么要问, 有没有另外一种方法可以等待子线程结束? 当然有的, 我们可以使用并发包下面的Future模式.
Future是一个任务执行的结果, 他是一个将来时, 即一个任务执行, 立即异步返回一个Future对象, 等到任务结束的时候, 会把值返回给这个future对象里面. 我们可以使用ExecutorService接口来提交一个线程.

public class Threads {

    static ExecutorService executorService = Executors.newFixedThreadPool(1);

    @SuppressWarnings("rawtypes")
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        SubThread thread = new SubThread();
        // thread.start();
        Future future = executorService.submit(thread);
        mainThreadOtherWork();
        System.out.println("now waiting sub thread done.");
        future.get();
        // try {
        // thread.join();
        // } catch (InterruptedException e) {
        // e.printStackTrace();
        // }
        System.out.println("now all done.");
        executorService.shutdown();
    }

    private static void mainThreadOtherWork() {
        System.out.println("main thread work start");
        try {
            Thread.sleep(3000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("main thread work done.");
    }

    public static class SubThread extends Thread {
        @Override
        public void run() {
            working();
        }

        private void working() {
            System.out.println("sub thread start working.");
            busy();
            System.out.println("sub thread stop working.");
        }

        private void busy() {
            try {
                sleep(5000L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

    }

}

这里, ThreadPoolExecutor 是实现了 ExecutorService的方法, sumbit的过程就是把一个Runnable接口对象包装成一个 Callable接口对象, 然后放到 workQueue里等待调度执行. 当然, 执行的启动也是调用了thread的start来做到的, 只不过这里被包装掉了. 另外, 这里的thread是会被重复利用的, 所以这里要退出主线程, 需要执行以下shutdown方法以示退出使用线程池. 扯远了. 
这种方法是得益于Callable接口和Future模式, 调用future接口的get方法, 会同步等待该future执行结束, 然后获取到结果. Callbale接口的接口方法是 V call(); 是可以有返回结果的, 而Runnable的 void run(), 是没有返回结果的. 所以, 这里即使被包装成Callbale接口, future.get返回的结果也是null的.如果需要得到返回结果, 建议使用Callable接口.
通过队列来控制线程的进度, 是很好的一个理念. 我们完全可以自己搞个队列, 自己控制. 这样也可以实现. 不信看代码:

public class Threads {

    // static ExecutorService executorService = Executors.newFixedThreadPool(1);
    static final BlockingQueue queue = new ArrayBlockingQueue(1);

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        SubThread thread = new SubThread(queue);
        thread.start();
        // Future future = executorService.submit(thread);
        mainThreadOtherWork();
        System.out.println("now waiting sub thread done.");
        // future.get();
        queue.take();
        // try {
        // thread.join();
        // } catch (InterruptedException e) {
        // e.printStackTrace();
        // }
        System.out.println("now all done.");
        // executorService.shutdown();
    }

    private static void mainThreadOtherWork() {
        System.out.println("main thread work start");
        try {
            Thread.sleep(3000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("main thread work done.");
    }

    public static class SubThread extends Thread {

        private BlockingQueue queue;

        /**
         * @param queue
         */
        public SubThread(BlockingQueue queue) {
            this.queue = queue;
        }

        @Override
        public void run() {
            try {
                working();
            } finally {
                try {
                    queue.put(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

        }

        private void working() {
            System.out.println("sub thread start working.");
            busy();
            System.out.println("sub thread stop working.");
        }

        private void busy() {
            try {
                sleep(5000L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

    }

}

这里是得益于我们用了一个阻塞队列, 他的put操作和take操作都会阻塞(同步), 在满足条件的情况下.当我们调用take()方法时, 由于子线程还没结束, 队列是空的, 所以这里的take操作会阻塞, 直到子线程结束的时候, 往队列里面put了个元素, 表明自己结束了. 这时候主线程的take()就会返回他拿到的数据. 当然, 他拿到什么我们是不必去关心的.
以上几种情况都是针对子线程只有1个的时候. 当子线程有多个的时候, 情况就不妙了.
第一种方法, 你要调用很多个线程的join, 特别是当你的线程不是for循环创建的, 而是一个一个创建的时候.
第二种方法, 要调用很多的future的get方法, 同第一种方法.
第三种方法, 比较方便一些, 只需要每个线程都在queue里面 put一个元素就好了.但是, 第三种方法, 这个队列里的对象, 对我们是毫无用处, 我们为了使用队列, 而要不明不白浪费一些内存, 那有没有更好的办法呢?
有的, concurrency包里面提供了好多有用的东东, 其中, CountDownLanch就是我们要用的.
CountDownLanch 是一个倒数计数器, 给一个初始值(>=0), 然后每countDown一次就会减1, 这很符合等待多个子线程结束的场景: 一个线程结束的时候, countDown一次, 直到所有都countDown了 , 那么所有子线程就都结束了.
先看看CountDownLanch有哪些方法:

CountDownLatch

await: 会阻塞等待计数器减少到0位置. 带参数的await是多了等待时间.
countDown: 将当前的技术减1
getCount(): 返回当前的计数
显而易见, 我们只需要在子线程执行之前, 赋予初始化countDownLanch, 并赋予线程数量为初始值.
每个线程执行完毕的时候, 就countDown一下.主线程只需要调用await方法, 可以等待所有子线程执行结束, 看代码:

public class Threads {
// static ExecutorService executorService = Executors.newFixedThreadPool(1);
    static final BlockingQueue queue = new ArrayBlockingQueue(1);
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        int threads = 5;
        CountDownLatch countDownLatch = new CountDownLatch(threads);
        for(int i=0;i < threads;i++){
            SubThread thread = new SubThread(2000*(i+1), countDownLatch);
            thread.start();
        }
        mainThreadOtherWork();
        System.out.println("now waiting sub thread done.");
        countDownLatch.await();
        System.out.println("now all done.");
    }

    private static void mainThreadOtherWork() {
        System.out.println("main thread work start");
        try {
            Thread.sleep(3000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("main thread work done.");
    }

    public static class SubThread extends Thread{

        // private BlockingQueue queue;
        private CountDownLatch countDownLatch;
        private long work;


        public SubThread(long work, CountDownLatch countDownLatch) {
            // this.queue = queue;
            this.countDownLatch = countDownLatch;
            this.work = work;
        }

        @Override
        public void run() {
            try{
                working();
            }finally{
                countDownLatch.countDown();
            }
        }

        private void working() {
            System.out.println(getName()+" sub thread start working.");
            busy();
            System.out.println(getName()+" sub thread stop working.");
        }

        private void busy() {
            try {
                sleep(work);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

此种方法也适用于使用 ExecutorService summit 的任务的执行.
另外还有一个并发包的类CyclicBarrier, 这个是(子)线程之间的互相等待的利器. 栅栏, 就是把大家都在一个地方堵住, 就像水闸, 等大家都完成了之前的操作, 在一起继续下面的操作. 不过就不再本篇的讨论范围内了.

MapReduce编程(入门篇)

 一. MapReduce 编程模型

还是以一个经典的图片来说明问题.

map reduce thought

1. 首先, 我们能确定我们有一份输入, 而且他的数据量会很大

2. 通过split之后, 他变成了若干的分片, 每个分片交给一个Map处理

3. map处理完后, tasktracker会把数据进行复制和排序, 然后通过输出的key 和value进行 partition的划分, 并把partition相同的map输出, 合并为相同的reduce的输入.
4. ruducer通过处理, 把数据输出, 每个相同的key, 一定在一个reduce中处理完, 每一个reduce至少对应一份输出(可以通过扩展MultipleOutputFormat来得到多分输出)
5. 来看一个例子, 如下图:(来自 《hadoop权威指南》 一书)
实例
  说明几点:
  5.1 输入的数据可能就是一堆文本
  5.2 mapper会解析每行数据, 然后提取有效的数据, 作为输出. 这里的例子是 从日志文件中提取每一年每天的气温, 最后会计算每年的最高气温
  5.3 map的输出就是一条一条的 key-value
  5.4 通过shuffle之后, 变成reduce的输入, 这是相同的key对应的value被组合成了一个迭代器
  5.5 reduce的任务是提取每一年的最高气温, 然后输出

二. Mapper

1. mapper可以选择性地继承 MapreduceBase这个基类, 他只是把一些方法实现了而已, 即使方法体是空的.
2. mapper必须实现 Mapper 接口(0.20以前的版本), 这是一个泛型接口, 需要执行输入和输出的key-value的类型, 这些类型通常都是Wriable接口的实现类
3. 实现map方法, 方法有四个参数, 前面两个就是输入的 Key 和 value, 第三个参数是 OuputCollector, 用于收集输出的, 第四个是reporter,用来报告一些状态的,可以用于debug
  3.1 input 默认是一行一条记录, 每天记录都放在value里边
  3.2 output  每次搜集一条 K-V记录, 一个K可以对应多个value, 在reduce 里面体现为一个 iterator
4. 覆盖 configure方法可以得到JobConf的实例, 这个JobConf是在Job运行时传递过来的, 可以跟外部资源进行数据交互

三. Reducer

1. reduce也可以选择继承 MapreduceBase这个基类, 功能跟mapper一样.
2. reducer必须实现Reducer接口, 这个接口同样是泛型接口, 意义跟Mapper的类似
3. 实现reduce方法, 这个方法也有四个参数, 第一个是输入的key, 第二个是输入的 value的迭代器, 可以遍历所有的value,相当于一个列表, outputCollector跟map的一样, 是输出的搜集器, 每次搜集都是key-value的形式, report的作用跟map的相同.
4. 在新版本中, hadoop已经将后面两个参数合并到一个context对象里边了, 当然还会兼容就版本的 接口. >0.19.x
5. 覆盖configure方法, 作用跟map的相同
6. 覆盖close 方法,可以做一些reduce结束后的处理工作.(clean up)

四. Combiner

1. combiner的作用是, 将map的输出,先计算一遍,得到初步的合并结果, 减少reduce的计算压力.
2. combiner的编写方法跟reduce是一样的, 他本来就是一个Reducer的实现类
3. 当reducer符合函数  F(a,b) = F(F(a), F(b)) 时, combinner可以与reduce相同. 比如 sum(a,b,c,d,e,f,g) = sum(sum(a,b) ,sum(c,d,e,f) , sum(g)) 还有max, min等等.
4. 编写正确的combiner可以优化整个mapreduce程序的性能.(特别是当reduce是性能瓶颈的时候.)
5. combiner可以跟reducer不同.

五. Configuration

1. 后加的属性的值会覆盖前面定义的相同名称的属性的值.
2. 被定义为 final的属性(在属性定义中加上<final>true</final>标签)不会被后面的同名属性定义的值给覆盖.
3. 系统属性比通过资源定义的属性优先级高, 也就是通过System.setProperty()方法会覆盖在资源文件中定义的属性的值.
4. 系统属性定义必须在资源文件中有相应的定义才会生效.
5. 通过 -D 选项定义的属性, 比在资源文件中定义的属性优先级要高.

六. Run Jobs

1. 设置 inputs & output
    1.1 先判断输入是否存在 (不存在会导致出错,最好利用程序来判断.)
    1.2 判断输出是否已经存在(存在也会导致出错)
    1.3 养成一种好的习惯(先判断,再执行)
2. 设置 mapper、reducer、combiner. 各个实现类的class对象.  XXXX.class
3. 设置 inputformat & outputformat & types
    3.1 input和output format都有两种, 一种是 textfile, 一种是sequencefile. 简单理解, textfile是文本组织的形式,sequence file是 二进制组织的形式.
    3.2 Types的设置, 根据输入和输出的数据类型, 设置各种Writable接口的实现类的class对象.
4. 设置reduce count
    4.1 reduce count可以为0, 当你的数据无需reduce的时候.
    4.2 reduce数量最好稍微少于当前可用的slots的数量, 这样reduce就能在一波计算中算好. (一个slot可以理解为一个计算单元(资源).)

七. 其他的一些细节.

1. ChainMapper可以实现链式执行mapper 他本身就是一个Mapper的实现类. 提供了一个addMapper的方法.
2. ChainReducer 跟ChainMapper类似, 可以实现链式执行reducer, 他是Reducer的实现类.
3. 多个job先后运行, 可以通过先后执行 JobClient.runJob方法来实现先后顺序
4. 扩展MultipleOutputFormat接口, 可以实现一个reduce对应多份输出 (而且可以指定文件名哦)
5. Partitioner 接口用于将 Map的输出结果进行分区, 分区相同的key对应的数据会被同一个reducer处理
    5.1 提供了一个接口方法: public int getPartition(K2 key, V2 value, int numReduceTasks)
    5.2 可以自己定义, 根据key的某些特指来划分, 也可以根据value的某些特质来划分.
    5.3 numReduceTasks就是设置的reduce的个数.一般返回的partition的值应该都小于这个值.(%)
6. reporter的作用
    6.1 reporter.incrCounter(key, amount). 比如对数据计算是, 一些不合规范的脏数据, 我们可以通过counter来记录有多少
    6.2 reporter.setStatus(status); 方法可以设置一条状态消息, 当我们发现job运行出现这条消息是, 说明出现了我们预期的(错误或者正确)的情况, 用于debug.
    6.3 reporter.progress(), 像mapreduce框架报告当前运行进度. 这个progress可以起到心跳的作用. 一个task要是超过10分钟没有想mapreduce框架报告情况, 这个reduce会被kill掉. 当你的任务处理会比较旧是, 最好定时向mapreduce汇报你的状态.
7. 通过实现Wriable接口, 我们可以自定义key和value的类型, 使用起来就像pojo, 不需要每次都进行parse. 如果你的自定义类型是Key的类型, 则需要同时实现Comparable 接口, 用于排序. 比如MapWritable就是一个例子.

八. 实战.(简单篇)

简单篇:

1. 需求: 统计某个站点每天的PV

2. 数据输入: 以天为分区存放着的日志数据, 一条日志代表一个PV

3. 数据输出: 日期   PV

4. Mapper编写

主要的工作很简单, split每一条日志, 取出日期, 并对该日期的PV搜集一条记录, 记录的value为ONE(1, 一条记录代表一个PV)

5. Reducer编写

reduce的任务是将每天(key相同的为同一天) 的日志进行汇总(sum), 最后以天为key输出汇总结果.

6. 设置环境, 指定job(Run)

6.1 设置输入路径.

 

 

 

 

6.2 设置输出路径

6.3 设置Mapper/Reducer 和 输入数据的数据格式和数据类型

6.4  执行命令:

hadoop jar site-pv-job.jar org.jiacheo.SitePVSumSampleJob

6.5 查看hadoop的web 工具, 显示当前job进度.

可以看出, 此次输入产生了14292个map,和29个reduce. reduce数这么少是因为我的reduce的slots数只有30, 所以设置为29, 以防一个挂了, 还能在一波reduce中算好.

6.6 计算结果.

上面部分是hadoop cli客户端显示的进度, 中间是web工具显示的输入输出的一些数据的统计.可以看出, 此次输入数据总共有1.6TB大小, 设计的总记录数为69.6亿. 也就是这份数据记录了该站点的69.6亿的PV. 左下角可以看出, 执行时间比较长, 用了18分钟+46秒.这里慢的原因不在于reduce, 而是我的map的slots太少, 只有300个, 总共一万多个map, 那要分好几百波才能算完map, 所以瓶颈在map这里.右下角是统计的结果数据, 可以看出, 该站点的整体的PV是呈现上升趋势的.

至此, 一个简单的map/reduce程序就写好并运行了.

下面介绍复杂一点的实践. 当然, 还是等有时间再来介绍吧. 碎觉先.

 

 

linux screen初体验

由于公司的线上线下环境(除了本机)都是linux环境, 所以工作的时候, securecrt少不了, 往往还需要同时登陆多台机器进行操作. 在操作的时候, 有时候还需要等待一个命令执行完才能搞其他东西, 有点浪费时间. 当然, 开多个securecrt来连接到远程机器可以解决这个问题, 但是另一个问题, 这种做法是解决不了的, 就是如果我要保存上一次的工作状态, 等下次登陆之后可以立刻进入上一次的状态继续工作, 这个时候, 需要我们对这种状态做一下保存, 而securecrt终端是不维护这种状态的, 每次断开重新登陆后都要重新敲一遍命令回到上次的状态. 这时候, screen命令就非常有用了. screen提供了类似于linux中的桌面的功能, 而且可以同时有多个桌面一起工作, 称之为window(窗口). 通过一些快捷键, 可以很方便地从一个窗口切换到另一个窗口, 而且这些窗口的状态都是保存着的. 下面介绍screen命令如何使用, 以及使用效果.

1. screen有哪些选项, man一把就知道了, 自己看看.

2. screen配置文件, 这个可有可无, 有这个配置文件可以让screen看起来比较容易理解, 我们只需要新建一个文本文件, 在里面键入以下配置信息即可:


caption always "%{wb} %-w%{+b bw}%n %t%{-}%+w"      #蓝底白字

#caption always "%{=b kR} %-w%{-b bg}%n %t%{-}%+w"   #红色的,这两句选一句就可以了

defutf8 on    #如果有中文乱码就加上,否则可以不用

defencoding utf-8

encoding utf-8 utf-8

defscrollback 8192     #可以解决闪屏的问题

保存文件, 比如文件名为.screenrc

3. 接下来是启动screen

敲入命令


screen -D -RR youscrid -c ${yourdir}/.screenrc

这里, yousrcid是由你自己定义的, 就是一个screen的sessionid, 下次再回到这个screen时, 需要用到这个id, 可以取个容易记住的名称, 比如linux的用户名之类的.

-c后面的选项值就是第2步保存的配置文件.

4. 经过前面3步, screen以及建立了, 这时候你在这个screen中干啥都行了.

4.1 新建一个新的窗口:  ctrl+a c 这里是快捷键, 先按下ctrl+a两个键, 然后再敲一下c (create)键就可以创建一个新的窗口了.

4.2 为窗口命名, 默认窗口的名称都叫 bash, 为了工作方便, 你可以给每个窗口命名, 快捷键是 ctrl+a A  注意这里后面是大写的A,(你也可以按下shift+a(小写a), 就不用开启caps lock了)

4.3 切换窗口,  切到下一个窗口的快捷键是:  ctrl+a n (next)  , 切换到上一个窗口快捷键是:  ctrl+a p (pre) . 另外还可以  ctrl+a  num  num值数字, 每新建一个screen都会有一个数字作为他的tab下标的(默认从0开始, 这个你肉眼是看的到的, 有了上面的配置的话)

4.4 退出窗口,  直接 exit 就可以退出当前的一个window了.

4.5  退出到shell控制台, 而不退出窗口,  ctrl+a d  这时候会回到你创建这个screen的那个控制台, 当前的screen状态都会被保持着. 这时候即使你在screen里面远程连接到其他机器上, 还是可以保持的, 而且下次恢复到screen的适合, 这种连接不会断开.

4.6  回到原有的screen, 在linux控制台下敲入命令:


screen -r yourscrid

这里的yoursrcid就是你在第3步的时候创建screen时候指定的id, 这个时候就会很神奇地回到上一次的工作状态中了!

5. 其他选项有待各位看官自己挖掘了.

6. 使用截图

 

参考文档

http://www.ibm.com/developerworks/cn/linux/l-cn-screen/

http://www.ibm.com/developerworks/cn/aix/library/au-gnu_screen/

http://blog.chinaunix.net/u2/82938/showart_1821389.html

 

在linux的alias中使用awk遇到的问题

场景很简单, 通过一个命令别名(alias)直接输出当前的机器的ip地址.

不需要使用ifconfig命令, 用ping就可以了. (更简单用 hostname -i 命令即可.)

ping -c 1 命令用来指定只ping一次, 后面是机器名或者域名.

查看机器名的命令是 hostname, 所以很简单, 直接

ping -c 1 `hostname`

就可以得到ip地址的信息了.

PING jiacheo (192.168.0.22) 56(84) bytes of data.
64 bytes from jiacheo (192.168.0.22): icmp_seq=0 ttl=64 time=0.034 ms
--- jiacheo ping statistics ---
1 packets transmitted, 1 received, 0% packet loss, time 0m
rtt min/avg/max/mdev = 0.034/0.034/0.034/0.000 ms, pipe 2

然后我只需要显示第一行的 (192.168.0.22)就可以了. 这时需要用到awk

ping -c 1 `hostname` | awk '{if(NR==1) pirnt $3}'

执行结果:

(192.168.0.22)

貌似没什么问题.

使用alias:

编辑~/.bashrc 文件, 最后加入

alias ip="ping -c 1 `hostname` | awk '{if(NR==1) pirnt $3}'

保存后, source ~/.bashrc 一下(或者重新登录终端)

在命令控制台敲入: ip  回车后显示

PING jiacheo (192.168.0.22) 56(84) bytes of data.

奇怪了, 开始怀疑是awk没有识别出分隔符.
后来想想, 在.bashrc里面定义的 环境变量都是可以用$var 来引用的, 这里的$是不是需要转义
在英明伟大神武牛叉轰轰的 莹莹的指导下, 果然是$符号的问题, 要转义一下就可以了. 转义的方法就是 在前面加个\

alias ip="ping -c 1 `hostname` | awk '{if(NR==1) pirnt \$3}'"

输出结果:

(192.168.0.22)

蛋疼扯了这么多, 就说明一个道理, 在定义alias的时候, 要注意特殊符号需要转义. 嗯.

高可用性系统经验分享之总结

昨天听了毕玄的课程, 在此做一下简短的小结.高可用性系统经验分享总结

1. 要做好监控监控好系统里面那些关键的点:哪些点影响全局, 哪些点对主流程没影响?

2. 要做好隔离不要将所有的东西都串起来搞, 一个系统挂了导致其他都挂. 隔离的手段: 系统拆分, 分级(P1,P2,P3,P4)

3.要理清并减少依赖特别是核心系统, 应该去除太多的依赖, 最好就似乎直接连接数据库, 这样稳定性只有数据库和系统本身, 而不用担心依赖系统的稳健程度.

4.缓存读操作一律走缓存, 当然缓存的时间限度要估量好.

5.优雅降级首先应该是将系统拆分成N个独立的功能点, 当遇到瓶颈的时候可以考虑将不影响主流程的功能去掉, 做到优雅降级.简单的实现就是一大堆的系统开关, 然后在每个功能点都使用开关. 当出现需要降级的情况的时候, 就可以在后台使用开关来降级了.

一致性哈希

学习分布式, 一致性哈希是最最基础的知识, 所以要理解好.

那什么是一致性哈希呢?(what)

百度百科 上的解释很专业术语. 要一句话定义貌似也有难度: 一致性哈希算法是在哈希算法基础上,提出的在动态变化的分布式环境中,哈希算法应该满足的几个条件: 平衡性, 单调性和分散性.

1.平衡性是指 hash的结果应该平均分配到各个节点, 这样从算法上就解决了负载均衡问题.

2.单调性是指 在新增或者删减节点时, 同一个key访问到的值总是一样的.

3.分散性是指 数据应该分散的存放在 分布式集群中的各个节点(节点自己可以有备份), 不必要每个节点都存储所有的数据.

 

为什么要一致性哈希?(why)

这个问题问得很好…首先我们要看看不使用一致性hash, 我们的分布式集群如何工作.

1. 普通集群, 把固定的key映射到固定的节点上, 节点只存放各自key的数据, 如图:

网站PV流量统计工具的实现

一, 概述

像CNZZ那样提供一段js代码然后帮助站长们统计网页的PV流量是如何实现的呢? 这里浅谈了一下实现方法, 其实很多网站内部的PV统计也是如此的, 只不过需要记录的数据比较多而已.

二, 需求

实现一个网站访问量统计工具, 能够得到被访问页面的URL, 被访问页面的前导URL, 以及访客所在地理位置.

三, 设计

分为两部分, 一是前端, 一是后端.