云服务器价格_云数据库_云主机【优惠】最新活动-搜集站云资讯

消息队列_海量数据库解决方案_新用户

小七 141 0

挤压消防水龙带:从卡夫卡压缩中获得最大收益

我们Cloudflare是Kafka的长期用户,第一次提到它可以追溯到2014年初,当时最新的版本是0.8.0。我们使用Kafka作为日志来增强分析(HTTP和DNS)、DDOS缓解、日志记录和指标。消防软管CC BY 2.0图像由RSLab提供虽然统一日志抽象的想法从那时起就保持不变(如果你没有读过jaykreps的这篇经典博客文章的话),但从那时起,卡夫卡在其他领域得到了发展。其中一个改进的方面是压缩支持。在过去的日子里,我们尝试过几次启用它,但最终由于协议中未解决的问题而放弃了这个想法。卡夫卡压缩概述就在去年,kafka0.11.0发布了新的改进协议和日志格式。简单的压缩方法是单独压缩日志中的消息:编辑:最初我们说这是卡夫卡在0.11.0之前的工作方式,但这似乎是错误的。如果压缩算法有更多的数据,那么它们的工作效果最好,因此在新的日志格式中,消息(现在称为记录)被打包并成批压缩。在以前的递归日志格式消息(压缩的消息集是一条消息)中,新格式使事情变得更简单:压缩的记录批只是一个批。现在压缩有了更大的空间来完成它的工作。同一个Kafka主题中的记录很有可能共享公共部分,这意味着它们可以被更好地压缩。在数千条信息的规模上,差异变得巨大。这里的缺点是,如果您想读取上面示例中的record3,那么您还必须获取记录1和2,无论批处理是否压缩。实际上,这并不重要,因为消费者通常一批接一批地按顺序读取所有记录。Kafka中压缩的好处在于它可以让您在CPU和磁盘以及网络使用上进行权衡。协议本身也被设计成最小化开销,只需要在几个地方进行解压缩:在日志的接收端,只有使用者需要解压缩消息:实际上,如果不使用加密,数据可以在NIC和磁盘之间复制,而用户空间的拷贝为零,这在一定程度上降低了成本。Cloudflare的卡夫卡瓶颈减少网络和磁盘使用是我们的一大卖点。早在2014年,我们就在卡夫卡的领导下开始旋转磁盘,从未出现过磁盘空间问题。然而,在某些时候,我们开始出现随机io的问题。大多数情况下,使用者和副本(只是另一种类型的使用者)从日志的最顶端读取数据,而这些数据驻留在页缓存中,这意味着您根本不需要从磁盘读取:在这种情况下,您接触磁盘的唯一时间是在写入过程中,顺序写入非常便宜。然而,当你有多个落后的消费者时,事情就会开始分崩离析:每个使用者都希望从物理磁盘读取日志的不同部分,这意味着来回查找。有一个落后的消费者是可以的,但他们中的许多人会开始争夺磁盘io,只会增加他们所有人的延迟。为了解决这个问题,我们升级到了固态硬盘。用户不再为磁盘时间而争了,但在大多数情况下,当用户不落后并且没有读io时,感觉非常浪费。我们没有无聊太久,因为其他问题也出现了:磁盘空间成了一个问题。固态硬盘要贵得多,可用磁盘空间大大减少。随着我们的成长,我们开始使网络饱和。我们使用2x10Gbit网卡,不完美的平衡意味着我们有时会使网络链路饱和。压缩承诺可以解决这两个问题,因此我们渴望在卡夫卡的改进支持下再试一次。性能测试在Cloudflare,我们广泛使用Go,这意味着我们的许多Kafka消费者和生产商都在Go中。这意味着我们不能在每一个服务器版本中都使用Kafka团队提供的现成Java客户机,开始享受压缩带来的好处。我们必须首先从Kafka客户端库获得支持(我们使用Shopify的sarama)。幸运的是,在2017年底增加了支持。有了我们这边的更多修复,我们可以让测试设置工作。Kafka支持4种压缩编解码器:none、gzip、lz4和snapy。我们必须弄清楚这些是如何为我们的主题工作的,所以我们编写了一个简单的生产者,将数据从现有的主题复制到目标主题中。对于每种压缩类型有四个目标主题,我们可以得到以下数字。每个目的地主题收到的消息量大致相同:不同主题的入口带宽不同,这是预期的:消息的平均大小与入口带宽差成正比:更明显的是,以下是这些主题的磁盘使用情况:这看起来很神奇,但它是一个相当低吞吐量的nginx错误主题,包含来自nginx的字符串错误消息。我们的主要目标是请求HTTP日志主题,其中包含更难压缩的capnp编码消息。当然,我们继续尝试请求的一个分区主题。最初的结果非常好:他们很好,因为他们是谎言。如果使用nginx错误日志,我们将未压缩的日志压缩到20Mbps以下,那么在这里,我们将提高30倍到600Mbps,压缩速度无法跟上。不过,作为一个起点,这个实验给了我们一些关于主要目标压缩比的期望。压缩已使用的消息磁盘使用率平均邮件大小没有30.18米48106MB1594亿广州工业园区317米1443兆字节455亿敏捷2099万14807MB705亿LZ4型2093万14731兆703立方从一开始Gzip听起来太贵了(尤其是在Go中),但是snapy应该能够跟上。我们分析了我们的生产商,它只花了2.4%的CPU时间进行快速压缩,从未使单个内核饱和:就连gzip也只花了20%,而且也没有让CPU饱和:卡夫卡似乎在做一些让我们减速的事情。理论上,它应该只对消息进行解压缩以进行冒烟测试,但它还具有一个故障保护机制,可以在消息具有无效偏移时重新压缩它们。对于Snappy,我们可以使用jstack从Kafka获得以下线程stacktrace:"kafka-request-handler-3"#87 daemon prio=5 os_prio=0 tid=0x00007f80d2e97800 nid=0x1194可运行[0x00007f7ee1adc000]java.lang.Thread。状态:可运行在org.xerial.snappy公司.SnappyNative.rawCompress(本地方法)在org.xerial.snappy公司.Snappy.rawCompress公司(Snappy.java:446)在org.xerial.snappy公司.快速压缩(Snappy.java:119)在org.xerial.snappy公司.SnappyOutputStream.compressInput(SnappyOutputStream.java:376)在org.xerial.snappy公司.SnappyOutputStream.write(SnappyOutputStream.java:130)在java.io.DataOutputStream.写入(数据输出流.java:107)-锁定(ajava.io.DataOutputStream)在org.apache.kafka.公用.utils.utils.writeTo公司(Utils.java:861)在org.apache.kafka.common.record.DefaultRecord.writeTo公司(DefaultRecord.java:203)在org.apache.kafka.common.record.MemoryRecordsBuilder.appendDefaultRecord(MemoryRecordsBuilder.java:622)在org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:409)在org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:442)在org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:595)在kafka.log.LogValidator$.$anonfun$构建记录和分配偏移量$1(LogValidator.scala:336)在kafka.log.LogValidator$.$anonfun$构建记录和分配偏移量$1$adapted(LogValidator.scala:335)在kafka.log.LogValidator$$$Lambda$675/1035377790。应用(未知来源)在scala.collection.mutable.调整大小learray.foreach(大小调整learray.scala:59)在scala.collection.mutable.调整大小learray.foreach$(调整大小ClearRay.scala:52)在scala.collection.mutable.ArrayBuffer.foreach(阵列缓冲器。scala:48)在kafka.log.LogValidator$.BuildRecords和AssignOffset(LogValidator.scala:335)在kafka.log.LogValidator$.validateMessagesAndAssignOffsetsCompressed(LogValidator.scala:288)在kafka.log.LogValidator$.validateMessagesAndAssignOffsets(LogValidator.scala:71)在卡夫卡.log.log.liftedTree1美元(Log.scala:654)在卡夫卡.log.log.$anonfun$追加$2(Log.scala:642)-锁定(ajava.lang.Object)在卡夫卡.log.log$$Lambda$627/239353060.apply(未知来源)在卡夫卡.log.log.maybeHandleIOException(日志。scala:1669)在卡夫卡.log.log。追加(日志。scala:624)在卡夫卡.log.log.附录标题(日志。scala:597)在kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(分区。scala:499)在kafka.cluster.Partition$$Lambda$625/1001513143.apply(未知来源)在卡夫卡.utils.CoreUtils$.inLock(CoreUtils。斯卡拉:217)在卡夫卡.utils.CoreUtils$.inReadLock(CoreUtils。斯卡拉:223)在kafka.cluster.Partition.appendRecordsToLeader(分区.scala:487)在kafka.server.ReplicaManager.$anonfun$附录本地允许$2(复制管理器.scala:724)在kafka.server.ReplicaManager$$Lambda$624/2052953875。应用(未知来源)在scala.collection.TraversableLike.$anonfun$地图$1(可遍历的。scala:234)在scala.collection.TraversableLike$$Lambda$12/187472540.apply(未知来源)在scala.collection.mutable.HashMap.$anonfun$foreach$1(HashMap.scala:138)在scala.collection.mutable.HashMap$$Lambda$25/1864869682.apply(未知源)在scala.collection.mutable.哈希表.foreachEntry(HashTable.scala:236)在scala.collection.mutable.哈希表.foreachEntry$(HashTable.scala:229)在scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)在scala.collection.mutable.HashMap.foreach(HashMap.scala:138)在scala.collection.TraversableLike.地图(可遍历的。scala:234)在scala.collection.TraversableLike.地图$(可遍历。scala:227)在scala.collection.AbstractTraversable.地图(可穿越。scala:104)在kafka.server.ReplicaManager.附录(复制管理器.scala:708)在kafka.server.ReplicaManager.附录记录(复制管理器.scala:459)在kafka.server.kafkapis.handleProduceRequest公司(卡夫卡皮斯。斯卡拉:466)在kafka.server.kafkapis.手柄(卡夫卡皮