"यदि कोई कर्मचारी अपना काम अच्छी तरह से करना चाहता है, तो उसे पहले अपने औजारों को तेज करना होगा।" - कन्फ्यूशियस, "द एनालेक्ट्स ऑफ कन्फ्यूशियस। लू लिंगगोंग"
मुखपृष्ठ > प्रोग्रामिंग > स्प्रिंग बूट में RocketMQ के साथ बैच संदेश उपभोग को कैसे कार्यान्वित करें

स्प्रिंग बूट में RocketMQ के साथ बैच संदेश उपभोग को कैसे कार्यान्वित करें

2024-11-07 को प्रकाशित
ब्राउज़ करें:721

How to Implement Batch Message Consumption with RocketMQ in Spring Boot

1. निर्भरताएँ जोड़ना

सबसे पहले, अपनी pom.xml फ़ाइल में आवश्यक निर्भरताएँ जोड़ें:



    org.apache.rocketmq
    rocketmq-spring-boot-starter
    2.3.1
    
        
            org.apache.rocketmq
            rocketmq-client
        
    




    org.apache.rocketmq
    rocketmq-client
    5.3.0

2. कॉन्फ़िगरेशन फ़ाइल Bootstrap.yaml

अपनी RocketMQ सेटिंग्स को Bootstrap.yaml फ़ाइल में कॉन्फ़िगर करें:

rocketmq:
  name-server: 192.168.1.1:9876;192.168.1.2:9876;192.168.1.3:9876 # Replace with actual NameServer addresses
  consumer:
    group: consume-group-test
    access-key: access # Configure if ACL is used
    secret-key: secret
    consume-message-batch-max-size: 50  # Max messages per batch
    pull-batch-size: 100  # Max messages pulled from Broker
  topics:
    project: "group-topic-1"
  groups:
    project: "consume-group-1"  # Use different groups for different business processes

3. कॉन्फ़िगरेशन क्लास MqConfigProperties

कॉन्फ़िगरेशन क्लास MqConfigProperties बनाएं:

import org.apache.rocketmq.spring.autoconfigure.RocketMQProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
import lombok.Data;

import java.io.Serializable;

/**
 * RocketMQ Configuration Class
 */
@Data
@Component
@ConfigurationProperties(prefix = "rocketmq")
public class MqConfigProperties implements Serializable {

    private static final long serialVersionUID = 1L;

    @Autowired
    private RocketMQProperties rocketMQProperties;

    private TopicProperties topics;
    private GroupProperties groups;

    /**
     * Topic Configuration Class
     */
    @Data
    public static class TopicProperties implements Serializable {
        private static final long serialVersionUID = 1L;
        private String project;
    }

    /**
     * Consumer Group Configuration Class
     */
    @Data
    public static class GroupProperties implements Serializable {
        private static final long serialVersionUID = 1L;
        private String project;
    }
}

4. उपभोक्ता संहिता लागू करना

उपभोक्ता वर्ग उपयोगकर्ता उपभोक्ता बनाएं:

import com.alibaba.fastjson2.JSONObject;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.spring.support.RocketMQUtil;
import org.springframework.context.ApplicationContext;
import org.springframework.context.SmartLifecycle;
import org.springframework.stereotype.Component;
import lombok.extern.slf4j.Slf4j;

import javax.annotation.Resource;
import java.util.List;

/**
 * Batch Consumer Implementation
 */
@Component
@Slf4j
public class UserConsumer implements SmartLifecycle {

    @Resource
    private MqConfigProperties mqConfigProperties;

    @Resource
    private ApplicationContext applicationContext;

    private volatile boolean running;
    private DefaultMQPushConsumer consumer;

    @Override
    public void start() {
        if (isRunning()) {
            throw new IllegalStateException("Consumer is already running");
        }
        initConsumer();
        setRunning(true);
        log.info("UserConsumer started successfully.");
    }

    @Override
    public void stop() {
        if (isRunning() && consumer != null) {
            consumer.shutdown();
            setRunning(false);
            log.info("UserConsumer stopped.");
        }
    }

    @Override
    public boolean isRunning() {
        return running;
    }

    private void setRunning(boolean running) {
        this.running = running;
    }

