यदि आपको काफ्का की मूल बातें जानने की जरूरत है, जैसे कि इसकी प्रमुख विशेषताएं, घटक और फायदे, तो मेरे पास यहां इसे कवर करने वाला एक लेख है। कृपया इसकी समीक्षा करें और निम्नलिखित अनुभागों के साथ आगे बढ़ने के लिए डॉकर का उपयोग करके काफ्का इंस्टॉलेशन पूरा होने तक चरणों का पालन करें।
काफ्का को NodeJS से जोड़ने के बारे में लेख में दिए गए उदाहरण के समान, इस स्रोत कोड में भी दो भाग शामिल हैं: एक निर्माता को प्रारंभ करना संदेश को काफ्का भेजने के लिए और उपभोक्ता का उपयोग करके के संदेशों की सदस्यता लेने के लिए &&&]विषय।
बेहतर समझ के लिए मैं कोड को छोटे भागों में तोड़ दूंगा। सबसे पहले, आइए परिवर्तनीय मानों को परिभाषित करें।
package main import ( "fmt" "github.com/confluentinc/confluent-kafka-go/kafka" ) var ( broker = "localhost:9092" groupId = "group-id" topic = "topic-name" )- यहां, पैकेज
github.com/confluentinc/confluent-kafka-go/kafka का उपयोग काफ्का से कनेक्ट करने के लिए किया जाता है।
-ब्रोकर होस्ट का पता है; यदि आप ZooKeeper का उपयोग कर रहे हैं, तो तदनुसार होस्ट पता बदलें।
-groupId और विषय को आवश्यकतानुसार बदला जा सकता है।
अगला निर्माता को प्रारंभ कर रहा है।
package main import ( "fmt" "github.com/confluentinc/confluent-kafka-go/kafka" ) var ( broker = "localhost:9092" groupId = "group-id" topic = "topic-name" )उपरोक्त कोड का उपयोग संदेशों की एक श्रृंखला भेजने के लिए किया जाता है
{"संदेश 1", "संदेश 2", "संदेश 3"} किसी विषय पर और गो-रूटीन ई के लिए के साथ ईवेंट के माध्यम से पुनरावृत्त करने के लिए:= रेंज पी.इवेंट्स() और डिलीवरी परिणाम प्रिंट करें, चाहे वह एक हो सफलता या विफलता.
नेक्स्ट एकउपभोक्ता बना रहा है, जो विषय को सब्सक्राइब करता है और संदेश प्राप्त करता है।
funcstartConsumer() {
सी, त्रुटि := kafka.NewConsumer(&kafka.ConfigMap{
"बूटस्ट्रैप.सर्वर": ब्रोकर,
"ग्रुप.आईडी": ग्रुपआईडी,
"auto.offset.reset": "सबसे पहले",
})
यदि त्रुटि !=शून्य {
घबराहट(गलती)
}
सी. सदस्यता लें (विषय, शून्य)
के लिए {
संदेश, त्रुटि := सी.रीडमैसेज(-1)
यदि त्रुटि == शून्य {
fmt.Printf('%s पर संदेश: %s\n', msg.TopicPartition, string(msg.Value))
} अन्य {
fmt.Printf('उपभोक्ता त्रुटि: %v (%v)\n', त्रुटि, संदेश)
तोड़ना
}
}
सी.बंद करें()
}
func startConsumer() { c, err := kafka.NewConsumer(&kafka.ConfigMap{ "bootstrap.servers": broker, "group.id": groupId, "auto.offset.reset": "earliest", }) if err != nil { panic(err) } c.Subscribe(topic, nil) for { msg, err := c.ReadMessage(-1) if err == nil { fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value)) } else { fmt.Printf("Consumer error: %v (%v)\n", err, msg) break } } c.Close() }निर्माता
और उपभोक्ता बनाने के लिए फ़ंक्शन को कॉल करें। वास्तविक दुनिया के परिदृश्य में, निर्माता और उपभोक्ता की तैनाती आमतौर पर माइक्रोसर्विसेज सिस्टम में दो अलग-अलग सर्वरों पर की जाती है।
func मुख्य() {
स्टार्टप्रोड्यूसर()
प्रारंभउपभोक्ता()
}
func main() { startProducer() startConsumer() }
हैप्पी कोडिंग!
यदि आपको यह सामग्री उपयोगी लगती है, तो कृपया लेखक का समर्थन करने और अधिक दिलचस्प सामग्री तलाशने के लिए मेरे ब्लॉग पर मूल लेख पर जाएं।
नोडजेएस
अस्वीकरण: उपलब्ध कराए गए सभी संसाधन आंशिक रूप से इंटरनेट से हैं। यदि आपके कॉपीराइट या अन्य अधिकारों और हितों का कोई उल्लंघन होता है, तो कृपया विस्तृत कारण बताएं और कॉपीराइट या अधिकारों और हितों का प्रमाण प्रदान करें और फिर इसे ईमेल पर भेजें: [email protected] हम इसे आपके लिए यथाशीघ्र संभालेंगे।
Copyright© 2022 湘ICP备2022001581号-3