日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

通過Python SDK管理告警

本文介紹通過Python SDK使用告警的代碼示例。

前提條件

  • 已創建RAM用戶并完成授權。具體操作,請參見創建RAM用戶并完成授權。

  • 已配置環境變量ALIBABA_CLOUD_ACCESS_KEY_IDALIBABA_CLOUD_ACCESS_KEY_SECRET。具體操作,請參見在Linux、macOS和Windows系統配置環境變量。

    重要
    • 阿里云賬號的AccessKey擁有所有API的訪問權限,建議您使用RAM用戶的AccessKey進行API訪問或日常運維。

    • 強烈建議不要把AccessKey ID和AccessKey Secret保存到工程代碼里,否則可能導致AccessKey泄露,威脅您賬號下所有資源的安全。

  • 已安裝0.7.9及以上版本的Python SDK。更多信息,請參見安裝Python SDK。

費用說明

在新版告警公測期間,僅短信通知和語音通知會產生費用。詳細價格,請參見產品定價。

操作說明
短信通知告警短信通知費用,按通知次數收費。
說明 某些運營商可能將內容過長(例如超過70字符)的短信拆分成2條發送,所以當您編輯的短信內容過長時,您可能收到2條短信,但日志服務只按照1條短信收費。
語音通知告警語音通知費用,按通知次數收費。
說明
  • 告警語音未撥通時,不會重復撥打,將以短信方式發送一次通知。
  • 無論告警語音是否撥通均按一次計費。未撥通的提示短信,不會額外產生短信費用。

管理告警監控規則

代碼示例如下。具體的參數說明,請參見告警監控規則數據結構。

import os
from aliyun.log import LogClient
# 日志服務的服務接入點。
endpoint = 'cn-huhehaote.log.aliyuncs.com'
# 本示例從環境變量中獲取AccessKey ID和AccessKey Secret。
accesskey_id = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_ID', '')
accesskey_secret = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_SECRET', '')
# 創建日志服務Client。
client = LogClient(endpoint, accesskey_id, accesskey_secret)

project = 'demo-alert'
alert_id = 'nginx-status-error'


def create_alert():
    alert = {
        'name': alert_id,
        'displayName': 'Nginx Status Error',
        'type': 'Alert',
        'state': 'Enabled',
        'schedule': {
            'type': 'FixedRate',
            'interval': '1m'
        },
        'configuration': {
            'version': '2.0',
            'type': 'default',
            'dashboard': 'internal-alert-analysis',
            'queryList': [{
                # 告警類型 log:日志庫 metric:時序庫   meta:資源數據
                'storeType': 'log',
                # 開服地域
                'region': 'cn-huhehaote',
                # 項目名稱
                'project': 'demo-alert',
                # 日志庫或者時序庫名稱
                'store': 'nginx-access-log',
                # 查詢語句
                'query': 'status >= 400 | select count(*) as cnt',
                # 時間片類型
                'timeSpanType': 'Truncated',
                # 開始時間
                'start': '-1m',
                # 結束時間
                'end': 'absolute',
                # 是否開啟獨享 SQL
                'powerSqlMode': 'auto'
            }],
            'groupConfiguration': {
                'type': 'no_group',
                'fields': []
            },
            'joinConfigurations': [],
            'severityConfigurations': [{
                'severity': 6,
                'evalCondition': {
                    'condition': 'cnt > 0',
                    'countCondition': ''
                }
            }],
            'labels': [{
                'key': 'service',
                'value': 'nginx'
            }],
            'annotations': [{
                'key': 'title',
                'value': 'Nginx Status Error'
            }, {
                'key': 'desc',
                'value': 'Nginx Status Error, count: ${cnt}'
            }],
            'autoAnnotation': True,
            'sendResolved': False,
            'threshold': 1,
            'noDataFire': False,
            'noDataSeverity': 6,
            'policyConfiguration': {
                'alertPolicyId': 'sls.builtin.dynamic',
                'actionPolicyId': 'test-action-policy',
                'repeatInterval': '1m',
                'useDefault': False
            }
        }
    }
    res = client.create_alert(project, alert)
    res.log_print()


