您现在的位置是:主页 > news > 上海广告公司网站制作/免费招收手游代理

上海广告公司网站制作/免费招收手游代理

admin2025/4/19 19:00:57news

简介上海广告公司网站制作,免费招收手游代理,注册公司线上的网址,有合作社做网站得不这是本人学习的总结,主要学习资料如下 马士兵教育 目录1、消息发送模式2、消息消费模式3、顺序消息的消费和发送3.1、全局顺序3.2、部分顺序3.3、部分顺序代码样例3.3.1、依赖3.3.2、发送信息3.3.3、接受信息4、延时消息的消费和发送4.1、代码样例5、批量发送消息5…

上海广告公司网站制作,免费招收手游代理,注册公司线上的网址,有合作社做网站得不这是本人学习的总结,主要学习资料如下 马士兵教育 目录1、消息发送模式2、消息消费模式3、顺序消息的消费和发送3.1、全局顺序3.2、部分顺序3.3、部分顺序代码样例3.3.1、依赖3.3.2、发送信息3.3.3、接受信息4、延时消息的消费和发送4.1、代码样例5、批量发送消息5…

这是本人学习的总结,主要学习资料如下

  • 马士兵教育

目录

  • 1、消息发送模式
  • 2、消息消费模式
  • 3、顺序消息的消费和发送
    • 3.1、全局顺序
    • 3.2、部分顺序
    • 3.3、部分顺序代码样例
      • 3.3.1、依赖
      • 3.3.2、发送信息
      • 3.3.3、接受信息
  • 4、延时消息的消费和发送
    • 4.1、代码样例
  • 5、批量发送消息
    • 5.1、代码样例


1、消息发送模式

三种发送方式,同步,异步和单向。

同步机会简单的一问一答,我发过去消息,MQ会立刻给我回复收到与否;没收到回复前会阻塞当前线程直到收到回复。

异步和通不一样也是一问一答,你需要回复收到,但是我不会等,我会做自己的事。

单向则是我发送消息,你收不收到我不关心。
请添加图片描述

2、消息消费模式

我们有两种消费模式,一种是集群消费,另一种是广播消费。默认情况和大部分情况是使用集群消费

  • 集群消费:消费消息往往比生产消息要复杂,所以一个porducer往往对应多个consumer。一个topic+tag里有多条消息等待着多个consumer消费。比如下面的图例,group 1里有三个consumer,他们各自分别从一个message queue里拿消息。

    一条消息只会被送到某一个message queue中,被某一个consumer消费,绝不会出现同一条消息被多个consumer消费的情况。

  • 广播消费:和集群模式相反,每一条消息都会发送给所有的consumer,也就是说一个消息会被消费多次,被不同的消费者各自消费一次。

请添加图片描述



3、顺序消息的消费和发送

顺序消息的消费和发送指的是,消费者消费的消息的顺序和生产者生产消息的顺序是一样的。

比如生产者生产消息是1,2,3,4,消费者消费的顺序也是1,2,3,4

顺序消息又分为全局顺序和部分顺序。


3.1、全局顺序

全局顺序出现在topic里只有一个message queue的情况。

因为message queue是队列的结构,消息先进先出,自然保证了消息消费的顺序。
请添加图片描述


3.2、部分顺序

部分顺序出现在topic有多个message queue的情况。

因为有多个message queue对应多个consumer,而不同的consumer消费的效率有所不同,所以每个消息被消费的顺序不严格与生产者生成的相同。

它只能保证消息每个message queue中的消息顺序消费。

如果业务上需要控制顺序,生产者在生产消息时可以将消息送到指定的message queue。比如下图的M1,M4指定送到queue1,保证M1M4先消费。

同时消费者也只从指定的message queue拿消息即可
请添加图片描述

3.3、部分顺序代码样例

3.3.1、依赖

<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.8.0</version>
</dependency>
<dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.24</version><scope>provided</scope>
</dependency>



3.3.2、发送信息

代码的场景是发送订单,有多个消息用来通知订单状态变化。订单生命周期created -> payed -> completed

其中有两个订单,id分别为1和2。订单1的状态消息都发送到message queue 1,订单2 的状态消息都发送到message queue 2

