日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

步驟三:開發單元化應用

在單元化架構中,您需要對本地應用進行開發改造,完成單元化功能配置。本文將基于轉賬、積分等場景分別介紹微服務(MS)中的 SOFARPC、消息隊列(MQ)以及分布式事務(DTX)如何完成 LDC 單元化相關的業務開發。

前提條件

路由參數為 userId,格式如 080066600000002,取第一位 + 0 作為分片位(sharding key)。當 userId = 080066600000002 時,分片位(sharding key)= 0 + 0 = 00當 userId = 180066600000000 時,分片位(sharding key)= 1 + 0 = 10

微服務中的 SOFARPC

按照 RPC 服務引用的標準使用規范,有如下要求:

  1. 接口入參的第一個參數為userId 路由信息。

  2. SOFARPC 默認從這個參數中提取和生成兩位分片位(sharding key),默認提取 UID 的倒數二、三位,參見 com.alipay.sofa.rpc.ldc.DefaultLdcRouteProvider

在本 demo 中,因無法滿足該規范,將自定義一個 PocLdcRouteProvider,實現提取兩位分片位的邏輯,替代 DefaultLdcRouteProvider 的標準實現。

import com.alipay.sofa.rpc.api.ldc.LdcRouteJudgeResult;
import com.alipay.sofa.rpc.api.ldc.LdcRouteProvider;
import com.alipay.sofa.rpc.common.utils.CommonUtils;
import com.alipay.sofa.rpc.config.ConsumerConfig;
import com.alipay.sofa.rpc.core.request.SofaRequest;
import com.alipay.zoneclient.api.EnterpriseZoneClientHolder;

public class PocLdcRouteProvider implements LdcRouteProvider {
    @Override
    public LdcRouteJudgeResult uidGenerator(ConsumerConfig consumerConfig, SofaRequest sofaRequest) {
        LdcRouteJudgeResult result = new LdcRouteJudgeResult();

        //如果沒有開啟ldc,那么就直接返回false
        if (!EnterpriseZoneClientHolder.isZoneMode()) {
            return result;
        }

        Object[] methodArgs = sofaRequest.getMethodArgs();
        if (CommonUtils.isEmpty(methodArgs)) {
            result.setSuccess(false);
            return result;
        }
        Object methodArg = methodArgs[0];
        if (methodArg instanceof String){
            String routeUid = (String) methodArg;
            String uid = UIDUtil.parseShardingKeyFromBacc(routeUid);
            result.setSuccess(true);
            result.setRouteId(uid);
            return result;
        }
        result.setSuccess(false);
        return result;
    }

    @Override
    public int order() {
        return 2;
    }
}

UIDUtil 類的代碼如下:

public final class UIDUtil {
    private UIDUtil() { }

    public static String parseShardingKeyFromBacc(String bacc) {
        if (bacc == null) {
            throw new NullPointerException("bacc is null");
        }
        if ("".equals(bacc.trim())) {
            throw new IllegalArgumentException("bacc is empty");
        }
        return bacc.substring(0, 1) + "0";
    }
}

交易啟動時,首先需要向 SOFARPC 注冊定制的路由邏輯,代碼如下:

com.alipay.sofa.rpc.ldc.LdcProviderManager.getInstance().registeLdcRouteProvider(new PocLdcRouteProvider());

在取款接口中,添加一個 UID 參數,代碼如下:

 @TwoPhaseBusinessAction(name = "pocDebitFirstAction", commitMethod = "commit", rollbackMethod = "rollback")
    public AccountTransResult debit(String uid, @BusinessActionContextParameter(isParamInProperty = true) AccountTransRequest accountTransRequest,
                                    BusinessActionContext businessActionContext);

在存款接口中,添加一個 UID 參數,代碼如下:

@TwoPhaseBusinessAction(name ="pocCreditFirstAction", commitMethod ="commit", rollbackMethod ="rollback")
public AccountTransResult credit(String uid,@BusinessActionContextParameter(isParamInProperty =true)AccountTransRequest accountTransRequest,
BusinessActionContext businessActionContext);

