本文介紹如何通過編寫代碼的方式,離線導(dǎo)入大數(shù)據(jù)量到PolarDB-X 1.0數(shù)據(jù)庫。

背景信息

假設(shè)當(dāng)前數(shù)據(jù)庫有一個表需要導(dǎo)入到PolarDB-X 1.0數(shù)據(jù)庫中,數(shù)據(jù)量大致為814萬,表結(jié)構(gòu)如下。

CREATE TABLE `post` (
  `postingType` int NOT NULL,
  `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
  `acceptedAnswer` bigint(20) DEFAULT NULL,
  `parentId` bigint(20) DEFAULT NULL,
  `score` int DEFAULT NULL
  `tags` varchar(128) DEFAULT NULL,
  PRIMARY KEY (`id`)
);
            

數(shù)據(jù)庫之間大數(shù)據(jù)量的遷移,建議把原始數(shù)據(jù)導(dǎo)出成一個文本文件,然后通過程序或者命令的方式導(dǎo)入到目標(biāo)數(shù)據(jù)庫。

對于上一節(jié)的 post 表,可以通過 SELECT INTO語法將數(shù)據(jù)從MySQL導(dǎo)出到一個名為stackoverflow.csv的文件中。在MySQL客戶端執(zhí)行以下命令:

SELECT postingType,id,acceptedAnswer,parentId,score,tags 
INTO OUTFILE '/tmp/stackoverflow.csv' 
FIELDS TERMINATED BY ',' 
OPTIONALLY ENCLOSED BY '"'
LINES TERMINATED BY '\n'
FROM test_table;
            

PolarDB-X 1.0數(shù)據(jù)庫上建表

由于導(dǎo)出的數(shù)據(jù)文件不包括表結(jié)構(gòu),所以需要手工在PolarDB-X 1.0目標(biāo)數(shù)據(jù)庫上建立表,并且根據(jù)實際情況設(shè)置拆分鍵。

例如以下是按照 idpost 表進(jìn)行分庫。

CREATE TABLE `post` (
  `postingType` int NOT NULL,
  `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
  `acceptedAnswer` bigint(20) DEFAULT NULL,
  `parentId` bigint(20) DEFAULT NULL,
  `score` int DEFAULT NULL,
  `tags` varchar(128) DEFAULT NULL,
  PRIMARY KEY (`id`)
) DBPARTITION BY hash(id) ENGINE=InnoDB DEFAULT CHARSET=utf8;
            

導(dǎo)入數(shù)據(jù)到PolarDB-X 1.0數(shù)據(jù)庫

導(dǎo)出數(shù)據(jù)文件以后,可以通過代碼的方式讀取文件內(nèi)容,然后導(dǎo)入到PolarDB-X 1.0數(shù)據(jù)庫中。為了提高效率,建議通過批量插入的方式。

以下是用 Java 寫的一個 Demo。

測試場景:插入8143801條數(shù)據(jù),耗時916秒,TPS 在9000左右。

測試客戶端配置:i5、8G、SSD。

測試PolarDB-X 1.0配置:4C4G。

public static void main(String[] args) throws IOException, URISyntaxException, ClassNotFoundException,
        SQLException {

        URL url = Main.class.getClassLoader().getResource("stackoverflow.csv");

        File dataFile = new File(url.toURI());

        String sql = "insert into post(postingType,id,acceptedAnswer,parentId,score,tags) values(?,?,?,?,?,?)";

        int batchSize = 10000;

        try (

            Connection connection = getConnection("XXXXX.drds.aliyuncs.com", 3306, "XXXXX",
                "XXXX",
                "XXXX");
            BufferedReader br = new BufferedReader(new FileReader(dataFile))) {
            String line;
            PreparedStatement st = connection.prepareStatement(sql);
            long startTime = System.currentTimeMillis();
            int batchCount = 0;
            while ((line = br.readLine()) != null) {
                String[] data = line.split(",");
                st.setInt(1, Integer.valueOf(data[0]));
                st.setInt(2, Integer.valueOf(data[1]));

                st.setObject(3, "".equals(data[2]) ? null : Integer.valueOf(data[2]));
                st.setObject(4, "".equals(data[3]) ? null : Integer.valueOf(data[3]));
                st.setObject(5, "".equals(data[4]) ? null : Integer.valueOf(data[4]));
                if (data.length >= 6) {
                    st.setObject(6, data[5]);
                }
                st.addBatch();
                if (++batchCount % batchSize == 0) {
                    st.executeBatch();
                    System.out.println(String.format("insert %d record", batchCount));
                }
            }
            if (batchCount % batchSize != 0) {
                st.executeBatch();
            }
            long cost = System.currentTimeMillis() - startTime;

            System.out.println(String.format("Take %d second,insert %d record, tps %d", cost/1000,batchCount, batchCount/(cost/1000)  ));

        }

    }

    /**
     * 獲取數(shù)據(jù)庫連接
     *
     * @param host     數(shù)據(jù)庫地址
     * @param port     端口
     * @param database 數(shù)據(jù)庫名稱
     * @param username 用戶名
     * @param password 密碼
     * @return
     * @throws ClassNotFoundException
     * @throws SQLException
     */
    private static Connection getConnection(String host, int port, String database, String username, String password)
        throws ClassNotFoundException, SQLException {
        Class.forName("com.mysql.jdbc.Driver");
        String url = String.format(
            "jdbc:mysql://%s:%d/%s?autoReconnect=true&socketTimeout=600000&rewriteBatchedStatements=true", host, port,
            database);
        Connection con = DriverManager.getConnection(url, username, password);
        return con;
    }