云服务器价格_云数据库_云主机【优惠】最新活动-搜集站云资讯

天翼云_cdn回本周期_高性能

小七 141 0

使用Databricks Delta构建移动游戏事件数据管道

如何使用结构化流建立端到端的数据管道在数据库里试试这个笔记本移动游戏的世界节奏很快,需要快速扩展的能力。随着全球数百万用户通过游戏方式每秒产生数百万事件,您需要实时计算关键指标(分数调整、游戏内购买、游戏内操作等)。同样重要的是,一个受欢迎的游戏发布或功能将增加事件流量的数量级,你将需要基础设施来处理这种快速的规模。由于低延迟洞察力和快速可扩展基础设施的复杂性,为诸如移动游戏分析之类的高容量流媒体使用案例构建数据管道可能会非常复杂和令人困惑。负责这项工作的开发人员将遇到许多体系结构问题。首先,他们应该考虑什么样的技术可以减少他们的学习曲线,并且能够很好地集成?第二,构建后的体系结构的可扩展性如何?最后,组织中的不同角色将如何协作?最终,他们将需要构建一个端到端的数据管道,包括以下三个功能组件:数据摄取/流传输;数据转换(ETL);以及数据分析和可视化。解决这些问题的一种方法是选择提供这些功能的统一平台。Databricks提供了一个统一的分析平台,它将大数据和人工智能结合在一起,并允许组织中的不同角色在一个工作区中进行协作。在本博客中,我们将探讨如何:使用AWS服务(如API Gateway、Lambda和Kinesis Streams)构建移动游戏数据管道使用Spark结构化流媒体构建流接收服务使用Databricks Delta作为流媒体操作的接收器探索如何直接在此表上执行分析,最大限度地减少数据延迟说明Databricks Delta如何解决流数据的传统问题高级基础设施组件构建移动游戏数据管道非常复杂,因为您需要快速可扩展的基础设施来处理数百万用户的数百万事件,并实时获得可操作的见解。这就是用AWS和Databricks构建数据管道的好处所在。Kinesis碎片可以动态地重新配置以处理增加的负载,而databrick会自动扩展集群以处理数据的增加。在我们的示例中,我们使用事件生成器模拟来自移动用户的游戏事件。这些事件被推送到一个REST端点,并通过摄取到Databricks Delta表来跟踪我们的数据管道。此事件生成器的代码可以在此处找到。 Amazon API网关、Lambda和Kinesis流对于本例,我们使用amazonapi网关构建REST端点。到达这个端点的事件会自动触发一个无服务器的lambda函数,该函数将这些事件导入Kinesis流中供我们使用。您需要将lambda与端点的集成设置为自动触发,并调用将这些事件写入kinesis的函数。设置Python lambda函数,如下所示:导入json进口boto3随机导入导入base64导入时间def lambda_处理程序(事件,上下文):打印"接收到的事件:{}"。格式(事件)stream_name='streamdemo_传入'记录=json.loads(事件['body'])record['eventTime']=int(时间。时间())event['body']=记录client=boto3.client('kinesis')client.put_记录(流名称=流名称,数据=json.dumps文件(事件),PartitionKey=str(随机.randint(1100)))无返回Kinesis流是由吞吐量提供的,因此您可以根据需要提供尽可能多的碎片来处理预期的数据吞吐量。每个碎片的写入吞吐量为1MB/秒,读吞吐量为2MB/秒,或每秒1000条记录。有关Kinesis流吞吐量的更多信息,请查看文档。如果有多个碎片,随机分区键对于均匀分布非常重要。使用结构化流媒体从Kinesis摄取从动觉流中摄取数据是直接的。在生产环境中,您需要设置适当的IAM角色策略,以确保集群能够访问Kinesis流。此操作的最小权限如下所示:{"Version":"2012-10-17","声明":[{"Effect":"允许","行动":["动觉:描述流","动觉:获取记录","运动:GetShardIterator"],"Resource":"ARNđYOURđ流"}]}或者,您也可以使用AWS访问键并将它们作为选项传入,然而,IAM角色是生产用例的最佳实践方法。在本例中,假设集群具有适当的IAM角色设置。首先创建如下数据帧:KinesDataFrame=火花\.读流\.format('kinesis')\.option('streamName','MY'u KINESIS'u STREAM_NAME')\.option('initialPosition','STREAM_POSITION')\.option('region','KINESIS'u region')\.加载()您还需要定义传入数据的模式。Kinesis数据的包装方式如下:kinessschema=结构类型()\.add('body',StringType())\.add('resource',StringType())\.add('requestContext',StringType())\.add('queryStringParameters',StringType())\.add('httpMethod',StringType())\.add('pathParameters',StringType())\.add('headers',StringType())\.add('stageVariables',StringType())\.add('path',StringType())\.add('isBase64Encoded',StringType())eventSchema=StructType().add('eventName',StringType())\.add('eventTime',TimestampType())\.add('eventParams',StructType()\.add('游戏关键词',StringType())\.add('应用程序名称',StringType())\.add('scoreAdjustment',IntegerType())\.add('平台',StringType())\.add('app_version',StringType())\.add('device_id',StringType())\.add('客户端事件\时间',TimestampType())\.add('amount',DoubleType()))对于这个演示,我们只对kinesschema的主体感兴趣,它将包含我们在eventSchema中描述的数据。someEventDF=KinesDataFrame.selectExpr("cast(data as STRING)jsonData")\.select(from_json('jsonData',kinesschema).alias('requestBody'))\.select(from_json('请求正文.body,eventSchema).alias('body'))\.select('正文属性1', '正文属性2', '身体等')使用Databricks Delta的实时数据管道现在我们已经定义了流数据帧,让我们继续进行一些简单的转换。事件数据通常是基于时间序列的,所以最好按事件日期之类的数据进行分区。但是,我们的传入流没有event date参数,因此我们将通过转换eventTime列来创建自己的参数。我们还将抛出一个检查,以确保eventTime不为空:base_path='/path/to/mobile_events_stream/'事件流=游戏事件过滤器(gamingEventDF.eventTime.isNotNull())。With列("eventDate",结束日期(游戏事件df.eventTime)) \.writeStream公司\.partitionBy('eventDate')\.format('delta')\.option('检查点位置',基本路径+'/\u checkpoint')\.start(基本路径)让我们借此机会定义表的位置。创建表如果不存在mobile_events_delta_raw使用DELTA位置"/path/to/mobile_events_stream/";实时分析、KPI和可视化现在我们已经将数据流实时传输到我们的Databricks Delta表中,我们可以继续研究一些kpi。传统上,公司只会每天查看这些内容,但是使用结构化流媒体和Databricks Delta,您可以在Databricks笔记本中实时地可视化这些内容。让我们从一个简单的开始。在过去的一小时里我看到了多少事件?计数df=带水印的游戏事件("事件时间","180分钟").groupBy(窗口("事件时间","60分钟")).count()countsQuery=countsDF.writeStream计数\.format('内存')\.queryName('传入的\u事件\u计数')\。开始()然后,我们可以在笔记本上看到这一点,比如一个条形图:也许我们可以让事情更有趣一点。我在最后一小时赚了多少钱?让我们检查一下预订情况。了解每小时的预订量是一个重要的指标,因为它可以指示我们的应用程序/生产系统的运行情况。例如,如果在新的游戏补丁发布后,预订量突然下降,我们马上就会知道出了问题。我们可以使用相同的数据帧,但是过滤所有purchaseEvents,按60分钟的窗口分组。预订DF=带水印的游戏事件("事件时间","180分钟")。筛选器(游戏事件df.eventName=='purchaseEvent').groupBy(窗口("eventTime","60分钟")).sum("事件参数金额")预订查询=预订DF.writeStream\.format('内存')\.queryName('传入事件\预订')\。开始()让我们选择一个线图来可视化这个:对于SQL爱好者,可以直接查询Databricks Delta表。让我们看一个简单的查询来显示当前的每日活动用户(DAU)。我知道我们实际上是在看设备id,因为我们的示例集不包含用户id,所以为了示例起见,让我们假设用户和设备之间存在1-1映射(尽管在现实世界中,情况并非总是如此)。计数(选择不同eventParams.device_id事件参数)作为mobile_events_delta_raw中的DAU,其中to_date(eventTime)=当前日期;用Databricks Delta解决传统流式"小文件"问题许多人在流媒体方面面临的一个共同挑战是经典的"小文件"问题。取决于您的写入被触发的频率和tra的音量