用过消息队列?Kafka?能否手写一个消息队列?懵

时间:2024-01-24 09:02:12

是否有同样的经历?面试官问你做过啥项目,我一顿胡侃,项目利用到了消息队列,kafka,rocketMQ等等。

好的,那请开始你的表演,面试官递过一支笔:给我手写一个消息队列!!WHAT?

为了大家遇到这种场景还能愉快的zhuangbi,所以写一篇文章,凑合用一下。

想要实现一个消息队列,我们需要关组以下几点:

1.首先有一个队列(FIFO)来存放消息

2.消息队列容量有限

3.需要入队,出队方法

4.需要考虑多线程并发情况

 

 

<1>.简单版:用LinkedList实现一个简单的消息队列

这里用LinkedList来实现队列,然后通过synchronized关键字来实现多线程的互斥,用LinkedList的addLast方法实现队列的push,用LinkedList的removeFirst实现队列的remove方法

//实现FIFO队列
public class MyList<T> {
    private LinkedList<T> storage = new LinkedList<T>();
    private int statckSize = 2000;
    public synchronized void push(T e) {//需要加上同步
        storage.addLast(e);
    }

    public synchronized T peek() {
        if(storage!=null&&storage.size()>0){
            return storage.peekFirst();
        }
        return null;

    }

    public void remove() {
        storage.removeFirst();
    }

    public boolean empty() {
        return storage.isEmpty();
    }
}
View Code

测试类:

public class ListTest {
    public static void main(String[] args) {
        MyList<String> myList = new MyList<String>();
        for(String s : "the prefect code".split(" ")){//LIFO
            myList.push(s);
        }
        while(!myList.empty()){
            System.out.print(myList.peek()+" ");
            myList.remove();
        }
    }

}
View Code

 

<2>.进阶版,仍然用LinkedList来实现队列,给出仓库的概念(消息队列仓库),生产者和消费者分别在独立线程中实现,使用object的wait(),notify()和synchronized()实现线程操作的同步与互斥(Obj.wait(),与Obj.notify()必须要与synchronized(Obj)一起使用,也就是wait,与notify是针对已经获取了Obj锁进行操作,从语法角度来说就是Obj.wait(),Obj.notify必须在synchronized(Obj){...}语句块内。)

抽象仓库类:

public interface AbstractStorage {
    void consumer(int num);
    void producer(int num);
}
View Code

生产者线程:

class Producer extends Thread {
    //生产数量
    private int num;
    //仓库
    private AbstractStorage abstractStorage;

    public Producer(AbstractStorage abstractStorage,int num){
        this.abstractStorage=abstractStorage;
        this.num=num;
    }
    // 调用仓库Storage的生产函数
    public void produce(int num){
        abstractStorage.producer(num);
    }
    // 线程run函数
    @Override
    public void run(){
        produce(num);
    }

}
View Code

消费者线程:

class Consumer extends Thread {
     //消费数量
     private int num;
     //仓库
     private AbstractStorage abstractStorage;

     public Consumer(AbstractStorage abstractStorage,int num){
        this.abstractStorage=abstractStorage;
        this.num=num;
     }

     public void consume(int num){
         abstractStorage.consumer(num);
     }

     @Override
    public void run(){
         consume(num);
     }

}
View Code

消息队列(仓库)实现类:

public class Storage1 implements AbstractStorage {
    //最大容量
    private final int MAX_SIZE = 100;
    //存储载体
    private LinkedList list =new LinkedList();

    @Override
    public void consumer(int num) {
        synchronized (list) {
            while (num > list.size()) {
                try {
                    list.wait();
                } catch (Exception e) {
                    e.printStackTrace();
                }
                System.out.println("【阻塞】当前要消费的数量:" + num + ",当前库存量:" + list.size() + "当前消费阻塞");
            }
            for (int i = 0; i < num; i++) {
                list.removeFirst();
            }
            System.out.println("【consumer】 "+Thread.currentThread().getName()+" 已消费产品数:" + num + ",现库存数:" + list.size());
            list.notifyAll();
        }
    }

    //生产
    @Override
    public void producer(int num) {
        synchronized (list) {
            while (list.size() + num > MAX_SIZE) {
                try {
                    list.wait();
                } catch (Exception e) {
                    e.printStackTrace();
                }
                System.out.println("【阻塞】当前队列已满,生产阻塞");
            }
            for (int i = 0; i < num; i++) {
                list.addLast(new Object());
            }
            System.out.println("【producer】 "+Thread.currentThread().getName()+ " 已生产产品数:" + num + ",现库存数:" + list.size());
            list.notifyAll();
        }
    }


}
View Code

测试类:

public class Test {
    public static void main(String[] args) {
        AbstractStorage abstractStorage =new Storage1();

        //生产者对象
        Producer p1 = new Producer(abstractStorage,10);
        Producer p2 = new Producer(abstractStorage,10);
        Producer p3 = new Producer(abstractStorage,10);
        Producer p4 = new Producer(abstractStorage,10);
        Producer p5 = new Producer(abstractStorage,10);
        Producer p6 = new Producer(abstractStorage,10);
        Producer p7 = new Producer(abstractStorage,10);
        Producer p8 = new Producer(abstractStorage,50);
        //消费者对象
        Consumer c1 = new Consumer(abstractStorage,20);
        Consumer c2 = new Consumer(abstractStorage,30);
        Consumer c3 = new Consumer(abstractStorage,50);

        c1.start();
        c2.start();
        c3.start();

        p1.start();
        p2.start();
        p3.start();
        p4.start();
        p5.start();
        p6.start();
        p7.start();
        p8.start();

    }



}
View Code

最终结果显示,我们能实现简单的生产消费,并且是线程同步的。