Java简单实现MQ架构和思路01

发布时间 2023-03-30 12:58:18作者: 小飞fei

实现一个 MQ(消息队列)架构可以涉及到很多方面,包括消息的生产和消费、消息的存储和传输、消息的格式和协议等等。下面是一个简单的 MQ 架构的实现示例,仅供参考:

  1. 定义消息格式和协议:我们可以定义一个简单的消息格式,比如 JSON 格式,包含消息的 ID、内容、发送时间等信息。同时,我们可以定义一个简单的协议,比如 TCP 协议,用来传输消息。

  2. 实现消息的存储:我们可以使用数据库或者文件系统等方式来存储消息。对于消息队列来说,可以考虑使用类似于链表的数据结构来存储消息,每个节点包含一个消息和指向下一个节点的指针。

  3. 实现消息的生产和消费:我们可以使用多线程来实现消息的生产和消费。具体地,我们可以定义一个生产者线程和多个消费者线程,生产者线程负责从外部系统中获取消息并存储到消息队列中,消费者线程负责从消息队列中获取消息并进行处理。

  4. 实现消息的传输:我们可以使用 TCP 协议来传输消息。具体地,我们可以定义一个服务器程序,负责监听指定的端口号,并接受客户端的连接请求。一旦连接建立,服务器程序就可以和客户端进行数据传输。

综上所述,一个简单的 MQ 架构的实现可以包含以上几个部分。实际上,MQ 的实现还涉及到很多其他方面,比如消息的确认和重试、消息的过期和删除、消息的优先级和分区等等。因此,在实际项目中,需要根据具体的需求和场景来进行设计和实现。

以下是一个简单的 Java 消息队列示例代码,使用了多线程和 TCP 协议:

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.LinkedList;

public class MessageQueue {

    private LinkedList<Message> queue = new LinkedList<Message>();

    public synchronized void putMessage(Message message) {
        queue.add(message);
        notifyAll();
    }

    public synchronized Message getMessage() throws InterruptedException {
        while (queue.isEmpty()) {
            wait();
        }
        return queue.removeFirst();
    }

    public static void main(String[] args) {
        MessageQueue queue = new MessageQueue();
        Producer producer = new Producer(queue);
        Consumer consumer1 = new Consumer(queue);
        Consumer consumer2 = new Consumer(queue);
        producer.start();
        consumer1.start();
        consumer2.start();
    }

}

class Message implements java.io.Serializable {

    private static final long serialVersionUID = 1L;

    private String id;
    private String content;

    public Message(String id, String content) {
        this.id = id;
        this.content = content;
    }

    public String getId() {
        return id;
    }

    public String getContent() {
        return content;
    }

}

class Producer extends Thread {

    private MessageQueue queue;

    public Producer(MessageQueue queue) {
        this.queue = queue;
    }

    public void run() {
        try {
            ServerSocket serverSocket = new ServerSocket(9999);
            while (true) {
                Socket socket = serverSocket.accept();
                ObjectInputStream ois = new ObjectInputStream(socket.getInputStream());
                Message message = (Message) ois.readObject();
                queue.putMessage(message);
                ois.close();
                socket.close();
            }
        } catch (IOException | ClassNotFoundException e) {
            e.printStackTrace();
        }
    }

}

class Consumer extends Thread {

    private MessageQueue queue;

    public Consumer(MessageQueue queue) {
        this.queue = queue;
    }

    public void run() {
        try {
            while (true) {
                Message message = queue.getMessage();
                System.out.println("Consumer " + getId() + " got message: " + message.getContent());
                Socket socket = new Socket("localhost", 9998);
                ObjectOutputStream oos = new ObjectOutputStream(socket.getOutputStream());
                oos.writeObject(message);
                oos.close();
                socket.close();
            }
        } catch (IOException | InterruptedException e) {
            e.printStackTrace();
        }
    }

}

