欢迎访问O2O前沿关注移动O2O发展四季彩平台登录!今天是:
O2O前沿
当前位置: O2O前沿 > 区块链 >

源码分析Kafka之Producer

时间:2018-09-27来源:http://www.o2oup.com文章热度:
   Kafka是一款很棒的消息系统,可以看看我之前写的 后端好书阅读与推荐来了解一下它的整体设计。今天我们就来深入了解一下它的实现细节(我fork了一份代码),首先关注Producer这一方。要使用kafka首先要实例化一个KafkaProducer,需要有brokerIP、序列化器等必要Properties以及acks(0、1、n)、compression、retries、batch.size等非必要Properties,通过这个简单的接口可以控制Producer大部分行为,实例化后就可以调用send方法发送消息了。核心实现是这个方法:

通过不同的模式可以实现发送即忘(忽略返回结果)、同步发送(获取返回的future对象,回调函数置为null)、异步发送(设置回调函数)三种消息模式。我们来看看消息类ProducerRecord有哪些属性:

它有多个构造函数,可以适应不同的消息类型:比如有无分区、有无key等。①中ProducerInterceptors(有0 ~ 无穷多个,形成一个拦截链)对ProducerRecord进行拦截处理(比如打上时间戳,进行审计与统计等操作)

如果用户有定义就进行处理并返回处理后的ProducerRecord,否则直接返回本身。然后②中doSend真正发送消息,并且是异步的(源码太长只保留关键):

下面是计算分区的方法:

默认的分区器DefaultPartitioner实现方式是如果partition存在就直接使用,否则根据key计算partition,如果key也不存在就使用round robin算法分配partition。/** * The default partitioning strategy: *
    *
  • If a partition is specified in the record, use it *
  • If no partition is specified but a key is present choose a partition based on a hash of the key *
  • If no partition or key is present choose a partition in a round-robin fashion */public class DefaultPartitioner implements Partitioner { private final ConcurrentMap topicCounterMap = new ConcurrentHashMap<>(); public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { List partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); if (keyBytes == null) {//key为空 int nextValue = nextValue(topic); List availablePartitions = cluster.availablePartitionsForTopic(topic);//可用的分区 if (availablePartitions.size() > 0) {//有分区,取模就行 int part = Utils.toPositive(nextValue) % availablePartitions.size(); return availablePartitions.get(part).partition(); } else {// 无分区, return Utils.toPositive(nextValue) % numPartitions; } } else {// key 不为空,计算key的hash并取模获得分区 return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } } private int nextValue(String topic) { AtomicInteger counter = topicCounterMap.get(topic); if (null == counter) { counter = new AtomicInteger(ThreadLocalRandom.current().nextInt()); AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter); if (currentCounter != null) { counter = currentCounter; } } return counter.getAndIncrement();//返回并加一,在取模的配合下就是round robin }}以上就是发送消息的逻辑处理,接下来我们再看看消息发送的物理处理。Sender(是一个Runnable,被包含在一个IO线程ioThread中,该线程不断从RecordAccumulator队列中的读取消息并通过Selector将数据发送给Broker)的wakeup方法,实际上是KafkaClient接口的wakeup方法,由NetworkClient类实现,采用了NIO,也就是java.nio.channels.Selector.wakeup()方法实现。Sender的run中主要逻辑是不停执行准备消息和等待消息:

    ③完成消息设置并保存到信道中,然后监听感兴趣的key,由KafkaChannel实现。public void setSend(Send send) { if (this.send != null) throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress, connection id is " + id); this.send = send; this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);}// transportLayer的一种实现中的相关方法public void addInterestOps(int ops) { key.interestOps(key.interestOps() | ops);}④主要是Selector的poll,其select被wakeup唤醒:

    其中pollSelectionKeys方法会调用如下方法完成消息发送:

    Send是一次数据发包,一般由ByteBufferSend或者MultiRecordsSend实现,其writeTo调用transportLayer的write方法,一般由PlaintextTransportLayer或者SslTransportLayer实现,区分是否使用ssl:

    到此就把Producer的业务相关逻辑处理和非业务相关的网络 2方面的主要流程梳理清楚了。其他额外的功能是通过一些配置保证的。比如顺序保证就是max.in.flight.requests.per.connection,InFlightRequests的doSend会进行判断(由NetworkClient的canSendRequest调用),只要该参数设为1即可保证当前包未确认就不能发送下一个包从而实现有序性

    再比如可靠性,通过设置acks,Sender中sendProduceRequest的clientRequest加入了回调函数:

    kafka源码一层一层包装很多,错综复杂,如有错误请大家不吝赐教。

    私信【666】即可领到免费资料呦!本文仅代表作者个人观点,不代表巨推链平台发声,对文章观点有疑义请先联系作者本人进行修改,若内容非法请联系平台管理员,邮箱[email protected]。更多区块链资讯,请到百万区块链发烧友聚集平台赤壁资讯网学习区块链技术请到www.zxhsh.com

百度搜索:源码分析Kafka之Producer 查找更多相关信息!

360搜索:源码分析Kafka之Producer 查找更多相关信息!

搜狗搜索:源码分析Kafka之Producer 查找更多相关信息!

------分隔线----------------------------