PySpark StructType 和 StructField 类用于以编程方式指定 DataFrame 的据类schema并创建复杂的列,如嵌套结构、型定数组和映射列。据类StructType是型定StructField的集合,它定义了列名、据类列数据类型、型定布尔值以指定字段是据类否可以为空以及元数据。
目录
PySpark 提供从pyspark.sql.types import StructType类来定义 DataFrame 的结构。其中,据类StructType 是 StructField 对象的集合或列表。
DataFrame 上的 PySpark printSchema()方法将 StructType 列显示为struct。
DataFrame.printSchema()
PySpark 提供pyspark.sql.types import StructField类来定义列,包括列名(String)、列类型(DataType)、可空列(Boolean)和元数据(MetaData)。
在创建 PySpark DataFrame 时,我们可以使用 StructType 和 StructField 类指定结构。StructType 是 StructField 的集合,用于定义列名、数据类型和是否可为空的标志。使用 StructField 我们还可以添加嵌套结构模式、用于数组的 ArrayType 和用于键值对的 MapType ,我们将在后面的部分中详细讨论。
下面的示例演示了一个非常简单的示例,说明如何在 DataFrame 上创建 StructType 和 StructField 以及它与示例数据一起使用来支持它。
import pysparkfrom pyspark.sql import SparkSessionfrom pyspark.sql.types import StructType,StructField, StringType, IntegerTypespark = SparkSession.builder.master("local[1]") \ .appName('SparkByExamples.com') \ .getOrCreate()data = [("James","","Smith","36636","M",3000), ("Michael","Rose","","40288","M",4000), ("Robert","","Williams","42114","M",4000), ("Maria","Anne","Jones","39192","F",4000), ("Jen","Mary","Brown","","F",-1) ]schema = StructType([ \ StructField("firstname",StringType(),True), \ StructField("middlename",StringType(),True), \ StructField("lastname",StringType(),True), \ StructField("id", StringType(), True), \ StructField("gender", StringType(), True), \ StructField("salary", IntegerType(), True) \ ]) df = spark.createDataFrame(data=data,schema=schema)df.printSchema()df.show(truncate=False)
通过运行上面的代码片段,它会显示在下面的输出中。
root |-- firstname: string (nullable = true) |-- middlename: string (nullable = true) |-- lastname: string (nullable = true) |-- id: string (nullable = true) |-- gender: string (nullable = true) |-- salary: integer (nullable = true)+---------+----------+--------+-----+------+------+|firstname|middlename|lastname|id |gender|salary|+---------+----------+--------+-----+------+------+|James | |Smith |36636|M |3000 ||Michael |Rose | |40288|M |4000 ||Robert | |Williams|42114|M |4000 ||Maria |Anne |Jones |39192|F |4000 ||Jen |Mary |Brown | |F |-1 |+---------+----------+--------+-----+------+------+
在处理 DataFrame 时,我们经常需要使用嵌套的结构列,这可以使用 StructType 来定义。
在下面的示例列中,“name” 数据类型是嵌套的 StructType。
structureData = [ (("James","","Smith"),"36636","M",3100), (("Michael","Rose",""),"40288","M",4300), (("Robert","","Williams"),"42114","M",1400), (("Maria","Anne","Jones"),"39192","F",5500), (("Jen","Mary","Brown"),"","F",-1) ]structureSchema = StructType([ StructField('name', StructType([ StructField('firstname', StringType(), True), StructField('middlename', StringType(), True), StructField('lastname', StringType(), True) ])), StructField('id', StringType(), True), StructField('gender', StringType(), True), StructField('salary', IntegerType(), True) ])df2 = spark.createDataFrame(data=structureData, schema=structureSchema)df2.printSchema()df2.show(truncate=False)
模式和 DataFrame 下方的输出。
root |-- name: struct (nullable = true) | |-- firstname: string (nullable = true) | |-- middlename: string (nullable = true) | |-- lastname: string (nullable = true) |-- id: string (nullable = true) |-- gender: string (nullable = true) |-- salary: integer (nullable = true)+--------------------+-----+------+------+|name |id |gender|salary|+--------------------+-----+------+------+|[James, , Smith] |36636|M |3100 ||[Michael, Rose, ] |40288|M |4300 ||[Robert, , Williams]|42114|M |1400 ||[Maria, Anne, Jones]|39192|F |5500 ||[Jen, Mary, Brown] | |F |-1 |+--------------------+-----+------+------+
使用 PySpark SQL 函数 struct(),我们可以更改现有 DataFrame 的结构并向其添加新的 StructType。下面学习如何将列从一个结构复制到另一个结构并添加新列。PySpark Column 类还提供了一些函数来处理 StructType 列。
from pyspark.sql.functions import col,struct,whenupdatedDF = df2.withColumn("OtherInfo", struct(col("id").alias("identifier"), col("gender").alias("gender"), col("salary").alias("salary"), when(col("salary").cast(IntegerType()) < 2000,"Low") .when(col("salary").cast(IntegerType()) < 4000,"Medium") .otherwise("High").alias("Salary_Grade") )).drop("id","gender","salary")updatedDF.printSchema()updatedDF.show(truncate=False)
在这里,它将 gender,salary 和 id 复制到新结构 otherInfo,并添加一个新列 Salary_Grade。
root |-- name: struct (nullable = true) | |-- firstname: string (nullable = true) | |-- middlename: string (nullable = true) | |-- lastname: string (nullable = true) |-- OtherInfo: struct (nullable = false) | |-- identifier: string (nullable = true) | |-- gender: string (nullable = true) | |-- salary: integer (nullable = true) | |-- Salary_Grade: string (nullable = false)
SQL StructType 还支持 ArrayType 和 MapType 来分别为数组和地图集合定义 DataFrame 列。在下面的示例中,列hobbies定义为 ArrayType(StringType) ,列properties定义为 MapType(StringType, StringType),表示键和值都为字符串。
arrayStructureSchema = StructType([ StructField('name', StructType([ StructField('firstname', StringType(), True), StructField('middlename', StringType(), True), StructField('lastname', StringType(), True) ])), StructField('hobbies', ArrayType(StringType()), True), StructField('properties', MapType(StringType(),StringType()), True) ])
输出以下模式。注意字段 Hobbies 是 array类型,properties是 map类型。
root |-- name: struct (nullable = true) | |-- firstname: string (nullable = true) | |-- middlename: string (nullable = true) | |-- lastname: string (nullable = true) |-- hobbies: array (nullable = true) | |-- element: string (containsNull = true) |-- properties: map (nullable = true) | |-- key: string | |-- value: string (valueContainsNull = true)
如果有太多列并且 DataFrame 的结构不时发生变化,一个很好的做法是从 JSON 文件加载 SQL StructType schema。可以使用 df2.schema.json() 获取 schema 并将其存储在文件中,然后使用它从该文件创建 schema。
print(df2.schema.json())
{ "type" : "struct", "fields" : [ { "name" : "name", "type" : { "type" : "struct", "fields" : [ { "name" : "firstname", "type" : "string", "nullable" : true, "metadata" : { } }, { "name" : "middlename", "type" : "string", "nullable" : true, "metadata" : { } }, { "name" : "lastname", "type" : "string", "nullable" : true, "metadata" : { } } ] }, "nullable" : true, "metadata" : { } }, { "name" : "dob", "type" : "string", "nullable" : true, "metadata" : { } }, { "name" : "gender", "type" : "string", "nullable" : true, "metadata" : { } }, { "name" : "salary", "type" : "integer", "nullable" : true, "metadata" : { } } ]}
或者也可以使用 df.schema.simpleString()返回一个相对简单的schema 格式。
现在让我们加载 json 文件并使用它来创建一个 DataFrame。
import jsonschemaFromJson = StructType.fromJson(json.loads(schema.json))df3 = spark.createDataFrame( spark.sparkContext.parallelize(structureData), schemaFromJson)df3.printSchema()
这将打印与上一节相同的输出。还可以在逗号分隔的文件中为可为空的文件提供名称、类型和标志,我们可以使用这些以编程方式创建 StructType。
就像从 JSON 字符串中加载结构一样,我们也可以从 DLL 中创建结构(通过使用SQL StructType 类 StructType.fromDDL 上的 fromDDL()静态函数)。还可以使用 toDDL() 从模式生成 DDL。结构对象上的 printTreeString() 打印模式,类似于 printSchema() 函数返回的结果。
ddlSchemaStr = "`fullName` STRUCT<`first`: STRING, `last`: STRING, `middle`: STRING>,`age` INT,`gender` STRING" ddlSchema = StructType.fromDDL(ddlSchemaStr) ddlSchema.printTreeString()
如果要对DataFrame的元数据进行一些检查,例如,DataFrame中是否存在列或字段或列的数据类型;我们可以使用 SQL StructType 和 StructField 上的几个函数轻松地做到这一点。
print(df.schema.fieldNames.contains("firstname"))print(df.schema.contains( StructField("firstname", StringType,true)))
此示例在两种情况下都返回True。对于第二个,如果是 IntegerType 而不是 StringType,它会返回 False,因为名字列的数据类型是 String,因为它会检查字段中的每个属性。同样,还可以检查两个模式是否相等或更多。
import pysparkfrom pyspark.sql import SparkSessionfrom pyspark.sql.types import StructType,StructField, StringType, IntegerType,ArrayType,MapTypefrom pyspark.sql.functions import col,struct,whenspark = SparkSession.builder.master("local[1]") \ .appName('SparkByExamples.com') \ .getOrCreate()data = [("James","","Smith","36636","M",3000), ("Michael","Rose","","40288","M",4000), ("Robert","","Williams","42114","M",4000), ("Maria","Anne","Jones","39192","F",4000), ("Jen","Mary","Brown","","F",-1) ]schema = StructType([ StructField("firstname",StringType(),True), StructField("middlename",StringType(),True), StructField("lastname",StringType(),True), StructField("id", StringType(), True), StructField("gender", StringType(), True), StructField("salary", IntegerType(), True) ]) df = spark.createDataFrame(data=data,schema=schema)df.printSchema()df.show(truncate=False)structureData = [ (("James","","Smith"),"36636","M",3100), (("Michael","Rose",""),"40288","M",4300), (("Robert","","Williams"),"42114","M",1400), (("Maria","Anne","Jones"),"39192","F",5500), (("Jen","Mary","Brown"),"","F",-1) ]structureSchema = StructType([ StructField('name', StructType([ StructField('firstname', StringType(), True), StructField('middlename', StringType(), True), StructField('lastname', StringType(), True) ])), StructField('id', StringType(), True), StructField('gender', StringType(), True), StructField('salary', IntegerType(), True) ])df2 = spark.createDataFrame(data=structureData,schema=structureSchema)df2.printSchema()df2.show(truncate=False)updatedDF = df2.withColumn("OtherInfo", struct(col("id").alias("identifier"), col("gender").alias("gender"), col("salary").alias("salary"), when(col("salary").cast(IntegerType()) < 2000,"Low") .when(col("salary").cast(IntegerType()) < 4000,"Medium") .otherwise("High").alias("Salary_Grade") )).drop("id","gender","salary")updatedDF.printSchema()updatedDF.show(truncate=False)""" Array & Map"""arrayStructureSchema = StructType([ StructField('name', StructType([ StructField('firstname', StringType(), True), StructField('middlename', StringType(), True), StructField('lastname', StringType(), True) ])), StructField('hobbies', ArrayType(StringType()), True), StructField('properties', MapType(StringType(),StringType()), True) ])
在本文中,云朵君和大家一起学习了 SQL StructType、StructField 的用法,以及如何在运行时更改 Pyspark DataFrame 的结构,将案例类转换为模式以及使用 ArrayType、MapType。
责任编辑:武晓燕 来源: 数据STUDIO PySpark数据结构(责任编辑:热点)
“海基一号”平台主体工程海上安装完成 处于南海内波流主通道上
不吹不黑!年均PUE 1.1的百度首个自建超大型数据中心是什么水平?
鹰君(00041.HK)授出499万份购股期权 惟须待承受人接纳方可作实
三星终于发布AI处理器Exynos9820迎战苹果、华为,但用的是8nm工艺
7nm Vega20香归香,但AMD需要的除了新制程,更重要的是新架构