def get_and_update_alert():
    res = client.get_alert(project, alert_id)
    res.log_print()

    alert = res.get_body()
    alert['configuration']['queryList'][0]['query'] = 'status >= 400 | select count(*) as cnt'
    res = client.update_alert(project, alert)
    res.log_print()


def enable_and_disable_alert():
    res = client.disable_alert(project, alert_id)
    res.log_print()

    res = client.enable_alert(project, alert_id)
    res.log_print()


def list_alerts():
    res = client.list_alert(project, offset=0, size=100)
    res.log_print()


def delete_alert():
    res = client.delete_alert(project, alert_id)
    res.log_print()


if __name__ == '__main__':
    create_alert()
    get_and_update_alert()
    enable_and_disable_alert()
    list_alerts()
    delete_alert()

管理告警資源數據

代碼示例如下。具體的參數說明,請參見告警資源數據結構。

管理用戶

import os
from aliyun.log import LogClient
from aliyun.log.resource_params import ResourceRecord
# 日志服務的服務入口。資源數據的寫操作,必須使用河源地域,讀操作可以使用其它地域。
endpoint = 'cn-heyuan.log.aliyuncs.com'
# 本示例從環境變量中獲取AccessKey ID和AccessKey Secret。
accesskey_id = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_ID', '')
accesskey_secret = os.environ.get('ALIBABA_CLOUD_ACCESS_KEY_SECRET', '')
# 創建日志服務Client。
client = LogClient(endpoint, accesskey_id, accesskey_secret)
user_resource_name = 'sls.common.user'


def create_user():
    user = {
        'user_id': 'alex',
        'user_name': 'Alex',
        'email': [
            '****@example.com'
        ],
        'country_code': '86',
        'phone': '133****3333',
        'enabled': True,
        'sms_enabled': True,
        'voice_enabled': True
    }
    record = ResourceRecord(user['user_id'], user['user_name'], user)
    res = client.create_resource_record(user_resource_name, record)
    print('[create user]')
    res.log_print()


def get_user():
    res = client.get_resource_record(user_resource_name, 'alex')
    print('[get user]')
    print(res.get_record().to_dict())


def update_user():
    user = {
        'user_id': 'alex',
        'user_name': 'Alex',
        'email': [
            '****@example.com'
        ],
        'country_code': '86',
        'phone': '133****3333',
        'enabled': False,
        'sms_enabled': True,
        'voice_enabled': True
    }
    record = ResourceRecord(user['user_id'], user['user_name'], user)
    res = client.update_resource_record(user_resource_name, record)
    print('[update user]')
    res.log_print()


def list_users():
    res = client.list_resource_records(user_resource_name, offset=0, size=100)
    print('[list users]')
    print([r.to_dict() for r in res.get_records()])


def delete_user():
    res = client.delete_resource_record(user_resource_name, ['alex'])
    print('[delete user]')
    res.log_print()


if __name__ == '__main__':
    create_user()
    get_user()
    update_user()
    list_users()
    delete_user()

管理用戶組

def create_user_group():
    user_group = {
        'user_group_id': 'devops',
        'user_group_name': 'DevOps Team',
        'enabled': True,
        'members': ['alex']
    }
    record = ResourceRecord(user_group['user_group_id'], user_group['user_group_name'], user_group)
    res = client.create_resource_record('sls.common.user_group', record)
    print('[create user group]')
    res.log_print()

管理Webhook集成

