消息系统:
负责将数据从一个应用程序传输到另一个应用程序,应用程序可以专注于数据,无需关注数据在两个或多个应用间是如何传递的。
消息在客户端应用程序和消息传递系统之间异步排队
点对点消息系统:
消息被保留在队列中。一个或多个消费者可以消费队列中的消息,但是特定消息只能由一个消费者消费;
一旦消费者读取队列中的消息,它就从该队列中消失。
发布-订阅消息系统:
消息被保留在主题中,消费者可以订阅一个或多个主题并使用该主题中的所有消息。
消息生产者成为发布者,消息使用者成为订阅者。(定时删除、达到容量删除)
kafka特点:
可靠性:分布式,支持多分区,多副本和容错
可扩展性:消息传递系统轻松缩放,无需停机
耐用性:kafka使用分布式提交日志,这意味着消息会尽可能快的保留在磁盘上,因为它是持久的
性能:对于发布和订阅消息都有高吞吐量。即使存储了许多TB的消息,它也保持稳定的性能。
Topics(主题)
属于特定类别的消息流,数据存储在主题中。(一个消息集合,物理上来说,不同的topic消息是分开 存储的,每个Topic可以有多个生产者向它发送消息,可以有多个消费者去消费其中的消息)
Partion(分区)
主题可以有许多分区,因此它可以处理任意数量的数据
Partion offset(分区偏移)
每个分区消息具有成为offset的唯一序列标识
Brokers(代理)
每个kafka service 称为一个Broker,多个broker组成kafka cluster。一个机器上可以部署一个或多个 broker,这多个Broker连接到相同的ZooKeeper就组成了kafka集群。一个Broker上可以常见一个或多个Topic
同一个topic可以在同一集群下的多个Broker中分布
Producers(生产者)
发送一个或多个kafka主题的消息的发布者
Consumers(消费者)
从代理处读取数据
Leader(领导者)
负责给定分区的所有读取和写入的节点,每个分区都有一个服务器充当Leader
Follower(追随者)
跟随领导指令的节点,如果领导失败,一个追随者将自动称为心得领导者
isr(副本同步列表): 从isr中剔除:刚开始包含所有副本,若leader挂掉,从isr从新选一个leader,然后将这个副本剔除
./zkServer.sh start(启动zookepper)
./bin/kafka-server-start.sh config/server.properties (启动kafka)
./kafka-topics.sh --create --zookeeper h1m1:2181 --replication-factor 1 --partitions 1 --topic Hello-Kafka(创建topic)
(副本) (分区数)
./kafka-console-producer.sh --broker-list h1m1:9092 --topic Hello-Kafka(启动生产者)
./kafka-console-consumer.sh --bootstrap-server h1m1:9092 --topic Hello-Kafka --from-beginning(启动消费者)
./kafka-topics.sh --describe --zookeeper h1m1:2181 --topic JCHome(查看主题信息)
修改主题: bin/kafka-topics.sh --zookeeper h1m1:2181 --alter --topic Hello-Kafka --partitions 2
删除主题: bin/kafka-topics.sh --zookeeper h1m1:2181 --delete --topic Hello-Kafka
查看所有主题:bin/kafka-topics.sh --list --zookeeper h1m1:2181
启动消费者指定分组:bin/kafka-console-consumer.sh --bootstrap-server h1m1:9092 --topic Test3 --group console-consumer-25119
查看分组:bin/kafka-consumer-groups.sh -bootstrap-server h1m1:9092 --list
查看分组详情:bin/kafka-consumer-groups.sh --bootstrap-server h1m1:9092 --describe --group console-consumer-25119
Kafka和spark streaming的集成
1.下载相关安装包和jar包
-
启动spark-shell。相关jar包放在/opt目录下,进入/opt目录执行下述命令
/usr/lib/spark/bin/spark-shell --jars kafka-clients-2.2.0.jar,kafka_2.12-2.2.0.jar,spark-streaming-kafka-0-10_2.11-2.4.5.jar,metrics-core-3.0.1.jar,zkclient-0.10.jar -v --conf spark.rpc.netty.dispatcher.numThreads=2
-
在spark-shell中执行。注意这里是 kafka010
import org.apache.spark.streaming.kafka010._ import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.streaming.{Seconds, StreamingContext} import kafka.serializer.StringDecoder import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe val ssc = new StreamingContext(sc, Seconds(1)) ssc.checkpoint("checkpoint") //注意这里改为自己环境的地址和端口 val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "h1m1:9092", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "myGroup", "auto.offset.reset" -> "latest", "enable.auto.commit" -> (false: java.lang.Boolean) ) val topics=Set("Test3") val dStream = KafkaUtils.createDirectStream[String, String]( ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams) ) val lines = dStream.map(_.value()) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map( x=>(x,1) ).reduceByKey(_+_) wordCounts.print ssc.start
注意:本文归作者所有,未经作者允许,不得转载