日本熟妇hd丰满老熟妇,中文字幕一区二区三区在线不卡 ,亚洲成片在线观看,免费女同在线一区二区

Spark實(shí)時(shí)同步

更新時(shí)間:

本文為您介紹如何通過Spark讀取或?qū)懭霐?shù)據(jù)至Hologres的操作方法。

背景信息

Spark是用于大規(guī)模數(shù)據(jù)處理的統(tǒng)一分析引擎,Hologres已經(jīng)與Spark(社區(qū)版以及EMR Spark版)高效打通,快速助力企業(yè)搭建數(shù)據(jù)倉庫。Hologres提供的Spark Connector,支持Spark以批處理的方式將數(shù)據(jù)寫入Hologres,同時(shí)Spark支持讀取多種數(shù)據(jù)源(例如文件、Hive、MySQL、PostgreSQL等)。

Hologres兼容PostgreSQL,因此Spark也可以用讀取PostgreSQL的方式直接讀取Hologres數(shù)據(jù),進(jìn)行ETL處理,再寫入Hologres及其他數(shù)據(jù)源,完成大數(shù)據(jù)開發(fā)抽取、處理、加載的完整閉環(huán)。

前提條件

  • 實(shí)例版本需為V0.9及以上版本。請?jiān)贖ologres管控臺的實(shí)例詳情頁查看當(dāng)前實(shí)例版本,如實(shí)例是V0.9以下版本,請您使用自助升級或加入Hologres釘釘交流群反饋,詳情請參見如何獲取更多的在線支持?

  • 需要安裝對應(yīng)版本的Spark環(huán)境,能夠運(yùn)行spark-shell命令。

連接數(shù)使用

Hologres Spark Connector在進(jìn)行讀寫時(shí),會(huì)使用一定的JDBC連接數(shù)。可能受到如下因素影響:

  • Spark的并發(fā)特性,在作業(yè)運(yùn)行過程中,可以通過Spark UI觀察到并行執(zhí)行的Task數(shù)量。

  • Connector在操作時(shí),對于固定復(fù)制(fixed copy)方式的寫入,每個(gè)并發(fā)操作都只使用一個(gè)JDBC連接。而采用INSERT方式寫入時(shí),每個(gè)并發(fā)則會(huì)利用write_thread_size數(shù)量的JDBC連接。在進(jìn)行讀取操作時(shí),每個(gè)并發(fā)同樣使用一個(gè)JDBC連接。

  • 其他方面可能使用的連接數(shù):在作業(yè)啟動(dòng)時(shí),可能會(huì)有獲取Schema信息的操作,這可能會(huì)短暫地建立1個(gè)連接。

因此作業(yè)使用的總連接數(shù)可以通過如下公式計(jì)算:

  • fixed copy模式:parallelism*1+1

  • 普通INSERT模式:parallelism*write_thread_size+1

說明

Spark Task并發(fā)可能受到用戶設(shè)置的參數(shù)影響,也可能受到Hadoop對文件分塊策略的影響。

通過Spark Connector寫入(推薦使用)

Hologres當(dāng)前支持使用內(nèi)置的Spark Connector將Spark數(shù)據(jù)寫入Hologres,相比其他寫入方式,調(diào)用基于Holo Client實(shí)現(xiàn)Connector寫入的方式性能更優(yōu)。具體操作步驟如下,阿里云也為您提供了相關(guān)的使用示例,詳情請參見通過Spark Connector寫入使用示例

準(zhǔn)備工作

  1. 獲取JAR包。

    Spark2和Spark3上均已支持Connector寫入,Spark寫入Hologres時(shí)需要引用connector的JAR包,當(dāng)前已經(jīng)發(fā)布到Maven中央倉庫,在項(xiàng)目中參照如下pom文件進(jìn)行配置。

    說明

    相關(guān)Connector也已開源,詳情請參見hologres-connectors

    <dependency>
        <groupId>com.alibaba.hologres</groupId>
        <artifactId>hologres-connector-spark-3.x</artifactId>
        <version>1.4.0</version>
        <classifier>jar-with-dependencies</classifier>
    </dependency>

    當(dāng)前Hologres已自動(dòng)生成JAR文件,下載鏈接如下。

  2. 使用JAR包。

    啟動(dòng)Spark時(shí)執(zhí)行以下命令。

    spark-shell --jars hologres-connector-spark-3.x-1.4.0-SNAPSHOT-jar-with-dependencies.jar

    或者使用pyspark:

    pyspark --jars hologres-connector-spark-3.x-1.4.0-SNAPSHOT-jar-with-dependencies.jar

