pt平台通用客户端 当前位置:首页>pt平台通用客户端>正文

pt平台通用客户端

发布时间:2018-10-23

原标题:《Spark Python API 官方文档中文版》 之 pyspark.sql (二)

《Spark Python API 官方文档中文版》 之 pyspark.sql (二)


摘要:在Spark开发中,由于需要用Python实现,发现API与Scala的略有不同,而Python API的中文资料相对很少。每次去查英文版API的说明相对比较慢,还是中文版比较容易get到所需,所以利用闲暇之余将官方文档翻译为中文版,并亲测Demo的代码。在此记录一下,希望对那些对Spark感兴趣和从事大数据开发的人员提供有价值的中文资料,对PySpark开发人员的工作和学习有所帮助。

官网地址:http://spark.apache.org/docs/1.6.2/api/python/pyspark.sql.html            

pyspark.sql module

Module Context

Spark SQL和DataFrames重要的类有:
pyspark.sql.SQLContext DataFrame和SQL方法的主入口
pyspark.sql.DataFrame 将分布式数据集分组到指定列名的数据框中
pyspark.sql.Column DataFrame中的列
pyspark.sql.Row DataFrame数据的行
pyspark.sql.HiveContext 访问Hive数据的主入口
pyspark.sql.GroupedData 由DataFrame.groupBy()创建的聚合方法集
pyspark.sql.DataFrameNaFunctions 处理丢失数据(空数据)的方法
pyspark.sql.DataFrameStatFunctions 统计功能的方法
pyspark.sql.functions DataFrame可用的内置函数
pyspark.sql.types 可用的数据类型列表
pyspark.sql.Window 用于处理窗口函数

3.class pyspark.sql.DataFrame(jdf, sql_ctx)

分布式的收集数据分组到命名列中。
一个DataFrame相当于在Spark SQL中一个相关的表,可在SQLContext使用各种方法创建,如:

people = sqlContext.read.parquet("...")

一旦创建, 可以使用在DataFrame、Column中定义的不同的DSL方法操作。
从data frame中返回一列使用对应的方法:

ageCol = people.age

一个更具体的例子:

# To create DataFrame using SQLContext
people = sqlContext.read.parquet("...")
department = sqlContext.read.parquet("...")
people.filter(people.age > 30).join(department, people.deptId == department.id)).groupBy(department.name, "gender").agg({"salary": "avg", "age": "max"})

3.1 agg(*exprs)

没有组的情况下聚集整个DataFrame (df.groupBy.agg()的简写)。

>>> l=[("jack",5),("john",4),("tom",2)]
>>> df = sqlContext.createDataFrame(l,["name","age"])
>>> df.agg({"age": "max"}).collect()
[Row(max(age)=5)]
>>> from pyspark.sql import functions as F
>>> df.agg(F.min(df.age)).collect()
[Row(min(age)=2)]

3.2 alias(alias)

返回一个设置别名的新的DataFrame。

>>> l=[("Alice",2),("Bob",5)]
>>> df = sqlContext.createDataFrame(l,["name","age"])
>>> from pyspark.sql.functions import *
>>> df_as1 = df.alias("df_as1")
>>> df_as2 = df.alias("df_as2")
>>> joined_df = df_as1.join(df_as2, col("df_as1.name") == col("df_as2.name"), "inner")
>>> joined_df.select(col("df_as1.name"), col("df_as2.name"), col("df_as2.age")).collect()
[Row(name=u"Alice", name=u"Alice", age=2), Row(name=u"Bob", name=u"Bob", age=5)]

3.3 cache()

用默认的存储级别缓存数据(MEMORY_ONLY_SER).

3.4 coalesce(numPartitions)

返回一个有确切的分区数的分区的新的DataFrame。
与在一个RDD上定义的合并类似, 这个操作产生一个窄依赖。 如果从1000个分区到100个分区,不会有shuffle过程, 而是每100个新分区会需要当前分区的10个。

>>> df.coalesce(1).rdd.getNumPartitions()
1

3.5 collect()

返回所有的记录数为行的列表。

