zoukankan      html  css  js  c++  java
  • 学习kafka的内容总结

      kafka的基本介绍:

      Kafka 是linkedin 公司用于日志处理的分布式消息队列,同时支持离线和在线日志处理。kafka 对消息保存时根据Topic进行归类,发送消息者成为Producer,消息接受者成为Consumer,此外kafka 集群有多个kafka 实例组成,每个实例(server)称为broker。无论是kafka集群,还是producer和consumer 都依赖于zookeeper 来保证系统可用性,为集群保存一些meta 信息。Kafka 是一种分布式的、分区的、多副本的基于发布/订阅的消息系统。它是通过 zookeeper 进行协调,常见可以用于 web/nginx 日志、访问日志、消息服务等。主要应用场景为:日志收集系统和消息系统。

         kafka的主要设计目标:
    • 以时间复杂度为 O(1) 的方式提供持久化能力,即使对 TB 级别以上的数据也能保证常数时间的访问性能。
    • 高吞吐率,即使在十分廉价的机器上也能实现单机支持每秒 100K 条消息的传输。
    • 支持 Kafka Server (即 Kafka 集群的服务器)间的消息分区,及分布式消费,同时保证每个 partition 内的消息顺序传输。
    • 同时支持离线数据处理和实时数据处理

           kafka中最主要的生产者和消费者:

        生产者(Producers)

      Producer 将消息发布到指定的Topic中,同时Producer 也能决定将此消息归属于哪个partition;这可以通过简单的循环的方式来实现,或者使用一些分区方法(比如根据消息的key来分区)

      消费者(Consumers)

      传统的消息传递有两种方式: 队列方式(queuing)、发布-订阅(publish-subscribe)方式.

    • 队列方式:一组消费者从机器上读消息,每个消息只传递给这组消费者中的一个。

    • 分布-订阅方式:消息被广播到所有的消费者。Kafka提供了一个消费组(consumer group)的说法来概括这两种方式。



    利用python来操作kafka:消费者
    from kafka import KafkaConsumer
    from kafka import TopicPartition
    from kafka import KafkaProducer
    
    #kafka consumer
    consumer = KafkaConsumer(
        group_id= 'test', #消费组id
        auto_offset_reset='earliest',
        enable_auto_commit = True, # 每过一段时间自动提交所有已消费的消息(在迭代时提交)
        auto_commit_interval_ms = 1000, # 自动提交的周期(毫秒)
        max_poll_records=10, # 每次最大消费数量
        key_deserializer=bytes.decode, #bytes
        value_deserializer=bytes.decode, #bytes
        bootstrap_servers= "node01:9092,node02:9092,node03:9092",#kafka集群地址
    )
    consumer.assign([
        TopicPartition(topic= 'news_tag', partition= 0),
        TopicPartition(topic= 'news_tag', partition= 1),
        TopicPartition(topic= 'news_tag', partition= 2),
        TopicPartition(topic= 'news_tag', partition= 3),
        TopicPartition(topic= 'news_tag', partition= 4),
        TopicPartition(topic= 'news_tag', partition= 5),
    ])
    
    
    for msg in consumer:
        recv = "offset=%d Partition=%d key=%s value=%s" % (msg.offset, msg.partition,  msg.key, msg.value)
        print(recv)
    

    利用python来操作kafka:生产者

     1 from kafka import KafkaConsumer
     2 from kafka import TopicPartition
     3 from kafka import KafkaProducer
     4 
     5 # Kafka Producer
     6 producer=KafkaProducer(
     7 bootstrap_servers="node01:9092,node02:9092,node03:9092",  # kafka集群地址
     8 key_serializer=str.encode, #str
     9 value_serializer=str.encode, #str
    10 )
    11 
    12 parts = int(my_key[0:2]) % 6
    13 result_msg = str(result)
    14 producer.send('news_tag_label', key=my_key, value=result_msg, partition=parts)
    15 print('发送成功')
    参考资料:
    1、https://www.jianshu.com/p/615ffebf605b
    2、https://pykafka.readthedocs.io/en/latest/index.html
    3、https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html
    4、https://www.jianshu.com/p/776c188cefa9
    5、https://www.jianshu.com/p/c89997867d48
    6、https://www.jianshu.com/p/97a02129eea7
    7、https://zhuanlan.zhihu.com/p/38330574
     

    ---------------------------本博客所有内容以学习、研究和分享为主,如需转载,请联系本人,标明作者和出处,并且是非商业用途,谢谢!---------------------

    作者:enhaofrank

    出处:https://www.cnblogs.com/enhaofrank/

    中科院硕士毕业

    现为深漂打工人

  • 相关阅读:
    Linux服务器安装JDK运行环境教程
    Oracle数据库通过DBLINK实现远程访问
    Java中的Number和Math类简单介绍
    使用Netty3或Netty4发布Http协议服务
    同步(Synchronous)和异步(Asynchronous)的概念
    通过注解实现Spring 声明式事务管理
    Spring事务管理入门与进阶
    vmware虚拟机安装CentOS教程
    记录自己的一次pjax性能优化
    让你的网页"抖起来"?!?
  • 原文地址:https://www.cnblogs.com/enhaofrank/p/14281205.html
Copyright © 2011-2022 走看看