本文介紹如何快速使用EMR Java SDK完成常見操作,例如創建集群、創建作業和擴縮容節點組等。
前提條件
請確保代碼運行環境設置了環境變量ALIBABA_CLOUD_ACCESS_KEY_ID和ALIBABA_CLOUD_ACCESS_KEY_SECRET。具體配置方法,請參見配置方案。
創建集群
說明
如果您在2022年12月19日17點(UTC+8)以后第一次創建EMR集群,無法使用該接口創建集群,請使用CreateCluster - 創建集群。
public static void main(String[] args) {
IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
DefaultAcsClient client = new DefaultAcsClient(profile);
final CreateClusterRequest request = new CreateClusterRequest();
request.setName("Your-Cluster-Name");
request.setSecurityGroupId("Your-Security-Group-Id"); //如果未指定安全組ID,則將創建具有給定名稱的新安全組。
request.setAutoRenew(false);
request.setChargeType("PostPaid"); //付費類型,按量付費。
request.setClusterType("HADOOP"); //集群類型。
request.setEmrVer("EMR-1.3.0"); //EMR版本。
request.setIsOpenPublicIp(true);
request.setLogEnable(true);
request.setLogPath("oss://Your-Bucket/Your-Folder");
request.setMasterPwdEnable(true); //啟用主節點密碼。
request.setMasterPwd("Aa123456789"); //設置主節點密碼。
request.setZoneId("cn-hangzhou-b"); //設置地域。
// io優化參數,ecs系列以及網絡類型會決定可選的硬件配置(ecs實例類型,云盤類型),詳情情況可以參照ecs購買頁面可以選擇的組合和支持的類型來設置。
request.setIoOptimized(true); // 設置IO優化參數。
request.setInstanceGeneration("ecs-2"); // 設置為ecs II系列,取值支持ecs-1和ecs-2。
request.setNetType("vpc"); // 設置網絡類型。
request.setVpcId("your-vpcId");
request.setVSwitchId("your-switchId");
List<CreateClusterRequest.EcsOrder> ecsOrders = new ArrayList<CreateClusterRequest.EcsOrder>();
CreateClusterRequest.EcsOrder masterOrder = new CreateClusterRequest.EcsOrder();
masterOrder.setIndex(1);
masterOrder.setDiskCapacity(50);
masterOrder.setDiskCount(2);
masterOrder.setDiskType("CLOUD_EFFICIENCY"); //指定磁盤類型。
masterOrder.setInstanceType("ecs.n1.large"); //指定ecs實例類型。
masterOrder.setNodeCount(1);
masterOrder.setNodeType("MASTER"); // 主節點。
ecsOrders.add(masterOrder);
CreateClusterRequest.EcsOrder coreOrder = new CreateClusterRequest.EcsOrder();
coreOrder.setIndex(2);
coreOrder.setDiskCapacity(50);
coreOrder.setDiskCount(4);
coreOrder.setDiskType("CLOUD_EFFICIENCY");
coreOrder.setInstanceType("ecs.n1.large");
coreOrder.setNodeCount(3);
coreOrder.setNodeType("CORE");
ecsOrders.add(coreOrder);
request.setEcsOrders(ecsOrders);
try {
CreateClusterResponse response = client.getAcsResponse(request);
String clusterId = response.getClusterId(); // cluster id
//對集群執行操作。
} catch (Exception e) {
}
}
創建集群需要指定集群所屬的安全組。如果不指定安全組ID,則需要指定一個安全組名稱,在創建集群的同時新建一個安全組。
設置高可用參數,詳情請參見創建集群的硬件配置部分。
request.setHighAvailabilityEnable(true);
設置可選軟件組件,詳情請參見創建集群的軟件配置章節。
List<String> soft = new ArrayList<String>(); soft.add("presto"); soft.add("oozie"); request.setOptionSoftWareLists(soft);
設置配置項,詳情請參見軟件配置。
request.setConfigurations("oss://your-bucket/your-conf.json");
設置引導操作,詳情請參見引導操作。
List<CreateClusterRequest.BootstrapAction> bootstrapActionLists = new ArrayList<CreateClusterRequest.BootstrapAction>(); CreateClusterRequest.BootstrapAction bootstrapActionList = new CreateClusterRequest.BootstrapAction(); bootstrapActionList.setName("bootstrapName"); bootstrapActionList.setPath("oss://emr-agent-pack/bootstrap/run-if.py"); bootstrapActionList.setArg("instance.isMaster=true mkdir -p /tmp/abc"); bootstrapActionLists.add(bootstrapActionList); request.setBootstrapActions(bootstrapActionLists);
查詢集群詳情
public static void main(String[] args) {
IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
DefaultAcsClient client = new DefaultAcsClient(profile);
final DescribeClusterRequest request = new DescribeClusterRequest();
request.setId("C-XXXXXXXXXXXXXXXX"); //集群ID。
try {
DescribeClusterResponse response = client.getAcsResponse(request);
DescribeClusterResponse.ClusterInfo clusterInfo = response.getClusterInfo();
//對集群執行操作。
} catch (Exception e) {
}
}
查詢集群列表
public static void main(String[] args) {
IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
DefaultAcsClient client = new DefaultAcsClient(profile);
final ListClustersRequest request = new ListClustersRequest();
request.setPageNumber(1);
request.setIsDesc(true);
request.setPageSize(20);
try {
ListClustersResponse response = client.getAcsResponse(request);
List<ListClustersResponse.ClusterInfo> clusterInfos = response.getClusters();
for (ListClustersResponse.ClusterInfo clusterInfo : clusterInfos) {
//對集群執行操作。
}
} catch (Exception e) {
}
}
釋放集群
public static void main(String[] args) {
IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
DefaultAcsClient client = new DefaultAcsClient(profile);
ReleaseClusterRequest request = new ReleaseClusterRequest();
request.setId("C-XXXXXXXXXXXXXXXX"); //指定要釋放的集群ID。
try {
ReleaseClusterResponse response = client.getAcsResponse(request);
} catch (Exception e) {
}
}
創建作業
public static void main(String[] args) {
IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
DefaultAcsClient client = new DefaultAcsClient(profile);
final CreateJobRequest request = new CreateJobRequest();
request.setName("Your-Job-Name");
request.setRunParameter("--master yarn-client --driver-memory 4g --executor-memory 4g --executor-cores 2 --num-executors 4 --class com.test.RemoteDebug ossref://Your-Bucket/Resource.jar 1000\"");
request.setFailAct("CONTINUE"); //繼續作業。
request.setType("SPARK"); //作業類型。
try {
CreateJobResponse response = client.getAcsResponse(request);
String jobId = response.getId();
} catch (Exception e) {
}
}
刪除作業
重要
如果一個作業被其他工作流使用,則不能刪除,需要先刪除對應的工作流或者修改對應的工作流。
public static void main(String[] args) {
IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
DefaultAcsClient client = new DefaultAcsClient(profile);
final DeleteJobRequest request = new DeleteJobRequest();
request.setId("J-XXXXXXXXXXXXXXXX"); //設置作業ID。
try {
DeleteJobResponse response = client.getAcsResponse(request);
} catch (Exception e) {
}
}
擴容節點組
通過調整數量擴容節點組。
IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
DefaultAcsClient client = new DefaultAcsClient(profile);
ResizeClusterV2Request request = new ResizeClusterV2Request();
request.setClusterId("C-0E4B90219*****");
List<ResizeClusterV2Request.HostGroup> hostGroups = new ArrayList<>();
ResizeClusterV2Request.HostGroup hostGroup = new ResizeClusterV2Request.HostGroup();
hostGroups.add(hostGroup);
hostGroup.setHostGroupId("G-F0D0661E0A6E****");
//擴容數量。
hostGroup.setNodeCount(1);
request.setHostGroups(hostGroups);
System.out.println(JSON.toJSONString(client.getAcsResponse(request)));
縮容節點組
您可以選擇通過調整指定節點組數量縮容節點組或通過實例ID縮容節點組。
通過調整數量縮容節點組
重要
使用該特性,您需要升級aliyun-java-sdk-emr到3.3.8版本。
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>aliyun-java-sdk-emr</artifactId>
<version>3.3.8</version>
</dependency>
IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
DefaultAcsClient client = new DefaultAcsClient(profile);
ReleaseClusterHostGroupRequest request = new ReleaseClusterHostGroupRequest();
# 集群ID。
request.setClusterId("C-01A1F4A********");
# 節點組ID,可通過ListClusterHostGroup接口獲取節點組ID。
request.setHostGroupId("G-D11D3E*******");
//指定釋放數量。
request.setReleaseNumber(3);
request.setEnableGracefulDecommission(true);
//單位為秒。
request.setDecommissionTimeout(60);
System.out.println(JSON.toJSONString(client.getAcsResponse(request)));
當啟用Yarn Decommission選項時,在EMR控制臺YARN服務的配置頁面,搜索參數yarn.resourcemanager.nodes.exclude-path,將其值改為/etc/ecm/hadoop-conf/yarn-exclude.xml并保存和部署,或者可以通過以下代碼完成修改。
IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
DefaultAcsClient client = new DefaultAcsClient(profile);
ModifyClusterServiceConfigRequest modifyClusterServiceConfigRequest = new ModifyClusterServiceConfigRequest();
modifyClusterServiceConfigRequest.setClusterId("C-01A1F4A********");
modifyClusterServiceConfigRequest.setRegionId("cn-hangzhou");
modifyClusterServiceConfigRequest.setServiceName("YARN");
modifyClusterServiceConfigRequest.setConfigParams("{\"yarn-site\":{\"yarn.resourcemanager.nodes.exclude-path\":\"/etc/ecm/hadoop-conf/yarn-exclude.xml\"}}");
modifyClusterServiceConfigRequest.setCustomConfigParams("{}");
modifyClusterServiceConfigRequest.setComment("for decommission gracefully");
modifyClusterServiceConfigRequest.setRefreshHostConfig(Boolean.TRUE);
System.out.println(JSON.toJSONString(client.getAcsResponse(modifyClusterServiceConfigRequest)));
通過實例ID縮容節點組
IClientProfile profile = DefaultProfile.getProfile("cn-hangzhou", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"), System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
DefaultAcsClient client = new DefaultAcsClient(profile);
ReleaseClusterHostGroupRequest request = new ReleaseClusterHostGroupRequest();
# 集群ID。
request.setClusterId("C-C52CF4246D10****");
# 節點組ID,可通過ListClusterHostGroup接口獲取節點組ID。
request.setHostGroupId("G-A24651D939AD****");
//指定釋放節點。
List<String> instanceIds = new ArrayList<>();
instanceIds.add("i-xxxxxxx");
instanceIds.add("i-xxxxxxy");
//需要JSON一下。
request.setInstanceIdList(JSON.toJSONString(instanceIds));
request.setEnableGracefulDecommission(true);
//單位為秒。
request.setDecommissionTimeout(60);
System.out.println(JSON.toJSONString(client.getAcsResponse(request)));
文檔內容是否對您有幫助?