>>> df.collect()
[Row(age=2, name=u"Alice"), Row(age=5, name=u"Bob")]

3.6 columns

返回所有列名的列表。

>>> df.columns
["age", "name"]

3.7 corr(col1, col2, method=None)

计算一个DataFrame相关的两列为double值。通常只支持皮尔森相关系数。DataFrame.corr()和DataFrameStatFunctions.corr()类似。
参数:●  col1 – 第一列的名称  
      ●  col2 – 第二列的名称
           ●  method – 相关方法.当前只支持皮尔森相关系数

3.8 count()

返回DataFrame的行数。

>>> df.count()
2

3.9 cov(col1, col2)

计算由列名指定列的样本协方差为double值。DataFrame.cov()和DataFrameStatFunctions.cov()类似。
参数:●  col1 – 第一列的名称
      ●  col2 – 第二列的名称

3.10 crosstab(col1, col2)

计算给定列的分组频数表,也称为相关表。每一列的去重值的个数应该小于1e4.最多返回1e6个非零对.每一行的第一列会是col1的去重值,列名称是col2的去重值。第一列的名称是$col1_$col2. 没有出现的配对将以零作为计数。DataFrame.crosstab() and DataFrameStatFunctions.crosstab()类似。
参数:●  col1 – 第一列的名称. 去重项作为每行的第一项。
      ●  col2 – 第二列的名称. 去重项作为DataFrame的列名称。

3.11 cube(*cols)

创建使用指定列的当前DataFrame的多维立方体,这样可以聚合这些数据。

>>> l=[("Alice",2),("Bob",5)]
>>> df = sqlContext.createDataFrame(l,["name","age"])
>>> df.cube("name", df.age).count().show()
+-----+----+-----+
| name| age|count|
+-----+----+-----+
| null|   2|    1|
|Alice|null|    1|
|  Bob|   5|    1|
|  Bob|null|    1|
| null|   5|    1|
| null|null|    2|
|Alice|   2|    1|
+-----+----+-----+

3.12 describe(*cols)

计算数值列的统计信息。
包括计数,平均,标准差,最小和最大。如果没有指定任何列,这个函数计算统计所有数值列。

>>> df.describe().show()
+-------+------------------+
|summary|               age|
+-------+------------------+
|  count|                 2|
|   mean|               3.5|
| stddev|2.1213203435596424|
|    min|                 2|
|    max|                 5|
+-------+------------------+
>>> df.describe(["age", "name"]).show()
+-------+------------------+-----+
|summary|               age| name|
+-------+------------------+-----+
|  count|                 2|    2|
|   mean|               3.5| null|
| stddev|2.1213203435596424| null|
|    min|                 2|Alice|
|    max|                 5|  Bob|
+-------+------------------+-----+

3.13 distinct()

返回行去重的新的DataFrame。

>>> l=[("Alice",2),("Alice",2),("Bob",5)]
>>> df = sqlContext.createDataFrame(l,["name","age"])
>>> df.distinct().count()
2

3.14 drop(col)

返回删除指定列的新的DataFrame。
参数:●  col – 要删除列的字符串类型名称,或者要删除的列。

>>> df.drop("age").collect()
[Row(name=u"Alice"), Row(name=u"Bob")] 
>>> df.drop(df.age).collect()
[Row(name=u"Alice"), Row(name=u"Bob")]
>>> l1=[("Bob",5)]
>>> df = sqlContext.createDataFrame(l,["name","age"])
>>> l2=[("Bob",85)]
>>> df2 = sqlContext.createDataFrame(l2,["name","height"])
>>> df.join(df2, df.name == df2.name, "inner").drop(df.name).collect()
[Row(age=5, height=85, name=u"Bob")]
>>> df.join(df2, df.name == df2.name, "inner").drop(df2.name).collect()
[Row(age=5, name=u"Bob", height=85)]

3.15 dropDuplicates(subset=None)

返回去掉重复行的一个新的DataFrame,通常只考虑某几列。
drop_duplicates()和dropDuplicates()类似。