def create_webhook_integration():
    webhooks = [{
        'id': 'dingtalk',
        'name': 'Dingtalk Webhook',
        'type': 'dingtalk',
        'url': 'https://oapi.dingtalk.com/robot/send?access_token=**********',
        'method': 'POST',
        # 釘釘加簽密鑰。如果您在釘釘側設置安全驗證方式為簽名校驗,則需配置該字段。您可以在釘釘機器人管理界面獲取加簽密鑰。
        # 'secret': 'SEC**********',
        'headers': []
    }, {
        'id': 'wechat',
        'name': 'Wechat Webhook',
        'type': 'wechat',
        'url': 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=**********',
        'method': 'POST',
        'headers': []
    }, {
        'id': 'feishu',
        'name': 'Feishu Webhook',
        'type': 'lark',
        'url': 'https://open.feishu.cn/open-apis/bot/v2/hook/**********',
        'method': 'POST',
        # 飛書加簽密鑰。如果您在飛書側設置安全驗證方式為簽名校驗,則需配置該字段。您可以在飛書機器人管理界面獲取加簽密鑰。
        # 'secret': '**********',
        'headers': []
    }, {
        'id': 'slack',
        'name': 'Slack Webhook',
        'type': 'slack',
        'url': 'https://hooks.slack.com/services/**********',
        'method': 'POST',
        'headers': []
    }, {
        'id': 'webhook',
        'name': 'Common Webhook',
        'type': 'custom',
        'url': 'https://example.com/***********',
        'method': 'POST',
        'headers': [{
            'key': 'Authorization',
            'value': 'Basic YWRtaW46Zm9vYmFy'
        }]
    }]

    for webhook in webhooks:
        record = ResourceRecord(webhook['id'], webhook['name'], webhook)
        res = client.create_resource_record('sls.alert.action_webhook', record)
        print('[create webhook integration] ' + webhook['id'])
        res.log_print()

管理行動策略

def create_action_policy():
    action_policy = {
        'action_policy_id': 'test-action-policy',
        'action_policy_name': 'Test Action Policy',
        'primary_policy_script': 'fire(type="sms", users=["alex"], groups=[], oncall_groups=[], receiver_type="static", external_url="", external_headers={}, template_id="sls.builtin.cn", period="any")',
        'secondary_policy_script': 'fire(type="voice", users=["alex"], groups=[], oncall_groups=[], receiver_type="static", external_url="", external_headers={}, template_id="sls.builtin.cn", period="any")',
        'escalation_start_enabled': False,
        'escalation_start_timeout': '10m',
        'escalation_inprogress_enabled': False,
        'escalation_inprogress_timeout': '30m',
        'escalation_enabled': True,
        'escalation_timeout': '1h'
    }
    record = ResourceRecord(
        action_policy['action_policy_id'], action_policy['action_policy_name'], action_policy)
    res = client.create_resource_record('sls.alert.action_policy', record)
    print('[create action policy]')
    res.log_print()

管理告警策略

def create_alert_policy():
    alert_policy = {
        'policy_id': 'test-alert-policy',
        'policy_name': 'Test Alert Policy',
        'parent_id': '',
        'group_script': 'fire(action_policy="test-action-policy", group={"alert.alert_id": alert.alert_id}, group_by_all_labels=true, group_wait="15s", group_interval="5m", repeat_interval="1h")',
        'inhibit_script': '',
        'silence_script': ''
    }
    record = ResourceRecord(alert_policy['policy_id'], alert_policy['policy_name'], alert_policy)
    res = client.create_resource_record('sls.alert.alert_policy', record)
    print('[create alert policy]')
    res.log_print()

管理內容模板

def create_content_template():
    template = {
        'template_id': 'test-template',
        'template_name': 'Test Template',
        'templates': {
            'sms': {
                'locale': 'zh-CN',
                'content': ''
            },
            'voice': {
                'locale': 'zh-CN',
                'content': ''
            },
            'email': {
                'locale': 'zh-CN',
                'subject': 'SLS Alert',
                'content': ''
            },
            'message_center': {
                'locale': 'zh-CN',
                'content': ''
            },
            'dingtalk': {
                'locale': 'zh-CN',
                'title': 'SLS Alert',
                'content': ''
            },
            'wechat': {
                'locale': 'zh-CN',
                'title': 'SLS Alert',
                'content': ''
            },
            'lark': {
                'locale': 'zh-CN',
                'title': 'SLS Alert',
                'content': ''
            },
            'slack': {
                'locale': 'zh-CN',
                'title': 'SLS Alert',
                'content': ''
            },
            'webhook': {
                'locale': 'zh-CN',
                'send_type': 'batch',
                'limit': 0,
                'content': ''
            },
            'fc': {
                'locale': 'zh-CN',
                'limit': 0,
                'send_type': 'batch',
                'content': ''
            },
            'event_bridge': {
                'locale': 'zh-CN',
                'subject': 'SLS Alert',
                'content': ''
            },
        }
    }
    record = ResourceRecord(template['template_id'], template['template_name'], template)
    res = client.create_resource_record('sls.alert.content_template', record)
    print('[create content template]')
    res.log_print()