本文為您介紹如何安裝并使用實時計算Flink版Python SDK。
前提條件
已創建AccessKey,詳情請參見創建AccessKey。
說明為避免主賬號泄露AccessKey帶來安全風險,建議您創建RAM用戶,授予RAM用戶Flink相關的訪問權限,再使用RAM用戶的AccessKey調用SDK。相關文檔請參見:
創建RAM用戶以及對應AccessKey,請參見創建RAM用戶或創建AccessKey。
為RAM用戶授權,請參見RAM授權。
已準備Python環境,要求3.6及以上版本。
賬號具有相關訪問及操作權限,詳情請參見權限管理。
安裝 Flink Python SDK
通過pip安裝Python SDK。
進行作業開發和運維等操作時,需要調用實時計算開發控制臺API。安裝和使用詳情請參見開發控制臺SDK中心。
pip3 install alibabacloud_ververica20220718==1.2.1
查看工作空間信息,進行工作空間的購買、資源調整等操作時,需要調用實時計算管理控制臺API。安裝和使用詳情請參見售賣控制臺SDK中心。
pip3 install alibabacloud_foasconsole20211028==1.0.2
在線調試和生成SDK示例
OpenAPI門戶提供了在線調用產品API、動態生成SDK示例代碼和快速檢索接口等功能,可以顯著降低使用API的難度。您可以在實時計算開發控制臺API和實時計算售賣控制臺API頁面查看所需API的SDK示例,并下載使用,具體操作步驟請參見快速開始。
參考示例
查看已購買的工作空間
查詢目標地域下已購買的Flink工作空間的詳細信息。必填請求參數如下,更多參數詳情請參見DescribeInstances - 查看已購買Flink全托管工作空間。
Region
:地域ID,詳情請參見服務接入點。例如cn-hangzhou。
# -*- coding: utf-8 -*-
import os
import sys
from typing import List
from alibabacloud_foasconsole20211028.client import Client as foasconsole20211028Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_foasconsole20211028 import models as foasconsole_20211028_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_util.client import Client as UtilClient
class Sample:
def __init__(self):
pass
@staticmethod
def create_client() -> foasconsole20211028Client:
"""
使用AK&SK初始化賬號Client
@return: Client
@throws Exception
"""
# 工程代碼泄露可能會導致 AccessKey 泄露,并威脅賬號下所有資源的安全性,建議使用更安全的 STS 方式。以下代碼示例僅供參考。
config = open_api_models.Config(
# 必填,請確保代碼運行環境設置了環境變量 ALIBABA_CLOUD_ACCESS_KEY_ID。,
access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
# 必填,請確保代碼運行環境設置了環境變量 ALIBABA_CLOUD_ACCESS_KEY_SECRET。,
access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
)
# Endpoint請根據實際情況修改
config.endpoint = f'foasconsole.aliyuncs.com'
return foasconsole20211028Client(config)
@staticmethod
def main(
args: List[str],
) -> None:
client = Sample.create_client()
describe_instances_request = foasconsole_20211028_models.DescribeInstancesRequest(
region='cn-hangzhou'
)
runtime = util_models.RuntimeOptions()
try:
# 調用并打印API的返回值
response=client.describe_instances_with_options(describe_instances_request, runtime)
print(response)
except Exception as error:
# 此處僅做打印展示,請謹慎對待異常處理,在工程項目中切勿直接忽略異常。
# 錯誤 message
print(error.message)
# 診斷地址
print(error.data.get("Recommend"))
UtilClient.assert_as_string(error.message)
@staticmethod
async def main_async(
args: List[str],
) -> None:
client = Sample.create_client()
describe_instances_request = foasconsole_20211028_models.DescribeInstancesRequest(
region='cn-hangzhou'
)
runtime = util_models.RuntimeOptions()
try:
# 復制代碼運行請自行打印 API 的返回值
await client.describe_instances_with_options_async(describe_instances_request, runtime)
except Exception as error:
# 此處僅做打印展示,請謹慎對待異常處理,在工程項目中切勿直接忽略異常。
# 錯誤 message
print(error.message)
# 診斷地址
print(error.data.get("Recommend"))
UtilClient.assert_as_string(error.message)
if __name__ == '__main__':
Sample.main(sys.argv[1:])
獲取已部署作業列表
獲取項目空間下所有已部署作業的信息。必填請求參數如下,參數詳情請參見ListDeployments - 獲取已部署作業列表。
workspace
:工作空間ID,可通過查看已購買的工作空間返回的ResourceId獲取。例如adf9e5147a****。namespace
:項目空間名稱,例如script****-default。
# -*- coding: utf-8 -*-
import os
import sys
from typing import List
from alibabacloud_ververica20220718.client import Client as ververica20220718Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_ververica20220718 import models as ververica_20220718_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_util.client import Client as UtilClient
class Sample:
def __init__(self):
pass
@staticmethod
def create_client() -> ververica20220718Client:
"""
使用AK&SK初始化賬號Client
@return: Client
@throws Exception
"""
# 工程代碼泄露可能會導致 AccessKey 泄露,并威脅賬號下所有資源的安全性,建議使用更安全的 STS 方式。以下代碼示例僅供參考。
config = open_api_models.Config(
# 必填,請確保代碼運行環境設置了環境變量 ALIBABA_CLOUD_ACCESS_KEY_ID。,
access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
# 必填,請確保代碼運行環境設置了環境變量 ALIBABA_CLOUD_ACCESS_KEY_SECRET。,
access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
)
# Endpoint請根據實際情況修改
config.endpoint = f'ververica.cn-hangzhou.aliyuncs.com'
return ververica20220718Client(config)
@staticmethod
def main(
args: List[str],
) -> None:
client = Sample.create_client()
list_deployments_headers = ververica_20220718_models.ListDeploymentsHeaders(
workspace='workspace'
)
list_deployments_request = ververica_20220718_models.ListDeploymentsRequest()
runtime = util_models.RuntimeOptions()
try:
#調用并打印API的返回值
request=client.list_deployments_with_options('namespace', list_deployments_request, list_deployments_headers, runtime)
print(request)
except Exception as error:
# 此處僅做打印展示,請謹慎對待異常處理,在工程項目中切勿直接忽略異常。
# 錯誤 message
print(error.message)
# 診斷地址
print(error.data.get("Recommend"))
UtilClient.assert_as_string(error.message)
@staticmethod
async def main_async(
args: List[str],
) -> None:
client = Sample.create_client()
list_deployments_headers = ververica_20220718_models.ListDeploymentsHeaders(
workspace='workspace'
)
list_deployments_request = ververica_20220718_models.ListDeploymentsRequest()
runtime = util_models.RuntimeOptions()
try:
# 復制代碼運行請自行打印 API 的返回值。namespace為項目空間名稱
await client.list_deployments_with_options_async('namespace', list_deployments_request, list_deployments_headers, runtime)
except Exception as error:
# 此處僅做打印展示,請謹慎對待異常處理,在工程項目中切勿直接忽略異常。
# 錯誤 message
print(error.message)
# 診斷地址
print(error.data.get("Recommend"))
UtilClient.assert_as_string(error.message)
if __name__ == '__main__':
Sample.main(sys.argv[1:])
啟動作業
啟動項目空間下一個已部署作業。必填請求參數如下,參數詳情請參見StartJobWithParams - 啟動作業實例。
workspace
:工作空間ID,例如adf9e5147a****。namespace
:項目空間名稱,例如script****-default。deploymentId
:作業部署ID,可通過獲取已部署作業列表獲取。例如3171d4d1-5952-4d02-b978-e762493b****。kind
:啟動位點類型。支持NONE(無狀態啟動)、LATEST_SAVEPOINT(最新的作業快照啟動)、FROM_SAVEPOINT(從指定快照啟動)、LATEST_STATE(最新狀態啟動)。
# -*- coding: utf-8 -*-
import os
import sys
from typing import List
from alibabacloud_ververica20220718.client import Client as ververica20220718Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_ververica20220718 import models as ververica_20220718_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_util.client import Client as UtilClient
class Sample:
def __init__(self):
pass
@staticmethod
def create_client() -> ververica20220718Client:
"""
使用AK&SK初始化賬號Client
@return: Client
@throws Exception
"""
# 工程代碼泄露可能會導致 AccessKey 泄露,并威脅賬號下所有資源的安全性,建議使用更安全的 STS 方式。以下代碼示例僅供參考。
config = open_api_models.Config(
# 必填,請確保代碼運行環境設置了環境變量 ALIBABA_CLOUD_ACCESS_KEY_ID。,
access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
# 必填,請確保代碼運行環境設置了環境變量 ALIBABA_CLOUD_ACCESS_KEY_SECRET。,
access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
)
# Endpoint請根據實際情況修改
config.endpoint = f'ververica.cn-hangzhou.aliyuncs.com'
return ververica20220718Client(config)
@staticmethod
def main(
args: List[str],
) -> None:
client = Sample.create_client()
start_job_with_params_headers = ververica_20220718_models.StartJobWithParamsHeaders(
workspace='workspace'
)
job_start_parameters_deployment_restore_strategy = ververica_20220718_models.DeploymentRestoreStrategy(
kind='NONE'
)
job_start_parameters = ververica_20220718_models.JobStartParameters(
deployment_id='deploymentId',
restore_strategy=job_start_parameters_deployment_restore_strategy
)
start_job_with_params_request = ververica_20220718_models.StartJobWithParamsRequest(
body=job_start_parameters
)
runtime = util_models.RuntimeOptions()
try:
# 復制代碼運行請自行打印 API 的返回值
client.start_job_with_params_with_options('namespace', start_job_with_params_request, start_job_with_params_headers, runtime)
except Exception as error:
# 此處僅做打印展示,請謹慎對待異常處理,在工程項目中切勿直接忽略異常。
# 錯誤 message
print(error.message)
# 診斷地址
print(error.data.get("Recommend"))
UtilClient.assert_as_string(error.message)
@staticmethod
async def main_async(
args: List[str],
) -> None:
client = Sample.create_client()
start_job_with_params_headers = ververica_20220718_models.StartJobWithParamsHeaders(
workspace='workspace'
)
job_start_parameters_deployment_restore_strategy = ververica_20220718_models.DeploymentRestoreStrategy(
#作業啟動策略
kind='NONE'
)
job_start_parameters = ververica_20220718_models.JobStartParameters(
deployment_id='deploymentId',
restore_strategy=job_start_parameters_deployment_restore_strategy
)
start_job_with_params_request = ververica_20220718_models.StartJobWithParamsRequest(
body=job_start_parameters
)
runtime = util_models.RuntimeOptions()
try:
# 復制代碼運行請自行打印 API 的返回值
await client.start_job_with_params_with_options_async('namespace', start_job_with_params_request, start_job_with_params_headers, runtime)
except Exception as error:
# 此處僅做打印展示,請謹慎對待異常處理,在工程項目中切勿直接忽略異常。
# 錯誤 message
print(error.message)
# 診斷地址
print(error.data.get("Recommend"))
UtilClient.assert_as_string(error.message)
if __name__ == '__main__':
Sample.main(sys.argv[1:])
獲取作業實例列表
獲取某個已部署作業下所有作業實例的信息。必填請求參數如下,參數詳情請參見ListJobs - 獲取作業實例列表。
workspace
:工作空間ID,例如adf9e5147a****。namespace
:項目空間名稱,例如script****-default。deploymentId
:作業部署ID,可以通過獲取已部署作業列表獲取。例如3171d4d1-5952-4d02-b978-e762493b****。
# -*- coding: utf-8 -*-
import os
import sys
from typing import List
from alibabacloud_ververica20220718.client import Client as ververica20220718Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_ververica20220718 import models as ververica_20220718_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_util.client import Client as UtilClient
class Sample:
def __init__(self):
pass
@staticmethod
def create_client() -> ververica20220718Client:
"""
使用AK&SK初始化賬號Client
@return: Client
@throws Exception
"""
# 工程代碼泄露可能會導致 AccessKey 泄露,并威脅賬號下所有資源的安全性,建議使用更安全的 STS 方式。以下代碼示例僅供參考。
config = open_api_models.Config(
# 必填,請確保代碼運行環境設置了環境變量 ALIBABA_CLOUD_ACCESS_KEY_ID。,
access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
# 必填,請確保代碼運行環境設置了環境變量 ALIBABA_CLOUD_ACCESS_KEY_SECRET。,
access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
)
# Endpoint請根據實際情況修改
config.endpoint = f'ververica.cn-hangzhou.aliyuncs.com'
return ververica20220718Client(config)
@staticmethod
def main(
args: List[str],
) -> None:
client = Sample.create_client()
list_jobs_headers = ververica_20220718_models.ListJobsHeaders(
workspace='workspace'
)
list_jobs_request = ververica_20220718_models.ListJobsRequest(
deployment_id='deploymentId'
)
runtime = util_models.RuntimeOptions()
try:
# 調用并打印API的返回值
request=client.list_jobs_with_options('namespace', list_jobs_request, list_jobs_headers, runtime)
print(request)
except Exception as error:
# 此處僅做打印展示,請謹慎對待異常處理,在工程項目中切勿直接忽略異常。
# 錯誤 message
print(error.message)
# 診斷地址
print(error.data.get("Recommend"))
UtilClient.assert_as_string(error.message)
@staticmethod
async def main_async(
args: List[str],
) -> None:
client = Sample.create_client()
list_jobs_headers = ververica_20220718_models.ListJobsHeaders(
workspace='workspace'
)
list_jobs_request = ververica_20220718_models.ListJobsRequest(
deployment_id='deploymentId'
)
runtime = util_models.RuntimeOptions()
try:
# 復制代碼運行請自行打印 API 的返回值
await client.list_jobs_with_options_async('namespace', list_jobs_request, list_jobs_headers, runtime)
except Exception as error:
# 此處僅做打印展示,請謹慎對待異常處理,在工程項目中切勿直接忽略異常。
# 錯誤 message
print(error.message)
# 診斷地址
print(error.data.get("Recommend"))
UtilClient.assert_as_string(error.message)
if __name__ == '__main__':
Sample.main(sys.argv[1:])
停止作業實例
停止一個作業實例。必填請求參數如下,參數詳情請參見StopJob - 停止作業實例。
workspace
:工作空間ID,例如adf9e5147a****。namespace
:項目空間名稱,例如script****-default。jobId
:作業實例ID,您可以通過獲取作業實例列表獲取。例如3171d4d1-5952-4d02-b978-e762493b****。stopStrategy
:作業停止策略。支持NONE(直接停止)、STOP_WITH_SAVEPOINT(生成作業快照后停止)、STOP_WITH_DRAIN(以drain的方式停止)。
# -*- coding: utf-8 -*-
import os
import sys
from typing import List
from alibabacloud_ververica20220718.client import Client as ververica20220718Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_ververica20220718 import models as ververica_20220718_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_util.client import Client as UtilClient
class Sample:
def __init__(self):
pass
@staticmethod
def create_client() -> ververica20220718Client:
"""
使用AK&SK初始化賬號Client
@return: Client
@throws Exception
"""
# 工程代碼泄露可能會導致 AccessKey 泄露,并威脅賬號下所有資源的安全性,建議使用更安全的 STS 方式。以下代碼示例僅供參考。
config = open_api_models.Config(
# 必填,請確保代碼運行環境設置了環境變量 ALIBABA_CLOUD_ACCESS_KEY_ID。
access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
# 必填,請確保代碼運行環境設置了環境變量 ALIBABA_CLOUD_ACCESS_KEY_SECRET。
access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET']
)
# Endpoint請根據實際情況修改
config.endpoint = f'ververica.cn-hangzhou.aliyuncs.com'
return ververica20220718Client(config)
@staticmethod
def main(
args: List[str],
) -> None:
client = Sample.create_client()
stop_job_headers = ververica_20220718_models.StopJobHeaders(
workspace='workspace'
)
stop_job_request_body = ververica_20220718_models.StopJobRequestBody(
# 作業停止策略
stop_strategy='stopStrategy'
)
stop_job_request = ververica_20220718_models.StopJobRequest(
body=stop_job_request_body
)
runtime = util_models.RuntimeOptions()
try:
# 復制代碼運行請自行打印 API 的返回值
client.stop_job_with_options('namespace', 'jobId', stop_job_request, stop_job_headers, runtime)
except Exception as error:
# 此處僅做打印展示,請謹慎對待異常處理,在工程項目中切勿直接忽略異常。
# 錯誤 message
print(error.message)
# 診斷地址
print(error.data.get("Recommend"))
UtilClient.assert_as_string(error.message)
@staticmethod
async def main_async(
args: List[str],
) -> None:
client = Sample.create_client()
stop_job_headers = ververica_20220718_models.StopJobHeaders(
workspace='workspace'
)
stop_job_request_body = ververica_20220718_models.StopJobRequestBody(
stop_strategy='stopStrategy'
)
stop_job_request = ververica_20220718_models.StopJobRequest(
body=stop_job_request_body
)
runtime = util_models.RuntimeOptions()
try:
# 復制代碼運行請自行打印 API 的返回值
await client.stop_job_with_options_async('namespace', 'jobId', stop_job_request, stop_job_headers, runtime)
except Exception as error:
# 此處僅做打印展示,請謹慎對待異常處理,在工程項目中切勿直接忽略異常。
# 錯誤 message
print(error.message)
# 診斷地址
print(error.data.get("Recommend"))
UtilClient.assert_as_string(error.message)
if __name__ == '__main__':
Sample.main(sys.argv[1:])
相關文檔
Java SDK參考請參見Java SDK參考。
實時計算售賣控制臺支持的API,以及各API的參數說明等詳情請參見API概覽。
實時計算開發控制臺支持的API,以及各API的參數說明等詳情請參見API概覽。