通過Spark Connector寫入使用示例

根據(jù)如下示例步驟為您介紹,如何通過Spark Connector將數(shù)據(jù)寫入Hologres。

  1. 創(chuàng)建Hologres表。

    在Hologres中執(zhí)行如下SQL命令創(chuàng)建目標(biāo)表,用來接收數(shù)據(jù)。

    CREATE TABLE tb008 (
      id BIGINT primary key,
      counts INT,
      name TEXT,
      price NUMERIC(38, 18),
      out_of_stock BOOL,
      weight DOUBLE PRECISION,
      thick FLOAT,
      time TIMESTAMPTZ,
      dt DATE, 
      by bytea,
      inta int4[],
      longa int8[],
      floata float4[],
      doublea float8[],
      boola boolean[],
      stringa text[]
    );
  2. Spark準(zhǔn)備數(shù)據(jù)并寫入Hologres。

    1. 在命令行運(yùn)行命令開啟Spark。

      spark-shell --jars hologres-connector-spark-3.x-1.4.0-SNAPSHOT-jar-with-dependencies.jar
    2. spark-shell里使用命令load spark-test.scala執(zhí)行測試文件,加載測試示例。

      spark-test.scala文件示例如下。

      import java.sql.{Timestamp, Date}
      import org.apache.spark.sql.types._
      import org.apache.spark.sql.Row
      
      val byteArray = Array(1.toByte, 2.toByte, 3.toByte, 'b'.toByte, 'a'.toByte)
      val intArray = Array(1, 2, 3)
      val longArray = Array(1L, 2L, 3L)
      val floatArray = Array(1.2F, 2.44F, 3.77F)
      val doubleArray = Array(1.222, 2.333, 3.444)
      val booleanArray = Array(true, false, false)
      val stringArray = Array("abcd", "bcde", "defg")
      
      val data = Seq(
        Row(-7L, 100, "phone1", BigDecimal(1234.567891234), false, 199.35, 6.7F, Timestamp.valueOf("2021-01-01 00:00:00"), Date.valueOf("2021-01-01"), byteArray, intArray, longArray, floatArray, doubleArray, booleanArray, stringArray),
        Row(6L, -10, "phone2", BigDecimal(1234.56), true, 188.45, 7.8F, Timestamp.valueOf("2021-01-01 00:00:00"), Date.valueOf("1970-01-01"), byteArray, intArray, longArray, floatArray, doubleArray, booleanArray, stringArray),
        Row(1L, 10, "phone3\"", BigDecimal(1234.56), true, 111.45, null, Timestamp.valueOf("2020-02-29 00:12:33"), Date.valueOf("2020-07-23"), byteArray, intArray, longArray, floatArray, doubleArray, booleanArray, stringArray)
      )
      
      
      val schema = StructType(Array(
        StructField("id", LongType),
        StructField("counts", IntegerType),
        StructField("name", StringType, false), //false表示此Field不允許為null
        StructField("price", DecimalType(38, 12)),
        StructField("out_of_stock", BooleanType),
        StructField("weight", DoubleType),
        StructField("thick", FloatType),
        StructField("time", TimestampType),
        StructField("dt", DateType),
        StructField("by", BinaryType),
        StructField("inta", ArrayType(IntegerType)),
        StructField("longa", ArrayType(LongType)),
        StructField("floata", ArrayType(FloatType)),
        StructField("doublea", ArrayType(DoubleType)),
        StructField("boola", ArrayType(BooleanType)),
        StructField("stringa", ArrayType(StringType))
      ))
      
      
      val df = spark.createDataFrame(
        spark.sparkContext.parallelize(data),
        schema
      )
      df.show()
      
      //配置導(dǎo)入數(shù)據(jù)至Hologres的信息。
      df.write.format("hologres") //必須配置為hologres
        .option("username", "your_username") //阿里云賬號的AccessKey ID。
        .option("password", "your_password") //阿里云賬號的Accesskey SECRET。
        .option("endpoint", "Ip:Port") //Hologres的Ip和Port。
        .option("database", "test_database") //Hologres的數(shù)據(jù)庫名稱,示例為test_database。
        .option("table", "tb008") //Hologres用于接收數(shù)據(jù)的表名稱,示例為tb008。
        .option("write_batch_size", 512) // 寫入攢批大小,詳見下方參數(shù)介紹
        .option("input_data_schema_ddl", df.schema.toDDL) // Dataframe對應(yīng)的DDL,僅spark3.x需要
        .mode(SaveMode.Append) // spark DataFrameWriter接口的SaveMode, 必須為Append;注意與WRITE_MODE不是同一個(gè)參數(shù), 自hologres-connector1.3.3版本開始,支持SaveMode.OverWrite,會(huì)清理原始表中的數(shù)據(jù),請謹(jǐn)慎使用
        .save()
  3. 查詢寫入的數(shù)據(jù)。

    在Hologres側(cè)查詢目標(biāo)表,即可確認(rèn)寫入的數(shù)據(jù),示例如下圖所示。測試示例數(shù)據(jù)

