云服务器价格_云数据库_云主机【优惠】最新活动-搜集站云资讯

CDN_周口网站建设公司_免费

小七 141 0

使用Databricks Delta构建实时属性管道

在数据库里试试这个笔记本在数字广告中,能够向客户提供的最重要的信息之一是关于他们的广告支出如何推动结果的信息。我们能提供的越快越好。为了将转换或参与与广告活动中的印象联系起来,公司必须进行归因。归因可能是一个相当昂贵的过程,如果没有正确的技术,针对不断更新的数据集运行属性是一项挑战。传统上,这不是一个容易解决的问题,因为有很多事情需要考虑:如何确保在不损坏记录的情况下以低延迟将数据写入读取位置?我们如何在不增加成本或性能损失的情况下不断地向大型、可查询的数据集追加数据?我应该在哪里以及如何引入归属的连接?幸运的是,Databricks通过结构化流和Databricks Delta使这变得容易。在这篇博文(以及相关的笔记本)中,我们将快速了解如何使用DataFrame API在Kinesis之上构建结构化流媒体应用程序(对于使用Azure Databricks的用户,可以使用Azure EventHubs、HDInsight上的Apache Kafka或Azure Cosmos DB integration),并使用Databricks Delta来近乎实时地查询流。我们还将向您展示如何使用您选择的BI工具来实时查看您的归因数据。定义流我们需要做的第一件事是建立印象和转换数据流。impression数据流为我们提供了与数字广告(impression)的客户相关联的属性的实时视图,而转换流则表示基于该广告执行了操作(例如单击广告、购买商品等)的客户。使用Databricks中的结构化流,您可以快速插入到流中,因为Databricks支持直接连接到Kafka(ApacheKafka,AWS上的ApacheKafka,HDInsight上的ApacheKafka)和Kinesis,如以下代码片段所述(这是为了获得印象,请重复此步骤进行转换)//阅读印象流瓦尔运动=spark.readStream.format("kinesis").option("streamName",kinessstreamname).option("区域",kinesregion).option("initialPosition","latest").option("awsAccessKey",$awsAccessKeyId$).option("awsSecretKey",$awsSecretKey$).加载()接下来,创建数据流模式,如下面的代码片段所示。//定义印象流模式val schema=结构类型(顺序(StructField("uid",StringType,true),StructField("impTimestamp",TimestampType,true),StructField("exchangeID",IntegerType,true),StructField("publisher",StringType,true),StructField("creativeID",IntegerType,true),StructField("单击",StringType,true),StructField("广告客户ID",IntegerType,true),StructField("browser",StringType,true),StructField("geo",StringType,true),StructField("bidAmount",DoubleType,true)))最后,我们要创建流式印象数据帧。使用Databricks display命令,我们可以在数据旁边实时看到数据和输入/处理速率。//定义流式显示数据帧值imp=kinesis.选择(来自_json('数据.cast("string"),schema)作为"fields"')。选择($"fields.*")//查看印象实时数据显示(imp)将流同步到Databricks增量impression(impression)和conv(conv)流可以直接同步到Databricks Delta,从而为这个实时属性用例提供了更大程度的灵活性和可伸缩性。t允许您将这些实时数据流快速写入S3/Blob存储上的Parquet格式,同时允许用户同时从同一目录中读取数据,而无需自己管理一致性、事务性和性能的开销。如下面的代码片段所述,我们从单个源捕获原始记录并将其写入其自己的Databricks Delta表中。进口org.apache.spark网站.sql.SparkSession进口org.apache.spark网站.sql.expressions.Window//将Impression`imp`数据持久化到Databricks Delta带水印的imp("impTimestamp","1分钟").重新划分(1).writeStream公司.格式("delta").option("路径","/tmp/adtech/impressions").option("检查点位置","/tmp/adtech/impressions checkpoints").触发器(org.apache.spark网站.sql.streaming.Trigger.ProcessingTime("10秒")).start()需要注意的是,使用Databricks Delta,您还可以:此时应用其他ETL、分析和/或扩展步骤将来自不同流或批处理过程和不同源的数据写入同一个表中用于即席报告的Databricks增量视图现在我们已经创建了impression和conversion Databricks Delta表,我们将创建命名视图,这样我们就可以轻松地在sparksql中执行连接,并且可以从您最喜欢的BI工具中查询这些数据。让我们首先从创建Databricks Delta视图开始。%sql语言使用adtech;创建或替换视图印象delta作为select*from delta.`/tmp/adtech/impressions`;创建或替换视图转换增量为select*from delta.`/tmp/adtech/conversions`;计算实时属性现在我们已经建立了Databricks Delta视图,我们可以计算视图上的最后一次触摸属性,然后计算视图上的加权属性。计算视图上的最后一次触摸属性如前几节所述,要计算实时窗口属性,我们将需要连接两个不同的Delta数据流:印象和转换。如下面的代码片段所述,我们将首先定义基于delta的Databricks表示和转换。我们还将定义窗口规范,它将由下面的dense_rank()语句使用。窗口和等级定义了我们的归属逻辑。//定义所需的印象数据值imps=火花.sql("将uid选为impUid,将广告客户id选为impAdv,*from斯巴克萨米特.imps").drop("广告客户ID")//定义所需的转换数据val转换=火花.sql("选择*自sparksummit.convs公司")//定义按印象时间戳排序的Spark SQL窗口//按Impression Uid和Impression Advertiser划分val窗口规范=窗口.分区依据("impUid","impAdv").orderBy(desc("impTimestamp"))//计算实时属性//接合印象。不纯== 转换.uid//确保印模时间在转换时间之前//密级过滤val窗口属性=转换连接(小鬼们,小鬼队("不纯")===转换列("uid")&小鬼队("impTimestamp")