kafka笔记

消息系统:

负责将数据从一个应用程序传输到另一个应用程序,应用程序可以专注于数据,无需关注数据在两个或多个应用间是如何传递的。

消息在客户端应用程序和消息传递系统之间异步排队

点对点消息系统:

消息被保留在队列中。一个或多个消费者可以消费队列中的消息,但是特定消息只能由一个消费者消费;

一旦消费者读取队列中的消息,它就从该队列中消失。

发布-订阅消息系统:

消息被保留在主题中,消费者可以订阅一个或多个主题并使用该主题中的所有消息。

消息生产者成为发布者,消息使用者成为订阅者。(定时删除、达到容量删除)

kafka特点:

可靠性:分布式,支持多分区,多副本和容错

可扩展性:消息传递系统轻松缩放,无需停机

耐用性:kafka使用分布式提交日志,这意味着消息会尽可能快的保留在磁盘上,因为它是持久的

性能:对于发布和订阅消息都有高吞吐量。即使存储了许多TB的消息,它也保持稳定的性能。

Topics(主题)

属于特定类别的消息流,数据存储在主题中。(一个消息集合,物理上来说,不同的topic消息是分开 存储的,每个Topic可以有多个生产者向它发送消息,可以有多个消费者去消费其中的消息)

Partion(分区)

主题可以有许多分区,因此它可以处理任意数量的数据

Partion offset(分区偏移)

每个分区消息具有成为offset的唯一序列标识

Brokers(代理)

每个kafka service 称为一个Broker,多个broker组成kafka cluster。一个机器上可以部署一个或多个 broker,这多个Broker连接到相同的ZooKeeper就组成了kafka集群。一个Broker上可以常见一个或多个Topic

同一个topic可以在同一集群下的多个Broker中分布

Producers(生产者)

发送一个或多个kafka主题的消息的发布者

Consumers(消费者)

从代理处读取数据

Leader(领导者)

负责给定分区的所有读取和写入的节点,每个分区都有一个服务器充当Leader

Follower(追随者)

跟随领导指令的节点,如果领导失败,一个追随者将自动称为心得领导者

isr(副本同步列表): 从isr中剔除:刚开始包含所有副本,若leader挂掉,从isr从新选一个leader,然后将这个副本剔除

 ./zkServer.sh start(启动zookepper)

./bin/kafka-server-start.sh config/server.properties (启动kafka)

 ./kafka-topics.sh --create --zookeeper h1m1:2181 --replication-factor 1 --partitions 1 --topic Hello-Kafka(创建topic)
                    (副本)			(分区数)		
./kafka-console-producer.sh --broker-list h1m1:9092 --topic Hello-Kafka(启动生产者)

./kafka-console-consumer.sh --bootstrap-server h1m1:9092 --topic Hello-Kafka --from-beginning(启动消费者)

./kafka-topics.sh --describe --zookeeper h1m1:2181 --topic JCHome(查看主题信息)


修改主题: bin/kafka-topics.sh --zookeeper h1m1:2181 --alter --topic Hello-Kafka --partitions 2
删除主题:	 bin/kafka-topics.sh --zookeeper h1m1:2181 --delete --topic Hello-Kafka

查看所有主题:bin/kafka-topics.sh  --list --zookeeper h1m1:2181

启动消费者指定分组:bin/kafka-console-consumer.sh --bootstrap-server h1m1:9092 --topic Test3 --group console-consumer-25119

查看分组:bin/kafka-consumer-groups.sh -bootstrap-server h1m1:9092 --list
查看分组详情:bin/kafka-consumer-groups.sh --bootstrap-server h1m1:9092 --describe --group console-consumer-25119

Kafka和spark streaming的集成

1.下载相关安装包和jar包

  1. 启动spark-shell。相关jar包放在/opt目录下,进入/opt目录执行下述命令

     /usr/lib/spark/bin/spark-shell --jars kafka-clients-2.2.0.jar,kafka_2.12-2.2.0.jar,spark-streaming-kafka-0-10_2.11-2.4.5.jar,metrics-core-3.0.1.jar,zkclient-0.10.jar -v --conf spark.rpc.netty.dispatcher.numThreads=2
    
  2. 在spark-shell中执行。注意这里是 kafka010

     import org.apache.spark.streaming.kafka010._
     import org.apache.kafka.common.serialization.StringDeserializer
     import org.apache.spark.streaming.{Seconds, StreamingContext}
     import kafka.serializer.StringDecoder
     import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
     import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
    
     val ssc = new StreamingContext(sc, Seconds(1))
     ssc.checkpoint("checkpoint")
    
     //注意这里改为自己环境的地址和端口
     val kafkaParams = Map[String, Object](
     "bootstrap.servers" -> "h1m1:9092",
     "key.deserializer" -> classOf[StringDeserializer],
     "value.deserializer" -> classOf[StringDeserializer],
     "group.id" -> "myGroup",
     "auto.offset.reset" -> "latest",
     "enable.auto.commit" -> (false: java.lang.Boolean)
     )
    
     val topics=Set("Test3")
    
     val dStream = KafkaUtils.createDirectStream[String, String](
     ssc,
     PreferConsistent,
     Subscribe[String, String](topics, kafkaParams)
     )
    
     val lines = dStream.map(_.value())
     val words = lines.flatMap(_.split(" "))
     val wordCounts = words.map( x=>(x,1) ).reduceByKey(_+_)
     wordCounts.print
     ssc.start
    

已有 0 条评论

    欢迎您,新朋友,感谢参与互动!