在这个示例中,我们定义了一个简单的消息队列,使用了一个链表来存储消息。然后,我们定义了一个生产者线程和两个消费者线程,生产者线程负责从外部系统中获取消息并存储到消息队列中,消费者线程负责从消息队列中获取消息并进行处理。我们使用 TCP 协议来传输消息,生产者线程监听端口号 9999,消费者线程通过连接 localhost 的 9998 端口来发送消息。在实际应用中,需要根据具体需求进行修改和扩展。

并在此代码基础上增加 协议类和注释

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.LinkedList;

// 消息队列类
public class MessageQueue {

    private LinkedList<Message> queue = new LinkedList<Message>();

    // 往队列中添加消息
    public synchronized void putMessage(Message message) {
        queue.add(message);
        notifyAll();
    }

    // 从队列中取出消息
    public synchronized Message getMessage() throws InterruptedException {
        while (queue.isEmpty()) {
            wait();
        }
        return queue.removeFirst();
    }

    // 主函数
    public static void main(String[] args) {
        MessageQueue queue = new MessageQueue();
        Producer producer = new Producer(queue);
        Consumer consumer1 = new Consumer(queue);
        Consumer consumer2 = new Consumer(queue);
        producer.start();
        consumer1.start();
        consumer2.start();
    }

}

// 消息类
class Message implements java.io.Serializable {

    private static final long serialVersionUID = 1L;

    private String type; // 消息类型,例如:ORDER, PAYMENT, SHIPMENT, 等等
    private String content; // 消息内容
    private String sender; // 消息发送者
    private String receiver; // 消息接收者

    // 构造方法
    public Message(String type, String content, String sender, String receiver) {
        super();
        this.type = type;
        this.content = content;
        this.sender = sender;
        this.receiver = receiver;
    }

    // 返回消息类型
    public String getType() {
        return type;
    }

    // 返回消息内容
    public String getContent() {
        return content;
    }

    // 返回消息发送者
    public String getSender() {
        return sender;
    }

    // 返回消息接收者
    public String getReceiver() {
        return receiver;
    }

}

// 编解码协议类
class Protocol {

    public static final String HEADER = "MESSAGE"; // 消息头
    public static final String DELIMITER = ":"; // 分隔符

    // 将消息编码为字符串
    public static String encode(Message message) {
        return HEADER + DELIMITER + message.getType() + DELIMITER + message.getContent() + DELIMITER
                + message.getSender() + DELIMITER + message.getReceiver();
    }

    // 将字符串解码为消息
    public static Message decode(String messageString) {
        String[] parts = messageString.split(DELIMITER);
        return new Message(parts[1], parts[2], parts[3], parts[4]);
    }

}

// 生产者线程类
class Producer extends Thread {

    private MessageQueue queue;
    private ServerSocket serverSocket;

    // 构造方法
    public Producer(MessageQueue queue) {
        this.queue = queue;
    }

    // 启动线程
    public void run() {
        try {
            serverSocket = new ServerSocket(9999); // 监听端口号9999
            while (true) {
                Socket socket = serverSocket.accept(); // 接收连接
                ObjectInputStream in = new ObjectInputStream(socket.getInputStream()); // 创建输入流
                String messageString = (String) in.readObject(); // 读取消息字符串
                Message message = Protocol.decode(messageString); // 解码消息
                queue.putMessage(message); // 添加消息到队列
                in.close(); // 关闭输入流
                socket.close(); // 关闭连接
            }
        } catch (IOException | ClassNotFoundException e) {
            e.printStackTrace();
        }
    }

}

// 消费者线程类
class Consumer extends Thread {

    private MessageQueue queue;

    // 构造方法
    public Consumer(MessageQueue queue) {
        this.queue = queue;
    }