使用pyspark加載Connector寫入示例

  1. 啟動(dòng)pyspark并加載Connector。

    pyspark --jars hologres-connector-spark-3.x-1.4.0-SNAPSHOT-jar-with-dependencies.jar

  2. 與spark-shell類似,使用元數(shù)據(jù)創(chuàng)建DataFrame之后調(diào)用Connector進(jìn)行寫入。

    data = [[1, "Elia"], [2, "Teo"], [3, "Fang"]]
    df = spark.createDataFrame(data, schema="id LONG, name STRING")
    df.show()
    
    df2.write.format("hologres").option(
      "username", "your_username").option(
      "password", "your_password").option(
      "endpoint", "hologres_endpoint").option(
      "database", "test_database").option(
      "table", "tb008").save()
    

使用Spark SQL加載Connector進(jìn)行寫入

說明

僅Spark3版本的Connector支持SQL方式。

  1. 啟動(dòng)Spark SQL并加載Connector。

    spark-sql --jars hologres-connector-spark-3.x-1.4.0-SNAPSHOT-jar-with-dependencies.jar
  2. 通過Spark SQL DDL,分別創(chuàng)建CSV和Hologres View,進(jìn)行寫入。

    CREATE TEMPORARY VIEW csvTable (
      c_custkey bigint,
      c_name string,
      c_address string,
      c_nationkey int,
      c_phone string,
      c_acctbal decimal(15, 2),
      c_mktsegment string,
      c_comment string)
    USING csv OPTIONS (
      path "resources/customer1.tbl", sep "|"
    );
    
    CREATE TEMPORARY VIEW hologresTable (
      c_custkey bigint,
      c_name string,
      c_address string,
      c_nationkey int,
      c_phone string,
      c_acctbal decimal(15, 2),
      c_mktsegment string,
      c_comment string)
    USING hologres OPTIONS (
      jdbcurl "jdbc:postgresql://hologres_endpoint/test_database",
      username "your_username", 
      password "your_password", 
      table "customer_holo_table", 
      copy_write_mode "true", 
      bulk_load "true", 
      copy_write_format "text"
    );
    
    -- 目前通過sql創(chuàng)建的hologres view不支持寫入部分列(如insert into hologresTable(c_custkey) select c_custkey from csvTable),寫入時(shí)需要寫入DDL中聲明的所有字段。如果希望寫入部分列,可以建表時(shí)僅聲明需要寫入的字段。
    INSERT INTO hologresTable SELECT * FROM csvTable;

