發(fā)送消息(多線程)
更新時間:
SOFAStack 消息隊列的消費者和生產(chǎn)者客戶端對象是線程安全的,可以在多個線程之間共享使用。
您可以在服務器上(或者多臺服務器)部署多個生產(chǎn)者和消費者實例,也可以在同一個生產(chǎn)者或消費者實例里采用多線程發(fā)送或接收消息,從而提高消息發(fā)送或接收 TPS 的能力。請避免為每個線程創(chuàng)建一個客戶端實例。
在多線程之間共享 Producer 的示例代碼如下:
import java.util.Properties;
import com.alipay.sofa.sofamq.client.PropertyKeyConst;
import io.openmessaging.api.Message;
import io.openmessaging.api.MessagingAccessPoint;
import io.openmessaging.api.OMS;
import io.openmessaging.api.OMSBuiltinKeys;
import io.openmessaging.api.Producer;
import io.openmessaging.api.SendResult;
public class Main {
public static void main(String... args) {
Properties credentials = new Properties();
// 阿里云賬號 AccessKey 擁有所有 API 的訪問權限,風險很高。強烈建議您創(chuàng)建并使用 RAM 用戶進行 API 訪問或日常運維,請登錄 RAM 控制臺創(chuàng)建 RAM 用戶。
// 此處以把 AccessKey 和 AccessKeySecret 保存在環(huán)境變量為例說明。
// 強烈建議不要把 AccessKey 和 AccessKeySecret 保存到代碼里,會存在密鑰泄漏風險
credentials.setProperty(OMSBuiltinKeys.ACCESS_KEY, "SOFA_AK_ENV");
credentials.setProperty(OMSBuiltinKeys.SECRET_KEY, "SOFA_SK_ENV");
// 設置 TCP 接入域名,進入控制臺的概覽頁面查看接入點配置
MessagingAccessPoint accessPoint = OMS.builder().driver("sofamq").endpoint("$endpoint")
.withCredentials(credentials).build();
Properties properties = new Properties();
// 設置用戶實例,進入控制臺的概覽頁面查看接入點配置
properties.setProperty(PropertyKeyConst.INSTANCE_ID, "$instanceId");
// 您在控制臺創(chuàng)建的 Group ID
properties.setProperty(PropertyKeyConst.GROUP_ID, "YOUR_GROUP");
finalProducer producer = accessPoint.createProducer(properties);
producer.start();
//創(chuàng)建的 Producer 和 Consumer 對象為線程安全的,可以在多線程間進行共享,避免每個線程創(chuàng)建一個實例。
//在 thread 和 anotherThread 中共享 Producer 對象,并發(fā)地發(fā)送消息至消息隊列。
Thread thread = new Thread(newRunnable() {
@Override
public void run() {
try {
Message msg = new Message(//
// Message 所屬的 Topic
"TopicTestMQ",
// Message Tag 可理解為 Gmail 中的標簽,對消息進行再歸類,方便 Consumer 指定過濾條件在消息隊列的服務器過濾
"TagA",
// Message Body 可以是任何二進制形式的數(shù)據(jù),消息隊列不做任何干預,
// 需要 Producer 與 Consumer 協(xié)商好一致的序列化和反序列化方式
"Hello MQ".getBytes());
SendResult sendResult = producer.send(msg);
// 同步發(fā)送消息,只要不拋異常就是成功
if (sendResult != null) {
System.out.println(new Date() + " Send mq message success. Topic is:" + MqConfig.TOPIC + " msgId is: " + sendResult.getMessageId());
}
} catch (Exception e) {
// 消息發(fā)送失敗,需要進行重試處理,可重新發(fā)送這條消息或持久化這條數(shù)據(jù)進行補償處理
System.out.println(new Date() + " Send mq message failed. Topic is:" + MqConfig.TOPIC);
e.printStackTrace();
}
}
});
thread.start();
Thread anotherThread = new Thread(new Runnable() {
@Override
public void run() {
try {
Message msg = new Message("TopicTestMQ", "TagA", "Hello MQ".getBytes());
SendResult sendResult = producer.send(msg);
// 同步發(fā)送消息,只要不拋異常就是成功
if (sendResult != null) {
System.out.println(new Date() + " Send mq message success. Topic is:" + MqConfig.TOPIC + " msgId is: " + sendResult.getMessageId());
}
} catch (Exception e) {
// 消息發(fā)送失敗,需要進行重試處理,可重新發(fā)送這條消息或持久化這條數(shù)據(jù)進行補償處理
System.out.println(new Date() + " Send mq message failed. Topic is:" + MqConfig.TOPIC);
e.printStackTrace();
}
}
});
anotherThread.start();
// Producer 實例若不再使用時,可將 Producer 關閉,進行資源釋放
// producer.shutdown();
}
}
文檔內(nèi)容是否對您有幫助?