public class ScheduleProducerExample {public static void main(String[] args) throws Exception{DefaultMQProducer producer = getProducer();List<Order> orderList = getOrderList();for(int i = 0; i < orderList.size(); i++) {String body = orderList.get(i).toString();Message msg = new Message("PartOrder", null, "KEY" + i, body.getBytes());SendResult sendResult = producer.send(msg, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> queueList, Message message, Object arg) {Integer id = (Integer) arg;int index = id % queueList.size();return queueList.get(index);}}, orderList.get(i).getId());System.out.println(String.format("SendResult status:%s, queueId:%d, body:%s",sendResult.getSendStatus(),sendResult.getMessageQueue().getQueueId(),body));}producer.shutdown();}private static List<Order> getOrderList() {List<Order> list = new ArrayList<>();list.add(new Order(1, "created"));list.add(new Order(2, "created"));list.add(new Order(1, "payed"));list.add(new Order(2, "payed"));list.add(new Order(1, "completed"));list.add(new Order(2, "completed"));return list;}private static DefaultMQProducer getProducer() throws MQClientException {DefaultMQProducer producer = new DefaultMQProducer("group1");producer.setNamesrvAddr("localhost:9876");producer.start();return producer;}}

输出结果,可以看到订单1都发送到了queueId:1,订单而都发送到了queueId:2
请添加图片描述


3.3.3、接受信息

注意在consumer.registerMessageListener中会定义如何消费信息。返回值用于定义消费的后续处理,比如返回SUSPEND_CURRENT_QUEUE_A_MOMENT则表示这次消息没有消费成功,下次还会继续消费该消息;

SUCCESS则表示消息消费成功,下次会消费下一条消息

public class PartialOrderConsumerExample {public static void main(String[] args) throws Exception{DefaultMQPushConsumer consumer = getConsumer();consumer.registerMessageListener(new MessageListenerOrderly() {Random random = new Random();@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext context) {context.setAutoCommit(true);for(MessageExt msg: list) {System.out.println("consumeThread=" + Thread.currentThread().getName()+ ", queueId=" + msg.getQueueId() + ", context:" + new String(msg.getBody()));}try{// mock business processTimeUnit.MILLISECONDS.sleep(random.nextInt(300));} catch (Exception e) {e.printStackTrace();// means that failed to consume this msg. In next time will still consume this msg.return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;}// means that success to consume this msg. In the next time will consume next msg.return ConsumeOrderlyStatus.SUCCESS;}});consumer.start();while(true){}//        consumer.shutdown();}private static DefaultMQPushConsumer getConsumer() throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_consumer1");consumer.setNamesrvAddr("localhost:9876");consumer.subscribe("PartOrder", "*");return consumer;}
}

返回结果消费成功,保证了消息部分有序。
请添加图片描述



4、延时消息的消费和发送

顾名思义,就是延时发送消息。下面看代码示例

4.1、代码样例

就普通地发送消息即可,只需要设置消息的属性就可达到延时的效果。比如msg.setDelayTimeLevel(1);

延时有18个等级,用数字1~18表示,分别代表1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

public class DelayProducerExample {public static void main(String[] args) throws Exception{DefaultMQProducer producer = getProducer();Message msg = getMessage();SendResult sendResult = producer.send(msg);System.out.printf("%s%n", sendResult);producer.shutdown();}private static DefaultMQProducer getProducer() throws MQClientException {DefaultMQProducer producer = new DefaultMQProducer("group1");producer.setNamesrvAddr("localhost:9876");producer.start();return producer;}private static Message getMessage() {Message msg = new Message();msg.setTopic("topic1");msg.setTags("tag1");msg.setBody("hello world".getBytes());// delayTimeLevel: 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2hmsg.setDelayTimeLevel(1);return msg;}
}



5、批量发送消息

就是一次性发送多个消息,大批量消息发送中可提高效率。

需要注意的是,批量发送的消息默认都是发送到同一个message queue

当然,批量发送的消息有上限。一次不能超过4M。超过的话我们需要拆分消息。

5.1、代码样例

其实比较简单,就是普通的send方法,不过参数换成了List,我们只需要往List填信息就可以批量发送了。

public class BatchProducerExample {public static void main(String[] args) throws Exception{DefaultMQProducer producer = getProducer();List<Message> messageList = getMessageList();SendResult sendResult = producer.send(messageList);System.out.printf("%s%n", sendResult);producer.shutdown();}private static List<Message> getMessageList() {List<Message> messageList = new ArrayList<>();messageList.add(new Message("topic1", "tag1", "hello world1".getBytes()));messageList.add(new Message("topic1", "tag1", "hello world2".getBytes()));messageList.add(new Message("topic1", "tag1", "hello world3".getBytes()));messageList.add(new Message("topic1", "tag1", "hello world4".getBytes()));messageList.add(new Message("topic1", "tag1", "hello world5".getBytes()));messageList.add(new Message("topic1", "tag1", "hello world6".getBytes()));return messageList;}private static DefaultMQProducer getProducer() throws MQClientException {DefaultMQProducer producer = new DefaultMQProducer("group1");producer.setNamesrvAddr("localhost:9876");producer.start();return producer;}
}

下面是接受的结果,所有消息都在同一个message queue
请添加图片描述