云服务器价格_云数据库_云主机【优惠】最新活动-搜集站云资讯

阿里云_国内云虚拟主机_好用

小七 141 0

在apachespark中使用MongoDB

2016年8月4日更新:从这篇文章开始,MongoDB为apachespark发布了一个新的Databricks认证连接器。有关如何使用新的MongoDB连接器forapachespark的教程和笔记本,请参阅更新的博客文章。这是来自MongoDB的高级解决方案架构师mattkalan的客座博客介绍如今广泛的数据管理技术使得用户很难从现实中辨别出炒作。虽然我知道MongoDB作为应用程序的实时分布式操作数据库的巨大价值,但我开始尝试apachespark,因为我想了解分析和批处理操作的可用选项。我从一个简单的例子开始,取股票价格的1分钟时间序列间隔,每个时间间隔的开盘价、最高价、最低价和收盘价,转换成5分钟的时间间隔(称为OHLC条),1分钟的数据存储在MongoDB中,然后通过MongoDB Hadoop在Spark中进行处理连接器,允许MongoDB作为Spark的输入或输出。有人可能会认为,一个更典型的例子是,您在MongoDB中记录这些市场数据,以便进行实时处理,但可能会在另一个离线环境中运行分析模型。当然,模型要复杂得多,这只是一个Hello World级别的例子。我选择OHLC酒吧只是因为这是我很容易找到的数据。摘要用例:将股票价格的1分钟间隔聚合为5分钟的间隔输入:MongoDB数据库中1分钟的股价区间简单分析:用Spark执行输出:Spark中5分钟的股票价格间隔,可以选择写回MongoDB设置环境的步骤:设置Spark environment——我在Mac笔记本电脑的虚拟机上安装了Spark v1.2.0下载示例数据–我从这个网页以1分钟的增量获取这些数据点在VM上安装MongoDB–我很容易在CentOS上用yum安装MongoDB,下面是这个页面的说明启动MongoDB–yum安装了一个默认配置文件,因此您只需运行此文件即可在本地主机和默认端口27017上启动:mongod-f/等/mongod.conf网站加载示例数据–mongoimport允许您在MongoDB中将CSV文件作为平面文档直接加载。命令如下:mongoimport equities-msft-minute-bars-2009.csv--类型csv--headerline-d marketdata-c迷你酒吧安装MongoDB Hadoop Connector–您可以下载Hadoop Connector jar,网址:Using the MongoDB Hadoop Connector with Spark。如果您使用Spark的Java接口,您还可以下载MongoDB Java驱动程序jar。您下载的任何jar都可以使用PySpark命令的–jars选项添加到Spark。我在下面使用了Python和Spark(称为PySpark)。对于下面的示例,下面是MongoDB集合中的文档(通过mongoshell)。只需从MongoDB安装的/bin目录中使用命令"Mongo"启动mongoshell。>使用市场数据> 数据库minbars.findOne(){"_id":ObjectId("54c00d1816526bc59d84b97c"),"Symbol":"MSFT","Timestamp":"2009-08-24 09:30","日":24,"打开":24.41,"高":24.42,"低":24.31,"收盘":24.31,"卷":683713}Spark示例对于我对Spark的最初尝试,我选择使用Python和交互式shell命令"PySpark"。这为我提供了一个利用Spark类的交互式Python环境。Python在quant中似乎很流行,因为与Java或Scala相比,Python是一种更自然的交互式查询语言。我能够在Spark中成功地读取MongoDB,但是请确保升级到Spark v1.2.2或v1.3.0,以解决PySpark早期版本中的一个bug。Spark的好处很快就显现出来了,而且与您在交互式环境中的预期一致—查询返回速度很快,比Hive快得多,部分原因是它们没有被编译为MapReduce。虽然延迟仍然高于MongoDB的内部查询和聚合框架,但是使用Spark进行分布式、多线程分析有更多的选择,因此它显然在数据分析方面扮演着重要角色。设置通过Hadoop输入格式从MongoDB读取的参数配置={"mongo.input.uri": "mongodb://localhost:27017个/市场数据.minbars"}inputFormatClassName="com.mongodb.hadoop.MongoInputFormat"这些价值观起作用,但其他价值观也可能起作用keyClassName="org.apache.hadoop.io.文本"值类名称="org.apache.hadoop.io.MapWritable"将MongoDB中的1分钟条形图读入Spark RDD格式MinbarrarRdd=sc.newAPIHadoopRDD公司(inputFormatClassName,keyClassName,valueClassName,无,无,配置)输出到MongoDB的配置configmongo.output.uri"] = "mongodb://localhost:27017个/市场数据.5分钟"outputFormatClassName="com.mongodb.hadoop.MongoOutputFormat"获取详细的原始结构(带有额外的元数据)并将其分解为仅包含定价数据最小值=最小值()导入日历、时间、数学dateFormatString='%Y-%m-%d%H:%m'按时间排序,然后在5分钟内分组到每个酒吧分组条形图=明巴德·索特比(lambda doc:str(docTimestamp"])).groupBy(lambda doc(docSymbol"],数学地板(日历.timegm(时间.strptime(docTimestamp"],日期格式字符串))/(5*60)))定义查看每组并拉出OHLC的功能假设每个分组是一个元组(symbol,seconds since epoch)和组中1分钟OHLC记录的resultitable编写函数获取(tuple,group);遍历group;手动拉入OHLCdef ohlc(分组):低=系统最大值高=-系统最大值i=0groupKey=分组[0]group=分组[1]对于组中的文档:#花点时间从第一个酒吧开始营业如果i==0:openTime=docTimestamp"]openPrice=docOpen"]#根据需要从条形图中指定最小值和最大值如果doc[";低";]<;低:低=doc[";低";]如果doc[";High";]>;高:high=doc[";high";]i=i+1#接近最后一个酒吧如果i==len(组):close=doc[";close";]outputDoc={";Symbol";:groupKey[0],";Timestamp";:开放时间,";Open";:开盘价,";High";:高,";Low";:低,";Close";:关闭}返回(None,outputDoc)结果dd=groupedBars.map(ohlc)resultRDD.saveAsNewAPIHadoopFile("file:///placeholder",outputFormatClassName,None,None,None,None,config)我从第一次介绍就看到了火花的魅力。它很容易使用。它还有一个特别好的地方,它的操作可以在列表或数据矩阵中的所有元素上运行。我还可以看到像R这样的统计功能的吸引力,但是在这种能力中,数据可以轻松地分布在多个节点上(还有一个针对R的Spark项目)。Spark当然是新的,我不得不使用Spark v1.2.2或更高版本,因为一个bug(Spark-5361)最初阻止我从PySpark写入Hadoop文件(用Java和Scala编写Hadoop&MongoDB应该可以)。我遇到的另一个缺点是在PySpark的交互会话中很难可视化数据。这让我想起了我大学时代在Matlab中调试代表光线轨迹的矩阵时,在他们添加更好的工具之前,他们感到沮丧。同样地,在RDD结构中显示数据仍然存在挑战;虽然有一个函数collect()用于创建更易于打印的列表,但有些元素(如iterables)仍然难以显示。在Spark中使用MongoDB的主要优点Spark很容易与MongoDB集成总的来说,了解如何通过Spark访问MongoDB中的数据是很有用的。回想起来,我花了更多的时间来处理数据,而不是将它们与MongoDB集成,这正是我所希望的。我还从在单个节点上预先配置的VM开始,而不是设置环境。从那以后,我了解到了Databricks云,我希望它可以简化更大的安装。存在许多实际应用程序这种数据操作的真实场景是在MongoDB中存储和查询实时的日内市场数据。价格在当天更新,允许用户实时查询。使用Spark,在一天结束后(即使第二天像FX一样立即开始),可以将单个标记聚合到更高效访问的结构中,例如这些OHLC条,或者按股票代码将一天中的单个滴答数组聚集到大型文档中。这种方法在一天内为捕获提供了很好的写吞吐量,并且可以快速访问周、月或年的价格。有一些MongoDB用户的系统遵循这种方法,他们大大减少了分析的延迟,同时也减少了硬件占用。通过将聚集的数据存储回MongoDB中,您可以灵活地索引数据并快速检索。有关更多信息,请访问:将MongoDB与Spark一起使用的文档MongoDB概述MongoDB大数据白皮书下载MongoDB Enterprise Edition下载MongoDB社区版免费试用Databricks。今天就开始吧