{"id":384,"date":"2018-12-26T11:41:38","date_gmt":"2018-12-26T03:41:38","guid":{"rendered":"https:\/\/www.yinyubo.cn\/?p=384"},"modified":"2022-05-17T09:44:15","modified_gmt":"2022-05-17T01:44:15","slug":"pulsar","status":"publish","type":"post","link":"https:\/\/www.yinyubo.com\/?p=384","title":{"rendered":"java\u6d4b\u8bd5pulsar\u5b9e\u4f8b"},"content":{"rendered":"<h1><span style=\"color: #33cccc;\"><strong>\u9700\u6c42<\/strong><\/span><\/h1>\n<p>\u6700\u8fd1\u516c\u53f8\u4e0a\u4e86pulsar\u670d\u52a1\uff0c\u7136\u540e\u6211\u4eec\u9700\u8981\u5b66\u4e60pulsar\u76f8\u5173\u7684\u5185\u5bb9\u3002\u6700\u597d\u7684\u529e\u6cd5\u5c31\u662f\u81ea\u5df1\u5b66\u4e60pulsar\u73af\u5883\u7684\u642d\u5efa\uff0c\u7136\u540e\u642d\u5efa\u4e00\u4e2apulsar-server.\u5e76\u4e14\u81ea\u5df1\u5efa\u7acbpulsar-client\u7684\u6d88\u8d39\u8005\u548c\u751f\u4ea7\u8005\uff0c\u4e92\u76f8\u8c03\u7528\uff0c\u6d4b\u8bd5\u8fde\u901a<\/p>\n<h1><span style=\"color: #33cccc;\"><strong>pulsar-server<\/strong><\/span><\/h1>\n<p>\u4f7f\u7528docker\u642d\u5efa\u662f\u6700\u65b9\u4fbf\u7684\u3002<br \/>\n\u8f93\u5165\u5982\u4e0b\u547d\u4ee4\u5c31\u53ef\u4ee5\u5566<\/p>\n<pre class=\"lang:python decode:true \">docker run -it -p 28000:80 -p 28080:8080 -p 26650:6650 apachepulsar\/pulsar-standalone<\/pre>\n<p>\u5b83\u4f1a\u53bb\u672c\u5730\u5efa\u7acb\u4e00\u4e2a\u6807\u51c6\u7684pulsar server\uff0c\u5176\u4e2d\u5404\u4e2a\u7aef\u53e3\u7684\u610f\u4e49\u5206\u522b\u662f\uff1a<\/p>\n<blockquote><p>80: the port for pulsar dashboard<br \/>\n8080: the http service url for pulsar service<br \/>\n6650: the binary protocol service url for pulsar service<\/p><\/blockquote>\n<p>\u6211\u8fd9\u8fb9\u6620\u5c04\u5230\u4e8628000,28080\uff0c26650\u4e09\u4e2a\u7aef\u53e3\u3002<\/p>\n<h1><span style=\"color: #33cccc;\"><strong>pulsar-client\u6d4b\u8bd5\u4e4b\u4ee3\u7801\u7ed3\u6784<\/strong><\/span><\/h1>\n<p><img loading=\"lazy\" decoding=\"async\" class=\"alignnone wp-image-385 size-full\" src=\"https:\/\/yinyubo-1257235934.cos.ap-nanjing.myqcloud.com\/content\/pulsar.png\" alt=\"\" width=\"209\" height=\"90\" \/><br \/>\n\u5982\u4e0a\u56fe\u6240\u793a\uff0c\u67094\u4e2a\u6587\u4ef6\uff0c<br \/>\nClient\u662f\u8fde\u63a5\u7684\u4ee3\u7801<br \/>\nMessageConsumer\u662f\u5355\u4e3b\u9898\u8ba2\u9605\uff08<span style=\"color: #ff0000;\">\u6d88\u8d39\u8005<\/span>\uff09<br \/>\nMessageConsumerAll\u662f\u8ba2\u9605\u6240\u6709\u4e3b\u9898\uff08<span style=\"color: #ff0000;\">\u6d88\u8d39\u8005<\/span>\uff09<br \/>\nMessageProducer\u662f\u53d1\u5e03\u6307\u5b9a\u4e3b\u9898\uff08<span style=\"color: #ff0000;\">\u751f\u4ea7\u8005<\/span>\uff09<\/p>\n<h1><span style=\"color: #33cccc;\"><strong>pulsar-client\u6d4b\u8bd5\u4e4bClient.java<\/strong><\/span><\/h1>\n<p>\u914d\u7f6e\u8fde\u63a5\u4fe1\u606f\u30020.0.0.0\u662fIP\u5730\u5740\uff0c\u5982\u679c\u4f60\u9700\u8981\u4f7f\u7528\uff0c\u8bf7\u6362\u6210\u4f60\u81ea\u5df1\u7684pulsar\u670d\u52a1\u5730\u5740<\/p>\n<pre class=\"lang:python decode:true EnlighterJSRAW\" data-enlighter-language=\"golang\"><code class=\"EnlighterJSRAW\" data-enlighter-language=\"golang\">package pulsar.client;\nimport org.apache.pulsar.client.api.PulsarClient;\nimport org.apache.pulsar.client.api.PulsarClientException;\nimport com.sun.xml.internal.ws.Closeable;\npublic class Client {\n    private PulsarClient client;\n    public Client() throws PulsarClientException {\n        client = PulsarClient.builder()\n                .serviceUrl(\"pulsar:\/\/0.0.0.0:26650\/\")\n                .build();\n    }\n    public void Close() throws PulsarClientException {\n    \tclient.close();\n    }\n    public PulsarClient getPulsarClient(){\n        return client;\n    }\n}<\/code><\/pre>\n<h1><span style=\"color: #33cccc;\"><strong>pulsar-client\u6d4b\u8bd5\u4e4bMessageConsumer.java<\/strong><\/span><\/h1>\n<p>\u5355\u4e3b\u9898\u8ba2\u9605\uff0c\u8fd9\u6bb5\u4ee3\u7801\u662f\u6f14\u793a\u5355\u4e3b\u9898\u8ba2\u9605\uff0c\u6253\u5370\u6536\u5230\u7684\u8ba2\u9605\u5185\u5bb9\uff0c\u4e0d\u5173\u95ed\u8fde\u63a5<\/p>\n<pre class=\"lang:python decode:true  EnlighterJSRAW\" data-enlighter-language=\"golang\"><code class=\"EnlighterJSRAW\" data-enlighter-language=\"golang\">package pulsar.client;\nimport org.apache.pulsar.client.api.Consumer;\nimport org.apache.pulsar.client.api.Message;\nimport org.apache.pulsar.client.api.PulsarClientException;\nimport org.apache.pulsar.client.api.SubscriptionType;\nimport java.util.concurrent.CompletableFuture;\nimport java.util.concurrent.ExecutionException;\nimport java.util.concurrent.TimeUnit;\npublic class MessageConsumer {\n\tprivate Client client;\n\tprivate Consumer consumer;\n\tpublic MessageConsumer(String topic, String subscription) throws PulsarClientException {\n\t\tclient = new Client();\n\t\tconsumer = createConsumer(topic, subscription);\n\t}\n\tprivate Consumer createConsumer(String topic, String subscription) throws PulsarClientException {\n\t\treturn client.getPulsarClient().newConsumer().topic(topic).subscriptionName(subscription)\n\t\t\t\t.ackTimeout(10, TimeUnit.SECONDS).subscriptionType(SubscriptionType.Exclusive).subscribe();\n\t}\n\tpublic void receiveMessage() throws ExecutionException, InterruptedException, PulsarClientException {\n\t\t\/***\n\t\t * \u7528\u6765\u5f02\u6b65\u83b7\u53d6\uff0c\u4fdd\u6301\u56de\u8bdd\n\t\t *\/\n\t\tdo {\n\t\t\t\/\/ Wait for a message\n\t\t\tCompletableFuture&lt;Message&gt; msg = consumer.receiveAsync();\n\t\t\tSystem.out.printf(\"Message received: %s\", new String(msg.get().getData()));\n\t\t\t\/\/ Acknowledge the message so that it can be deleted by the message broker\n\t\t\tconsumer.acknowledge(msg.get());\n\t\t} while (true);\n\t}\n\tpublic String getMessage() throws ExecutionException, InterruptedException, PulsarClientException {\n\t\t\/***\n\t\t * \u83b7\u53d6\u4e00\u6b21\uff0c\u5c31\u5173\u95ed\u4f1a\u8bdd\n\t\t *\/\n\t\t\/\/ Wait for a message\n\t\tSystem.out.printf(\"Start pulsar\");\n\t\tCompletableFuture&lt;Message&gt; msg = consumer.receiveAsync();\n\t\t\/\/ System.out.printf(\"Message received: %s\", new String(msg.get().getData()));\n\t\tString result = \"topic is: \" + msg.get().getTopicName() + \",data is: \" + new String(msg.get().getData());\n\t\t\/\/ Acknowledge the message so that it can be deleted by the message broker\n\t\tconsumer.acknowledge(msg.get());\n\t\tconsumer.close();\n\t\tclient.Close();\n\t\treturn result;\n\t}\n\tpublic static void main(String[] args) throws PulsarClientException, ExecutionException, InterruptedException {\n\t\tMessageConsumer consumer = new MessageConsumer(\"topic1\", \"my-sub\");\n\t\t consumer.receiveMessage();\n\/\/\t\tString reString = consumer.getMessage();\n\/\/\t\tSystem.err.println(reString);\n\t\t\/\/ consumer.client.Close();\n\t}\n}<\/code><\/pre>\n<h1><span style=\"color: #33cccc;\"><strong>pulsar-client\u6d4b\u8bd5\u4e4bMessageConsumerAll.java<\/strong><\/span><\/h1>\n<p>\u4e0b\u9762\u8fd9\u6bb5\u4ee3\u7801\u662f\u6f14\u793a\u8ba2\u9605\u670d\u52a1\u5668\u4e0a\u7684\u6240\u6709\u4e3b\u9898\uff0c\u6536\u5230\u4e00\u6761\u6d88\u606f\u4e4b\u540e\uff0c\u6253\u5370\u4e3b\u9898\u548c\u5185\u5bb9\uff0c\u7136\u540e\u5173\u95ed\u8fde\u63a5<\/p>\n<pre class=\"lang:python decode:true  EnlighterJSRAW\" data-enlighter-language=\"golang\"><code class=\"EnlighterJSRAW\" data-enlighter-language=\"golang\">package pulsar.client;\nimport org.apache.pulsar.client.api.Consumer;\nimport org.apache.pulsar.client.api.Message;\nimport org.apache.pulsar.client.api.PulsarClientException;\nimport org.apache.pulsar.client.api.SubscriptionType;\nimport java.util.concurrent.CompletableFuture;\nimport java.util.concurrent.ExecutionException;\nimport java.util.concurrent.TimeUnit;\npublic class MessageConsumer {\n\tprivate Client client;\n\tprivate Consumer consumer;\n\tpublic MessageConsumer(String topic, String subscription) throws PulsarClientException {\n\t\tclient = new Client();\n\t\tconsumer = createConsumer(topic, subscription);\n\t}\n\tprivate Consumer createConsumer(String topic, String subscription) throws PulsarClientException {\n\t\treturn client.getPulsarClient().newConsumer().topic(topic).subscriptionName(subscription)\n\t\t\t\t.ackTimeout(10, TimeUnit.SECONDS).subscriptionType(SubscriptionType.Exclusive).subscribe();\n\t}\n\tpublic void receiveMessage() throws ExecutionException, InterruptedException, PulsarClientException {\n\t\t\/***\n\t\t * \u7528\u6765\u5f02\u6b65\u83b7\u53d6\uff0c\u4fdd\u6301\u56de\u8bdd\n\t\t *\/\n\t\tdo {\n\t\t\t\/\/ Wait for a message\n\t\t\tCompletableFuture&lt;Message&gt; msg = consumer.receiveAsync();\n\t\t\tSystem.out.printf(\"Message received: %s\", new String(msg.get().getData()));\n\t\t\t\/\/ Acknowledge the message so that it can be deleted by the message broker\n\t\t\tconsumer.acknowledge(msg.get());\n\t\t} while (true);\n\t}\n\tpublic String getMessage() throws ExecutionException, InterruptedException, PulsarClientException {\n\t\t\/***\n\t\t * \u83b7\u53d6\u4e00\u6b21\uff0c\u5c31\u5173\u95ed\u4f1a\u8bdd\n\t\t *\/\n\t\t\/\/ Wait for a message\n\t\tSystem.out.printf(\"Start pulsar\");\n\t\tCompletableFuture&lt;Message&gt; msg = consumer.receiveAsync();\n\t\t\/\/ System.out.printf(\"Message received: %s\", new String(msg.get().getData()));\n\t\tString result = \"topic is: \" + msg.get().getTopicName() + \",data is: \" + new String(msg.get().getData());\n\t\t\/\/ Acknowledge the message so that it can be deleted by the message broker\n\t\tconsumer.acknowledge(msg.get());\n\t\tconsumer.close();\n\t\tclient.Close();\n\t\treturn result;\n\t}\n\tpublic static void main(String[] args) throws PulsarClientException, ExecutionException, InterruptedException {\n\t\tMessageConsumer consumer = new MessageConsumer(\"topic1\", \"my-sub\");\n\t\t consumer.receiveMessage();\n\/\/\t\tString reString = consumer.getMessage();\n\/\/\t\tSystem.err.println(reString);\n\t\t\/\/ consumer.client.Close();\n\t}\n}<\/code><\/pre>\n<h1><span style=\"color: #33cccc;\"><strong>pulsar-client\u6d4b\u8bd5\u4e4bMessageProducer.java<\/strong><\/span><\/h1>\n<p>\u4e0b\u9762\u8fd9\u6bb5\u4ee3\u7801\u662f\u53d1\u5e03\u4e3b\u9898\u548c\u5185\u5bb9\u5230pulsar\u670d\u52a1\u5668\uff0c\u53d1\u5e03\u4e00\u6b21\u4e4b\u540e\uff0c\u5173\u95ed\u8fde\u63a5<\/p>\n<pre class=\"lang:python decode:true  EnlighterJSRAW\" data-enlighter-language=\"golang\"><code class=\"EnlighterJSRAW\" data-enlighter-language=\"golang\">package pulsar.client;\nimport org.apache.pulsar.client.api.Producer;\nimport org.apache.pulsar.client.api.PulsarClientException;\nimport java.util.concurrent.TimeUnit;\npublic class MessageProducer {\n    private Client client;\n    private Producer&lt;byte[]&gt; producer;\n    public MessageProducer(String topic) throws PulsarClientException {\n        client = new Client();\n        producer = createProducer(topic);\n    }\n    private Producer&lt;byte[]&gt; createProducer(String topic) throws PulsarClientException {\n        return client.getPulsarClient().newProducer()\n                .topic(topic)\n                .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS)\n                .sendTimeout(10, TimeUnit.SECONDS)\n                .blockIfQueueFull(true)\n                .create();\n    }\n    public void sendMessage(String message) {\n        producer.sendAsync(message.getBytes()).thenAccept(msgId -&gt; {\n            System.out.printf(\"Message with ID %s successfully sent\", msgId);\n        });\n    }\n    public void sendOnce(String message) {\n    \t\/**\n    \t * \u53d1\u9001\u4e00\u6b21\u5c31\u5173\u95ed\n    \t *\/\n    \ttry {\n\t\t\tproducer.send(message.getBytes());\n\t\t\tSystem.out.printf(\"Message with content %s successfully sent\", message);\n\t\t\tproducer.close();\n\t\t\tclient.Close();\n\t\t} catch (PulsarClientException e) {\n\t\t\t\/\/ TODO Auto-generated catch block\n\t\t\te.printStackTrace();\n\t\t}\n    }\n    \/\/ todo add exceptionally().\n    public void close(Producer&lt;byte[]&gt; producer){\n        producer.closeAsync()\n                .thenRun(() -&gt; System.out.println(\"Producer closed\"));\n    }\n    public static void main(String[] args) throws PulsarClientException {\n        MessageProducer producer = new MessageProducer(\"topic1\");\n\/\/        producer.sendMessage(\"Hello World ,lalla\");\n        producer.sendOnce(\"Hello World ,lizhenwei\");\n    }\n}<\/code><\/pre>\n<h1><span style=\"color: #33cccc;\"><strong>\u8fd0\u884c\u6548\u679c<\/strong><\/span><\/h1>\n<p>\u751f\u4ea7\u8005console log:<\/p>\n<pre class=\"lang:python decode:true \"><code class=\"EnlighterJSRAW\" data-enlighter-language=\"golang\">Message with content Hello World ,lizhenwei successfully sent<\/code><\/pre>\n<p>\u6d88\u8d39\u8005console log<\/p>\n<pre class=\"lang:python decode:true\"><code class=\"EnlighterJSRAW\" data-enlighter-language=\"golang\">Start pulsar receive:\ntopic is: persistent:\/\/public\/default\/topic1,data is: Hello World ,lizhenwei<\/code><\/pre>\n","protected":false},"excerpt":{"rendered":"<p>\u9700\u6c42 \u6700\u8fd1\u516c\u53f8\u4e0a\u4e86pulsar\u670d\u52a1\uff0c\u7136\u540e\u6211\u4eec\u9700\u8981\u5b66\u4e60pulsar\u76f8\u5173\u7684\u5185\u5bb9\u3002\u6700\u597d\u7684\u529e\u6cd5\u5c31\u662f\u81ea\u5df1\u5b66\u4e60pulsar [&hellip;]<\/p>\n","protected":false},"author":1,"featured_media":386,"comment_status":"closed","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[4],"tags":[],"class_list":["post-384","post","type-post","status-publish","format-standard","has-post-thumbnail","hentry","category-k8s"],"_links":{"self":[{"href":"https:\/\/www.yinyubo.com\/index.php?rest_route=\/wp\/v2\/posts\/384","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/www.yinyubo.com\/index.php?rest_route=\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/www.yinyubo.com\/index.php?rest_route=\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/www.yinyubo.com\/index.php?rest_route=\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"https:\/\/www.yinyubo.com\/index.php?rest_route=%2Fwp%2Fv2%2Fcomments&post=384"}],"version-history":[{"count":2,"href":"https:\/\/www.yinyubo.com\/index.php?rest_route=\/wp\/v2\/posts\/384\/revisions"}],"predecessor-version":[{"id":927,"href":"https:\/\/www.yinyubo.com\/index.php?rest_route=\/wp\/v2\/posts\/384\/revisions\/927"}],"wp:featuredmedia":[{"embeddable":true,"href":"https:\/\/www.yinyubo.com\/index.php?rest_route=\/wp\/v2\/media\/386"}],"wp:attachment":[{"href":"https:\/\/www.yinyubo.com\/index.php?rest_route=%2Fwp%2Fv2%2Fmedia&parent=384"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/www.yinyubo.com\/index.php?rest_route=%2Fwp%2Fv2%2Fcategories&post=384"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/www.yinyubo.com\/index.php?rest_route=%2Fwp%2Fv2%2Ftags&post=384"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}