日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

發(fā)送消息(多線程)

SOFAStack 消息隊列的消費者和生產(chǎn)者客戶端對象是線程安全的,可以在多個線程之間共享使用。

您可以在服務器上(或者多臺服務器)部署多個生產(chǎn)者和消費者實例,也可以在同一個生產(chǎn)者或消費者實例里采用多線程發(fā)送或接收消息,從而提高消息發(fā)送或接收 TPS 的能力。請避免為每個線程創(chuàng)建一個客戶端實例。

在多線程之間共享 Producer 的示例代碼如下:

import java.util.Properties;
import com.alipay.sofa.sofamq.client.PropertyKeyConst;
import io.openmessaging.api.Message;
import io.openmessaging.api.MessagingAccessPoint;
import io.openmessaging.api.OMS;
import io.openmessaging.api.OMSBuiltinKeys;
import io.openmessaging.api.Producer;
import io.openmessaging.api.SendResult;

public class Main {
    public static void main(String... args) {
        Properties credentials = new Properties();
        // 阿里云賬號 AccessKey 擁有所有 API 的訪問權限,風險很高。強烈建議您創(chuàng)建并使用 RAM 用戶進行 API 訪問或日常運維,請登錄 RAM 控制臺創(chuàng)建 RAM 用戶。
        // 此處以把 AccessKey 和 AccessKeySecret 保存在環(huán)境變量為例說明。
        // 強烈建議不要把 AccessKey 和 AccessKeySecret 保存到代碼里,會存在密鑰泄漏風險
        credentials.setProperty(OMSBuiltinKeys.ACCESS_KEY, "SOFA_AK_ENV");        
        credentials.setProperty(OMSBuiltinKeys.SECRET_KEY, "SOFA_SK_ENV");
        // 設置 TCP 接入域名,進入控制臺的概覽頁面查看接入點配置
        MessagingAccessPoint accessPoint = OMS.builder().driver("sofamq").endpoint("$endpoint")
                .withCredentials(credentials).build();
        Properties properties = new Properties();
        // 設置用戶實例,進入控制臺的概覽頁面查看接入點配置
        properties.setProperty(PropertyKeyConst.INSTANCE_ID, "$instanceId");

        // 您在控制臺創(chuàng)建的 Group ID
        properties.setProperty(PropertyKeyConst.GROUP_ID, "YOUR_GROUP");
        finalProducer producer = accessPoint.createProducer(properties);
        producer.start();

        //創(chuàng)建的 Producer 和 Consumer 對象為線程安全的,可以在多線程間進行共享,避免每個線程創(chuàng)建一個實例。

        //在 thread 和 anotherThread 中共享 Producer 對象,并發(fā)地發(fā)送消息至消息隊列。
        Thread thread = new Thread(newRunnable() {
            @Override
            public void run() {
                try {
                    Message msg = new Message(//
                            // Message 所屬的 Topic
                            "TopicTestMQ",
                            // Message Tag 可理解為 Gmail 中的標簽,對消息進行再歸類,方便 Consumer 指定過濾條件在消息隊列的服務器過濾
                            "TagA",
                            // Message Body 可以是任何二進制形式的數(shù)據(jù),消息隊列不做任何干預,
                            // 需要 Producer 與 Consumer 協(xié)商好一致的序列化和反序列化方式
                            "Hello MQ".getBytes());
                    SendResult sendResult = producer.send(msg);
                    // 同步發(fā)送消息,只要不拋異常就是成功
                    if (sendResult != null) {
                        System.out.println(new Date() + " Send mq message success. Topic is:" + MqConfig.TOPIC + " msgId is: " + sendResult.getMessageId());
                    }
                } catch (Exception e) {
                    // 消息發(fā)送失敗,需要進行重試處理,可重新發(fā)送這條消息或持久化這條數(shù)據(jù)進行補償處理
                    System.out.println(new Date() + " Send mq message failed. Topic is:" + MqConfig.TOPIC);
                    e.printStackTrace();
                }
            }
        });
        thread.start();
        Thread anotherThread = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    Message msg = new Message("TopicTestMQ", "TagA", "Hello MQ".getBytes());
                    SendResult sendResult = producer.send(msg);
                    // 同步發(fā)送消息,只要不拋異常就是成功
                    if (sendResult != null) {
                        System.out.println(new Date() + " Send mq message success. Topic is:" + MqConfig.TOPIC + " msgId is: " + sendResult.getMessageId());
                    }
                } catch (Exception e) {
                    // 消息發(fā)送失敗,需要進行重試處理,可重新發(fā)送這條消息或持久化這條數(shù)據(jù)進行補償處理
                    System.out.println(new Date() + " Send mq message failed. Topic is:" + MqConfig.TOPIC);
                    e.printStackTrace();
                }
            }
        });
        anotherThread.start();
        // Producer 實例若不再使用時,可將 Producer 關閉,進行資源釋放
        // producer.shutdown();
    }
}