云服务器价格_云数据库_云主机【优惠】最新活动-搜集站云资讯

CDN_百度云管家破解版_0元

小七 141 0

用统一的分析平台构建复杂的数据管道

介绍大数据从业者经常在Quora上发布重复出现的问题:什么是数据工程?如何成为数据科学家?什么是数据分析师?除了理解这些角色和各自的职责外,更重要的问题是:三个不同的角色、三个不同的体验和三个不同的需求如何协作和组合它们的工作?或者,他们是否可以采用统一的平台,而不是求助于一次性定制解决方案?是的,他们可以协作使用单一平台。上个月,我们发布了统一的Databricks平台。为了促进数据工程师、数据科学家和数据分析师之间的协作,其两个软件构件Databricks工作区和Notebook工作流实现了这种令人垂涎的协作。在本博客中,我们将探讨每个角色如何使用笔记本工作流与apachespark协作并构建复杂的数据管道将独立的和幂等的笔记本作为一个单独的执行单元进行编排无需定制一次性或独特的解决方案。亚马逊公共产品评级首先,让我们看一下数据场景。将我们的数据场景看作是亚马逊公共产品评级的一个语料库,其中每个角色都期望数据以可消化的格式执行各自的任务。这是一个包含不同数据工件的产品评论的语料库,任何数据科学家或数据分析员都会对这个数据集感兴趣。例如,数据分析员可能希望研究数据,以检查存在哪些类型的评级、产品类别或品牌。相比之下,数据科学家可能希望训练一个机器学习模型,在用户评论中定期使用诸如"great"或"return"或"terror"之类的关键词来预测有利的评级。但是,如果不首先将数据转换为每个角色的可消化格式,那么探索(由数据分析员)或训练模型(由数据科学家)都是不可能的。这就是数据工程师需要考虑的问题:她负责通过创建数据管道将原始数据转换为可消费数据。(我们引用了一个示例,即数据工程师如何将公共数据集摄取到数据块中。)接下来,我们将检查我们的第一个数据管道,第一个笔记本TrainModel,并遍历与每个角色相关的任务。apachespark作业的数据管道探索数据为了简洁起见,我们不讨论将原始数据转换为JSON文件的Python代码,这些代码就在这个页面上。相反,我们将专注于我们的数据管道笔记本,TrainModel,它帮助数据科学家和数据分析师进行协作。一旦我们的数据工程师将产品评论的语料库摄取到Parquet文件中,用Parquet文件创建了一个外部Amazon表,从该外部表创建了一个临时视图来研究表的各个部分,数据分析师和数据科学家都可以在这个TrainModel笔记本中合作工作。数据分析师可以表达SQL查询,而不是用Python代码(数据工程师或数据科学家更熟悉的语言)来表示计算。这里的重点是,笔记本的类型,无论是Scala、Python、R还是SQL,都不如用熟悉的语言(即SQL)表达查询和与他人协作的能力重要。既然我们已经为每个人物角色提供了可消化的数据,作为临时表tmp_amazon,数据分析师可以提出业务问题并可视化数据;例如,她可以通过以下问题查询该表:数据是什么样子的?有多少不同的品牌?品牌的收视率如何?对她的初步分析感到满意,她可能会求助于一位数据科学家,他可以设计一个机器学习模型,使他们能够定期预测用户评论的评分。当用户每天或每周在亚马逊网站上购买和评价产品时,机器学习模型可以在生产中定期使用新数据进行再训练。训练机器学习模型apachespark的机器学习库MLlib包含许多用于分类、回归、聚类和协作过滤的算法。在高层次上火花.ml该包提供了用于特性化、流水线、数学实用程序和持久性的工具、技术和API。当涉及到基于特定关键字的结果为好(1)或坏(0)的二元预测时,最适合这种分类的模型是Logistic回归模型,它是预测有利结果概率的广义线性模型的特例。在我们的例子中,我们希望通过一些有利的关键字来预测评价的结果。我们不仅将使用MLlib提供的logistic回归模型族的二项logistic回归,而且使用火花.ml管道及其变压器和估计器。创建机器学习管道这段Python代码展示了如何使用转换器和估计器创建管道。从pyspark.ml进口*从pyspark.ml.功能进口*从pyspark.ml.功能进口Bucketizer从pyspark.ml.分类进口*从pyspark.ml.调整进口*从pyspark.ml.评估进口*从pyspark.ml.回归进口*##Bucketizer将一列连续特性转换为一列特性桶,其中bucket由用户指定。#它使用公共参数inputCol和outputCol,以及用于bucketization的splits。#大于阈值的特征值被bucketted为1.0;值等于或小于阈值#被二进制化为0.0。inputCol支持Vector和Double类型。#我们将使用rating作为输入,如果大于4.5,则输出标签的值为1##对于这个模型,我们将使用两个特性转换提取器:Bucketizer和Tokenizer#splits=[-float("inf"),4.5,float("inf")]tok=标记器(inputCol="review",outputCol="words")bucket label="输出Col Splitzer,bucks=splittings")##使用HashingTF特性提取器,其输入为"words"#hashTF=HashingTF(inputCol=tok.getOutputCol公司(),numFeatures=10000,outputCol="功能")##使用一些参数创建模型实例#lr=逻辑回归(maxIter=10,regParam=0.0001,elasticNetParam=1.0)##创建带有所有特征变换器的stages管道以创建估计器#管道=管道(阶段=[bucket,tok,hashTF,lr])创建培训和测试数据接下来,我们使用我们的训练数据来拟合模型,最后用我们的测试数据进行评估。转换后的数据帧预测应该有我们的预测和标签。#创建我们的模型估计器#型号=管道.fit(培训数据)#用测试数据给模型打分预测=模型.转换(测试数据)#将dataframe转换成一个表,这样我们就可以使用SQL轻松地查询它预测.createOrReplaceTempView("tmp U预测")正如您从上面对我们的预测数据帧(保存为临时表)的查询中可能注意到,在我们的测试数据中,如果出现return一词,那么预测和标签的值都为0,并且如预期的那样,评级较低。对评估模型的结果感到满意,数据科学家可以持久化该模型,以便与其他数据科学家共享以进行进一步评估,或者与数据工程师共享以部署到生产中。这是通过持久化模型来实现的。维持模型考虑这样的用例和场景:数据科学家生成一个ML模型,并希望对其进行测试和迭代,将其部署到生产中以进行实时预测服务,或者与另一个数据科学家共享以进行验证。你怎么做的?持久化和序列化ML管道是导出MLlib模型的一种方法。另一种方法是使用Databricks dbml本地库,这是具有非常低延迟要求的实时服务的首选方式。一个重要的警告:对于服务于模型时的低延迟要求,我们建议并提倡使用dbmllocal。但是对于这个例子,由于延迟不是定期产品评审的问题或要求,所以我们使用MLlib管道API来导出和导入模型。尽管dbmllocal是我们导出和导入模型的首选方式,但出于许多原因,这两种持久性机制都很重要。首先,它很简单,而且与语言无关,模型被导出为JSON。其次,它可以从一个笔记本中导出,用Python编写,然后导入(加载)到另一个笔记本,用Scala编写,持久化和序列化ML管道,交换格式独立于语言。第三,序列化和持久化管道封装了所有的特性化,而不仅仅是模型。最后,如果您希望使用结构化流式处理实时预测模型。模型.写入().overwrite().save("/mnt/jules/amazon model")在我们的TrainModel笔记本中,我们导出模型,以便它可以被另一个笔记本ServeModel导入到我们的链式笔记本工作流的下游(见下文)。在下一节中,我们将讨论第二条管道CreateStream。创建流考虑这样一个场景:我们可以访问产品评论的实时流,并且,使用我们经过训练的模型,我们希望对我们的模型进行评分。数据工程师可以通过两种方式提供这些实时数据:一种是通过Kafka或Kinesis在Amazon网站上对产品进行评分;另一种是通过插入到表中的新条目(这些条目不属于训练集)将它们转换为S3上的JSON文件。事实上,这是可行的,因为结构化流式API以相同的方式读取数据,无论您的数据源是blob、S3中的文件,还是来自Kinesis或Kafka的流。我们选择了S3作为分布式队列的低成本和低延迟。在我们的例子中,数据工程师可以简单地从我们的表中提取最新的条目,构建在Parquet文件之上。这条短管道包括三个Spark作业:从Amazon表查询新产品数据转换生成的数据帧将我们的数据帧作为JSON文件存储在S3上为了模拟流,我们可以将每个文件视为JSON数据行的集合,作为流数据来为模型评分。这并不是一个罕见的例子