怎么自动设置 kafka 的kafka offset 管理

关于kafka中的timestamp与offset的对应关系
转自:/关于kafka中的timestamp与offset的对应关系.html
关于kafka中的timestamp与offset的对应关系
Aug 11, 2015
获取单个分区的情况
kafka通过offset记录每条日志的偏移量,详见《Kafka文件存储机制那些事》。但是当用户想读取之前的信息时,他是不可能知道这些消息对应的offset的,用户只能指定时间,比如说我从昨天的12点开始读取消息。
这就有个问题了,怎么样将用户定义的时间转化为集群内部的offset呢?
先简单重温一下kafka的物理存储机制:每个topic分成多个分区,而一个分区对应磁盘中的一个目录,目录中会有多个文件,比如:
01145974.index
01145974.log
可以看出来,每个segment
file其实有2部分,一个index文件,一个log文件。文件名是这个文件内的第一个消息的offset。log文件记录的是实际的消息内容。而index对log文件作了索引,当需要查看某个消息时,如果指定offset,很容易就定位到log文件中的具体位置。详见上面说的文章。
但正如刚才所说,用户不知道offset,而只知道时间,所以就需要转换了。
kafka用了一个很直观很简单的方法:将文件名中的offset与文件的最后修改时间放入一个map中,然后再查找。详细步骤如下:
(1)将文件名及文件的最后时间放入一个map中,时间使用的是13位的unix时间戳
(2)当用户指定一个时间t0时,在map中找到最后一个时间t1早于t0的时间,然后返回这个文件名,即这个文件的第一个offset。
(3)这里只返回了一个分区的offset,而事实上需要返回所有分区的offset,所以对所有分区采取上述步骤。
(4)使用取到的消息,开始消费消息。
举个例子:
w-r--r-- 1 hadoop hadoop
8?? 11 10:20 .log
-rw-r--r-- 1 hadoop hadoop
8?? 11 10:20 .index
-rw-r--r-- 1 hadoop hadoop
8?? 11 10:40 .log
-rw-r--r-- 1 hadoop hadoop
8?? 11 10:40 .index
-rw-r--r-- 1 hadoop hadoop
8?? 11 11:04 .log
-rw-r--r-- 1 hadoop hadoop
8?? 11 11:04 .index
-rw-r--r-- 1 hadoop hadoop
8?? 11 11:25 .log
-rw-r--r-- 1 hadoop hadoop
8?? 11 11:25 .index
-rw-r--r-- 1 hadoop hadoop
?? 11 11:28 .index
-rw-r--r-- 1 hadoop hadoop
8?? 11 11:28 .log
我们有上述几个文件
(1)当我需要消费从8月11日11:00开始的数据时,它会返回最后修改时间早于8月11日11:00的文件名,此外是修改时间第10:40的文件,offset为.其实由于它的最后修改时间在10:40,我们需要的数据不可能在它里面,它直接返回11:40的文件即可,但可能是出于更保险的考虑,它返回了上一个文件。
(2)其它类似,当我消费11:20的数据,返回的offset为.
(3)而当我消费的数据早于10:20的话,则返回的offset为空,如果是通过数组保存offset的,则提取第一个offset时会出现
java.lang.ArrayIndexOutOfBoundsException
异常。如在kafka编程指南中的SimpleConsumer中的代码:
long[] offsets = response.offsets(topic, partition);
return offsets[0]; 当然,也可以合理处理,当返回为空时,直接返回最早的offset即可。
(4)当消费的数据晚于最晚时刻,返回最新的消息。
(1)这里对kafka集群本身没有任何的负担,kafka消息也不需要记录时间点这个字段,只有在需要定位的时候,才临时构建一个map,然后将offset与时间读入这个map中。
(2)冗余很多消息。这种方法粒度非常粗,是以文件作为粒度的,因此冗余的消息数据和文件的大小有关系,默认为1G。如果这个topic的数据非常少,则这1G的数据可以就是几天前的数据了。
(3)有2个特殊的时间点: 需要查找的 timestamp 是 -1 或者 -2时,特殊处理
case OffsetRequest.LatestTime =&
// OffsetRequest.LatestTime = -1
startIndex = offsetTimeArray.length -1
case OffsetRequest.EarliestTime =& // OffsetRequest.EarliestTime = -2
startIndex =0
同时从所有分区获取消息的情况
1、当同时从多个分区读取消息时,只要有其中一个分区,它的所有文件的修改时间均晚于你指定的时间,就会出错,因为这个分区返回的offset为空,除非你作了合理的处理。
2、storm!!!
storm0.9x版本遇到上述问题时,同样会出错,出现以下异常
storm.kafka.UpdateOffsetException 而从0.10版本开始,改为了从最早时间开始消费消息。
3、还有个问题,如何将消息均匀的分布但各个分区中。比如在我们一个topic中,其中一个分区已经有60G数据,而另一个分区还不足2G,如果指定时间的话,由于小的那个分区的修改时间肯定是在近期的,所以当指定一个较前的时间点就会出错。而且即使不出错,从不同分区返回的消息也可能时间相差很远。
如何将消息均匀的分布到不同的分区???也就是说如果自定义自己的分区函数。
参考内容:相关源码略读
http://blog.csdn.net/lizhitao/article/details/
Server 处理 Client 发送来的请求的入口在 文件夹: core/src/main/scala/kafka/server
类:kafka.server.KafkaApis&方法:
handle 处理offset请求的函数:&handleOffsetRequest&###2、处理逻辑
处理逻辑主要分为四步
获取partition 从partition中获取offset high water mark 处理(这一段的资料太少了) 异常处理
由于request中包含查询多个partition的offset的请求。所以最终会返回一个map,保存有每个partition对应的offset
这里主要介绍从某一个partition中获取offset的逻辑,代码位置kafka.log.Log#getOffsetsBefore(timestamp,
maxNumOffsets)&从一个partition中获取offset
(1)建立offset与timestamp的对应关系,并保存到数据中
//每个Partition由多个segment file组成。获取当前partition中的segment列表
val segsArray = segments.view
// 创建数组
var offsetTimeArray: Array[(Long, Long)] =null
if(segsArray.last.size &0)
offsetTimeArray =newArray[(Long, Long)](segsArray.length +1)
offsetTimeArray =newArray[(Long, Long)](segsArray.length)
// 将 offset 与 timestamp 的对应关系添加到数组中
for(i &&span class="hljs-title" style="box-sizing: border- color: rgb(249, 38, 114);"&-0until segsArray.length)
// 数据中的每个元素是一个二元组,(segment file 的起始 offset,segment file的最近修改时间)
offsetTimeArray(i) = (segsArray(i).start, segsArray(i).messageSet.file.lastModified)
if(segsArray.last.size &0)
// 如果最近一个 segment file 不为空,将(最近的 offset, 当前之间)也添加到该数组中
offsetTimeArray(segsArray.length) = (logEndOffset, time.milliseconds)
通过这段逻辑,获的一个数据 offsetTimeArray,每个元素是一个二元组,二元组内容是(offset, timestamp)
(2)找到最近的最后一个满足 timestamp & target_timestamp 的 index
var startIndex = -1
timestamp match {
// 需要查找的 timestamp 是 -1 或者 -2时,特殊处理
caseOffsetRequest.LatestTime =&
// OffsetRequest.LatestTime = -1
startIndex = offsetTimeArray.length -1
caseOffsetRequest.EarliestTime =& // OffsetRequest.EarliestTime = -2
startIndex =0
var isFound =false
debug("Offset time array = "+ offsetTimeArray.foreach(o =&"%d, %d".format(o._1, o._2)))
startIndex = offsetTimeArray.length -1
// 从最后一个元素反向找
while(startIndex &=0&& !isFound) {
// 找到满足条件或者
if(offsetTimeArray(startIndex)._2 &= timestamp)
// offsetTimeArray 的每个元素是二元组,第二个位置是 timestamp
isFound =true
startIndex -=1
} 通过这段逻辑,实际找到的是 “最近修改时间早于目标timestamp的最近修改的segment file的起始offset” 但是获取offset的逻辑并没有结束,后续仍有处理
(3)找到满足该条件的offset数组
实际上这个函数的功能是找到一组offset,而不是一个offset。第二个参数 maxNumOffsets 指定最多找几个满足条件的
获取一组offset的逻辑
// 返回的数据的长度 = min(maxNumOffsets, startIndex + 1),startIndex是逻辑2中找到的index
val retSize = maxNumOffsets.min(startIndex +1)
val ret = newArray[Long](retSize)
// 逐个将满足条件的offset添加到返回的数据中
for(j &&span class="hljs-title" style="box-sizing: border- color: rgb(249, 38, 114);"&-0until retSize) {
ret(j) = offsetTimeArray(startIndex)._1
startIndex -=1
// 降序排序返回。offset 越大数据越新。
// ensure that the returned seq is in descending order of offsets
ret.toSeq.sortBy(- _) 最终返回这个数组
3、注意事项
实际找到的offset并不是从目标timestamp开始的第一个offset。需要注意 当 timestamp
小于最老的数据文件的最近修改时间时,返回值是一个空数组。可能会导致使用时的问题。 调整segment
file文件拆分策略的配置时,需要注意可能会造成的影响。
已投稿到:
以上网友发言只代表其个人观点,不代表新浪网的观点或立场。Kafka使用入门教程 - 简单介绍_服务器应用_Linux公社-Linux系统门户网站
你好,游客
Kafka使用入门教程
来源:Linux社区&
作者:红磊
Kafka是一个分布式的、可分区的、可复制的消息系统。它提供了普通消息系统的功能,但具有自己独特的设计。这个独特的设计是什么样的呢?
首先让我们看几个基本的消息系统术语:
Kafka将消息以topic为单位进行归纳。
将向Kafka topic发布消息的程序成为producers.
将预订topics并消费消息的程序成为consumer.
Kafka以集群的方式运行,可以由一个或多个服务组成,每个服务叫做一个broker.
producers通过网络将消息发送到Kafka集群,集群向消费者提供消息,如下图所示:
客户端和服务端通过TCP协议通信。Kafka提供了Java客户端,并且对都提供了支持。
相关阅读:
分布式发布订阅消息系统 Kafka 架构设计
Apache Kafka 代码实例
Apache Kafka 教程笔记
Topics 和Logs
先来看一下Kafka提供的一个抽象概念:topic.
一个topic是对一组消息的归纳。对每个topic,Kafka 对它的日志进行了分区,如下图所示:
每个分区都由一系列有序的、不可变的消息组成,这些消息被连续的追加到分区中。分区中的每个消息都有一个连续的序列号叫做offset,用来在分区中唯一的标识这个消息。
在一个可配置的时间段内,Kafka集群保留所有发布的消息,不管这些消息有没有被消费。比如,如果消息的保存策略被设置为2天,那么在一个消息被发布的两天时间内,它都是可以被消费的。之后它将被丢弃以释放空间。Kafka的性能是和数据量无关的常量级的,所以保留太多的数据并不是问题。
实际上每个consumer唯一需要维护的数据是消息在日志中的位置,也就是offset.这个offset有consumer来维护:一般情况下随着consumer不断的读取消息,这offset的值不断增加,但其实consumer可以以任意的顺序读取消息,比如它可以将offset设置成为一个旧的值来重读之前的消息。
以上特点的结合,使Kafka consumers非常的轻量级:它们可以在不对集群和其他consumer造成影响的情况下读取消息。你可以使用命令行来"tail"消息而不会对其他正在消费消息的consumer造成影响。
将日志分区可以达到以下目的:首先这使得每个日志的数量不会太大,可以在单个服务上保存。另外每个分区可以单独发布和消费,为并发操作topic提供了一种可能。
每个分区在Kafka集群的若干服务中都有副本,这样这些持有副本的服务可以共同处理数据和请求,副本数量是可以配置的。副本使Kafka具备了容错能力。
每个分区都由一个服务器作为&leader&,零或若干服务器作为&followers&,leader负责处理消息的读和写,followers则去复制leader.如果leader down了,followers中的一台则会自动成为leader。集群中的每个服务都会同时扮演两个角色:作为它所持有的一部分分区的leader,同时作为其他分区的followers,这样集群就会据有较好的负载均衡。
Producer将消息发布到它指定的topic中,并负责决定发布到哪个分区。通常简单的由负载均衡机制随机选择分区,但也可以通过特定的分区函数选择分区。使用的更多的是第二种。
发布消息通常有两种模式:队列模式()和发布-订阅模式()。队列模式中,consumers可以同时从服务端读取消息,每个消息只被其中一个consumer读到;发布-订阅模式中消息被广播到所有的consumer中。
Consumers可以加入一个consumer 组,共同竞争一个topic,topic中的消息将被分发到组中的一个成员中。同一组中的consumer可以在不同的程序中,也可以在不同的机器上。如果所有的consumer都在一个组中,这就成为了传统的队列模式,在各consumer中实现负载均衡。
如果所有的consumer都不在不同的组中,这就成为了发布-订阅模式,所有的消息都被分发到所有的consumer中。
更常见的是,每个topic都有若干数量的consumer组,每个组都是一个逻辑上的&订阅者&,为了容错和更好的稳定性,每个组由若干consumer组成。这其实就是一个发布-订阅模式,只不过订阅者是个组而不是单个consumer。
由两个机器组成的集群拥有4个分区 (P0-P3) 2个consumer组. A组有两个consumerB组有4个
相比传统的消息系统,Kafka可以很好的保证有序性。
传统的队列在服务器上保存有序的消息,如果多个consumers同时从这个服务器消费消息,服务器就会以消息存储的顺序向consumer分发消息。虽然服务器按顺序发布消息,但是消息是被异步的分发到各consumer上,所以当消息到达时可能已经失去了原来的顺序,这意味着并发消费将导致顺序错乱。为了避免故障,这样的消息系统通常使用&专用consumer&的概念,其实就是只允许一个消费者消费消息,当然这就意味着失去了并发性。
在这方面Kafka做的更好,通过分区的概念,Kafka可以在多个consumer组并发的情况下提供较好的有序性和负载均衡。将每个分区分只分发给一个consumer组,这样一个分区就只被这个组的一个consumer消费,就可以顺序的消费这个分区的消息。因为有多个分区,依然可以在多个consumer组之间进行负载均衡。注意consumer组的数量不能多于分区的数量,也就是有多少分区就允许多少并发消费。
Kafka只能保证一个分区之内消息的有序性,在不同的分区之间是不可以的,这已经可以满足大部分应用的需求。如果需要topic中所有消息的有序性,那就只能让这个topic只有一个分区,当然也就只有一个consumer组消费它。
更多详情见请继续阅读下一页的精彩内容:
【内容导航】
相关资讯 & & &
& (03月06日)
& (02月24日)
& (03月24日)
& (03月05日)
& (12/23/:16)
   同意评论声明
   发表
尊重网上道德,遵守中华人民共和国的各项有关法律法规
承担一切因您的行为而直接或间接导致的民事或刑事法律责任
本站管理人员有权保留或删除其管辖留言中的任意内容
本站有权在网站内转载或引用您的评论
参与本评论即表明您已经阅读并接受上述条款
Linux 发表于 回复 bighero 的评论点击这里下载JAR包 , 这里是哪里 ?加QQ 3165270
(5) bighero 发表于 点击这里下载JAR包 , 这里是哪里 ?
(6) bighero 发表于 请问是哪两个jar 包 ? 下载地址在哪里啊 ?Kafka文件存储机制及partition和offset-linux-操作系统-壹聚教程网Kafka文件存储机制及partition和offset
kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。 这种动作是在现代网络上的许多社会功能的一个关键因素。
Kafka是什么Kafka是最初由Linkedin公司开发,是一个分布式、分区的、多副本的、多订阅者,基于zookeeper协调的分布式日志系统(也可以当做MQ系统),常见可以用于web/nginx日志、访问日志,消息服务等等,Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。1.前言一个商业化消息队列的性能好坏,其文件存储机制设计是衡量一个消息队列服务技术水平和最关键指标之一。下面将从Kafka文件存储机制和物理结构角度,分析Kafka是如何实现高效文件存储,及实际应用效果。2.Kafka文件存储机制Kafka部分名词解释如下:Broker:消息中间件处理结点,一个Kafka节点就是一个broker,多个broker可以组成一个Kafka集群。Topic:一类消息,例如page view日志、click日志等都可以以topic的形式存在,Kafka集群能够同时负责多个topic的分发。Partition:topic物理上的分组,一个topic可以分为多个partition,每个partition是一个有序的队列。Segment:partition物理上由多个segment组成,下面2.2和2.3有详细说明。offset:每个partition都由一系列有序的、不可变的消息组成,这些消息被连续的追加到partition中。partition中的每个消息都有一个连续的序列号叫做offset,用于partition唯一标识一条消息.分析过程分为以下4个步骤:topic中partition存储分布partiton中文件存储方式partiton中segment文件存储结构在partition中如何通过offset查找message通过上述4过程详细分析,我们就可以清楚认识到kafka文件存储机制的奥秘。2.1 topic中partition存储分布假设实验环境中Kafka集群只有一个broker,xxx/message-folder为数据文件存储根目录,在Kafka
broker中server.properties文件配置(参数log.dirs=xxx/message-folder),例如创建2个topic名称分别为report_push、launch_info,
partitions数量都为partitions=4存储路径和目录规则为:xxx/message-folder|--report_push-0|--report_push-1|--report_push-2|--report_push-3|--launch_info-0|--launch_info-1|--launch_info-2|--launch_info-3在Kafka文件存储中,同一个topic下有多个不同partition,每个partition为一个目录,partiton命名规则为topic名称+有序序号,第一个partiton序号从0开始,序号最大值为partitions数量减1。如果是多broker分布情况,请参考kafka集群partition分布原理分析2.2 partiton中文件存储方式下面示意图形象说明了partition中文件存储方式:图1每个partion(目录)相当于一个巨型文件被平均分配到多个大小相等segment(段)数据文件中。但每个段segment
file消息数量不一定相等,这种特性方便old segment file快速被删除。每个partiton只需要支持顺序读写就行了,segment文件生命周期由服务端配置参数决定。这样做的好处就是能快速删除无用文件,有效提高磁盘利用率。2.3 partiton中segment文件存储结构读者从2.2节了解到Kafka文件系统partition存储方式,本节深入分析partion中segment file组成和物理结构。segment file组成:由2大部分组成,分别为index file和data
file,此2个文件一一对应,成对出现,后缀&.index&和“.log”分别表示为segment索引文件、数据文件.segment文件命名规则:partion全局的第一个segment从0开始,后续每个segment文件名为上一个segment文件最后一条消息的offset值。数值最大为64位long大小,19位数字字符长度,没有数字用0填充。下面文件列表是笔者在Kafka broker上做的一个实验,创建一个topicXXX包含1
partition,设置每个segment大小为500MB,并启动producer向Kafka
broker写入大量数据,如下图2所示segment文件列表形象说明了上述2个规则:图2以上述图2中一对segment file文件为例,说明segment中index&―-&data file对应关系物理结构如下:图3上述图3中索引文件存储大量元数据,数据文件存储大量消息,索引文件中元数据指向对应数据文件中message的物理偏移地址。其中以索引文件中元数据3,497为例,依次在数据文件中表示第3个message(在全局partiton表示第368772个message)、以及该消息的物理偏移地址为497。从上述图3了解到segment data file由许多message组成,下面详细说明message物理结构如下:图4参数说明:关键字解释说明8 byte offset在parition(分区)内的每条消息都有一个有序的id号,这个id号被称为偏移(offset),它可以唯一确定每条消息在parition(分区)内的位置。即offset表示partiion的第多少message4 byte message sizemessage大小4 byte CRC32用crc32校验message1 byte “magic&表示本次发布Kafka服务程序协议版本号1 byte “attributes&表示为独立版本、或标识压缩类型、或编码类型。4 byte key length表示key的长度,当key为-1时,K byte key字段不填K byte key可选value bytes payload表示实际消息数据。2.4 在partition中如何通过offset查找message例如读取offset=368776的message,需要通过下面2个步骤查找。第一步查找segment file上述图2为例,其中.index表示最开始的文件,起始偏移量(offset)为0.第二个文件.index的消息量起始偏移量为368770
= 368769 + 1.同样,第三个文件.index的起始偏移量为337 +
1,其他后续文件依次类推,以起始偏移量命名并排序这些文件,只要根据offset **二分查找**文件列表,就可以快速定位到具体文件。当offset=368776时定位到.index|log第二步通过segment file查找message通过第一步定位到segment file,当offset=368776时,依次定位到.index的元数据物理位置和
.log的物理偏移地址,然后再通过.log顺序查找直到
offset=368776为止。从上述图3可知这样做的优点,segment index
file采取稀疏索引存储方式,它减少索引文件大小,通过mmap可以直接内存操作,稀疏索引为数据文件的每个对应message设置一个元数据指针,它比稠密索引节省了更多的存储空间,但查找起来需要消耗更多的时间。3 Kafka文件存储机制?实际运行效果实验环境:Kafka集群:由2台虚拟机组成cpu:4核物理内存:8GB网卡:千兆网卡jvm heap: 4GB详细Kafka服务端配置及其优化请参考:kafka server.properties配置详解图5从上述图5可以看出,Kafka运行时很少有大量读磁盘的操作,主要是定期批量写磁盘操作,因此操作磁盘很高效。这跟Kafka文件存储中读写message的设计是息息相关的。Kafka中读写message有如下特点:写message消息从java堆转入page cache(即物理内存)。由异步线程刷盘,消息从page cache刷入磁盘。读message消息直接从page cache转入socket发送出去。当从page cache没有找到相应数据时,此时会产生磁盘IO,从磁盘Load消息到page cache,然后直接从socket发出去4.总结Kafka高效文件存储设计特点Kafka把topic中一个parition大文件分成多个小文件段,通过多个小文件段,就容易定期清除或删除已经消费完文件,减少磁盘占用。通过索引信息可以快速定位message和确定response的最大大小。通过index元数据全部映射到memory,可以避免segment file的IO磁盘操作。通过索引文件稀疏存储,可以大幅降低index文件元数据占用空间大小。kafka中的partition和offsetLog机制说到分区,就要说kafka对消息的存储.在官方文档中.分区读写日志图首先,kafka是通过log(日志)来记录消息发布的.每当产生一个消息,kafka会记录到本地的log文件中,这个log和我们平时的log有一定的区别.这里可以参考一下The
Log,不多解释.这个log文件默认的位置在config/server.properties中指定的.默认的位置是log.dirs=/tmp/kafka-logs,linux不用说,windows的话就在你对应磁盘的根目录下.我这里是D盘.分区partitionkafka是为分布式环境设计的,因此如果日志文件,其实也可以理解成消息,放在同一个地方,那么必然会带来可用性的下降,一挂全挂,如果全量拷贝到所有的机器上,那么数据又存在过多的冗余,而且由于每台机器的磁盘大小是有限的,所以即使有再多的机器,可处理的消息还是被磁盘所限制,无法超越当前磁盘大小.因此有了partition的概念.kafka对消息进行一定的计算,通过hash来进行分区.这样,就把一份log文件分成了多份.如上面的分区读写日志图,分成多份以后,在单台broker上,比如快速上手中,如果新建topic的时候,我们选择了--replication-factor
1 --partitions 2,那么在log目录里,我们会看到test-0目录和test-1目录.就是两个分区了.你可能会想,这特么没啥区别呀.注意,当有了多个broker之后,这个意义就存在了.这里上一张图,原文在参考链接里有kafka分布式分区存储这是一个topic包含4个Partition,2
Replication(拷贝),也就是说全部的消息被放在了4个分区存储,为了高可用,将4个分区做了2份冗余,然后根据分配算法.将总共8份数据,分配到broker集群上.结果就是每个broker上存储的数据比全量数据要少,但每份数据都有冗余,这样,一旦一台机器宕机,并不影响使用.比如图中的Broker1,宕机了.那么剩下的三台broker依然保留了全量的分区数据.所以还能使用,如果再宕机一台,那么数据不完整了.当然你可以设置更多的冗余,比如设置了冗余是4,那么每台机器就有了0123完整的数据,宕机几台都行.需要在存储占用和高可用之间做衡量.至于宕机后,zookeeper会选出新的partition leader.来提供服务.这个等下篇文章偏移offset上一段说了分区,分区就是一个有序的,不可变的消息队列.新来的commit
log持续往后面加数据.这些消息被分配了一个下标(或者偏移),就是offset,用来定位这一条消息.消费者消费到了哪条消息,是保持在消费者这一端的.消息者也可以控制,消费者可以在本地保存最后消息的offset,并间歇性的向zookeeper注册offset.也可以重置offset如何通过offset算出分区其实partition存储的时候,又分成了多个segment(段),然后通过一个index,索引,来标识第几段.这里先可以去看一下本地log目录的分区文件夹.在我这里,test-0,这个分区里面,会有一个index文件和一个log文件,index和log对于某个指定的分区,假设每5个消息,作为一个段大小,当产生了10条消息的情况想,目前有会得到(只是解释)0.index (表示这里index是对0-4做的索引)5.index (表示这里index是对5-9做的索引)10.index (表示这里index是对10-15做的索引,目前还没满)和0.log5.log10.log,当消费者需要读取offset=8的时候,首先kafka对index文件列表进行二分查找,可以算出.应该是在5.index对应的log文件中,然后对对应的5.log文件,进行顺序查找,5-&6-&7-&8,直到顺序找到8就好了.
上一页: &&&&&下一页:相关内容}

我要回帖

更多关于 kafka 手动提交offset 的文章

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信