java测试pulsar实例

java测试pulsar实例

需求

最近公司上了pulsar服务,然后我们需要学习pulsar相关的内容。最好的办法就是自己学习pulsar环境的搭建,然后搭建一个pulsar-server.并且自己建立pulsar-client的消费者和生产者,互相调用,测试连通

pulsar-server

使用docker搭建是最方便的。
输入如下命令就可以啦

docker run -it -p 28000:80 -p 28080:8080 -p 26650:6650 apachepulsar/pulsar-standalone

它会去本地建立一个标准的pulsar server,其中各个端口的意义分别是:

80: the port for pulsar dashboard
8080: the http service url for pulsar service
6650: the binary protocol service url for pulsar service

我这边映射到了28000,28080,26650三个端口。

pulsar-client测试之代码结构


如上图所示,有4个文件,
Client是连接的代码
MessageConsumer是单主题订阅(消费者
MessageConsumerAll是订阅所有主题(消费者
MessageProducer是发布指定主题(生产者

pulsar-client测试之Client.java

配置连接信息。0.0.0.0是IP地址,如果你需要使用,请换成你自己的pulsar服务地址

package pulsar.client;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import com.sun.xml.internal.ws.Closeable;
public class Client {
    private PulsarClient client;
    public Client() throws PulsarClientException {
        client = PulsarClient.builder()
                .serviceUrl("pulsar://0.0.0.0:26650/")
                .build();
    }
    public void Close() throws PulsarClientException {
    	client.close();
    }
    public PulsarClient getPulsarClient(){
        return client;
    }
}

pulsar-client测试之MessageConsumer.java

单主题订阅,这段代码是演示单主题订阅,打印收到的订阅内容,不关闭连接

package pulsar.client;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
public class MessageConsumer {
	private Client client;
	private Consumer consumer;
	public MessageConsumer(String topic, String subscription) throws PulsarClientException {
		client = new Client();
		consumer = createConsumer(topic, subscription);
	}
	private Consumer createConsumer(String topic, String subscription) throws PulsarClientException {
		return client.getPulsarClient().newConsumer().topic(topic).subscriptionName(subscription)
				.ackTimeout(10, TimeUnit.SECONDS).subscriptionType(SubscriptionType.Exclusive).subscribe();
	}
	public void receiveMessage() throws ExecutionException, InterruptedException, PulsarClientException {
		/***
		 * 用来异步获取,保持回话
		 */
		do {
			// Wait for a message
			CompletableFuture<Message> msg = consumer.receiveAsync();
			System.out.printf("Message received: %s", new String(msg.get().getData()));
			// Acknowledge the message so that it can be deleted by the message broker
			consumer.acknowledge(msg.get());
		} while (true);
	}
	public String getMessage() throws ExecutionException, InterruptedException, PulsarClientException {
		/***
		 * 获取一次,就关闭会话
		 */
		// Wait for a message
		System.out.printf("Start pulsar");
		CompletableFuture<Message> msg = consumer.receiveAsync();
		// System.out.printf("Message received: %s", new String(msg.get().getData()));
		String result = "topic is: " + msg.get().getTopicName() + ",data is: " + new String(msg.get().getData());
		// Acknowledge the message so that it can be deleted by the message broker
		consumer.acknowledge(msg.get());
		consumer.close();
		client.Close();
		return result;
	}
	public static void main(String[] args) throws PulsarClientException, ExecutionException, InterruptedException {
		MessageConsumer consumer = new MessageConsumer("topic1", "my-sub");
		 consumer.receiveMessage();
//		String reString = consumer.getMessage();
//		System.err.println(reString);
		// consumer.client.Close();
	}
}

pulsar-client测试之MessageConsumerAll.java

下面这段代码是演示订阅服务器上的所有主题,收到一条消息之后,打印主题和内容,然后关闭连接

package pulsar.client;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
public class MessageConsumer {
	private Client client;
	private Consumer consumer;
	public MessageConsumer(String topic, String subscription) throws PulsarClientException {
		client = new Client();
		consumer = createConsumer(topic, subscription);
	}
	private Consumer createConsumer(String topic, String subscription) throws PulsarClientException {
		return client.getPulsarClient().newConsumer().topic(topic).subscriptionName(subscription)
				.ackTimeout(10, TimeUnit.SECONDS).subscriptionType(SubscriptionType.Exclusive).subscribe();
	}
	public void receiveMessage() throws ExecutionException, InterruptedException, PulsarClientException {
		/***
		 * 用来异步获取,保持回话
		 */
		do {
			// Wait for a message
			CompletableFuture<Message> msg = consumer.receiveAsync();
			System.out.printf("Message received: %s", new String(msg.get().getData()));
			// Acknowledge the message so that it can be deleted by the message broker
			consumer.acknowledge(msg.get());
		} while (true);
	}
	public String getMessage() throws ExecutionException, InterruptedException, PulsarClientException {
		/***
		 * 获取一次,就关闭会话
		 */
		// Wait for a message
		System.out.printf("Start pulsar");
		CompletableFuture<Message> msg = consumer.receiveAsync();
		// System.out.printf("Message received: %s", new String(msg.get().getData()));
		String result = "topic is: " + msg.get().getTopicName() + ",data is: " + new String(msg.get().getData());
		// Acknowledge the message so that it can be deleted by the message broker
		consumer.acknowledge(msg.get());
		consumer.close();
		client.Close();
		return result;
	}
	public static void main(String[] args) throws PulsarClientException, ExecutionException, InterruptedException {
		MessageConsumer consumer = new MessageConsumer("topic1", "my-sub");
		 consumer.receiveMessage();
//		String reString = consumer.getMessage();
//		System.err.println(reString);
		// consumer.client.Close();
	}
}

pulsar-client测试之MessageProducer.java

下面这段代码是发布主题和内容到pulsar服务器,发布一次之后,关闭连接

package pulsar.client;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import java.util.concurrent.TimeUnit;
public class MessageProducer {
    private Client client;
    private Producer<byte[]> producer;
    public MessageProducer(String topic) throws PulsarClientException {
        client = new Client();
        producer = createProducer(topic);
    }
    private Producer<byte[]> createProducer(String topic) throws PulsarClientException {
        return client.getPulsarClient().newProducer()
                .topic(topic)
                .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS)
                .sendTimeout(10, TimeUnit.SECONDS)
                .blockIfQueueFull(true)
                .create();
    }
    public void sendMessage(String message) {
        producer.sendAsync(message.getBytes()).thenAccept(msgId -> {
            System.out.printf("Message with ID %s successfully sent", msgId);
        });
    }
    public void sendOnce(String message) {
    	/**
    	 * 发送一次就关闭
    	 */
    	try {
			producer.send(message.getBytes());
			System.out.printf("Message with content %s successfully sent", message);
			producer.close();
			client.Close();
		} catch (PulsarClientException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
    }
    // todo add exceptionally().
    public void close(Producer<byte[]> producer){
        producer.closeAsync()
                .thenRun(() -> System.out.println("Producer closed"));
    }
    public static void main(String[] args) throws PulsarClientException {
        MessageProducer producer = new MessageProducer("topic1");
//        producer.sendMessage("Hello World ,lalla");
        producer.sendOnce("Hello World ,lizhenwei");
    }
}

运行效果

生产者console log:

Message with content Hello World ,lizhenwei successfully sent

消费者console log

Start pulsar receive:
topic is: persistent://public/default/topic1,data is: Hello World ,lizhenwei

苏ICP备18047533号-2