日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

SMTP多線程發信示例之Python

重要

風險提示:"用于生產環境之前請務必先做好測試。"

Python中的多線程是一種并發編程模型,它允許多個線程(即輕量級進程)在一個程序中同時執行。多線程可以提高程序的響應性和效率,特別是在處理I/O操作、網絡請求或者其他耗時操作時。

Python中的多線程庫

Python提供了幾種多線程的支持庫:

1. threading模塊:這是Python中最常用的多線程庫,提供了創建和管理線程的基本功能。

2. concurrent.futures模塊:提供了更高層次的抽象,使得編寫并發代碼更簡單。

使用threading模塊創建線程

下面是一個簡單的示例,展示如何使用threading模塊創建和管理線程:

import threading
import time


def worker(num):
    """線程函數"""
    print(f'Thread {num}: starting')
    time.sleep(2)
    print(f'Thread {num}: finishing')


def main():
    threads = []
    for i in range(5):
        t = threading.Thread(target=worker, args=(i,))
        threads.append(t)
        t.start()

    # 等待所有線程完成
    for t in threads:
        t.join()


if __name__ == '__main__':
    main()

使用concurrent.futures簡化多線程編程

concurrent.futures模塊提供了一個更高級別的接口來執行異步任務:

from concurrent.futures import ThreadPoolExecutor
import time


def worker(num):
    print(f'Thread {num}: starting')
    time.sleep(2)
    print(f'Thread {num}: finishing')
    return num


def main():
    with ThreadPoolExecutor(max_workers=5) as executor:
        futures = [executor.submit(worker, i) for i in range(5)]
        for future in futures:
            print(f'Result: {future.result()}')


if __name__ == '__main__':
    main()

發送郵件

單線程發送示例

SMTP 之 Python3.6 及以上調用示例

多線程發送示例

下述代碼在Python 3.11.9進行的測試。

服務器地址請根據控制臺配置的對應區域選擇,請參考SMTP 服務地址

# -*- coding: utf-8 -*-
import queue
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.header import Header
from email.utils import make_msgid, formatdate


class SMTPConnectionPool:
    def __init__(self, max_connections, smtp_server, smtp_port, username, password):
        self.pool = queue.Queue(max_connections)
        self.smtp_server = smtp_server
        self.smtp_port = smtp_port
        self.username = username
        self.password = password

        # 初始化連接池
        for _ in range(max_connections):
            connection = self.create_connection()
            self.pool.put(connection)

    def create_connection(self):
        conn = smtplib.SMTP_SSL(smtp_server, smtp_port)
        conn.login(username, password)
        return conn

    def get_connection(self):
        return self.pool.get()

    def return_connection(self, conn):
        self.pool.put(conn)

    def close_all_connections(self):
        while not self.pool.empty():
            conn = self.pool.get()
            conn.quit()


def send_email(pool, subject, body, to_email, retry_enabled, retry_count=0):
    """
    發送郵件函數,支持重試機制。

    參數:
    subject (str): 郵件主題。
    body (str): 郵件正文。
    to_email (list): 收件人郵箱列表。
    retry_enabled (bool): 是否啟用重試機制。
    retry_count (int): 當前重試次數,默認為0。

    返回:
    bool:郵件發送成功或不再重試時返回True,需要重試時返回False。
    """
    conn = pool.get_connection()
    # 獲取當前線程名稱
    thread_name = threading.current_thread().name
    # 記錄開始時間
    threading_start_time = time.time()
    v_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
    # 根據重試次數打印相應信息
    if retry_count == 0:
        print(f'{thread_name}================================={v_time} 首次發送!\n')
    else:
        print(f"{thread_name}================================={v_time} 開始第{retry_count}次重試!")

    # 發送郵件邏輯
    try:
        # 構建郵件內容和發送郵件的邏輯
        msg = MIMEMultipart('alternative')
        msg['Subject'] = Header(subject)
        msg['From'] = username
        msg['To'] = ",".join(to_email)
        msg['Message-id'] = make_msgid()
        msg['Date'] = formatdate()

        text_html = MIMEText(body, _subtype='html', _charset='UTF-8')
        msg.attach(text_html)

        # 開啟DEBUG模式
        conn.set_debuglevel(1)
        conn.sendmail(username, to_email, msg.as_string())
        # client.quit()
        print(f'{thread_name}--發送成功!\n')
    except Exception as e:
        # 處理發送郵件時的異常
        print(f'{thread_name}--------------------------------Exception\n')
        print(f"{thread_name}--Exception SMTP server reply: {e}\n")

        # 判斷是否需要重試的邏輯
        if retry_enabled is False:
            print(f"{thread_name}--不符合重試條件(重試開關關閉)!\n")
            return True
        if retry_count > retry_limit:
            print(f"{thread_name}--不符合重試條件(重試次數超過上限)!\n")
            return True

        for v_err in ruler_not_retry:
            if v_err in str(e):
                print(f"{thread_name}--不符合重試條件(包含不可重試類型)!\n")
                return True
        print(f"{thread_name}--加入重試隊列,等待下次重試!\n")
        return False

    finally:
        # 記錄結束時間并計算耗時
        threading_end_time = time.time()
        threading_elapsed_time = round(threading_end_time - threading_start_time, 2)
        print(f"Thread {thread_name} has finished sending email to {to_email},{threading_elapsed_time} seconds\n")
        print(f'{thread_name}--------------------------------end \n')
        # 將連接放回連接池
        pool.return_connection(conn)

    return True


