云服务器价格_云数据库_云主机【优惠】最新活动-搜集站云资讯

分布式存储_网站建设html5_优惠

小七 141 0

使用awslambda和数据块实现ETL自动化和ML模型服务

作为一名数据解决方案架构师,我与客户合作使用Databricks构建端到端解决方案。Databricks构建在AWS之上,与AWS的所有产品兼容,我们的客户都是AWS的热心用户。当然,这意味着我经常就架构问题向他们提供建议,比如如何将数据块与更广泛的AWS生态系统集成。在这个博客中,我将展示如何利用AWS Lambda和Databricks来处理两个用例:基于事件的ETL自动化(例如,使用Databricks的restapi为Spark SQL表或作业触发器创建分区)和提供使用Apache Spark训练的机器学习模型结果。关于AWS Lambda的一些背景知识允许你在没有AWS服务的情况下运行一个特定于事件的服务。计算资源、容量配置、自动伸缩、代码监视、日志记录以及代码和安全补丁部署都由AWS管理。它支持三种编程语言:Java、Python和节点.Js.Lambda是Databricks的完美补充,可以根据其他AWS服务中的事件触发操作。databricksrestapi提供了一种将Spark集群与Lambda连接起来的机制。Databricks的restapi简介Databricks restapi支持对Databricks的编程访问(而不是通过webui)。它可以自动创建和运行作业,生产数据流,等等。有关API如何工作的更多信息,请阅读文档或此博客。对于下一个示例,我将演示如何使用API自动化ETL作业。示例1:ETL自动化在某些情况下,每天的ETL作业无法在设定的时间进行调度。例如,有时您需要特定数量的数据点才能使用,或者可能存在相当大的每日变化性,这使得简单的CRON作业不是一个好的选择。在这种情况和其他类似情况下,AWS Lambda函数可用于检查各种系统的状况(例如,数据着陆是否在S3或Kinesis中),并通过Databricks的REST启动作业API.输入下图所示的示例是,当新数据到达bucket时,S3会触发自定义函数。lambda函数触发器Databricks的作业正在使用restapi。具体地说,各种数据都在S3中着陆(步骤1);事件通知被推送到Amazon Lambda中的自定义函数(步骤2);自定义函数对Databricks进行restapi调用以启动新作业(步骤3);作为ETL作业的一部分,Databricks向S3读写数据(步骤4)。在下图所示的示例中,当新数据到达bucket中时,S3将触发自定义函数。AWS Lambda触发器Databricks的工作是使用restapi。具体来说,各种数据都在S3中(步骤1);事件通知被推送到AWS Lambda中的自定义函数(步骤2);自定义函数对Databricks进行restapi调用以启动新作业(步骤3);作为ETL作业的一部分,Databricks向S3读写数据(步骤4)。图1:ETL自动化:1)Data lands是来自各种源的S3,2)触发事件并调用AWS Lambda中的自定义函数,3)自定义函数对Databricks进行REST API调用以启动新作业,4)作为ETL作业的一部分,Databricks从S3读取和写入数据。图2:AmazonLambda配置页面的屏幕截图。在"前缀"和"后缀"字段中,可以通过提供前缀或后缀(如文件扩展名)进一步限制将触发通知的范围。如果未指定,则bucket中创建的所有对象都会触发通知。下面的代码显示了用节点.js. 它对Databricks中的REST端点进行HTTPs post调用。JSON加载是Databricks中预配置的Spark作业的键/值(作业标识和实际作业编号)。你可以在这个视频中学习Spark jobs如何在Databricks中工作。const https=require("https");exports.handler导出=(事件、上下文、回调)=>{变量数据=JSON.stringify({"作业编号":作业编号});变量选项={主机:"xxx.cloud.databricks.com",港口:443,路径:"/api/2.0/jobs/run now",方法:"POST",//身份验证标头标题:{"授权":"Basic"+新缓冲区("用户:通过").toString("base64"),"Content-Type":"应用程序/json","内容长度":缓冲区长度(数据)}};var请求=https.请求(选项、功能(res){变量体="";恢复("数据",函数(数据){body+=数据;});恢复("结束",函数(){控制台.log(身体);});恢复("错误",功能(e){控制台.log("获取错误:"+e.message);});});请求.写入(数据);请求.结束();};例2:机器学习模型服务在这个例子中,我们使用一个使用Spark ML的随机森林回归器在数据块中训练的预测模型。这些数据是自行车共享系统中每小时的车手快照。每小时,我们会得到登记的,临时的,和目前使用自行车的总人数,以及关于日期和天气的信息。基于这些数据,我们训练了一个机器学习模型来预测在给定的一个小时内直流自行车共享系统中的骑手人数。一旦模型得到训练,我们将其应用于一组测试数据,并将结果预测写入NoSQL数据库(在本例中是Riak TS)。有关此模型的更多信息,请参阅数据建模笔记本的第3部分和第4部分。在上面的例子中,训练数据存储在S3中,模型创建和预测结果以批处理方式写入Riak-TS。awslambda是一个很好的解决方案,可以在持久性层之外提供模型预测的结果,而不必处理伸缩、版本控制和安全性方面的任何麻烦。类似地,Lambda可用于为DynamoDB、Redis或其他适当的数据存储系统提供预测结果。这种方法不仅限于回归模型:它还可以用于构建推荐系统或分类器。这种方法的一个问题是,它仅限于具有类别特征的预测(如本例中的城市和州)。对于连续的特征,如温度和无限个可能值,人们可以离散化连续特征(即,使用量子化分解器),其细节超出了本博客的范围。还可以扩展这个用例,并使用Kinesis(或Kafka)和Spark Streaming将训练数据作为流接收。在ML算法允许流式更新的情况下(例如K-Means或Logistic回归),我们可以近乎实时地更新模型。在接近实时的模型更新和数据库中的预测结果刷新之间会有一些延迟。图3:机器学习模型服务:1)实时数据馈送,例如日志、像素或感官数据,落在Kinesis上;2)Spark的结构化流将数据用于存储和处理,包括批量或接近实时的ML模型创建/更新;3)输出模型预测写入Riak TS,4) AWS Lambda和awsapi网关用于向各种客户端提供预测结果。下面的代码显示了一个用Java编写的自定义lambda函数。它调用Riak服务器,根据城市、州和时间信息来提取预计的乘客数量。JSON加载的参数与笔记本相同,例如{"city":"Washington","state":"D.C.","time":"1356436800000"}。公共类RiakModelServer实现了RequestHandler{static String tableName="spark riak预测时间";静态字符串hostName="hostName";公共响应handleRequest(请求请求,上下文上下文){double predictedCount=0;试试看{RiakClient客户端=RiakClient.newClient(主机名);List inputPKs=new ArrayList();inputPKs.add(新单元(请求.getCity()));inputPKs.add(新单元(请求.getState()));inputPKs.add(Cell.newTimestamp(Long.parseLong(请求.getTime())));Fetch Fetch=新建获取生成器(表名,inputPKs).build();查询结果查询结果=客户端.execute(取回);如果(查询结果.getRowsCount() !=0){预测计数=查询结果.GetRowScope().get(0).GetCellScope().get(3).getDouble();}}catch(异常ex){例如printStackTrace();}返回新的响应("+predictedCount);}}图4:AWS Lambda配置页面的屏幕截图。预计2012年圣诞节中午12点车手数匹配笔记本第4节。下一步是什么现在您已经了解了Databricks如何与AWS Lambda一起工作,您可以自己尝试一下。注册免费试用版,开始试用Databricks的apachespark集群、API等(选择平台试用版获得对API的完全访问)。这是一系列关于如何将Databricks与AWS生态系统中的其他服务一起使用的博客中的第一篇,在Twitter上关注我们,或者注册我们的时事通讯,以便在发布新博客时得到通知。免费试用Databricks。今天就开始吧