云服务器价格_云数据库_云主机【优惠】最新活动-搜集站云资讯

大带宽_路由器设置代理服务器_精选特惠

小七 141 0

Introducing Pandas UDF for PySpark

这是来自纽约Two Sigma Investments,LP软件工程师的客座社区帖子。这篇博客也在Two Sigma上发布在数据库里试试这个笔记本更新:此博客于2018年2月22日更新,包含一些更改。这篇博客文章介绍了即将发布的apachespark2.3版本中的Pandas udf(也称为矢量化的udf)特性,它极大地提高了Python中用户定义函数(udf)的性能和可用性。在过去的几年里,Python已经成为数据科学家的默认语言。pandas、numpy、statsmodel和scikitlearn等软件包已获得广泛采用,并成为主流工具包。与此同时,apachespark已经成为处理大数据的事实标准。为了使数据科学家能够利用大数据的价值,Spark在0.7版中添加了一个PythonAPI,并支持用户定义函数。这些用户定义的函数一次操作一行,因此承受着高序列化和调用开销。因此,许多数据管道在Java和Scala中定义udf,然后从Python调用它们。构建在apachearrow之上的Pandas udf为您带来了两个世界中最好的特性:完全用Python定义低开销、高性能的udf。在spark2.3中,将有两种类型的Pandas udf:标量和分组映射。接下来,我们使用四个示例程序来说明它们的用法:加一、累积概率、减去平均值、普通最小二乘线性回归。标量熊猫UDF标量Pandas udf用于将标量操作矢量化。在Python中,只需在Pandas中定义一个标量函数@Pandas熊猫系列作为参数并返回另一个熊猫系列同样大小。下面我们用两个例子来说明:加一和累积概率。加一计算v+1是一个简单的示例,用于演示一次行udf和标量Pandas udf之间的差异。请注意,在这种情况下,内置列运算符的执行速度可以快得多。使用一次性行自定义项:从pyspark.sql.functions导入自定义项#使用自定义项定义一行一次的自定义项@自定义项('double')#输入/输出都是一个双精度值def加1(v):返回v+1df.WITH列("v2",加上1个(df.v))使用熊猫UDF:从pyspark.sql.functions导入pandas_udf,PandasUDFType#使用pandas_udf定义pandas udf@熊猫‘,PandasudType.SCALAR)#输入/输出都是熊猫系列双打定义熊猫加一(v):返回v+1df.WITH列("v2",熊猫加一(df.v))上面的例子定义了一个一次一行的UDF"plus-one"和一个执行相同"plus-one"计算的标量Pandas-UDF"Pandas_-plus-one"。除了函数修饰符:"UDF"vs"pandas_UDF"之外,UDF的定义是相同的。在row-at-a-time版本中,用户定义函数接受一个双"v",并将"v+1"的结果返回为double。在Pandas版本中,用户定义函数采用熊猫系列"v"并返回"v+1"的结果作为熊猫系列. 因为"v+1"被矢量化了熊猫系列,Pandas版本比row-at-time版本快得多。请注意,使用scalar pandas UDF时有两个重要要求:输入和输出系列的大小必须相同。如何将列拆分为多个熊猫系列是Spark的内部函数,因此用户定义函数的结果必须独立于拆分。累积概率这个例子展示了标量Pandas UDF的一个更实际的用法:使用scipy包计算正态分布N(0,1)中某个值的累积概率。将熊猫作为pd导入从scipy import stats@熊猫(双倍)def cdf(v):返回pd系列(stats.norm.cdf(v) )df.WITH列("累积概率",cdf(df.v))stats.norm.cdfworks在标量值和熊猫系列,这个例子也可以用一次一行的udf编写。与前面的示例类似,Pandas版本运行得更快,如后面的"性能比较"部分所示。分组地图熊猫UDFPython用户非常熟悉数据分析中的split-apply-combine模式。分组映射Pandas udf就是为这种情况而设计的,它们对某个组的所有数据进行操作,例如"对于每个日期,应用此操作"。分组映射Pandas udf首先根据groupby运算符中指定的条件将Spark数据帧拆分为组,应用用户定义的函数(熊猫.DataFrame-> 熊猫.DataFrame)对于每个组,组合结果并将其作为新的Spark数据帧返回。分组映射Pandas udf使用与标量Pandas udf相同的函数decorator Pandas_udf,但它们有一些区别:自定义函数的输入:标量:熊猫系列分组映射:熊猫.DataFrame自定义函数的输出:标量:熊猫系列分组映射:熊猫.DataFrame分组语义:标量:没有分组语义分组映射:由"groupby"子句定义输出大小:标量:与输入大小相同分组贴图:任意大小函数修饰符中的返回类型:标量:指定返回的类型的数据类型熊猫系列分组映射:指定返回的每个列名和类型的StructType熊猫.DataFrame接下来,让我们通过两个例子来说明分组map Pandas udf的用例。减去平均值这个例子展示了分组map Pandas udf的一个简单用法:从组中的每个值中减去平均值。@熊猫(数据框模式, PandasudType.GROUPED_映射)#输入/输出都是熊猫.DataFramedef减去平均值(pdf):返回pdf.分配(v=pdf.v-pdf.v.mean())数据框groupby('id')。应用(减去平均值)在这个例子中,我们从每组的每个v值中减去v的平均值。分组语义由"groupby"函数定义,即每个输入熊猫.DataFrame对用户定义函数具有相同的"id"值。此用户定义函数的输入和输出模式是相同的,因此我们传递"数据框模式"用于指定架构的decorator pandas_udf。分组映射Pandas udf也可以被称为驱动程序上的独立Python函数。这对于调试非常有用,例如:样品=测向滤波器(id==1).toPandas()#在上作为独立函数运行熊猫.DataFrame并验证结果减去_平均功能(样品)#现在用火花奔跑数据框groupby('id')。应用(减去平均值)在上面的示例中,我们首先将Spark DataFrame的一小部分转换为熊猫.DataFrame,然后在其上运行subtract_mean作为独立的Python函数。在验证函数逻辑之后,我们可以在整个数据集上使用Spark调用UDF。普通最小二乘线性回归最后一个例子展示了如何使用statsmodels为每组运行OLS线性回归。对于每组,我们根据统计模型Y=bX+c计算X=(x1,x2)的βb=(b1,b2)。进口statsmodels.api作为山猫#df有四列:id,y,x1,x2group_列='id'y\U列='y'x_列=['x1','x2']架构=数据框选择(group_列,*x_列).schema@熊猫(模式,PandasudType.GROUPED_映射)#输入/输出都是熊猫.DataFrame定义ols(pdf):group_key=pdf[group_column].iloc[0]y=pdf[y_列]X=pdf[X_列]X=sm.add_常数(十)型号=sm.OLS公司(y,X).fit()返回pd数据帧([[组密钥]+[模型参数[i] 对于i in x_columns]],columns=[group\u column]+x\u columns)β=数据框groupby(组_列)。应用(ols)此示例演示了分组map Pandas udf可以与任意python函数一起使用:熊猫.DataFrame-> 熊猫.DataFrame. 返回的熊猫.DataFrame可以有不同数量的行和列作为输入。性能比较最后,我们要展示一次行udf和Pandas udf之间的性能比较。我们为上面的三个例子运行了微观基准测试(加一,累积概率和减去平均值)。配置和方法我们在Databricks community edition上的单节点Spark集群上运行基准测试。配置详细信息:数据:一个10M行的数据帧,有一个Int列和一个双列集群:6.0 GB内存,0.88核,1个DBUDatabricks运行时版本:最新RC(4.0,Scala 2.11)关于基准的详细实现,请查看Pandas UDF笔记本。如图所示,Pandas udf的性能远远优于一次一行的udf,从3倍到100倍。结论和今后的工作即将推出的SCAP 2.3发布为基本改进Python中用户定义函数的能力和性能奠定了基础。将来,我们计划在聚合和窗口函数中引入对Pandas udf的支持。相关工作可在SPARK-22216中跟踪。熊猫UDF是Spark社区努力的一个很好的例子。我们要感谢Bryan Cutler,Hyukjin Kwon,Jeff Reback,Liang Chi Hsieh,Leif Walsh,Li Jin,Reynold Xin,Takuya Ueshin,Wes McKinney,Xiao Li和其他许多人的贡献。最后,特别感谢apachearrow社区使这项工作成为可能。下一步是什么您可以尝试Pandas UDF笔记本,此功能现在作为Databricks Runtime 4.0beta的一部分提供。免费试用Databricks。今天就开始吧