def send_mail_with_fixed_rate(total_emails, to_email_list, subject, body, qps, max_workers, retry_enabled):
    """
    均勻發送郵件函數。

    根據總郵件數、收件人列表、主題、正文、每秒查詢率(QPS)、最大工作線程數和是否啟用重試來發送郵件。
    該函數通過控制郵件發送的間隔和并行處理來實現郵件的均勻發送。

    參數:
    total_emails (int): 總郵件數。
    to_email_list (list): 收件人郵箱列表。
    subject (str): 郵件主題。
    body (str): 郵件正文。
    qps (float): 每秒查詢率,用于控制發送頻率。
    max_workers (int): 最大工作線程數。
    retry_enabled (bool): 是否啟用重試機制。
    """
    # 計算郵件發送間隔,使發信更均勻
    if total_emails == 0:
        print("No emails to send.")
        return
    # 記錄開始時間
    start_time = time.time()
    interval = round(1 / qps, 2)
    retry_queue = []  # 重試隊列

    print(f'max_workers:{max_workers}')
    print(f'retry_enabled:{retry_enabled}')
    print(f'total_emails:{total_emails}')
    print(f'qps:{qps}')
    pool = SMTPConnectionPool(max_connections=5, smtp_server=smtp_server, smtp_port=465,
                              username=username, password=password)
    # 使用線程池來并行處理郵件發送任務
    with ThreadPoolExecutor(max_workers=max_workers) as executor:  # 可以根據需要調整最大線程數
        futures = []
        batch_map = {}  # 用于存儲 Future 和對應的批次

        # 計算總批次數
        num_batches = (total_emails + email_batch_size - 1) // email_batch_size
        print(f'num_batches:{num_batches}')
        print(f'email_batch_size:{email_batch_size}')

        for batch_idx in range(num_batches):
            start_index = batch_idx * email_batch_size
            end_index = min(start_index + email_batch_size, total_emails)
            current_batch = to_email_list[start_index:end_index]

            if current_batch:
                future = executor.submit(send_email, pool, subject, body, current_batch, retry_enabled, 0)
                futures.append(future)  # 儲存未來對象
                batch_map[future] = current_batch  # 儲存未來對象和對應的批次

                # 控制發送頻率
                time.sleep(interval)
                # print(f'等待 {interval} 秒')

        # 等待所有任務完成
        for future in as_completed(futures):
            batch = batch_map[future]
            if not future.result():  # 如果發送時返回False
                retry_queue.append((batch, 1))  # 添加到重試隊列,并記錄重試次數

        # 重試邏輯
        while retry_queue and retry_enabled:
            new_retry_queue = []
            for batch, retry_count in retry_queue:
                future = executor.submit(send_email, pool, subject, body, batch, retry_enabled, retry_count)
                if not future.result():
                    new_retry_count = retry_count + 1
                    if new_retry_count < (retry_limit + 1):
                        new_retry_queue.append((batch, new_retry_count))
                    else:
                        print(f"Maximum retry attempts have been reached, give up retry: {batch}")
            print(f"Retry queue after this round: {new_retry_queue}")
            retry_queue = new_retry_queue

    # 記錄結束時間
    end_time = time.time()
    # 計算耗時
    elapsed_time = round(end_time - start_time, 2)
    print(f"Total time taken: {elapsed_time} seconds")


retry_enabled = False  # True/False, True開啟重試, 大多數情況不建議重試,重試是整組重試(如共120人,分2次請求,單次請求60人,重試會按60人重試)。
retry_limit = 2  # 最多重試次數,開啟重試后生效
# 截取一些關鍵特征,含有該特征的不進行重試
ruler_not_retry = ["too frequency", 'getaddrinfo failed', 'Authentication failure', 'Invalid rcptto']

# 示例參數
qps = 3  # 自定義的QPS,1表示每秒請求1次,不要超過27,約5000次/180秒,預熱發信不宜太高,避免收信方拒絕
max_workers = 200  # 最大線程數,用于保護服務器,避免過度占用系統資源
email_batch_size = 1  # 每次最多處理的收件人數量60,設置多人會都顯示在郵件上

