Kafka生产者

发送消息步骤

ProducerRecord 对象需要包含目标主题和要发送的内容。我们还可以指定键或分区。

  1. 在发送 ProducerRecord 对象时,生产者要先把键和值对象序列化成字节数组,这样它们才能够在网络上传输。
  2. 数据被传给分区器。如果之前在 ProducerRecord 对象里指定了分区,那么分区器就不会再做任何事情,直接把指定的分区返回。如果没有指定分区,那么分区器会根据 ProducerRecord 对象的键来选择一个分区。选好分区以后,生产者就知道该往哪个主题和分区发送这条记录了。
  3. 紧接着,这条记录被添加到一个记录批次里,这个批次里的所有消息会被发送到相同的主题和分区上。
  4. 有一个独立的线程负责把这些记录批次发送到相应的 broker 上。
    ———–问题是这样节省了带宽,不会影响实时性么?生产者并不会等待批次满了才发,半满甚至只包含一个消息就会发。感觉有个间隔,间隔到了不管有多少都发。
  5. 服务器在收到这些消息时会返回一个响应。如果消息成功写入 Kafka,就返回一个 RecordMetaData 对象,它包含了主题和分区信息,以及记录在分区里的偏移量。如果写入失败,则会返回一个错误
  6. 生产者在收到错误之后会尝试重新发送消息,几次之后如果还是失败,就返回错误信息。

发送消息方式

发送并忘记(fire-and-forget)

我们把消息发送给服务器,但并不关心它是否正常到达。大多数情况下,消息会正常到达,因为 Kafka 是高可用的,而且生产者会自动尝试重发(可通过参数配置)。不过,使用这种方式有时候也会丢失一些消息。

1
2
3
4
5
6
7
8
ProducerRecord<String, String> record =
new ProducerRecord<>("CustomerCountry", "Precision Products",
"France"); ➊
try {
producer.send(record); ➋
} catch (Exception e) {
e.printStackTrace(); ➌
}

❶ 生产者的 send() 方法将 ProducerRecord 对象作为参数,所以我们要先创建一个 ProducerRecord 对象。它需要目标主题的名字和要发送的键和值对象,它们都是字符串。键和值对象的类型必须与序列化器和生产者对象相匹配。

❷ 我们使用生产者的 send() 方法发送 ProducerRecord 对象。从生产者的架构图里可以看到,消息先是被放进缓冲区,然后使用单独的线程发送到服务器端。send() 方法会返回一个包含 RecordMetadata 的 Future 对象,不过因为我们会忽略返回值,所以无法知道消息是否发送成功。如果不关心发送结果,那么可以使用这种发送方式。比如,记录 Twitter 消息日志,或记录不太重要的应用程序日志。

❸ 我们可以忽略发送消息时可能发生的错误或在服务器端可能发生的错误,但在发送消息之前,生产者还是有可能发生其他的异常。这些异常有可能是 SerializationException(说明序列化消息失败)、BufferExhaustedException 或 TimeoutException(说明缓冲区已满),又或者是 InterruptException(说明发送线程被中断)。

同步发送

我们使用 send() 方法发送消息,它会返回一个 Future 对象,调用 get() 方法进行等待,就可以知道消息是否发送成功。

1
2
3
4
5
6
7
ProducerRecord<String, String> record =
new ProducerRecord<>("CustomerCountry", "Precision Products", "France");
try {
producer.send(record).get(); ➊
} catch (Exception e) {
e.printStackTrace(); ➋
}

❶ 在这里,producer.send() 方法先返回一个 Future 对象,然后调用 Future 对象的 get() 方法等待 Kafka 响应。如果服务器返回错误,get() 方法会抛出异常。如果没有发生错误,我们会得到一个 RecordMetadata 对象,可以用它获取消息的偏移量。

❷ 如果在发送数据之前或者在发送过程中发生了任何错误,比如 broker 返回了一个不允许重发消息的异常或者已经超过了重发的次数,那么就会抛出异常。我们只是简单地把异常信息打印出来。

KafkaProducer 一般会发生两类错误。其中一类是可重试错误,这类错误可以通过重发消息来解决。比如对于连接错误,可以通过再次建立连接来解决,“无主(no leader)”错误则可以通过重新为分区选举首领来解决。KafkaProducer 可以被配置成自动重试,如果在多次重试后仍无法解决问题,应用程序会收到一个重试异常。另一类错误无法通过重试解决,比如“消息太大”异常。对于这类错误,KafkaProducer 不会进行任何重试,直接抛出异常。

