Psycopg是Python用于操作PostgreSQL的庫。Hologres兼容PostgreSQL 11,因此您可以通過psycopg訪問Hologres。本文將指導您使用psycopg2訪問Hologres,示例使用的操作環境為基于CentOS 7系統的Python 3.8版本。
安裝Python3.8
您可以基于Miniconda、Anaconda安裝Python 3.8環境。如下內容以CentOS 7系統為例,安裝Python 3.8版本。
安裝Python 3.8。
您可以下載對應版本的Python,執行如下命令進行安裝。
# yum install centos-release-scl # yum install rh-python38 # scl enable rh-python38 bash # python --version Python 3.8.6
安裝psycopg2模塊。
執行如下命令安裝psycopg2模塊。
# pip install psycopg2-binary
連接Hologres
Python3.8環境和psycopg2安裝完成之后,您可以執行如下操作并連接Hologres。
加載psycopg2。
如果需要使用psycopg2,您可以執行命令
import psycopg2
加載安裝的psycopg2。創建數據庫連接。
您可以使用
psycopg2.connect()
函數連接Hologres,具體語法和參數說明如下所示。conn = psycopg2.connect(host="<Endpoint>", port=<Port>, dbname="<databases>", user="<Access ID>", password="<Access Key>", keepalives=<keepalives>, keepalives_idle=<keepalives_idle>, keepalives_interval=<keepalives_interval>, keepalives_count=<keepalives_count>, application_name="<Application Name>" )
參數
描述
host
Hologres實例的網絡地址。
進入Hologres管理控制臺的實例詳情頁,從網絡信息獲取網絡地址。
port
Hologres的實例端口。
您可以進入Hologres管理控制臺的實例詳情頁,從網絡信息獲取端口。
dbname
Hologres創建的數據庫名稱。
user
當前阿里云賬號的AccessKey ID。
您可以單擊AccessKey 管理,獲取AccessKey ID。
password
當前阿里云賬號的AccessKey Secret。
您可以單擊AccessKey 管理,獲取AccessKey Secret。
application_name
可選,應用名,用于記錄查詢日志時識別SQL代表的應用含義。
說明配置該參數,有助于您在慢Query清單中根據Application Name快速定位您的發起請求的應用。
keepalives
可選(推薦使用),連接方式:
1表示使用長連接。
0表示非長連接。
keepalives_idle
空閑時,保持連接連通的時間間隔,單位秒(s)。
keepalives_interval
沒得到回應時,等待重新嘗試保持連通的時間間隔,單位秒(s)。
keepalives_count
嘗試重新保持連通最大次數。
代碼示例如下。
conn = psycopg2.connect(host="<Endpoint>", port=<Port>, dbname="<databases>", user="<Access ID>", password="<Access Key>", keepalives=1, # 保持連接 keepalives_idle=130, # 空閑時,每130秒保持連接連通 keepalives_interval=10, # 沒得到回應時,等待10秒重新嘗試保持連通 keepalives_count=15, # 嘗試最多15次重新保持連通 application_name="<Application Name>" )
使用Hologres
當您成功連接Hologres數據庫之后,即可通過psycopg2進行數據開發操作。如下內容將指導您創建表、插入數據、查詢和釋放資源等操作。如果需要使用Fixed Plan能力實現更高性能的讀寫操作,需要配置相關GUC參數,請參見Fixed Plan加速SQL執行。
創建游標。
在進行數據開發之前,您需要執行命令
cur = conn.cursor()
來創建連接的游標。數據開發。
創建表
您可以執行如下命令,創建一個表
holo_test
并定義表的數據類型為integer。您也可以根據業務需求定義表名稱和數據類型。cur.execute("CREATE TABLE holo_test (num integer);")
插入數據
您可以執行如下命令,為創建的表
holo_test
插入數據1~1000。cur.execute("INSERT INTO holo_test SELECT generate_series(%s, %s)", (1, 1000))
查詢數據
cur.execute("SELECT sum(num) FROM holo_test;") cur.fetchone()
提交事務。
在查詢數據的命令之后,您需要執行命令
conn.commit()
提交事務,此操作可以確保操作已經提交。也可以把autocommit參數設置為true,實現SQL命令的自動提交。釋放資源。
為避免影響后續的操作,當操作執行完成后,您需要執行如下命令關閉游標并斷開數據庫連接。
cur.close() conn.close()
Pandas DataFrame快速寫入Hologres最佳實踐
使用Python時,經常會使用Pandas將數據轉換為DataFrame,并對DataFrame進行處理,最終將DataFrame導入Hologres,此時希望將DataFrame快速導入Hologres。導入時候常用to_sql
函數,詳情請參見Pandas。
需要Pandas為V1.4.2及以上版本,您可以執行如下命令強制安裝V1.5.1版本的Pandas庫。
# pip install Pandas==1.5.1
推薦使用to_sql
函數的callable方式,使用copy方式導入數據,樣例的Python代碼如下。
# 加載依賴
import pandas as pd
import psycopg2
# 生成連接字符串
host="hgpostcn-cn-xxxxxx-cn-hangzhou.hologres.aliyuncs.com"
port=80
dbname="demo"
user="LTAI5xxxxx"
password="fa8Kdgxxxxx"
application_name="Python Test"
conn = "postgresql+psycopg2://{}:{}@{}:{}/{}?application_name={}".format(user, password, host, port, dbname,application_name)
print(conn)
# 生成dataframe
data = [('1','1','1'),('2','2','2')]
cols = ('col1','col2','col3')
pd_data = pd.DataFrame(data, columns=cols)
# 定義callable函數
import csv
from io import StringIO
def psql_insert_copy(table, conn, keys, data_iter):
"""
Execute SQL statement inserting data
Parameters
----------
table : pandas.io.sql.SQLTable
conn : sqlalchemy.engine.Engine or sqlalchemy.engine.Connection
keys : list of str
Column names
data_iter : Iterable that iterates the values to be inserted
"""
# gets a DBAPI connection that can provide a cursor
dbapi_conn = conn.connection
with dbapi_conn.cursor() as cur:
s_buf = StringIO()
writer = csv.writer(s_buf)
writer.writerows(data_iter)
s_buf.seek(0)
columns = ', '.join('"{}"'.format(k) for k in keys)
if table.schema:
table_name = '{}.{}'.format(table.schema, table.name)
else:
table_name = table.name
sql = 'COPY {} ({}) FROM STDIN WITH CSV'.format(
table_name, columns)
cur.copy_expert(sql=sql, file=s_buf)
# 導入數據
pd_data.to_sql(
name="pd_data",
con=conn,
if_exists="append",
index=False,
method=psql_insert_copy
)
查看歷史查詢,驗證已經使用COPY方式寫入數據至Hologres。