smtp_server = 'smtpdm.aliyun.com'
username = 'test@t1.example.com'  # username,通過控制臺創建的發信地址
password = 'xxxxxxxxx'  # password,通過控制臺創建的SMTP密碼
smtp_port = 465
subject = '自定義主題'
body = '<h1>自定義內容</h1>'
pool = SMTPConnectionPool(max_connections=5, smtp_server=smtp_server, smtp_port=smtp_port,
                          username=username, password=password)
# 模擬數據
total_emails = 10  # 需要發送的總郵件數,真實發信需要替換為真實發件人數量,如len(to_email_list)
to_email_list = [f'recipient{i}@example.com' for i in range(0, total_emails)]  # 生成1000個示例收件人,真實發信需要傳入真實收件人列表

# 真實發信
# to_email_list = ['a@example.com', 'b@example.com', 'c@example.com']  # 傳入真實收件人列表
# total_emails = len(to_email_list)  # 需要發送的總郵件數,真實發信需要替換為真實發件人數量,如len(to_email_list)

# 調用發送
send_mail_with_fixed_rate(total_emails, to_email_list, subject, body, qps, max_workers, retry_enabled)

# 關閉所有連接
pool.close_all_connections()

send_email 函數

功能:發送郵件,并支持重試機制。

參數

pool:SMTP連接池。

subject:郵件主題。

body:郵件正文。

to_email:收件人郵箱列表。

retry_enabled:是否啟用重試機制。

retry_count:當前重試次數,默認為0。

返回值:郵件發送成功或不再重試時返回True,需要重試時返回False。

send_mail_with_fixed_rate 函數

功能:均勻發送郵件,根據總郵件數、收件人列表、主題、正文、每秒查詢率(QPS)、最大工作線程數和是否啟用重試來發送郵件。

參數

total_emails:總郵件數。

to_email_list:收件人郵箱列表。

subject:郵件主題。

body:郵件正文。

qps:每秒查詢率,用于控制發送頻率。

max_workers:最大工作線程數。

retry_enabled:是否啟用重試機制。

image

最佳實踐

retry_enabled重試機制默認為False關閉,按需設置QPS默認為 3,建議不要超過27,約5000次/180秒,預熱發信QPS不宜太高,避免收信方拒絕,email_batch_size單次請求人數默認為1,設置多人會都顯示在同一封郵件上。

代碼綜述

該段代碼主要用于批量發送電子郵件,并實現了郵件發送的重試機制。主要功能包括:

郵件發送:通過 send_email 函數實現郵件的構建與發送,支持重試機制。

郵件均勻發送:通過 send_mail_with_fixed_rate 函數實現郵件的均勻發送,控制發送頻率和并發度,避免對郵件服務器造成過大壓力。

重試機制:當郵件發送失敗時,可以根據配置決定是否進行重試,以及重試的最大次數。

三種不重試的情況:

  • 不符合重試條件(重試開關關閉)!

  • 不符合重試條件(重試次數超過上限)!

  • 不符合重試條件(包含不可重試類型)!

日志定位:線程名稱{thread_name}收件人地址{to_email}用于匹配單次請求的日志。

注意事項

SMTP服務器配置:確保服務器地址,賬號,密碼正確無誤。

收件人列表格式:to_email_list 應為一個列表,每個元素為一個收件人的郵箱地址。如果直接使用逗號分隔的字符串,可能會導致郵件發送失敗。

并發控制:max_workers 參數控制了并發線程的數量,應根據服務器性能和SMTP服務器限制合理設置。

發送頻率控制:QPS參數用于控制每秒發送的郵件數量,過高的值可能導致SMTP服務器拒絕服務。

重試機制:retry_enabled 和 retry_limit 參數決定了是否啟用重試及最大重試次數,需謹慎設置,避免不必要的重復發送。

常見問題

郵件發送失敗:

檢查SMTP服務器配置是否正確。

確認收件人列表格式是否正確。

檢查網絡連接是否正常。

查看SMTP服務器的日志,確認是否有具體的錯誤信息。

郵件發送速度過慢:

調整QPS參數,適當增加每秒發送的郵件數量。

檢查 max_workers 參數,確保并發度足夠高。

重試機制失效:

確認 retry_enabled 參數是否為 True。

檢查 retry_limit 參數是否設置合理。

確認 ruler_not_retry 列表中的錯誤信息是否匹配實際的異常情況。

日志輸出過多:

調整日志輸出級別,減少不必要的調試信息。

可以將日志輸出到文件,而不是控制臺,以便于查看和管理。

代碼優化建議:

日志記錄:可以使用 Python 的 logging 模塊替代 print 語句,以便更好地管理和控制日志輸出。

配置管理:將配置參數(如 smtp_server, username, password 等)提取到配置文件中,便于管理和維護。

異常處理:增加更多的異常捕獲和處理邏輯,提高程序的健壯性和容錯能力。

性能優化:考慮使用異步編程模型(如 asyncio)進一步提升郵件發送的效率。