异步发送

我们调用 send() 方法,并指定一个回调函数,服务器在返回响应时调用该函数。

1
2
3
4
5
6
7
8
9
10
11
12
private class DemoProducerCallback implements Callback {➊
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
e.printStackTrace(); ➋
}
}
}

ProducerRecord<String, String> record =
new ProducerRecord<>("CustomerCountry", "Biomedical Materials", "USA"); ➌
producer.send(record, new DemoProducerCallback()); ➍

❶ 为了使用回调,需要一个实现了 org.apache.kafka.clients.producer.Callback 接口的类,这个接口只有一个 onCompletion 方法。

❷ 如果 Kafka 返回一个错误,onCompletion 方法会抛出一个非空(non null)异常。这里我们只是简单地把它打印出来,但是在生产环境应该有更好的处理方式。

❸ 记录与之前的一样。

❹ 在发送消息时传进去一个回调对象。

生产者参数

acks

acks 参数指定了必须要有多少个分区副本收到消息,生产者才会认为消息写入是成功的。这个参数对消息丢失的可能性有重要影响”

acks = 0:生产者在成功写入消息之前不会等待任何来自服务器的响应。也就是说,如果当中出现了问题,导致服务器没有收到消息,那么生产者就无从得知,消息也就丢失了

acks = 1:只要集群的首领节点收到消息,生产者就会收到一个来自服务器的成功响应。

acks = all:只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。

buffer.memory

该参数用来设置生产者内存缓冲区的大小,生产者用它缓冲要发送到服务器的消息。

如果应用程序发送消息的速度超过发送到服务器的速度,会导致生产者空间不足。

这个时候, send() 方法调用要么被阻塞,要么抛出异常.

取决于如何设置 block.on.buffer.full 参数(在 0.9.0.0 版本里被替换成了 max.block.ms,表示在抛出异常之前可以阻塞一段时间)。

retries
产者从服务器收到的错误有可能是临时性的错误(比如分区找不到首领)。

在这种情况下,retries 参数的值决定了生产者可以重发消息的次数,如果达到这个次数,生产者会放弃重试并返回错误。

默认情况下,生产者会在每次重试之间等待 100ms,不过可以通过 retry.backoff.ms 参数来改变这个时间间隔。

建议在设置重试次数和重试时间间隔之前,先测试一下恢复一个崩溃节点需要多少时间(比如所有分区选举出首领需要多长时间),让总的重试时间比 Kafka 集群从崩溃中恢复的时间长,否则生产者会过早地放弃重试。不过有些错误不是临时性错误,没办法通过重试来解决(比如“消息太大”错误)。

一般情况下,因为生产者会自动进行重试,所以就没必要在代码逻辑里处理那些可重试的错误。只需要处理那些不可重试的错误或重试次数超出上限的情况。

batch.size

“当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。

该参数指定了一个批次可以使用的内存大小,按照字节数计算(而不是消息个数)。

当批次被填满,批次里的所有消息会被发送出去。不过生产者并不一定都会等到批次被填满才发送,半满的批次,甚至只包含一个消息的批次也有可能被发送。

所以就算把批次大小设置得很大,也不会造成延迟,只是会占用更多的内存而已。但如果设置得太小,因为生产者需要更频繁地发送消息,会增加一些额外的开销”

linger.ms

指定生产者在发送批次之前等待更多消息加入批次的时间。

KafkaProducer 会在批次填满或 linger.ms 达到上限时把批次发送出去。

默认情况下,只要有可用的线程,生产者就会把消息发送出去,就算批次里只有一个消息。

把 linger.ms 设置成比 0 大的数,让生产者在发送批次之前等待一会儿,使更多的消息加入到这个批次。

虽然这样会增加延迟,但也会提升吞吐量(因为一次性发送更多的消息,每个消息的开销就变小了)。

分区

默认分区策略

kafka消息可以是一个个键值对,键有两个作用:一个是作为消息的附加信息,也可以决定消息被写道主题哪个分区。

key为空且使用默认分区器,分区器使用轮询算法将消息均衡分不到各个分区。

如果key不为空且使用默认分区器,对key进行散列(kafka自己的散列算法),每次都是散列到同一分区。
可能散列到的分区不可用,出现问题。
增加新分区也会出现问题,所以永远不要增加新分区

自定义分区策略

实现Partitioner接口

参考资料

《kafka权威指南》

于大帅 wechat
打钱! 打钱! 打钱😡😡😡
打赏完了让我夸夸你🤨