>>> from pyspark.sql import Row
>>> df = sc.parallelize([Row(name="Alice", age=5, height=80),Row(name="Alice", age=5, height=80),Row(name="Alice", age=10, height=80)]).toDF()
>>> df.dropDuplicates().show()
+---+------+-----+
|age|height| name|
+---+------+-----+
|  5|    80|Alice|
| 10|    80|Alice|
+---+------+-----+
>>> df.dropDuplicates(["name", "height"]).show()
+---+------+-----+
|age|height| name|
+---+------+-----+
|  5|    80|Alice|
+---+------+-----+

3.16 drop_duplicates(subset=None)

与以上相同。

3.17 dropna(how="any", thresh=None, subset=None)

返回一个删除null值行的新的DataFrame。dropna()和dataframenafunctions.drop()类似。
参数:●  how – "any"或者"all"。如果"any",删除包含任何空值的行。如果"all",删除所有值为null的行。
    thresh – int,默认为None,如果指定这个值,删除小于阈值的非空值的行。这个会重写"how"参数。
   ●  subset – 选择的列名称列表。

>>> l=[("Alice",2),("Bob",5)]
>>> df = sqlContext.createDataFrame(l,["name","age"])
>>> dfnew = df.cube("name", df.age).count()
>>> dfnew.show()
+-----+----+-----+
| name| age|count|
+-----+----+-----+
| null|   2|    1|
|Alice|null|    1|
|  Bob|   5|    1|
|  Bob|null|    1|
| null|   5|    1|
| null|null|    2|
|Alice|   2|    1|
+-----+----+-----+
>>> dfnew.na.drop().show()
+-----+---+-----+
| name|age|count|
+-----+---+-----+
|  Bob|  5|    1|
|Alice|  2|    1|
+-----+---+-----+

3.18 dtypes

返回所有列名及类型的列表。

>>> df.dtypes
[("age", "int"), ("name", "string")]

3.19 explain(extended=False)

将(逻辑和物理)计划打印到控制台以进行调试。
参数:●  extended – boolean类型,默认为False。如果为False,只打印物理计划。

