云服务器价格_云数据库_云主机【优惠】最新活动-搜集站云资讯

服务器_成都网站开发建设_测评

小七 141 0

介绍Flint:apachespark的时间序列库

这是Two Sigma的Li Jin和Databricks的Kevin Rasmussen的联合客户社区博客;他们分享了如何在ApacheSpark中使用Flint。在数据库里试试这个笔记本介绍如今,数据科学家面临的数据量不断增加,而我们现在发现,传统的单机解决方案已不能满足这些数据集的需求。在过去的几年里,apachespark已经成为处理大数据工作负载的标准,我们认为它为数据科学家提供了分析大型时间序列的巨大潜力。我们在Two Sigma开发了Flint,以增强Spark用于时间序列分析的功能。Flint是一个开源库,可以通过Maven和PyPI获得。时间序列分析时间序列分析有两个组成部分:时间序列处理和时间序列建模。时间序列操作是操作数据并将其转换为用于训练模型的特征的过程。时间序列操作用于数据清理和特征工程等任务。时间序列操作中的典型功能包括:连接:连接两个时间序列数据集,通常按时间窗口化:基于时间窗口的特征转换重采样:更改数据的频率填充缺少的值或删除NA行。时间序列建模是在时间序列数据中识别模式并训练模型进行预测的过程。这是一个复杂的主题;它包括诸如ARIMA和自相关等特定技术,以及应用于时间序列数据的各种通用机器学习技术(例如线性回归)。弗林特专注于时间序列操纵。在这篇博文中,我们将演示Flint在时间序列操作中的功能,以及它如何与其他库(如Spark ML)一起工作,以完成一个简单的时间序列建模任务。弗林特概况Flint从Two Sigma的一个内部库中获得了灵感,该库在处理时间序列数据方面非常强大。Flint的主要API是它的PythonAPI。入口点TimeSeriesDataFrame是PySpark DataFrame的扩展,并公开了其他时间序列功能。下面是一个简单的示例,演示如何将数据读入Flint并同时使用PySpark DataFrame和Flint功能:从燧石导入FlintContext,摘要生成器flintContext=flintContext(sqlContext)df=spark.createDataFrame([('2018-08-20',1.0),('2018-08-21',2.0),('2018-08-24',3.0)],['时间','v']).withColumn('time',from_utc_timestamp(col('time'),'utc'))#转换为Flint数据帧燧石df=flintContext.read.dataframe测向(df)#使用Spark DataFrame功能flint_df=燧石_df.WITH列('v',flint_df['v']+1)#使用Flint功能flint_df=燧石_数据框摘要循环(摘要生成器.count())弗林特功能在本节中,我们将介绍几个处理时间序列数据的核心功能。Asof连接Asof Join意味着准时加入,使用不精确的匹配条件。它接受一个公差参数,例如"1day",并将每个左侧行与该公差内最近的右侧行连接起来。Flint有两个asof连接函数:LeftJoin和FutureLeftJoin。唯一的区别是连接的时间方向:是连接过去的行还是将来的行。例如…左=。。。#时间,v1#20180101100号#20180102,50#20180104,-50#20180105100号右=。。。#时间,v2#20171231100.0#20180104105.0#20180105102.0加入=left.leftJoin(对,公差="1天")#时间,v1,v2#201801010100100.0#20180102,50,空#20180104,-50,105.0#20180105100102.0Asof Join对于处理具有不同频率、未对齐时间戳等的数据非常有用。下面的案例研究部分将进一步说明该函数。添加列循环与"周期数据"中的"时间戳"相同。人们通常希望转换具有相同时间戳的数据,例如,对具有相同时间戳的特征进行排序。AddColumnsForCycle对于这种类型的计算是一个方便的函数。AddColumnsForCycle接受一个用户定义的函数,该函数将一个Pandas系列映射到另一个长度相同的Pandas系列。一些例子包括:每个周期的排名值:从燧石导入自定义项@自定义项('double')定义等级(v):#v是a熊猫系列返回v.rank(pct=True)df=…#时间,v#20180101,1.0#20180101,2.0#20180101,3.0df=df.addColumns循环({'rank':等级(df['v'])})#时间,等级,等级#20180101,1.0,0.333#20180101,2.0,0.667#20180101,3.0,1.0Box-Cox变换是一种有用的数据转换技术,可以使数据更接近正态分布。以下示例对每个循环执行Box-Cox变换:将熊猫作为pd导入从scipy import stats@自定义项('double')def boxcox(五):返回pd系列(stats.box考克斯(v) [0])df=…#时间,v#20180101,1.0#20180101,2.0#20180101,3.0df=df.addColumns循环({'v_boxcox':boxcox(df['v'])})#时间,v,v逯boxcox#20180101,1.0,0.0#20180101,2.0,0.852#20180101,3.0,1.534摘要生成器Flint摘要生成器类似于Spark SQL聚合函数。摘要生成器从值列表中计算单个值。查看Flint摘要生成器的完整描述: flint.readthedocs.io/en/latest/reference.html模块-ts.flint.summary生成器.Flint的摘要生成器功能包括:汇总:在整个数据框中聚合数据summary ecycles:使用相同的时间戳聚合数据summarientervals:聚合属于同一时间范围的数据摘要窗口:聚合属于同一窗口的数据addSummaryColumns:计算累计聚合,例如累计和一个例子包括计算最大压降:进口pyspark.sql.functions作为F#特定股票的回报。#1.01表示股票上涨1%;0.95表示股票下跌5%df=。。。#时间,回来#20180101,1.01#20180102,0.95#20180103,1.05# ...#第一个addSummaryColumns添加了一列"return_product",它是每天的累计收益#第二个addSummaryColumns添加一列"return_product_max",这是每天的最大累积回报累计回报=df.addSummary列(摘要生成器.product('return')\.AddSummary列(summariers.max("退货产品")\.toDF('时间','返回','累计返回','最大值''提款=cum_返回.withColumn("提款",1-cum_returns['cum_return']/cum_returns['max_cum_return'])最大下降=提款.agg(F.max('提款')窗口Flint的summarizeWindows函数类似于sparksql中的滚动窗口函数,因为它可以计算滚动平均值之类的东西。主要的区别是summaryewindows不需要分区键,因此可以处理单个大型时间序列。一些例子包括:计算滚动指数移动平均值:从燧石导入窗口w=windows.过去的绝对时间("7天")df=。。。#时间,v#20180101,1.0#20180102,2.0#20180103,3.0df=df.summary窗口(w,摘要生成器.ewma("v",α=0.5)#时间,v,v,u ewma#20180101,1.0,1.0#20180102,2.0,2.5#20180103,3.0,4.25案例研究现在我们考虑一个例子,其中Flint函数执行简单的时间序列分析。资料准备我们已将标准普尔500指数的每日价格数据下载到CSV文件中。首先,我们将文件读入Flint数据帧并添加"return"列:从燧石导入FlintContextflintContext=flintContext(sqlContext)标准普尔500=flintContext.read.dataframe(spark.read.option('header',True).option('inferSchema',True).csv('sp500.csv'))sp500_return=sp500.withColumn('return',10000*(sp500['Close']-sp500['Open'])/sp500['Open'])。select('time','return')在这里,我们想测试一个非常简单的想法:前一天的收益可以用来预测第二天的收益吗?为了测试这个想法,我们首先需要自连接返回表,以便创建一个"preview_day_return"列:sp500?上一天?返回=sp500_返回.shiftTime(windows.future_绝对时间('1day')).toDF('时间','上一天'u return')sp500?加入?返回=sp500_return.leftJoin(sp500返回前一天)但是合并结果有一个问题:前一个星期一的返回值为空!这是因为我们在周末没有任何回报数据,所以周一不能简单地将周日的回报数据加入其中。为了解决这个问题,我们将leftJoin的tolerance参数设置为'3days',这个持续时间足以覆盖两天的周末,因此星期一可以加入上周五的返回值:sp500?加入?返回=sp500_return.leftJoin(sp500_上一天返回,容差='3days').dropna()特征工程接下来我们使用Flint进行一些特征转换。在时间序列分析中,基于特征过去的值来转换特征是很常见的。Flint的SummaryWidows函数可以用于这种类型的转换。下面我们提供两个使用summarywindows进行基于时间的特征转换的示例:一个带有内置的summary生成器,另一个带有用户定义函数(UDF)。内置摘要生成器:从燧石导入摘要生成器sp500_decaged_return=sp500_加入_return.summary窗口(窗口=windows.过去的绝对时间("7天"),摘要生成器=摘要生成器.ewma("上一天返回",alpha=0.5))自定义项:从燧石导入自定义项@udf('double',arg\u type='numpy')def衰变(列):v=列[0]衰变=np.功率(0.5,np.arange公司(长度(v))[::-1]返回(v*衰变).sum()sp500_decaged_return=sp500_加入_return.summary窗口(窗口=windows.过去的绝对时间("7天"),summary={'previous_day_decaged_return':衰变(sp500加入了\u return[['previous'u day\u return']])})模特培训既然我们已经准备好了数据,我们就可以在上面训练一个模型了。这里我们使用Spark-ML来拟合线性回归模型:从pyspark.ml.回归导入线性回归从pyspark.ml.功能导入向量汇编程序汇编器=矢量汇编器(inputCols=previous_day_return","前一天\u decaged_return"],outputCol="功能")输出=汇编程序.转换(sp500_decaged_return)。select('return','features').toDF('label','features')lr=线性REG