单kafka 集群某节点挂机kafka可以做数据采集吗

在云上搭建大规模实时数据流处理系统
发表于 17:54|
作者郝峻晟,钟波,罗伟
摘要:如今数据规模变得越来越大,传统的软硬件工具已很难满足数据处理的需求。本文将结合汽车状态实时监控系统,讲述如何在阿里云上使用Kafka和Storm搭建大规模消息分发和实时数据流处理系统,及其中遇到的挑战。
在大数据时代,数据规模变得越来越大。由于数据的增长速度和非结构化的特性,常用的软硬件工具已无法在用户可容忍的时间内对数据进行采集、管理和处理。本文主要介绍如何在阿里云上使用Kafka和Storm搭建大规模消息分发和实时数据流处理系统,以及这个过程中主要遭遇的一些挑战。实践主要立足建立一套汽车状态实时监控系统,可以在阿里云上立即进行部署。
实时大数据处理利器——Storm和Kafka
大数据时代,随着可获取数据的渠道增多,比如常见的电子商务、网络、传感器的数据流、太空数据等,数据规模也变得越来越大;同时,不同的渠道往往产生更多的数据类型,这些衍生的数据增长非常之快,规模非常之大。大数据时代各个机构可谓是坐拥金山,然而目前大数据技术的应用却仍然存在众多挑战,主要出现在数据收集、存储、处理和可视化几个过程。
1. 数据收集
Gartner的Merv Adrian对大数据有这样一个定义:“大数据让常用硬件软件工具无法在用户可容忍时间内对数据进行采集、管理和处理。”
麦肯锡全球研究院在2011年5月也有这样一个概念:“大数据是指超出典型数据库软件工具采集、存储、管理和分析能力的数据集。”
从上面的定义可以看出,大数据最大的挑战在于如何在有限时间内对数据进行处理和分析,并得到有用信息。
2. 数据处理
大数据处理中最著名的工具是Hadoop,不过它并不是一套实时系统。为了解决这个问题,计算机工程师们又开发了Storm和Kafka。Apache
Storm是一套开源的分布式实时计算系统。最早由Nathan Marz
开发,在被Twitter收购后开源,并在2014年9月起成为Apache顶级开源项目。Storm被广泛用于各种商业网站,包括Twitter、Yelp、Groupon、百度、淘宝等。Storm的使用场景非常广泛,例如实时分析、在线机器学习、连续计算、分部署RPC、ET|等。Storm有着非常快的处理速度,单节点可以达到百万个元组每秒,此外它还具有高扩展、容错、保证数据处理等特性。
图1是Storm的一个简单的架构。
图1 &Storm架构
Apache Kafka也是一个开源的系统,旨在提供一个统一的,高吞吐、低延迟的分布式消息处理平台来对实时数据进行处理。它最早由LinkedIn开发,开源于2011年并被贡献给了Apache。Kafka区别于传统RabbitMQ、Apache
ActiveMQ等消息系统的地方主要在于:分布式系统特性,易于扩展;为发布和订阅提供高吞吐量;支持多订阅,可以自动平衡消费者;可以将消息持久化到磁盘,可以用于批量消费,例如ETL等。
图2 &Kafka架构
在阿里云上部署Storm和Kafka
我们需要设计一个实时车辆监控系统,这个系统要将汽车驾驶过程中实时的位置,速度,转速,油耗以及转速发送到系统中,从而可以实时计算出车流量和污染物排放量。该系统的目标是要能同事支持10万辆车同时发送消息,在最高峰能满足100万辆车。为了实现如此规模的消息分发和吞吐,我们基于Kafka和Storm来设计实现。同时为了满足高扩展性,我们将Storm和Kafka分别部署到不同的服务器上,如果需要更多的计算能力,可以随时通过创建新的服务器的方式来完成。此外为了满足高可用性,每台相同功能的服务器也需要至少部署2台,这样一旦一台服务器出现问题,另外一台服务器也可以持续提供服务。
在实体服务器上部署Storm和Kafka等系统涉及到大量服务器集群和软件的安装部署,这个过程需要花费大量时间,而云计算则很好的弥补了这一点——提供各种虚拟服务器和镜像功能,加快基础设施和软件的部署过程。
基于云的车联网监控系统架构
图3 &车联网监控系统架构
我们需要2台服务器来构建Kafka代理服务器,在Storm中还需要2台服务器来运行Spout和2个Bolt,另外在Redis层则需要2台服务器来部署缓存,再加上2台服务器作为Web服务器。服务器架构图如图4所示。
图4 &车联网监控系统架构
在部署车联网监控系统之前,我们首先需要在每台服务器上部署相应的软件,包括Git、Libzmq、Java、G++等,用于代码编译和相关软件安装。可以使用SSH连接到相应的机器。用户名密码则会由阿里云以邮件或者短消息的方式提供。
在车联网实时监控系统中,我们需要部署4种不同类型的服务器,分别是网站前台服务器、Kafka服务器、Storm服务器和缓存服务器,以满足上面提到的高扩展性的要求。在每一种类型的服务器部署完成之后,都可以通过阿里云镜像的功能,创建一个能随时使用的镜像,这样在扩展服务器的时候就不需要重新安装软件,直接通过镜像创建服务器就可以了。
以下命令需要在所有服务器上运行以安装相应的软件:以下命令安装在缓存服务器和Kafka服务器上:另外,我们还需要在Storm的服务器安装maven和lein用于代码编译:在Kafka服务器上安装Kafka:
对于Storm和Kafka的安装,到这一步已基本完成,接下去需要分别创建镜像。创建镜像的方法是先创建阿里云快照,然后通过将快照转换为镜像的方式完成。具体步骤如下:
在阿里云的管理界面选择云服务器,随后选择该服务器的磁盘列表,点击创建快照。输入快照名称并确认。阿里云会自动为云服务器的系统盘创建快照,当创建完成以后,会出现“创建自定义镜像”按钮。点击“创建自定义镜像”的按钮,阿里云就会将这个快照转换为镜像,可以在阿里云ECS管理界面的自定义镜像栏中看到。
图5 &自定义的镜像
接下来,我们通过镜像可以直接创建相同配置的ECS服务器。
图6 &从自定义镜像中创建云服务器
当然,在自动扩展实现上,云服务并不需要用户去手动执行,这里我们使用阿里云的ECS REST API自动通过镜像创建机器。可以参考以下Python代码,自动创建阿里云ECS虚拟机:基于Storm和Kafka的车辆信息实时监控系统打造
接下来做的就是将车辆信息实时监控系统部署到系统中。这个系统演示了如何编写一个Storm的Topology,从Kafka消息系统中将信息读取出来。我们使用Kafka的客户端模拟从世界各地发送车辆实时信息给Kafka集群,然后Storm
Topology会把这些消息通过Bolts将坐标转换为Json对象,并且使用GeoJSON在Bing Map上显示车辆的实时位置、温度、转速以及速度等等信息。Topology还会将信息写到Redis缓存中,然后Node.js通过socket.io读取Redis中的信息,并且使用d3js显示在页面上。
首先,我们需要编写Kafka 生产者的部分代码,主要是模拟读取汽车的实时数据并向Kafka集群进行发送,我们实现了一个KafkaCarDataProducer类,通过配置ProducerConfig来创建一个Producer对象来发送数据。它可以用来连接到Zookeeper,或者直接是Kafka
代理。例如:kafkaclient.cloudapp.net:2181或者0:kafkaclient.cloudapp.net:9092。代码中我们根据不同的连接字符串设置不同配置。伪代码如下:然后就可以直接通过下面代码来发送消息:接下来我们需要编写3个Storm类,首先是创建Storm的Topology,这个类叫KafkaCarTopology,我们创建了一个叫car的topic,然后定义本机一个hosts和Zookeeper
hosts,最后创建一个Spout,叫做KafkaSpout,然后添加ParseCarDataBolt连接到KafkaSout,再创建一个RedisCarBolt,用于将结果写入Redis缓存。最后根据参数创建3个Worker,提交Storm
Topology。在这个拓扑结构中,我们有2个Bolt用于数据的处理,第一个叫ParserCarDataBolt,这个Bolt主要将Kafka传出的消息转换为Json格式,它继承BaseBasicBolt,在execute函数中通过collector提交数据,同时重载了declareOutputFields函数,通知下一个Bolt的数据格式。代码如下:数据会被写入RedisCarBolt,再写入到Redis缓存中。它继承自BaseRichBolt,需要重载prepare和excute方法来处理消息元组。此外还需要重载prepare和cleanup函数,几个关键的函数如下:最后我们还需要编写一些Node.js的代码,保证在页面上通过socket.io进行通讯,实时将最终数据从Redis里面读取出来,并在BingMap上显示。到此为止,一个简单的车辆信息实时监控系统就实现了,我们通过bash脚本进行编译,并安装到相应的服务器上,比如下列代码需要被安装在Storm的服务器上:有一点需要注意的是,由于在编译过程中需要自动下载Storm库,在阿里云的国内机房的虚拟机很有可能需要设置代理进行。设置代理的方法也很简单,通过对lein命令增加以下参数就可以了:http_proxy=http://URL:PORT
接着我们在网页上访问http://webhostname或者运行node.js的服务器,就会看到下面的网页,同时发现网页将同步刷新汽车的实时位置、速度、转速等。
图7 &车联网监控系统演示页面
对车联网监控系统的性能测试
接下来我们对这个系统进行了一个简单的吞吐量测试。我们只有1个Topic,使用5个partition、3个worker、1个Spout和2个Bolt,在一台2核2GB的ECS上运行。我们使用了另外4台客户端,每个客户端有4核8G内存,分别启动40个线程不断向这个系统实时发送汽车信息,模拟160台汽车发送的情况,其消息发送数量和CPU占用率情况如图8所示。
图8 &车联网监控系统性能分析
从图8中可以看出,平均每辆汽车客户端会模拟每秒给系统发送了1000条消息,总的吞吐量达到16万条左右,此时平均的CPU占用率大约在30%左右。如果系统是完全线性的,在系统CPU占用率达到90%的情况下,大约能处理48万条消息。不过实际情况中,在阿里云ECS上,却发现CPU达到50%以后,就不再上升,而客户端发送消息的延时也逐步增加。
经过分析以后发现,由于ECS的磁盘性能无法和物理机的SSD磁盘相比,所以在Kafka消息大量写入磁盘的过程中,吞吐量下降,磁盘读写负担变得非常大。这时我们增加了Kafka的Broker和Storm的Spout的数量,将消息分布式地分发到多台ECS上,从而实现了消息吞吐量的线性增加。
在这个系统中,我们不推荐使用大核和大内存的机器,而推荐使用多台2核2GB的服务器分布式地处理消息。这也是云计算处理大数据的原则所在,使用横向扩展而不用纵向扩展。
至此我们介绍了利用Storm和Kafka实现大数据的实时处理,并且介绍了如何在云上通过镜像快速地创建这套系统。此外,我们还介绍了如何对Storm、Kafka、Redis以及Node.js开发出一个实时的车辆信息监控系统。这个系统能够实现高性能、大吞吐量和高并发。当然,随着大数据的快速发展,我们相信还会有越来越多好的工具和产品出现在市场上,到那时我们从大数据中获取有效的信息将会变得更加容易和便捷。有了云计算的帮助,开发的周期也会变得越来越短。
“It’s going mainstream, and it’s your next opportunity.“, Teradata Magazine,
” Big data: The next frontier for innovation, competition, and productivity”
作者简介:郝峻晟,云角创始人兼CTO,在软件开发、项目管理以及创业领域有超过十年的经验。曾在微软任职,参与开发了微软的多个项目,包括服务器和开发平台事业部的System Center Configuration Manager, 商业事物平台系统等,拥有丰富的软件开发和项目领导经验。创立了云角,致力于公有云的推广和实施工作,还在亚洲为大量的企业和高校进行公有云的培训及合作。他的团队为Microsoft Azure在中国的推广和实施提供了大量的支持。钟波,云角高级工程师,拥有多年的IT开发经验,精通云平台相关技术。独立完成过诸多迁移项目,将现有应用平滑的迁移至云平台,针对云上的架构对现有应用做优化。参与过云平台相关应用的开发、部署、测试。通过项目实践,在云计算应用开发、解决方案和架构设计、迁移、实施等领域具备丰富的实践经验积累。罗伟,云角资深工程师,精通云平台相关技术。独立完成过大量云迁移项目,参与公有云平台相关应用的部署和测试。在云计算解决方案和架构设计、迁移、实施等领域有丰富的实践经验。
推荐阅读相关主题:
CSDN官方微信
扫描二维码,向CSDN吐槽
微信号:CSDNnews
相关热门文章用户名:moviebat
文章数:67
访问量:9338
注册日期:
阅读量:1297
阅读量:3317
阅读量:461637
阅读量:1145929
51CTO推荐博文
& &王家林老师的课程:2016年大数据Spark“蘑菇云”行动之spark streaming消费flume采集的kafka数据Directf方式作业。& & 一、基本背景& & Spark-Streaming获取kafka数据的两种方式Receiver与Direct的方式,本文介绍Direct的方式。具体的流程是这样的:& &1、Direct方式是直接连接到kafka的节点上获取数据了。& &2、基于Direct的方式:周期性地查询Kafka,来获得每个topic+partition的最新的offset,从而定义每个batch的offset的范围。& &3、当处理数据的job启动时,就会使用Kafka的简单consumer api来获取Kafka指定offset范围的数据。& &这种方式有如下优点:& &1、简化并行读取:如果要读取多个partition,不需要创建多个输入DStream然后对它们进行union操作。Spark会创建跟Kafka partition一样多的RDD partition,并且会并行从Kafka中读取数据。所以在Kafka partition和RDD partition之间,有一个一对一的映射关系。;& &2、高性能:不需要开启WAL机制,只要Kafka中作了数据的复制,那么就可以通过Kafka的副本进行恢复;& &3、一次且仅一次的事务机制:Spark Streaming自己就负责追踪消费的offset,并保存在checkpoint中。& &Spark自己一定是同步的,因此可以保证数据是消费一次且仅消费一次。& &二、配置文件及编码& & &flume版本:1.6.0,此版本直接支持到kafka,不用在单独安装插件。& & &kafka版本2.10-0.8.2.1,必须是0.8.2.1,刚开始我用的是0.10,结果出现了下& & & 四、各类错误大全的第2个错误。& & &spark版本:1.6.1。& & & & &&& & & kafka配文件:producer.properties,红色文字为特别要注意的配置坑,呵呵& &&#agentsection&producer.sources= s&producer.channels= cproducer.sinks= r#sourcesectionproducer.sources.s.type= execproducer.mand= tail -f -n+1 /opt/test/test.logproducer.sources.s.channels= c# Eachsink's type must be definedproducer.sinks.r.type= org.apache.flume.plugins.KafkaSinkproducer.sinks.r.metadata.broker.list=192.168.0.10:9092producer.sinks.r.partition.key=0producer.sinks.r.partitioner.class=org.apache.flume.plugins.SinglePartitionproducer.sinks.r.serializer.class=kafka.serializer.StringEncoderproducer.sinks.r.request.required.acks=0producer.sinks.r.max.message.size=1000000producer.sinks.r.producer.type=syncproducer.sinks.r.custom.encoding=UTF-8producer.sinks.r.custom.topic.name=flume2kafka2streaming930#Specifythe channel the sink should useproducer.sinks.r.channel= c# Eachchannel's type is defined.&producer.channels.c.type= memoryproducer.channels.c.capacity= 1000producer.channels.c.transactionCapacity= 100核心代码如下:&SparkConf&conf&=&SparkConf().setMaster().
&&&&&&&&&&&&&&setAppName()
&&&&&&&&&&&&&&.setJars(String[]&{
&&&&&&&&&&&&&&&&&&&&&&})Map&StringString&&kafkaParameters&=&HashMap&StringString&()kafkaParameters.put()Set&String&&topics&=&&HashSet&String&()topics.add()JavaPairInputDStream&StringString&&lines&=&KafkaUtils.(jscString.String.StringDecoder.StringDecoder.kafkaParameterstopics)JavaDStream&String&&words&=&lines.flatMap(FlatMapFunction&Tuple2&StringString&String&()&{&Iterable&String&&(Tuple2&StringString&&tuple)&Exception&{
&&&&&&&&&&&&&&Arrays.(tuple..split())}
&&&&&&})JavaPairDStream&StringInteger&&pairs&=&words.mapToPair(PairFunction&StringStringInteger&()&{
&&&&&&&&&&Tuple2&StringInteger&&(String&word)&Exception&{
&&&&&&&&&&&&&&Tuple2&StringInteger&(word)}
&&&&&&})JavaPairDStream&StringInteger&&wordsCount&=&pairs.reduceByKey(Function2&IntegerIntegerInteger&()&{&Integer&(Integer&v1Integer&v2)&Exception&{
&&&&&&&&&&&&&&v1&+&v2}
&&&&&&})wordsCount.print()jsc.start()jsc.awaitTermination()jsc.close()& & 三、启动脚本启动zookeeperbin/zookeeper-server-start.sh config/zookeeper.properties &启动kafka brokerbin/kafka-server-start.sh config/server.properties &创建topicbin/kafka-topics.sh --create --zookeeper 192.168.0.10:2181 --replication-factor 1 --partitions 1 --topic flume2kafka2streaming930启动flumebin/flume-ng agent --conf conf/ &-f conf/producer.properties &-n producer -Dflume.root.logger=INFO,consolebin/spark-submit --class com.dt.spark.sparkstreaming.SparkStreamingOnKafkaDirected &--jars /lib/kafka_2.10-0.8.2.1/kafka-clients-0.8.2.1.jar,/lib/kafka_2.10-0.8.2.1/kafka_2.10-0.8.2.1.jar,/lib/kafka_2.10-0.8.2.1/metrics-core-2.2.0.jar,/lib/spark-1.6.1/spark-streaming-kafka_2.10-1.6.1.jar --master local[5] SparkApps.jar&echo "hadoop spark hive storm spark hadoop hdfs" && /opt/test/test.log&echo "hive storm " && /opt/test/test.logecho "hdfs" && /opt/test/test.logecho "hadoop spark hive storm spark hadoop hdfs" && /opt/test/test.log& & 输出结果如下:* 结果如下:* -------------------------------------------* Time: 0 ms* -------------------------------------------*(spark,8)*(storm,4)*(hdfs,4)*(hive,4)*(hadoop,8)& & 四、各类错误大全& & 1、Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/streaming/kafka/KafkaUtils& & & & at com.dt.spark.SparkApps.SparkStreaming.SparkStreamingOnKafkaDirected.main& & & & 一概是没有提交jar包,一概会报错,无法执行,一概在submit脚本里添加:& & & &bin/spark-submit --class com.dt.spark.sparkstreaming.SparkStreamingOnKafkaDirected &--jars /lib/kafka_2.10-0.8.2.1/kafka-clients-0.8.2.1.jar,/lib/kafka_2.10-0.8.2.1/kafka_2.10-0.8.2.1.jar,/lib/kafka_2.10-0.8.2.1/metrics-core-2.2.0.jar,/lib/spark-1.6.1/spark-streaming-kafka_2.10-1.6.1.jar --master local[5] SparkApps.jar && &2、Exception in thread "main" java.lang.ClassCastException: kafka.cluster.BrokerEndPoint cannot be cast to kafka.cluster.Broker。& & & & &上及spark官网查询,这个是因为版本不兼容引起。官网提供的版本:Spark Streaming 1.6.1 is compatible with Kafka 0.8.2.1& &王家林_DT大数据梦工厂& & 简介: 王家林:DT大数据梦工厂创始人和首席专家.微信公众号DT_Spark .联系邮箱@&电话:&微信号:微博为:/ilovepains
了这篇文章
类别:未分类┆阅读(0)┆评论(0)Apache Kafka 教程笔记 - 基础
Apache Kafka 教程笔记 - 基础
本文基于Kafka 0.81. 引言
  互联网够公司的日志无处不在,web日志,js日志,搜索日志,监控日志等等。对于这些日志的离线分析(Hadoop),wget&rsync虽然人力维护成本较高,但可以满足功能行需求。但对于这些日志的实时分析需求(例如实时推荐,监控系统),则往往必须要引入一些&高大上&的系统。
  传统的企业消息系统(例如WebSphere)并不是非常适合大规模的日志处理系统,理由如下:1) 过于关注可靠性,这些可靠性增加了系统实现&API的复杂度,而在日志处理过程中,丢失几条日志常常&无伤大雅&2) 包括API,scale及消息缓冲的设计理念都不适合Hign Throughput的日志处理系统
  针对这些问题,近些年各个公司都做了一些自己的日志收集系统,例如:Facebook的Scribe、Yahoo的data highway,Cloudera的Flume,Apache的Chukwa,百度的BigPipe,阿里的RocketMQ。
  Kafka是LinkedIn开发并开源出来的一个高吞吐的分布式消息系统。其具有以下特点:1) 支持高Throughput的应用2) scale out:无需停机即可扩展机器3) 持久化:通过将数据持久化到硬盘以及replication防止数据丢失4) 支持online和offline的场景。
  kafka使用scala开发,支持多语言客户端(c++、java、python、go等)其架构如下[2]:Producer:消息发布者Broker:消息中间件处理结点,一个kafka节点就是一个brokerConsumer:消息订阅者
  kafka的消息分几个层次:1) Topic:一类消息,例如page view日志,click日志等都可以以topic的形式存在,kafka集群能够同时负责多个topic的分发2) Partition: Topic物理上的分组,一个topic可以分为多个partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)。3) Message:消息,最小订阅单元
  具体流程:1. Producer根据指定的partition方法(round-robin、hash等),将消息发布到指定topic的partition里面2. kafka集群接收到Producer发过来的消息后,将其持久化到硬盘,并保留消息指定时长(可配置),而不关注消息是否被消费。3. Consumer从kafka集群pull数据,并控制获取消息的offset
  ThroughPutHigh Throughput是kafka需要实现的核心目标之一,为此kafka做了以下一些设计:1)数据磁盘持久化:消息不在内存中cache,直接写入到磁盘,充分利用磁盘的顺序读写性能2)zero-copy:减少IO操作步骤3)数据批量发送4)数据压缩5)Topic划分为多个partition,提高parallelism
  load balance&HA1) producer根据用户指定的算法,将消息发送到指定的partition2) 存在多个partiiton,每个partition有自己的replica,每个replica分布在不同的Broker节点上3) 多个partition需要选取出lead partition,lead partition负责读写,并由zookeeper负责fail over4) 通过zookeeper管理broker与consumer的动态加入与离开
  pull-based system由于kafka broker会持久化数据,broker没有内存压力,因此,consumer非常适合采取pull的方式消费数据,具有以下几点好处:1)简化kafka设计2)consumer根据消费能力自主控制消息拉取速度3)consumer根据自身情况自主选择消费模式,例如批量,重复消费,从尾端开始消费等Scale Out当需要增加broker结点时,新增的broker会向zookeeper注册,而producer及consumer会根据注册在zookeeper上的watcher感知这些变化,并及时作出调整。
  Kafka 的详细介绍:请点这里Kafka 的下载地址:请点这里
  相关阅读:
  分布式发布订阅消息系统 Kafka 架构设计
  Apache Kafka 代码实例 本文基于Kafka 0.8
  在一台机器上构建一个3个节点的kafka集群,并测试producer、consumer在正常情况下的行为,以及在lead broker/follow broker失效情况下的行为1.下载并解压kafka 0.8.0 release$ mkdir kafka$ wget $ tar -zxvf kafka_2.8.0-0.8.0.tar.gz$ cd kafka_2.8.0-0.8.0$ lltotal 2560drwxr-xr-x 6 root root 4096 Dec 17 17:44 ./drwxr-xr-x 4 root root 4096 Dec 17 18:20 ../drwxr-xr-x 3 root root 4096 Dec 17 18:16 bin/drwxr-xr-x 2 root root 4096 Dec 17 17:43 config/-rw-r--r-- 1 root root 2520145 Nov 27 06:21 kafka_2.8.0-0.8.0.jardrwxr-xr-x 2 root root 4096 Nov 27 06:21 libs/-rw-r--r-- 1 root root 12932 Nov 27 06:21 LICENSEdrwxr-xr-x 2 root root 4096 Dec 17 18:00 logs/-rw------- 1 root root 47165 Dec 17 18:10 nohup.out-rw-r--r-- 1 root root 162 Nov 27 06:21 NOTICE2.启动一个单节点的zookeeper$ nohup bin/zookeeper-server-start.sh config/zookeeper.properties &3. 准备启动一个3个broker节点的kafka集群,因此做如下配置$ cp config/server.properties config/server-1.properties $ cp config/server.properties config/server-2.properties
  并做如下修改:
  config/server-1.properties:broker.id=1port=9093log.dir=/tmp/kafka-logs-1config/server-2.properties:broker.id=2port=9094log.dir=/tmp/kafka-logs-2
  说明:broker.id: broker节点的唯一标识port: broker节点使用端口号log.dir: 消息目录位置4. 启动3个broker节点$ JMX_PORT=9997 bin/kafka-server-start.sh config/server-1.properties &$ JMX_PORT=9998 bin/kafka-server-start.sh config/server-2.properties &$ JMX_PORT=9999 bin/kafka-server-start.sh config/server.properties &
  5. 创建topic并查看$ bin/kafka-create-topic.sh --zookeeper localhost:2181 --replica 3 --partition 1 --topic 3testcreation succeeded!$ bin/kafka-list-topic.sh --zookeeper localhost:2181topic: 3test partition: 0 leader: 2 replicas: 2,1,0 isr: 2,1,0topic: test partition: 0 leader: 0 replicas: 0 isr: 0topic: test_topic partition: 0 leader: 1 replicas: 0,1,2 isr: 1,2,0说明:partiton: partion id,由于此处只有一个partition,因此partition id 为0leader:当前负责读写的lead broker idrelicas:当前partition的所有replication broker listisr:relicas的子集,只包含出于活动状态的broker6.启动consumer & producer,并在producer启动后的console输入一些信息$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic 3testmessage1message3message2$ bin/kafka-console-producer.sh --broker-list localhost:9092,localhost:9093,localhost:9094 --topic 3testmessage1message3message2producer发送的数据consumer都能正常消费7. 干掉follow broker杀掉一个非lead broker(lead broker id为2)$ pkill -9 -f server-1.properties查看topic:$ bin/kafka-list-topic.sh --zookeeper localhost:2181topic: 3test partition: 0 leader: 2 replicas: 2,1,0 isr: 2,0topic: test partition: 0 leader: 0 replicas: 0 isr: 0topic: test_topic partition: 0 leader: 2 replicas: 0,1,2 isr: 2,0
  此时,存活的broker只有2,0测试:produce发送消息,consumer能正常接收到8. 继续干掉leader broker
  干掉leader broker后,连续查看topic状态$ pkill -9 -f server-2.properties $ bin/kafka-list-topic.sh --zookeeper localhost:2181topic: 3test partition: 0 leader: 2 replicas: 2,1,0 isr: 2,0topic: test partition: 0 leader: 0 replicas: 0 isr: 0topic: test_topic partition: 0 leader: 2 replicas: 0,1,2 isr: 2,0$ bin/kafka-list-topic.sh --zookeeper localhost:2181topic: 3test partition: 0 leader: 2 replicas: 2,1,0 isr: 2,0topic: test partition: 0 leader: 0 replicas: 0 isr: 0topic: test_topic partition: 0 leader: 2 replicas: 0,1,2 isr: 2,0$ bin/kafka-list-topic.sh --zookeeper localhost:2181topic: 3test partition: 0 leader: 0 replicas: 2,1,0 isr: 0topic: test partition: 0 leader: 0 replicas: 0 isr: 0topic: test_topic partition: 0 leader: 0 replicas: 0,1,2 isr: 0$ bin/kafka-list-topic.sh --zookeeper localhost:2181topic: 3test partition: 0 leader: 0 replicas: 2,1,0 isr: 0topic: test partition: 0 leader: 0 replicas: 0 isr: 0topic: test_topic partition: 0 leader: 0 replicas: 0,1,2 isr: 0杀掉leader broker过了一会,broker 0成为新的leader broker测试:produce发送消息,consumer能正常接收到
&&&主编推荐
H3C认证Java认证Oracle认证
基础英语软考英语项目管理英语职场英语
.NETPowerBuilderWeb开发游戏开发Perl
二级模拟试题一级模拟试题一级考试经验四级考试资料
软件测试软件外包系统分析与建模敏捷开发
法律法规历年试题软考英语网络管理员系统架构设计师信息系统监理师
高级通信工程师考试大纲设备环境综合能力
路由技术网络存储无线网络网络设备
CPMP考试prince2认证项目范围管理项目配置管理项目管理案例项目经理项目干系人管理
职称考试题目
招生信息考研政治
网络安全安全设置工具使用手机安全
生物识别传感器物联网传输层物联网前沿技术物联网案例分析
Java核心技术J2ME教程
Linux系统管理Linux编程Linux安全AIX教程
Windows系统管理Windows教程Windows网络管理Windows故障
数据库开发Sybase数据库Informix数据库
&&&&&&&&&&&&&&&
希赛网 版权所有 & &&}

我要回帖

更多关于 kafka 单节点安装 的文章

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信