在單元化架構中,您需要對本地應用進行開發改造,完成單元化功能配置。本文將基于轉賬、積分等場景分別介紹微服務(MS)中的 SOFARPC、消息隊列(MQ)以及分布式事務(DTX)如何完成 LDC 單元化相關的業務開發。
前提條件
路由參數為 userId,格式如 080066600000002,取第一位 + 0 作為分片位(sharding key)。當 userId = 080066600000002 時,分片位(sharding key)= 0 + 0 = 00當 userId = 180066600000000 時,分片位(sharding key)= 1 + 0 = 10
微服務中的 SOFARPC
按照 RPC 服務引用的標準使用規范,有如下要求:
接口入參的第一個參數為
userId
路由信息。SOFARPC 默認從這個參數中提取和生成兩位分片位(sharding key),默認提取 UID 的倒數二、三位,參見
com.alipay.sofa.rpc.ldc.DefaultLdcRouteProvider
。
在本 demo 中,因無法滿足該規范,將自定義一個 PocLdcRouteProvider,實現提取兩位分片位的邏輯,替代 DefaultLdcRouteProvider 的標準實現。
import com.alipay.sofa.rpc.api.ldc.LdcRouteJudgeResult;
import com.alipay.sofa.rpc.api.ldc.LdcRouteProvider;
import com.alipay.sofa.rpc.common.utils.CommonUtils;
import com.alipay.sofa.rpc.config.ConsumerConfig;
import com.alipay.sofa.rpc.core.request.SofaRequest;
import com.alipay.zoneclient.api.EnterpriseZoneClientHolder;
public class PocLdcRouteProvider implements LdcRouteProvider {
@Override
public LdcRouteJudgeResult uidGenerator(ConsumerConfig consumerConfig, SofaRequest sofaRequest) {
LdcRouteJudgeResult result = new LdcRouteJudgeResult();
//如果沒有開啟ldc,那么就直接返回false
if (!EnterpriseZoneClientHolder.isZoneMode()) {
return result;
}
Object[] methodArgs = sofaRequest.getMethodArgs();
if (CommonUtils.isEmpty(methodArgs)) {
result.setSuccess(false);
return result;
}
Object methodArg = methodArgs[0];
if (methodArg instanceof String){
String routeUid = (String) methodArg;
String uid = UIDUtil.parseShardingKeyFromBacc(routeUid);
result.setSuccess(true);
result.setRouteId(uid);
return result;
}
result.setSuccess(false);
return result;
}
@Override
public int order() {
return 2;
}
}
UIDUtil 類的代碼如下:
public final class UIDUtil {
private UIDUtil() { }
public static String parseShardingKeyFromBacc(String bacc) {
if (bacc == null) {
throw new NullPointerException("bacc is null");
}
if ("".equals(bacc.trim())) {
throw new IllegalArgumentException("bacc is empty");
}
return bacc.substring(0, 1) + "0";
}
}
交易啟動時,首先需要向 SOFARPC 注冊定制的路由邏輯,代碼如下:
com.alipay.sofa.rpc.ldc.LdcProviderManager.getInstance().registeLdcRouteProvider(new PocLdcRouteProvider());
在取款接口中,添加一個 UID 參數,代碼如下:
@TwoPhaseBusinessAction(name = "pocDebitFirstAction", commitMethod = "commit", rollbackMethod = "rollback")
public AccountTransResult debit(String uid, @BusinessActionContextParameter(isParamInProperty = true) AccountTransRequest accountTransRequest,
BusinessActionContext businessActionContext);
在存款接口中,添加一個 UID 參數,代碼如下:
@TwoPhaseBusinessAction(name ="pocCreditFirstAction", commitMethod ="commit", rollbackMethod ="rollback")
public AccountTransResult credit(String uid,@BusinessActionContextParameter(isParamInProperty =true)AccountTransRequest accountTransRequest,
BusinessActionContext businessActionContext);
有關 SOFARPC 單元化配置的更多信息,參見 單元化配置。
消息隊列
此處假設一個存款加積分的場景:在賬戶 A 存入一筆錢后,需要增加賬戶 A 的積分。此場景下,需要消息隊列(MQ)中間件通過一條發送事務消息,通知積分中心執行積分增加操作。
發送事務消息
發送事務消息代碼示例如下:
// 啟動 producer
public void afterPropertiesSet()throwsException {
MessagingAccessPoint accessPoint = OMS.builder().driver("sofamq").build();
Properties properties = new Properties();
//替換 GID_PGROUP 為您實際創建的 Group ID
properties.setProperty(PropertyKeyConst.GROUP_ID, "GID_PGROUP");
transactionProducer = accessPoint.createTransactionProducer(properties, newLocalTransactionChecker() {
@Override
publicTransactionStatus check (Message msg){
returnTransactionStatus.CommitTransaction;
}
});
transactionProducer.start();
LogUtil.info(LOGGER, "transaction producer started");
}
// 發送消息
public void publishMessage(TxnFlowRequest request) {
try {
PointAcctDTO pointAcctDTO = buildPointReturnDTO(request);
Message message = new Message(TOPIC, EVENT_CODE, hessianSerializer.serialize(pointAcctDTO));
String shardingKey = UIDUtil.parseShardingKeyFromBacc(request.getBacc());
message.putUserProperties(UserPropKey.CELL_UID, shardingKey);
transactionProducer.send(message, (msg, arg) -> {
returnTransactionStatus.CommitTransaction;
}, null);
LogUtil.info(LOGGER, "Public a message, success. TOPIC [{}] EVENTCODE [{}] id [{}] bacc [{}] payload [{}]",
message.getTopic(), EVENT_CODE, message.getMsgID(), request.getBacc(), request);
} catch (Exception e) {
LogUtil.error(LOGGER, e, "Public a message, failure. TOPIC [{}] EVENTCODE [{}] bacc [{}] error [{}]",
TOPIC, EVENT_CODE, request.getBacc(), e.getMessage());
throw new TxnFlowException(PropertyConstant.CODE_SYS_ERR, "call msgBorker error", e);
}
}
接收事務消息
接收事務消息代碼示例如下:
// 啟動 consumer
@Override
public void afterPropertiesSet() throws Exception {
Properties properties = new Properties();
//替換 GID_PGROUP 為您實際創建的 Group ID
properties.setProperty(PropertyKeyConst.GROUP_ID, "GID_SGROUP");
properties.setProperty(PropertyKeyConst.LDC_SUB_MODE, "RZONE");
Consumer consumer = OMS.builder().driver("sofamq").build().createConsumer(properties);
//替換 TP_TEST_POC 為您實際創建的消息 Topic
consumer.subscribe("TP_TEST_POC", "EC_ACCT_POINT", this);
consumer.start();
}
// 消息
public Action consume(Message message, ConsumeContext context) {
try {
PointAcctDTO pointAcctDTO = hessianSerializer.deserialize(message.getBody(),
PointAcctDTO.class.getName());
serviceTemplate.executeWithTransaction(pointAcctDTO, newServiceCallback() {
@Override
public String getServiceName () {
return "returnPoint";
}
@Override
public void checkParam () {
//參數校驗,如果為null或者空則消息不用重試
pointAcctDTO.checkParameters();
}
@Override
public void checkIdempotent () {
//消息有可能出現重發的情況,如果這次積分已經更新過了,則直接返回成功。 消息冪等處理有問題
List<TPointOrderDO> tPointOrderDOs = tPointOrderDAO.searchTxnSn(
pointAcctDTO.getBacc(), pointAcctDTO.getTxnSn());
if (tPointOrderDOs != null && tPointOrderDOs.size() != 0) {
throwPcException.valueOf(PcStatusCode.IDEMPOTENT);
}
}
@Override
public void execute () {
//埋點
mockActionServices.mockAction(ActionPointConstant.POINTCENTER_MESSAGE,
pointAcctDTO.getBacc());
List<TPotAcctDO> tPotAcctDOs = tPotAcctDAO.lockForUpdate(pointAcctDTO.getBacc());
//根據原來代碼邏輯,如果沒找到則打日志;
if (tPotAcctDOs == null || tPotAcctDOs.size() != 1) {
LogUtil.error(LOGGER, "error = {}, payload = {}",
PcStatusCode.BACC_COUNT_ERROR, "select t_pot_acct record:"
+ tPotAcctDOs);
return;
}
TPotAcctDO tPotAcctDO = tPotAcctDOs.get(0);
//根據原來代碼邏輯,如果狀態不為0則直接返回。
if (!PropertyConstant.ACC_STATUS_0.equals(tPotAcctDO.getStatus())) {
return;
}
tPotAcctDO.setPotBal(tPotAcctDO.getPotBal().add(pointAcctDTO.getPotBal()));
tPotAcctDO.setLastTxnSn(pointAcctDTO.getTxnSn());
tPotAcctDAO.update(tPotAcctDO);
TPointOrderDO tPointOrderDO = new TPointOrderDO(pointAcctDTO.getBacc(),
pointAcctDTO.getPotBal(), pointAcctDTO.getTxnSn());
tPointOrderDAO.insertOrder(tPointOrderDO);
}
});
} catch (CodecException e) {
LogUtil.error(LOGGER, e, "consume pointAcctDTO={} exception.");
}
// do something
return Action.CommitMessage;
}
分布式事務
在事務發起方的 spring 事務模板內,調用 dtxService.start()
方法開啟分布式事務,需要在開啟時,增加單元化架構下的分片參數,用做服務路由和數據庫路由。
代碼示例如下:
Map<String,Object> properties =newHashMap<String,Object>();
// 開啟分布式事務
String shardingKey =UIDUtil.parseShardingKeyFromBacc(request.getBacc());
dtxService.start("accttrans", request.getTxnSn(), shardingKey, properties);
有關分布式事務單元化配置的更多信息,參見 接入單元化能力。
任務調度
您無需進行額外配置,即可使用單元化的任務調度能力,請參見 使用跨 zone 網關。
建議檢查客戶端的啟動參數是否正常傳入:-D com.alipay.ldc.zone=xxx //指定所屬的邏輯單元
。
API 網關
您無需進行額外配置,即可使用 API 網關的單元化路由能力,請參見 創建路由規則、創建 API。
應用參數配置
在應用部署時,您還需要傳入以下參數。各參數均通過 JVM 的 -D 參數傳入到應用中。
中間件相關參數
com.alipay.instanceid
:當前租戶中間件實例唯一標識,可以在消息隊列控制臺概覽頁 接入配置 > 實例 ID 中獲取。com.antcloud.antvip.endpoint
:ACVIP 地址,可以在消息隊列控制臺概覽頁 接入配置 > TCP 協議內網接入點 中獲取。com.alipay.env
:環境標識,取值 shared,表示運行在共享模式。com.antcloud.mw.access
:中間件訪問控制鍵值。com.antcloud.mw.secret
:中間件訪問控制密鑰。
同城雙活相關參數
com.alipay.ldc.zone
:單元名稱。com.alipay.ldc.datacenter
:物理機房名稱。
LDC 單元化相關參數
zmode
:LDC 單元化開關。取值為 true 時,表示開啟 LDC 單元化。com.alipay.ldc.strictmode
:LDC 單元化嚴格模式開關。取值為 true 時,表示開啟 LDC 單元化嚴格模式。