    private void initConsumer() {
        String topic = mqConfigProperties.getTopics().getProject();
        String group = mqConfigProperties.getGroups().getProject();
        String nameServer = mqConfigProperties.getRocketMQProperties().getNameServer();
        String accessKey = mqConfigProperties.getRocketMQProperties().getConsumer().getAccessKey();
        String secretKey = mqConfigProperties.getRocketMQProperties().getConsumer().getSecretKey();

        RPCHook rpcHook = RocketMQUtil.getRPCHookByAkSk(applicationContext.getEnvironment(), accessKey, secretKey);
        consumer = rpcHook != null
                ? new DefaultMQPushConsumer(group, rpcHook, new AllocateMessageQueueAveragely())
                : new DefaultMQPushConsumer(group);

        consumer.setNamesrvAddr(nameServer);
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        consumer.setConsumeMessageBatchMaxSize(100);  // Set the batch size for consumption
        consumer.subscribe(topic, "*");
        consumer.setMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {
                log.info("Received {} messages", msgs.size());
                for (MessageExt message : msgs) {
                    String body = new String(message.getBody());
                    log.info("Processing message: {}", body);
                    User user = JSONObject.parseObject(body, User.class);
                    processUser(user);
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        log.info("UserConsumer initialized with topic [{}] and group [{}].", topic, group);
    }

    private void processUser(User user) {
        log.info("Processing user with ID: {}", user.getId());
        // Handle user-related business logic
    }
}

5. निर्माता उदाहरण कोड

बैच संदेश तैयार करने के लिए, आप निम्न उपयोगकर्ता निर्माता वर्ग का उपयोग कर सकते हैं:

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import java.util.ArrayList;
import java.util.List;

public class UserProducer {

    private DefaultMQProducer producer;

    public void sendBatchMessages(List users, String topic) {
        List messages = new ArrayList();
        for (User user : users) {
            messages.add(new Message(topic, JSONObject.toJSONString(user).getBytes()));
        }
        try {
            producer.send(messages);
        } catch (Exception e) {
            log.error("Error sending batch messages", e);
        }
    }
}

6. अतिरिक्त अनुकूलन सुझाव

  • प्रदर्शन अनुकूलन: आप उपभोक्ता थ्रेड पूल के आकार को समायोजित कर सकते हैं। डिफ़ॉल्ट रूप से, यह कंज्यूमथ्रेडमिन=20 और कंज्यूमथ्रेडमैक्स=20 पर सेट है। उच्च-समवर्ती परिदृश्यों में, थ्रेड पूल का आकार बढ़ाने से प्रदर्शन में वृद्धि हो सकती है।

  • त्रुटि प्रबंधन: जब उपभोग विफल हो जाता है, तो अनंत पुनः प्रयास लूप से बचने के लिए RECONSUME_LATER से सावधान रहें। अपनी व्यावसायिक आवश्यकताओं के आधार पर अधिकतम पुनः प्रयास संख्या निर्धारित करें।

  • किरायेदार अलगाव: गलत समूह से डेटा उपभोग से बचने के लिए विभिन्न व्यावसायिक मॉड्यूल के लिए अलग-अलग समूहों का उपयोग करें। यह उत्पादन परिवेश में विशेष रूप से महत्वपूर्ण है।

विज्ञप्ति वक्तव्य यह आलेख यहां पुन: प्रस्तुत किया गया है: https://dev.to/wilson_evan_1efa5910f8855/how-to-implement-batch-message-consemption-with-rocketmq-in-spring-boot-6gi?1 यदि कोई उल्लंघन है, तो कृपया स्टडी_गोलंग से संपर्क करें @163.com हटाएं
नवीनतम ट्यूटोरियल अधिक>

चीनी भाषा का अध्ययन करें

अस्वीकरण: उपलब्ध कराए गए सभी संसाधन आंशिक रूप से इंटरनेट से हैं। यदि आपके कॉपीराइट या अन्य अधिकारों और हितों का कोई उल्लंघन होता है, तो कृपया विस्तृत कारण बताएं और कॉपीराइट या अधिकारों और हितों का प्रमाण प्रदान करें और फिर इसे ईमेल पर भेजें: [email protected] हम इसे आपके लिए यथाशीघ्र संभालेंगे।

Copyright© 2022 湘ICP备2022001581号-3