>>> df.explain()
== Physical Plan ==
Scan ExistingRDD[age#0,name#1]
>>> df.explain(True)
== Parsed Logical Plan ==
...
== Analyzed Logical Plan ==
...
== Optimized Logical Plan ==
...
== Physical Plan ==
...

3.20 fillna(value, subset=None)

替换空值,和na.fill()类似,DataFrame.fillna()和dataframenafunctions.fill()类似。
参数:●  value - 要代替空值的值有int,long,float,string或dict.如果值是字典,subset参数将被忽略。值必须是要替换的列的映射,替换值必须是int,long,float或者string.
      ●  subset - 要替换的列名列表。在subset指定的列,没有对应数据类型的会被忽略。例如,如果值是字符串,subset包含一个非字符串的列,这个非字符串的值会被忽略。

>>> l=[("Alice",2),("Bob",5)]
>>> df = sqlContext.createDataFrame(l,["name","age"])
>>> dfnew = df.cube("name", df.age).count()
>>> dfnew.show()
+-----+----+-----+
| name| age|count|
+-----+----+-----+
| null|   2|    1|
|Alice|null|    1|
|  Bob|   5|    1|
|  Bob|null|    1|
| null|   5|    1|
| null|null|    2|
|Alice|   2|    1|
+-----+----+-----+
>>> dfnew.na.fill(50).show()
+-----+---+-----+
| name|age|count|
+-----+---+-----+
| null|  2|    1|
|Alice| 50|    1|
|  Bob|  5|    1|
|  Bob| 50|    1|
| null|  5|    1|
| null| 50|    2|
|Alice|  2|    1|
+-----+---+-----+
>>> dfnew.na.fill({"age": 50, "name": "unknown"}).show()
+-------+---+-----+
|   name|age|count|
+-------+---+-----+
|unknown|  2|    1|
|  Alice| 50|    1|
|    Bob|  5|    1|
|    Bob| 50|    1|
|unknown|  5|    1|
|unknown| 50|    2|
|  Alice|  2|    1|
+-------+---+-----+

3.21 filter(condition)

用给定的条件过滤行。
where()和filter()类似。
参数:●  条件 - 一个列的bool类型或字符串的SQL表达式。

>>> l=[("Alice",2),("Bob",5)]
>>> df = sqlContext.createDataFrame(l,["name","age"])
>>> df.filter(df.age > 3).collect()
[Row(age=5, name=u"Bob")]
>>> df.where(df.age == 2).collect()
[Row(age=2, name=u"Alice")]
>>> df.filter("age > 3").collect()
[Row(age=5, name=u"Bob")]
>>> df.where("age = 2").collect()
[Row(age=2, name=u"Alice")]

3.22 first()

返回第一行。

>>> df.first()
Row(age=2, name=u"Alice")

3.23 flatMap(f)

返回在每行应用F函数后的新的RDD,然后将结果压扁。
是df.rdd.flatMap()的简写。

>>> df.flatMap(lambda p: p.name).collect()
[u"A", u"l", u"i", u"c", u"e", u"B", u"o", u"b"]

3.24 foreach(f)

应用f函数到DataFrame的所有行。
是df.rdd.foreach()的简写。

>>> def f(person):
...     print(person.name)
>>> df.foreach(f)
Alice
Bob

3.25 foreachPartition(f)

应用f函数到DataFrame的每一个分区。
是 df.rdd.foreachPartition()的缩写。

>>> def f(people):
...     for person in people:
...         print(person.name)
>>> df.foreachPartition(f)
Alice
Bob

3.26 freqItems(cols, support=None)

参数:●  cols – 要计算重复项的列名,为字符串类型的列表或者元祖。
      ●  support – 要计算频率项的频率值。默认是1%。参数必须大于1e-4.

3.27 groupBy(*cols)

使用指定的列分组DataFrame,这样可以聚合计算。可以从GroupedData查看所有可用的聚合方法。
groupby()和groupBy()类似。
参数:●  cols – 分组依据的列。每一项应该是一个字符串的列名或者列的表达式。

>>> df.groupBy().avg().collect()
[Row(avg(age)=3.5)]
>>> df.groupBy("name").agg({"age": "mean"}).collect()
[Row(name=u"Alice", avg(age)=2.0), Row(name=u"Bob", avg(age)=5.0)]
>>> df.groupBy(df.name).avg().collect()
[Row(name=u"Alice", avg(age)=2.0), Row(name=u"Bob", avg(age)=5.0)]
>>> df.groupBy(["name", df.age]).count().collect()
[Row(name=u"Bob", age=5, count=1), Row(name=u"Alice", age=2, count=1)]

3.28 groupby(*cols)

和以上一致

3.29 head(n=None)

返回前n行
参数:●  n – int类型,默认为1,要返回的行数。
返回值: 如果n大于1,返回行列表,如果n为1,返回单独的一行。

>>> df.head()
Row(age=2, name=u"Alice")
>>> df.head(1)
[Row(age=2, name=u"Alice")]

3.30 insertInto(tableName, overwrite=False)

插入DataFrame内容到指定表。
注:在1.4中已过时,使用DataFrameWriter.insertInto()代替。

3.31 intersect(other)

返回新的DataFrame,包含仅同时在当前框和另一个框的行。
相当于SQL中的交集。

3.32 intersect(other)

如果collect()和take()方法可以运行在本地(不需要Spark executors)那么返回True

3.33 join(other, on=None, how=None)

使用给定的关联表达式,关联另一个DataFrame。
以下执行df1和df2之间完整的外连接。
参数:● other – 连接的右侧
   ● on – 一个连接的列名称字符串, 列名称列表,一个连接表达式(列)或者列的列表。如果on参数是一个字符串或者字符串列表,表示连接列的名称,这些名称必须同时存在join的两个表中, 这样执行的是一个等价连接。
   ● how – 字符串,默认"inner"。inner,outer,left_outer,right_outer,leftsemi之一。

>>> l=[("Alice",2),("Bob",5)]
>>> df = sqlContext.createDataFrame(l,["name","age"])
>>> l2=[("Tom",80),("Bob",85)]
>>> df2 = sqlContext.createDataFrame(l2,["name","height"])
>>> df.join(df2, df.name == df2.name, "outer").select(df.name, df2.height).collect()
[Row(name=None, height=80), Row(name=u"Alice", height=None), Row(name=u"Bob", height=85)]
>>> df.join(df2, "name", "outer").select("name", "height").collect()
[Row(name=u"Tom", height=80), Row(name=u"Alice", height=None), Row(name=u"Bob", height=85)]
>>> l3=[("Alice",2,60),("Bob",5,80)]
>>> df3 = sqlContext.createDataFrame(l3,["name","age","height"])
>>> cond = [df.name == df3.name, df.age == df3.age]
>>> df.join(df3, cond, "outer").select(df.name, df3.age).collect()
[Row(name=u"Bob", age=5), Row(name=u"Alice", age=2)]
>>> df.join(df2, "name").select(df.name, df2.height).collect()
[Row(name=u"Bob", height=85)]
>>> l4=[("Alice",1),("Bob",5)]
>>> df4 = sqlContext.createDataFrame(l4,["name","age"])
>>> df.join(df4, ["name", "age"]).select(df.name, df.age).collect()
[Row(name=u"Bob", age=5)]

3.34 limit(num)

将结果计数限制为指定的数字。

>>> df.limit(1).collect()
[Row(age=2, name=u"Alice")]
>>> df.limit(0).collect()
[]

3.35 map(f)

通过每行应用f函数返回新的RDD。
是 df.rdd.map()的缩写。

>>> df.map(lambda p: p.name).collect()
[u"Alice", u"Bob"]

3.36 mapPartitions(f, preservesPartitioning=False)

通过每个分区应用f函数返回新的RDD
是df.rdd.mapPartitions()的缩写。

>>> rdd = sc.parallelize([1, 2, 3, 4], 4)
>>> def f(iterator): yield 1
...
>>> rdd.mapPartitions(f).sum()
4

3.37 na

返回DataFrameNaFunctions用于处理缺失值。

3.38 orderBy(*cols, **kwargs)

返回按照指定列排序的新的DataFrame。
参数:● cols – 用来排序的列或列名称的列表。
      ● ascending – 布尔值或布尔值列表(默认 True). 升序排序与降序排序。指定多个排序顺序的列表。如果指定列表, 列表的长度必须等于列的长度。

>>> l=[("Alice",2),("Bob",5)]
>>> df = sqlContext.createDataFrame(l,["name","age"])
>>> df.sort(df.age.desc()).collect()
[Row(name=u"Bob", age=5), Row(name=u"Alice", age=2)]
>>> df.sort("age", ascending=False).collect()
[Row(name=u"Bob", age=5), Row(name=u"Alice", age=2)]
>>> df.orderBy(df.age.desc()).collect()
[Row(name=u"Bob", age=5), Row(name=u"Alice", age=2)]
>>> from pyspark.sql.functions import *
>>> df.sort(asc("age")).collect()
[Row(name=u"Alice", age=2), Row(name=u"Bob", age=5)]
>>> df.orderBy(desc("age"), "name").collect()
[Row(name=u"Bob", age=5), Row(name=u"Alice", age=2)]
>>> df.orderBy(["age", "name"], ascending=[0, 1]).collect()
[Row(name=u"Bob", age=5), Row(name=u"Alice", age=2)]

3.39 persist(storageLevel=StorageLevel(False, True, False, False, 1))

设置存储级别以在第一次操作运行完成后保存其值。这只能用来分配新的存储级别,如果RDD没有设置存储级别的话。如果没有指定存储级别,默认为(memory_only_ser)。

3.40 printSchema()

打印schema以树的格式

>>> df.printSchema()
root
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)

 

当前文章:http://radiokey.biz/kan/34irr.html

发布时间:2018-10-23 00:44:20

博多利线上娱乐 菲律宾黄金城国际娱乐 通宝娱乐真人老虎机 经典老虎机游戏必赢 老虎机出现0909 ag8879环亚手机登陆 铂金pt和pd有什么区别 手机免费试玩mg电子游戏  

28037 14070 40717 42814 74043 4270460032 12561 64797

责任编辑:龙安董海

随机推荐