    // 启动线程
    public void run() {
        while (true) {
            try {
                Message message = queue.getMessage(); // 从队列中取出消息
                System.out.println("Received message: " + message.getContent()); // 输出消息内容
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

}

以下是在上面代码基础上增加消息确认和重试功能:

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.LinkedList;

// 消息队列类
public class MessageQueue {

    private LinkedList<Message> queue = new LinkedList<Message>();

    // 往队列中添加消息
    public synchronized void putMessage(Message message) {
        queue.add(message);
        notifyAll();
    }

    // 从队列中取出消息
    public synchronized Message getMessage() throws InterruptedException {
        while (queue.isEmpty()) {
            wait();
        }
        return queue.removeFirst();
    }

    // 主函数
    public static void main(String[] args) {
        MessageQueue queue = new MessageQueue();
        Producer producer = new Producer(queue);
        Consumer consumer1 = new Consumer(queue);
        Consumer consumer2 = new Consumer(queue);
        producer.start();
        consumer1.start();
        consumer2.start();
    }

}

// 消息类
class Message implements java.io.Serializable {

    private static final long serialVersionUID = 1L;

    private String type; // 消息类型,例如:ORDER, PAYMENT, SHIPMENT, 等等
    private String content; // 消息内容
    private String sender; // 消息发送者
    private String receiver; // 消息接收者
    private boolean confirmed; // 消息是否已确认
    private int retryCount; // 消息重试次数

    // 构造方法
    public Message(String type, String content, String sender, String receiver) {
        super();
        this.type = type;
        this.content = content;
        this.sender = sender;
        this.receiver = receiver;
        confirmed = false;
        retryCount = 0;
    }

    // 返回消息类型
    public String getType() {
        return type;
    }

    // 返回消息内容
    public String getContent() {
        return content;
    }

    // 返回消息发送者
    public String getSender() {
        return sender;
    }

    // 返回消息接收者
    public String getReceiver() {
        return receiver;
    }

    // 返回消息是否已确认
    public boolean isConfirmed() {
        return confirmed;
    }

    // 设置消息已确认
    public void setConfirmed() {
        confirmed = true;
    }

    // 返回消息重试次数
    public int getRetryCount() {
        return retryCount;
    }

    // 增加消息重试次数
    public void incrementRetryCount() {
        retryCount++;
    }

}

// 编解码协议类
class Protocol {

    public static final String HEADER = "MESSAGE"; // 消息头
    public static final String DELIMITER = ":"; // 分隔符

    // 将消息编码为字符串
    public static String encode(Message message) {
        return HEADER + DELIMITER + message.getType() + DELIMITER + message.getContent() + DELIMITER
                + message.getSender() + DELIMITER + message.getReceiver() + DELIMITER + message.isConfirmed()
                + DELIMITER + message.getRetryCount();
    }

    // 将字符串解码为消息
    public static Message decode(String messageString) {
        String[] parts = messageString.split(DELIMITER);
        Message message = new Message(parts[1], parts[2], parts[3], parts[4]);
        message.setConfirmed(Boolean.parseBoolean(parts[5]));
        message.incrementRetryCount();
        return message;
    }

}

// 生产者线程类
class Producer extends Thread {

    private MessageQueue queue;
    private ServerSocket serverSocket;

    // 构造方法
    public Producer(MessageQueue queue) {
        this.queue = queue;
    }

   // 启动线程
    public void run() {
        try {
            serverSocket = new ServerSocket(9999); // 监听端口号9999
            while (true) {
                Socket socket = serverSocket.accept(); // 接收连接
                ObjectInputStream in = new ObjectInputStream(socket.getInputStream()); // 创建输入流
                ObjectOutputStream out = new ObjectOutputStream(socket.getOutputStream()); // 创建输出流
                String messageString = (String) in.readObject(); // 读取消息字符串
                Message message = Protocol.decode(messageString); // 解码为消息对象
                if (!message.isConfirmed()) { // 如果消息未确认
                    queue.putMessage(message); // 添加到队列中
                }
                out.writeObject("ACK"); // 发送确认消息
                out.flush();
                in.close();
                out.close();
                socket.close();
            }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (ClassNotFoundException e) {
            e.printStackTrace
        }
    }
}