通過Spark讀取數(shù)據(jù)源數(shù)據(jù)并寫入Hologres

  1. Spark從數(shù)據(jù)源讀取數(shù)據(jù)。

    Spark支持從不同數(shù)據(jù)源讀取數(shù)據(jù),具體數(shù)據(jù)源分類如下。

    • Spark支持以Hologres為數(shù)據(jù)源。

      Hologres兼容PostgreSQL,因?yàn)镾park可以用讀取PostgreSQL的方式讀取Hologres中的數(shù)據(jù)。讀取代碼如下。

      說明

      在使用JDBC方式進(jìn)行讀取前,請前往官網(wǎng)下載Postgresql JDBC Jar,本文以postgresql-42.2.18版本為例,在spark-shell啟動(dòng)時(shí)執(zhí)行./bin/spark-shell --jars /path/to/postgresql-42.2.18.jar加載該jar,可以與hologres-connector的jar包一同加載。

      // Read from some table, for example: tb008
      val readDf = spark.read
        .format("jdbc") //使用postgresql jdbc driver讀取holo
        .option("driver","org.postgresql.Driver")
        .option("url", "jdbc:postgresql://Ip:Por/test_database")
        .option("dbtable", "tb008")
        .option("user", "your_username")
        .option("password", "your_password")
        .load()

      Spark Connector從V1.3.2版本開始支持讀取Hologres,并提供了優(yōu)化的并發(fā)讀取功能。相較于使用PostgreSQL JDBC驅(qū)動(dòng)的方法,還可以設(shè)置讀取的并發(fā)參數(shù),根據(jù)Hologres表的Shard進(jìn)行分片,從而實(shí)現(xiàn)并發(fā)讀取,顯著提升了性能。示例如下。

      val spark = SparkSession
      .builder
      .appName("ReadFromHologres")
      .master("local[*]")
      .getOrCreate()
      
      spark.sparkContext.setLogLevel("WARN")
      
      import spark.implicits._
      
      val schema = StructType(Array(
        StructField("id", LongType),
        StructField("counts", IntegerType),
        StructField("name", StringType, false),
        StructField("price", DecimalType(38, 12)),
        StructField("out_of_stock", BooleanType)
      ))
      
      val readDf = spark.read
      .format("hologres")
      .schema(schema) // 可選,如果不指定schema,默認(rèn)讀取holo表全部字段
      .option("username", "your_username")
      .option("password", "your_password")
      .option("jdbcurl", "jdbc:postgresql://hologres_endpoint/test_db")
      .option("table", "tb008")
      .option("scan_parallelism", "10") //讀取Hologres時(shí)的默認(rèn)并發(fā)數(shù),最大為holo表的shardcount
      .load()

    • Spark支持其他數(shù)據(jù)源(如Parquet格式的文件)。

      Spark支持從其他數(shù)據(jù)源中讀取數(shù)據(jù)寫入Hologres中,例如使用Spark從Hive中讀取數(shù)據(jù),代碼如下。

      import org.apache.spark.{SparkConf, SparkContext}
      import org.apache.spark.sql.hive.HiveContext
      
      val sparkConf = new SparkConf()
      val sc = new SparkContext(sparkConf)
      val hiveContext = new HiveContext(sc)
      
      // Read from some table, for example: phone
      val readDf = hiveContext.sql("select * from hive_database.phone")
  2. Spark將讀到的數(shù)據(jù)寫入Hologres。

    import com.alibaba.hologres.spark2.sink.SourceProvider
    
    -- Write to hologres table
    df.write
      .format("hologres")
      .option(SourceProvider.USERNAME, "your_username")
      .option(SourceProvider.PASSWORD, "your_password")
      .option(SourceProvider.ENDPOINT, "Ip:Port")
      .option(SourceProvider.DATABASE, "test_database")
      .option(SourceProvider.TABLE, table)
      .option(SourceProvider.WRITE_BATCH_SIZE, 512) -- 寫入攢批大小
      .option(SourceProvider.INPUT_DATA_SCHEMA_DDL, df.schema.toDDL) -- 僅spark3.x需要
      .mode(SaveMode.Append) // 僅spark3.x需要
      .save()

