Storm中实现了一个容器,它里面的元素存一定时间,超过该时间的元素便可能被删除,以保证容器可以加入更多新容器。
实现思想:初始化N个容器,N个容器组成链表,元素的最短存储时间为T,每次将新元素插入到链表头部的容器内,每隔T/(N-1)时间,将链表尾部的容器删除,同时在链表头部插入新容器,每次删除在N个容器中分别执行删除操作。可以每个元素的存储时间为:[T, T + T/(N-1)]
在具体实现中作者采用一个线程进行容器队列的头部插入和尾部删除工作,同时使用锁机制避免常规容器操作和容器队列的删除与添加操作的访问冲突。
Code:
privatefinalObject _lock= newObject();
privateThread _cleaner;
private ExpiredCallback _callback;
……
final long expirationMillis = expirationSecs * 1000L;
final long sleepTime = expirationMillis / (numBuckets-1);
_cleaner = new Thread(new Runnable() {
public void run() {
try {
while(true) {
Map<K, V> dead = null;
Time.sleep(sleepTime);
synchronized(_lock) {
dead = _buckets.removeLast();
_buckets.addFirst(new HashMap<K, V>());
}
if(_callback!=null) {
for(Entry<K, V>entry: dead.entrySet()){
_callback.expire((),());
}
}
}
} catch (InterruptedException ex) {
}
}
});
_cleaner.setDaemon(true);
_cleaner.start();