有關 SOFARPC 單元化配置的更多信息,參見 單元化配置

消息隊列

此處假設一個存款加積分的場景:在賬戶 A 存入一筆錢后,需要增加賬戶 A 的積分。此場景下,需要消息隊列(MQ)中間件通過一條發送事務消息,通知積分中心執行積分增加操作。

發送事務消息

發送事務消息代碼示例如下:

 // 啟動 producer
    public void afterPropertiesSet()throwsException {
        MessagingAccessPoint accessPoint = OMS.builder().driver("sofamq").build();

        Properties properties = new Properties();
        //替換 GID_PGROUP 為您實際創建的 Group ID
        properties.setProperty(PropertyKeyConst.GROUP_ID, "GID_PGROUP");
        transactionProducer = accessPoint.createTransactionProducer(properties, newLocalTransactionChecker() {
            @Override
            publicTransactionStatus check (Message msg){
                returnTransactionStatus.CommitTransaction;
            }
        });
        transactionProducer.start();
        LogUtil.info(LOGGER, "transaction producer started");
    }

    // 發送消息
    public void publishMessage(TxnFlowRequest request) {
        try {
            PointAcctDTO pointAcctDTO = buildPointReturnDTO(request);
            Message message = new Message(TOPIC, EVENT_CODE,     hessianSerializer.serialize(pointAcctDTO));
            String shardingKey = UIDUtil.parseShardingKeyFromBacc(request.getBacc());
            message.putUserProperties(UserPropKey.CELL_UID, shardingKey);
            transactionProducer.send(message, (msg, arg) -> {
                returnTransactionStatus.CommitTransaction;
            }, null);
            LogUtil.info(LOGGER, "Public a message, success. TOPIC [{}] EVENTCODE [{}] id [{}] bacc [{}] payload [{}]",
                    message.getTopic(), EVENT_CODE, message.getMsgID(), request.getBacc(), request);
        } catch (Exception e) {
            LogUtil.error(LOGGER, e, "Public a message, failure. TOPIC [{}] EVENTCODE [{}] bacc [{}] error [{}]",
                    TOPIC, EVENT_CODE, request.getBacc(), e.getMessage());
            throw new TxnFlowException(PropertyConstant.CODE_SYS_ERR, "call msgBorker error", e);
        }
    }

接收事務消息

接收事務消息代碼示例如下:

// 啟動 consumer
    @Override
    public void afterPropertiesSet() throws Exception {
        Properties properties = new Properties();
        //替換 GID_PGROUP 為您實際創建的 Group ID
        properties.setProperty(PropertyKeyConst.GROUP_ID, "GID_SGROUP");
        properties.setProperty(PropertyKeyConst.LDC_SUB_MODE, "RZONE");
        Consumer consumer = OMS.builder().driver("sofamq").build().createConsumer(properties);
        //替換 TP_TEST_POC 為您實際創建的消息 Topic
        consumer.subscribe("TP_TEST_POC", "EC_ACCT_POINT", this);
        consumer.start();
    }

    // 消息
    public Action consume(Message message, ConsumeContext context) {
        try {
            PointAcctDTO pointAcctDTO = hessianSerializer.deserialize(message.getBody(),
                    PointAcctDTO.class.getName());


            serviceTemplate.executeWithTransaction(pointAcctDTO, newServiceCallback() {

                @Override
                public String getServiceName () {
                    return "returnPoint";
                }

                @Override
                public void checkParam () {

                    //參數校驗,如果為null或者空則消息不用重試
                    pointAcctDTO.checkParameters();
                }

                @Override
                public void checkIdempotent () {
                    //消息有可能出現重發的情況,如果這次積分已經更新過了,則直接返回成功。 消息冪等處理有問題
                    List<TPointOrderDO> tPointOrderDOs = tPointOrderDAO.searchTxnSn(
                            pointAcctDTO.getBacc(), pointAcctDTO.getTxnSn());
                    if (tPointOrderDOs != null && tPointOrderDOs.size() != 0) {
                        throwPcException.valueOf(PcStatusCode.IDEMPOTENT);
                    }
                }

                @Override
                public void execute () {

                    //埋點
                    mockActionServices.mockAction(ActionPointConstant.POINTCENTER_MESSAGE,
                            pointAcctDTO.getBacc());

                    List<TPotAcctDO> tPotAcctDOs = tPotAcctDAO.lockForUpdate(pointAcctDTO.getBacc());

                    //根據原來代碼邏輯,如果沒找到則打日志;
                    if (tPotAcctDOs == null || tPotAcctDOs.size() != 1) {
                        LogUtil.error(LOGGER, "error = {}, payload = {}",
                                PcStatusCode.BACC_COUNT_ERROR, "select t_pot_acct record:"
                                        + tPotAcctDOs);
                        return;
                    }
                    TPotAcctDO tPotAcctDO = tPotAcctDOs.get(0);

                    //根據原來代碼邏輯,如果狀態不為0則直接返回。
                    if (!PropertyConstant.ACC_STATUS_0.equals(tPotAcctDO.getStatus())) {
                        return;
                    }

                    tPotAcctDO.setPotBal(tPotAcctDO.getPotBal().add(pointAcctDTO.getPotBal()));
                    tPotAcctDO.setLastTxnSn(pointAcctDTO.getTxnSn());
                    tPotAcctDAO.update(tPotAcctDO);
                    TPointOrderDO tPointOrderDO = new TPointOrderDO(pointAcctDTO.getBacc(),
                            pointAcctDTO.getPotBal(), pointAcctDTO.getTxnSn());
                    tPointOrderDAO.insertOrder(tPointOrderDO);

                }
            });

        } catch (CodecException e) {
            LogUtil.error(LOGGER, e, "consume pointAcctDTO={} exception.");
        }
        // do something
        return Action.CommitMessage;
    }

分布式事務

在事務發起方的 spring 事務模板內,調用 dtxService.start() 方法開啟分布式事務,需要在開啟時,增加單元化架構下的分片參數,用做服務路由和數據庫路由。

代碼示例如下:

Map<String,Object> properties =newHashMap<String,Object>();

// 開啟分布式事務
String shardingKey =UIDUtil.parseShardingKeyFromBacc(request.getBacc());
dtxService.start("accttrans", request.getTxnSn(), shardingKey, properties);

有關分布式事務單元化配置的更多信息,參見 接入單元化能力

任務調度

您無需進行額外配置,即可使用單元化的任務調度能力,請參見 使用跨 zone 網關

說明

建議檢查客戶端的啟動參數是否正常傳入:-D com.alipay.ldc.zone=xxx //指定所屬的邏輯單元

API 網關

您無需進行額外配置,即可使用 API 網關的單元化路由能力,請參見 創建路由規則創建 API

應用參數配置

在應用部署時,您還需要傳入以下參數。各參數均通過 JVM 的 -D 參數傳入到應用中。

中間件相關參數

  • com.alipay.instanceid:當前租戶中間件實例唯一標識,可以在消息隊列控制臺概覽頁 接入配置 > 實例 ID 中獲取。

  • com.antcloud.antvip.endpoint:ACVIP 地址,可以在消息隊列控制臺概覽頁 接入配置 > TCP 協議內網接入點 中獲取。

  • com.alipay.env:環境標識,取值 shared,表示運行在共享模式。

  • com.antcloud.mw.access:中間件訪問控制鍵值。

  • com.antcloud.mw.secret:中間件訪問控制密鑰。

同城雙活相關參數

  • com.alipay.ldc.zone:單元名稱。

  • com.alipay.ldc.datacenter:物理機房名稱。

LDC 單元化相關參數

  • zmode:LDC 單元化開關。取值為 true 時,表示開啟 LDC 單元化。

  • com.alipay.ldc.strictmode:LDC 單元化嚴格模式開關。取值為 true 時,表示開啟 LDC 單元化嚴格模式。