通過Spark實(shí)時(shí)寫入數(shù)據(jù)至Hologres

  1. 在Hologres創(chuàng)建一張表,用于接收數(shù)據(jù),創(chuàng)建代碼如下。

    CREATE TABLE test_table_stream
    (
        value text,
        count bigint
    );
  2. 讀取本地端口輸入行,進(jìn)行詞頻統(tǒng)計(jì)并實(shí)時(shí)寫入Hologres中,相關(guān)示例代碼如下。

    • 代碼:

       val spark = SparkSession
            .builder
            .appName("StreamToHologres")
            .master("local[*]")
            .getOrCreate()
      
          spark.sparkContext.setLogLevel("WARN")
          import spark.implicits._
      
          val lines = spark.readStream
            .format("socket")
            .option("host", "localhost")
            .option("port", 9999)
            .load()
      
          -- Split the lines into words
          val words = lines.as[String].flatMap(_.split(" "))
      
          -- Generate running word count
          val wordCounts = words.groupBy("value").count()
      
          wordCounts.writeStream
              .outputMode(OutputMode.Complete())
              .format("hologres")
              .option(SourceProvider.USERNAME, "your_username")
              .option(SourceProvider.PASSWORD, "your_password")
              .option(SourceProvider.JDBCURL, "jdbc:postgresql://Ip:Port/dbname")
              .option(SourceProvider.TABLE, "test_table_stream")
              .option("batchsize", 1)
              .option("isolationLevel", "NONE")
              .option("checkpointLocation", checkpointLocation)
              .start()
              .awaitTermination()
    • 參數(shù)釋義:

      參數(shù)名

      默認(rèn)值

      是否必填

      參數(shù)描述

      username

      登錄Hologres賬號的AccessKey ID。您可以單擊AccessKey 管理來獲取。

      建議您使用環(huán)境變量的方式調(diào)用用戶名和密碼,降低密碼泄露風(fēng)險(xiǎn)。

      password

      登錄Hologres賬號的AccessKey Secret。您可以單擊AccessKey 管理來獲取。

      建議您使用環(huán)境變量的方式調(diào)用用戶名和密碼,降低密碼泄露風(fēng)險(xiǎn)。

      table

      Hologres用于接收數(shù)據(jù)的表名稱。

      endpoint

      JDBCURL二選一

      Hologres實(shí)例的網(wǎng)絡(luò)域名。

      您可以進(jìn)入Hologres管理控制臺實(shí)例詳情頁,從網(wǎng)絡(luò)信息獲取主機(jī)和端口號。

      database

      JDBCURL二選一

      Hologres接收數(shù)據(jù)的表所在數(shù)據(jù)庫名稱。

      jdbcurl

      ENDPOINT+DATABASE組合設(shè)置二選一

      Hologres的JDBCURL

      copy_write_mode

      true

      是否使用Fixed Copy方式寫入,F(xiàn)ixed Copy是Hologres V1.3新增的能力,相比INSERT方法,F(xiàn)ixed Copy方式可以更高的吞吐(因?yàn)槭橇髂J剑偷臄?shù)據(jù)延時(shí),更低的客戶端內(nèi)存消耗(因?yàn)椴粩€批)。

      說明

      需要Connector為V1.3.0及以上版本,Hologres引擎版本為V1.3.34及以上版本。

      copy_write_format

      false

      僅Copy模式生效,是否進(jìn)行臟數(shù)據(jù)校驗(yàn),打開之后如果有臟數(shù)據(jù),可以定位到寫入失敗的具體行。

      說明

      RecordChecker會(huì)對寫入性能造成一定影響,非排查環(huán)節(jié)不建議開啟。

      bulk_load

      true

      是否采用批量Copy方式寫入(與fixed copy不同,fixed copy是流式的)。

      說明
      • Hologre V2.1版本對無主鍵表的寫入性能進(jìn)行了優(yōu)化。在Hologre V2.1版本中,無主鍵表的批量寫入操作不再會(huì)導(dǎo)致表鎖,而是采用了行鎖機(jī)制。這可以使其能夠與Fixed Plan并行執(zhí)行,從而提高了數(shù)據(jù)處理的效率和并發(fā)性。

      • Connector為V1.4.0及以上版本,Hologres引擎需要V2.1.0及以上版本。

      max_cell_buffer_size

      20971520(20 MB)

      使用Copy模式寫入時(shí),單個(gè)字段的最大長度。

      copy_write_dirty_data_check

      false

      是否進(jìn)行臟數(shù)據(jù)校驗(yàn),打開之后如果有臟數(shù)據(jù),可以定位到寫入失敗的具體行,RecordChecker會(huì)對寫入性能造成一定影響,非排查環(huán)節(jié)不建議開啟。

      說明

      僅Copy模式生效。

      copy_write_direct_connect

      對于可以直連的環(huán)境會(huì)默認(rèn)使用直連。

      僅Copy模式生效,Copy的瓶頸往往是VPC Endpoint的網(wǎng)絡(luò)吞吐,因此Hologres會(huì)測試當(dāng)前環(huán)境能否直連holo fe,如果支持則默認(rèn)使用直連。此參數(shù)設(shè)置為false表示不使用直連。

      input_data_schema_ddl

      spark3.x必填,值為<your_DataFrame>.schema.toDDL

      Spark中DataFrame的DDL。

      write_mode

      INSERT_OR_REPLACE

      當(dāng)INSERT目標(biāo)表為有主鍵的表時(shí)采用不同策略。

      • INSERT_OR_IGNORE:當(dāng)主鍵沖突時(shí),不寫入。

      • INSERT_OR_UPDATE:當(dāng)主鍵沖突時(shí),更新相應(yīng)列。

      • INSERT_OR_REPLACE:當(dāng)主鍵沖突時(shí),更新所有列。

      write_batch_size

      512

      每個(gè)寫入線程的最大批次大小,在經(jīng)過write_mode合并后的Put數(shù)量達(dá)到write_batch_size時(shí)進(jìn)行一次批量提交。

      write_batch_byte_size

      2 MB

      每個(gè)寫入線程的最大批次Byte大小,在經(jīng)過WRITE_MODE合并后的Put數(shù)據(jù)字節(jié)數(shù)達(dá)到WRITE_BATCH_BYTE_SIZE時(shí)進(jìn)行一次批量提交。

      write_max_interval_ms

      10000 ms

      距離上次提交超過write_max_interval_ms會(huì)觸發(fā)一次批量提交。

      write_fail_strategy

      TYR_ONE_BY_ONE

      當(dāng)某一批次提交失敗時(shí),會(huì)將批次內(nèi)的記錄逐條提交(保序),單條提交失敗的記錄將會(huì)跟隨異常被拋出。

      write_thread_size

      1

      寫入并發(fā)線程數(shù)(每個(gè)并發(fā)占用1個(gè)數(shù)據(jù)庫連接)。

      在一個(gè)Spark作業(yè)中,占用的總連接數(shù)與Spark并發(fā)相關(guān),關(guān)系為總連接數(shù)= spark.default.parallelism * WRITE_THREAD_SIZE

      dynamic_partition

      false

      若為true,寫入分區(qū)表父表時(shí),當(dāng)分區(qū)不存在時(shí)自動(dòng)創(chuàng)建分區(qū)。

      retry_count

      3

      當(dāng)連接故障時(shí),寫入和查詢的重試次數(shù)。

      retry_sleep_init_ms

      1000 ms

      每次重試的等待時(shí)間=retry_sleep_init_ms+retry_count*retry_sleep_step_ms

      retry_sleep_step_ms

      10*1000 ms

      每次重試的等待時(shí)間=retry_sleep_init_ms+retry_count*retry_sleep_step_ms

      connection_max_idle_ms

      60000 ms

      寫入線程和點(diǎn)查線程數(shù)據(jù)庫連接的最大IDLE時(shí)間,超過此時(shí)間的連接將被釋放。

      fixed_connection_mode

      false

      非Copy模式(如INSERT默認(rèn))下,寫入和點(diǎn)查場景不占用連接數(shù)。

      說明

      Beta功能,需要Connector版本為V1.2.0及以上版本,Hologres引擎版本為V1.3.0及以上版本。

      scan_batch_size

      256

      在從Hologres讀取數(shù)據(jù)時(shí),Scan操作一次獲取的行數(shù)。

      scan_timeout_seconds

      60

      在從Hologres讀取數(shù)據(jù)時(shí),Scan操作的超時(shí)時(shí)間,單位:秒(s)。

      scan_parallelism

      10

      讀取Hologres時(shí)的分片數(shù)量,最大為Hologres表的Shardcount。作業(yè)運(yùn)行時(shí),這些分片會(huì)被分配到Spark Task上進(jìn)行讀取。

數(shù)據(jù)類型映射

Spark與Hologres的數(shù)據(jù)類型映射如下表所示。

Spark類型

Hologres類型

ShortType

SMALLINT

IntegerType

INT

LongType

BIGINT

StringType

TEXT、JSONB、JSON

DecimalType

NUMERIC(38, 18)

BooleanType

BOOL

DoubleType

DOUBLE PRECISION

FloatType

FLOAT

TimestampType

TIMESTAMPTZ

DateType

DATE

BinaryType

BYTEA、ROARINGBITMAP

ArrayType(IntegerType)

int4[]

ArrayType(LongType)

int8[]

ArrayType(FloatType

float4[]

ArrayType(DoubleType)

float8[]

ArrayType(BooleanType)

boolean[]

ArrayType(StringType)

text[]