風險提示:"用于生產環境之前請務必先做好測試。"
Python中的多線程是一種并發編程模型,它允許多個線程(即輕量級進程)在一個程序中同時執行。多線程可以提高程序的響應性和效率,特別是在處理I/O操作、網絡請求或者其他耗時操作時。
Python中的多線程庫
Python提供了幾種多線程的支持庫:
1. threading模塊:這是Python中最常用的多線程庫,提供了創建和管理線程的基本功能。
2. concurrent.futures模塊:提供了更高層次的抽象,使得編寫并發代碼更簡單。
使用threading模塊創建線程
下面是一個簡單的示例,展示如何使用threading模塊創建和管理線程:
import threading
import time
def worker(num):
"""線程函數"""
print(f'Thread {num}: starting')
time.sleep(2)
print(f'Thread {num}: finishing')
def main():
threads = []
for i in range(5):
t = threading.Thread(target=worker, args=(i,))
threads.append(t)
t.start()
# 等待所有線程完成
for t in threads:
t.join()
if __name__ == '__main__':
main()
使用concurrent.futures簡化多線程編程
concurrent.futures模塊提供了一個更高級別的接口來執行異步任務:
from concurrent.futures import ThreadPoolExecutor
import time
def worker(num):
print(f'Thread {num}: starting')
time.sleep(2)
print(f'Thread {num}: finishing')
return num
def main():
with ThreadPoolExecutor(max_workers=5) as executor:
futures = [executor.submit(worker, i) for i in range(5)]
for future in futures:
print(f'Result: {future.result()}')
if __name__ == '__main__':
main()
發送郵件
單線程發送示例
多線程發送示例
下述代碼在Python 3.11.9進行的測試。
服務器地址請根據控制臺配置的對應區域選擇,請參考SMTP 服務地址。
# -*- coding: utf-8 -*-
import queue
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.header import Header
from email.utils import make_msgid, formatdate
class SMTPConnectionPool:
def __init__(self, max_connections, smtp_server, smtp_port, username, password):
self.pool = queue.Queue(max_connections)
self.smtp_server = smtp_server
self.smtp_port = smtp_port
self.username = username
self.password = password
# 初始化連接池
for _ in range(max_connections):
connection = self.create_connection()
self.pool.put(connection)
def create_connection(self):
conn = smtplib.SMTP_SSL(smtp_server, smtp_port)
conn.login(username, password)
return conn
def get_connection(self):
return self.pool.get()
def return_connection(self, conn):
self.pool.put(conn)
def close_all_connections(self):
while not self.pool.empty():
conn = self.pool.get()
conn.quit()
def send_email(pool, subject, body, to_email, retry_enabled, retry_count=0):
"""
發送郵件函數,支持重試機制。
參數:
subject (str): 郵件主題。
body (str): 郵件正文。
to_email (list): 收件人郵箱列表。
retry_enabled (bool): 是否啟用重試機制。
retry_count (int): 當前重試次數,默認為0。
返回:
bool:郵件發送成功或不再重試時返回True,需要重試時返回False。
"""
conn = pool.get_connection()
# 獲取當前線程名稱
thread_name = threading.current_thread().name
# 記錄開始時間
threading_start_time = time.time()
v_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
# 根據重試次數打印相應信息
if retry_count == 0:
print(f'{thread_name}================================={v_time} 首次發送!\n')
else:
print(f"{thread_name}================================={v_time} 開始第{retry_count}次重試!")
# 發送郵件邏輯
try:
# 構建郵件內容和發送郵件的邏輯
msg = MIMEMultipart('alternative')
msg['Subject'] = Header(subject)
msg['From'] = username
msg['To'] = ",".join(to_email)
msg['Message-id'] = make_msgid()
msg['Date'] = formatdate()
text_html = MIMEText(body, _subtype='html', _charset='UTF-8')
msg.attach(text_html)
# 開啟DEBUG模式
conn.set_debuglevel(1)
conn.sendmail(username, to_email, msg.as_string())
# client.quit()
print(f'{thread_name}--發送成功!\n')
except Exception as e:
# 處理發送郵件時的異常
print(f'{thread_name}--------------------------------Exception\n')
print(f"{thread_name}--Exception SMTP server reply: {e}\n")
# 判斷是否需要重試的邏輯
if retry_enabled is False:
print(f"{thread_name}--不符合重試條件(重試開關關閉)!\n")
return True
if retry_count > retry_limit:
print(f"{thread_name}--不符合重試條件(重試次數超過上限)!\n")
return True
for v_err in ruler_not_retry:
if v_err in str(e):
print(f"{thread_name}--不符合重試條件(包含不可重試類型)!\n")
return True
print(f"{thread_name}--加入重試隊列,等待下次重試!\n")
return False
finally:
# 記錄結束時間并計算耗時
threading_end_time = time.time()
threading_elapsed_time = round(threading_end_time - threading_start_time, 2)
print(f"Thread {thread_name} has finished sending email to {to_email},{threading_elapsed_time} seconds\n")
print(f'{thread_name}--------------------------------end \n')
# 將連接放回連接池
pool.return_connection(conn)
return True
def send_mail_with_fixed_rate(total_emails, to_email_list, subject, body, qps, max_workers, retry_enabled):
"""
均勻發送郵件函數。
根據總郵件數、收件人列表、主題、正文、每秒查詢率(QPS)、最大工作線程數和是否啟用重試來發送郵件。
該函數通過控制郵件發送的間隔和并行處理來實現郵件的均勻發送。
參數:
total_emails (int): 總郵件數。
to_email_list (list): 收件人郵箱列表。
subject (str): 郵件主題。
body (str): 郵件正文。
qps (float): 每秒查詢率,用于控制發送頻率。
max_workers (int): 最大工作線程數。
retry_enabled (bool): 是否啟用重試機制。
"""
# 計算郵件發送間隔,使發信更均勻
if total_emails == 0:
print("No emails to send.")
return
# 記錄開始時間
start_time = time.time()
interval = round(1 / qps, 2)
retry_queue = [] # 重試隊列
print(f'max_workers:{max_workers}')
print(f'retry_enabled:{retry_enabled}')
print(f'total_emails:{total_emails}')
print(f'qps:{qps}')
pool = SMTPConnectionPool(max_connections=5, smtp_server=smtp_server, smtp_port=465,
username=username, password=password)
# 使用線程池來并行處理郵件發送任務
with ThreadPoolExecutor(max_workers=max_workers) as executor: # 可以根據需要調整最大線程數
futures = []
batch_map = {} # 用于存儲 Future 和對應的批次
# 計算總批次數
num_batches = (total_emails + email_batch_size - 1) // email_batch_size
print(f'num_batches:{num_batches}')
print(f'email_batch_size:{email_batch_size}')
for batch_idx in range(num_batches):
start_index = batch_idx * email_batch_size
end_index = min(start_index + email_batch_size, total_emails)
current_batch = to_email_list[start_index:end_index]
if current_batch:
future = executor.submit(send_email, pool, subject, body, current_batch, retry_enabled, 0)
futures.append(future) # 儲存未來對象
batch_map[future] = current_batch # 儲存未來對象和對應的批次
# 控制發送頻率
time.sleep(interval)
# print(f'等待 {interval} 秒')
# 等待所有任務完成
for future in as_completed(futures):
batch = batch_map[future]
if not future.result(): # 如果發送時返回False
retry_queue.append((batch, 1)) # 添加到重試隊列,并記錄重試次數
# 重試邏輯
while retry_queue and retry_enabled:
new_retry_queue = []
for batch, retry_count in retry_queue:
future = executor.submit(send_email, pool, subject, body, batch, retry_enabled, retry_count)
if not future.result():
new_retry_count = retry_count + 1
if new_retry_count < (retry_limit + 1):
new_retry_queue.append((batch, new_retry_count))
else:
print(f"Maximum retry attempts have been reached, give up retry: {batch}")
print(f"Retry queue after this round: {new_retry_queue}")
retry_queue = new_retry_queue
# 記錄結束時間
end_time = time.time()
# 計算耗時
elapsed_time = round(end_time - start_time, 2)
print(f"Total time taken: {elapsed_time} seconds")
retry_enabled = False # True/False, True開啟重試, 大多數情況不建議重試,重試是整組重試(如共120人,分2次請求,單次請求60人,重試會按60人重試)。
retry_limit = 2 # 最多重試次數,開啟重試后生效
# 截取一些關鍵特征,含有該特征的不進行重試
ruler_not_retry = ["too frequency", 'getaddrinfo failed', 'Authentication failure', 'Invalid rcptto']
# 示例參數
qps = 3 # 自定義的QPS,1表示每秒請求1次,不要超過27,約5000次/180秒,預熱發信不宜太高,避免收信方拒絕
max_workers = 200 # 最大線程數,用于保護服務器,避免過度占用系統資源
email_batch_size = 1 # 每次最多處理的收件人數量60,設置多人會都顯示在郵件上
smtp_server = 'smtpdm.aliyun.com'
username = 'test@t1.example.com' # username,通過控制臺創建的發信地址
password = 'xxxxxxxxx' # password,通過控制臺創建的SMTP密碼
smtp_port = 465
subject = '自定義主題'
body = '<h1>自定義內容</h1>'
pool = SMTPConnectionPool(max_connections=5, smtp_server=smtp_server, smtp_port=smtp_port,
username=username, password=password)
# 模擬數據
total_emails = 10 # 需要發送的總郵件數,真實發信需要替換為真實發件人數量,如len(to_email_list)
to_email_list = [f'recipient{i}@example.com' for i in range(0, total_emails)] # 生成1000個示例收件人,真實發信需要傳入真實收件人列表
# 真實發信
# to_email_list = ['a@example.com', 'b@example.com', 'c@example.com'] # 傳入真實收件人列表
# total_emails = len(to_email_list) # 需要發送的總郵件數,真實發信需要替換為真實發件人數量,如len(to_email_list)
# 調用發送
send_mail_with_fixed_rate(total_emails, to_email_list, subject, body, qps, max_workers, retry_enabled)
# 關閉所有連接
pool.close_all_connections()
send_email 函數
功能:發送郵件,并支持重試機制。
參數:
pool:SMTP連接池。
subject:郵件主題。
body:郵件正文。
to_email:收件人郵箱列表。
retry_enabled:是否啟用重試機制。
retry_count:當前重試次數,默認為0。
返回值:郵件發送成功或不再重試時返回True,需要重試時返回False。
send_mail_with_fixed_rate 函數
功能:均勻發送郵件,根據總郵件數、收件人列表、主題、正文、每秒查詢率(QPS)、最大工作線程數和是否啟用重試來發送郵件。
參數:
total_emails:總郵件數。
to_email_list:收件人郵箱列表。
subject:郵件主題。
body:郵件正文。
qps:每秒查詢率,用于控制發送頻率。
max_workers:最大工作線程數。
retry_enabled:是否啟用重試機制。
最佳實踐
retry_enabled重試機制默認為False關閉,按需設置QPS默認為 3,建議不要超過27,約5000次/180秒,預熱發信QPS不宜太高,避免收信方拒絕,email_batch_size單次請求人數默認為1,設置多人會都顯示在同一封郵件上。
代碼綜述
該段代碼主要用于批量發送電子郵件,并實現了郵件發送的重試機制。主要功能包括:
郵件發送:通過 send_email 函數實現郵件的構建與發送,支持重試機制。
郵件均勻發送:通過 send_mail_with_fixed_rate 函數實現郵件的均勻發送,控制發送頻率和并發度,避免對郵件服務器造成過大壓力。
重試機制:當郵件發送失敗時,可以根據配置決定是否進行重試,以及重試的最大次數。
三種不重試的情況:
不符合重試條件(重試開關關閉)!
不符合重試條件(重試次數超過上限)!
不符合重試條件(包含不可重試類型)!
日志定位:線程名稱{thread_name}收件人地址{to_email}用于匹配單次請求的日志。
注意事項
SMTP服務器配置:確保服務器地址,賬號,密碼正確無誤。
收件人列表格式:to_email_list 應為一個列表,每個元素為一個收件人的郵箱地址。如果直接使用逗號分隔的字符串,可能會導致郵件發送失敗。
并發控制:max_workers 參數控制了并發線程的數量,應根據服務器性能和SMTP服務器限制合理設置。
發送頻率控制:QPS參數用于控制每秒發送的郵件數量,過高的值可能導致SMTP服務器拒絕服務。
重試機制:retry_enabled 和 retry_limit 參數決定了是否啟用重試及最大重試次數,需謹慎設置,避免不必要的重復發送。
常見問題
郵件發送失敗:
檢查SMTP服務器配置是否正確。
確認收件人列表格式是否正確。
檢查網絡連接是否正常。
查看SMTP服務器的日志,確認是否有具體的錯誤信息。
郵件發送速度過慢:
調整QPS參數,適當增加每秒發送的郵件數量。
檢查 max_workers 參數,確保并發度足夠高。
重試機制失效:
確認 retry_enabled 參數是否為 True。
檢查 retry_limit 參數是否設置合理。
確認 ruler_not_retry 列表中的錯誤信息是否匹配實際的異常情況。
日志輸出過多:
調整日志輸出級別,減少不必要的調試信息。
可以將日志輸出到文件,而不是控制臺,以便于查看和管理。
代碼優化建議:
日志記錄:可以使用 Python 的 logging 模塊替代 print 語句,以便更好地管理和控制日志輸出。
配置管理:將配置參數(如 smtp_server, username, password 等)提取到配置文件中,便于管理和維護。
異常處理:增加更多的異常捕獲和處理邏輯,提高程序的健壯性和容錯能力。
性能優化:考慮使用異步編程模型(如 asyncio)進一步提升郵件發送的效率。