[提交译文][tech]: 20211102 Apache Kafka- Asynchronous Messaging for Seamless Systems.md

This commit is contained in:
lkxed 2022-06-20 22:11:31 +08:00
parent 0fcf1718c2
commit b82ec795f4
2 changed files with 301 additions and 299 deletions

View File

@ -1,299 +0,0 @@
[#]: subject: "Apache Kafka: Asynchronous Messaging for Seamless Systems"
[#]: via: "https://www.opensourceforu.com/2021/11/apache-kafka-asynchronous-messaging-for-seamless-systems/"
[#]: author: "Krishna Mohan Koyya https://www.opensourceforu.com/author/krishna-mohan-koyya/"
[#]: collector: "lkxed"
[#]: translator: "lkxed"
[#]: reviewer: " "
[#]: publisher: " "
[#]: url: " "
Apache Kafka: Asynchronous Messaging for Seamless Systems
======
Apache Kafka is one of the most popular open source message brokers. Found in almost all microservices environments, it has become an important component of Big Data manipulation. This article gives a brief description of Apache Kafka, followed by a case study that demonstrates how it is used.
![Digital-backgrund-connecting-in-globe][1]
Have you ever wondered how e-commerce platforms are able to handle immense traffic without getting stuck? Ever thought about how OTT platforms are able to deliver content to millions of users, smoothly and simultaneously? The key lies in their distributed architecture.
A system designed around distributed architecture is made up of multiple functional components. These components are usually spread across several machines, which collaborate with each other by exchanging messages asynchronously over a network. Asynchronous messaging is what enables scalable, non-blocking communication among components, thereby allowing smooth functioning of the overall system.
### Asynchronous messaging
The common features of asynchronous messaging are:
* The producers and consumers of the messages are not aware of each other. They join and leave the system without the knowledge of the others.
* A message broker acts as the intermediary between the producers and consumers.
* The producers associate each of the messages with a type, known as topic. A topic is just a simple string.
* It is possible that producers send messages on multiple topics, and multiple producers send messages on the same topic.
* The consumers register with the broker for messages on one or more topics.
* The producers send the messages only to the broker, and not to the consumers.
* The broker, in turn, delivers the messages to all the consumers that are registered against the topic.
* The producers do not expect any response from the consumers. In other words, the producers and consumers do not block each other.
There are several message brokers available in the market, and Apache Kafka is one of the most popular among them.
### Apache Kafka
Apache Kafka is an open source distributed messaging system with streaming capabilities, developed by the Apache Software Foundation. Architecturally, it is a cluster of several brokers that are coordinated by the Apache Zookeeper service. These brokers share the load on the cluster while receiving, persisting, and delivering the messages.
#### Partitions
Kafka writes messages into buckets known as partitions. A given partition holds messages only on one topic. For example, Kafka writes messages on the topic heartbeats into the partition named *heartbeats-0*, irrespective of the producer of the messages.
![Figure 1: Asynchronous messaging][2]
However, in order to leverage the cluster-wide parallel processing capabilities of Kafka, administrators often create more than one partition for a given topic. For instance, if the administrator creates three partitions for the topic heartbeats, Kafka names them as *heartbeats-0, heartbeats-1,* and *heartbeats-2.* Kafka writes the heartbeat messages across all the three partitions in such a way that the load is evenly distributed.
There is yet another possible scenario in which the producers associate each of the messages with a key. For example, a component uses C1 as the key while another component uses C2 as the key for the messages that they produce on the topic heartbeats. In this scenario, Kafka makes sure that the messages on a topic with a specific key are always found only in one partition. However, it is quite possible that a given partition may hold messages with different keys. Figure 2 presents a possible message distribution among the partitions.
![Figure 2: Message distribution among the partitions][3]
#### Leaders and ISRs
Kafka maintains several partitions across the cluster. The broker on which a partition is maintained is called the leader for the specific partition. Only the leader receives and serves the messages from its partitions.
But what happens to a partition if its leader crashes? To ensure business continuity, every leader replicates its partitions on other brokers. The latter act as the in-sync-replicas (ISRs) for the partition. In case the leader of a partition crashes, Zookeeper conducts an election and names an ISR as the new leader. Thereafter, the new leader takes the responsibility of writing and serving the messages for that partition. Administrators can choose how many ISRs are to be maintained for a topic.
![Figure 3: Command-line producer][4]
#### Message persistence
The brokers map each of the partitions to a specific file on the disk, for persistence. By default, they keep the messages for a week on the disk! The messages and their order cannot be altered once they are written to a partition. Administrators can configure policies like message retention, compaction, etc.
![Figure 4: Command-line consumer][5]
#### Consuming the messages
Unlike most other messaging systems, Apache Kafka does not actively deliver the messages to its consumers. Instead, it is the responsibility of the consumers to listen to the topics and read the messages. A consumer can read messages from more than one partition of a topic. And it is also possible that multiple consumers read messages from a given partition. Kafka guarantees that no message is read more than once by a given consumer.
Kafka also expects that every consumer is identified with a group ID. Consumers with the same group ID form a group. Typically, in order to read messages from N number of topic partitions, an administrator creates a group with N number of consumers. This way, each consumer of the group reads messages from its designated partition. If the group consists of more consumers than the available partitions, the excess consumers remain idle.
In any case, Apache Kafka guarantees that a message is read only once at the group level, irrespective of the number of consumers in the group. This architecture gives consistency, high-performance, high scalability, near-real-time delivery, and message persistence along with zero-message loss.
### Installing and running Kafka
Although, in theory, the Apache Kafka cluster can consist of any number of brokers, most of the clusters in production environments usually consist of three or five of these.
Here, we will set up a single-broker cluster that is good enough for the development environment.
Download the latest version of Kafka from *https://kafka.apache.org/downloads* using a browser. It can also be downloaded with the following command, on a Linux terminal:
```
wget https://www.apache.org/dyn/closer.cgi?path=/kafka/2.8.0/kafka_2.12-2.8.0.tgz
```
We can move the downloaded archive *file kafka_2.12-2.8.0.tgz* to some other folder, if needed. Extracting the archive creates a folder by the name *kafka_2.12-2.8.0*, which will be referred to as *KAFKA_HOME* hereafter.
Open the file server.properties under the *KAFKA_HOME/config* folder and uncomment the line with the following entry:
```
listeners=PLAINTEXT://:9092
```
This configuration enables Apache Kafka to receive plain text messages on port 9092, on the local machine. Kafka can also be configured to receive messages over a secure channel as well, which is recommended in the production environments.
Irrespective of the number of brokers, Apache Zookeeper is required for broker management and coordination. This is true even in the case of single-broker clusters. Since Zookeeper is already bundled with Kafka, we can start it with the following command from *KAFKA_HOME*, on a terminal:
```
./bin/zookeeper-server-start.sh ./config/zookeeper.properties
```
Once Zookeeper starts running, Kafka can be started in another terminal, with the following command:
```
./bin/kafka-server-start.sh ./config/server.properties
```
With this, a single-broker Kafka cluster is up and running.
### Verifying Kafka
Let us publish and receive messages on the topic topic-1. A topic can be created with a chosen number of partitions with the following command:
```
./bin/kafka-topics.sh --create --topic topic-1 --zookeeper localhost:2181 --partitions 3 --replication-factor 1
```
The above command also specifies the replication factor, which should be less than or equal to the number of brokers in the cluster. Since we are working on a single-broker cluster, the replication factor is set to one.
Once the topic is created, producers and consumers can exchange messages on that topic. The Kafka distribution includes a producer and a consumer for test purposes. Both of these are command-line tools.
To invoke the producer, open the third terminal and run the following command:
```
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic-1
```
This command displays a prompt at which we can key in simple text messages. Because of the given options on the command, the producer sends the messages on *topic-1* to the Kafka that is running on port 9092 on the local machine.
Open the fourth terminal and run the following command to start the consumer tool:
```
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic-1 from-beginning
```
This command starts the consumer that connects to the Kafka on port number 9092 on the local machine. It registers for reading the messages on topic-1. Because of the last option on the command line, the consumer receives all the messages on the chosen topic from the beginning.
Since the producer and consumer are connecting to the same broker and referring the same topic, the consumer receives and displays the messages on its terminal.
Now, lets use Kafka in the context of a practical application.
### Case study
ABC is a hypothetical bus transport company, which has a fleet of passenger buses that ply between different cities across the country. Since ABC wants to track each bus in real-time for improving the quality of its operations, it comes up with a solution around Apache Kafka.
ABC first equips all its buses with devices to track their location. An operations centre is set up with Apache Kafka, to receive the location updates from each of the hundreds of buses. A dashboard is developed to display the current status of all the buses at any point in time. Figure 5 represents this architecture.
![Figure 5: Kafka based architecture][6]
In this architecture, the devices on the buses act as the message producers. They send their current location to Kafka on the topic *abc-bus-location*, periodically. For processing the messages from different buses, ABC chooses to use the trip code as the key. For example, if the bus from Bengaluru to Hubballi runs with the trip code*BLRHBL003*, then *BLRHBL003* becomes the key for all the messages from that specific bus during that specific trip.
The dashboard application acts as the message consumer. It registers with the broker against the same topic *abc-bus-location*. Consequently, the topic becomes the virtual channel between the producers (buses) and the consumer (dashboard).
The devices on the buses never expect any response from the dashboard application. In fact, none of them is even aware of the presence of the others. This architecture enables non-blocking communication between hundreds of buses and the central office.
#### Implementation
Lets assume that ABC wants to create three partitions for maintaining the location updates. Since the development environment has only one broker, the replication factor should be set to one.
The following command creates the topic accordingly:
```
./bin/kafka-topics.sh --create --topic abc-bus-location --zookeeper localhost:2181 --partitions 3 --replication-factor 1
```
The producer and consumer applications can be written in multiple languages like Java, Scala, Python, JavaScript, and a host of others. The code in the following sections provides a peek into the way they are written in Java.
##### Java producer
The Fleet class simulates the Kafka producer applications running on six buses of ABC. It sends location updates on *abc-bus-location* to the specified broker. Please note that the topic name, message keys, message body, and broker address are hard-coded only for simplicity.
```
public class Fleet {
public static void main(String[] args) throws Exception {
String broker = “localhost:9092”;
Properties props = new Properties();
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker);
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
Producer<String, String> producer = new KafkaProducer<String, String>(props);
String topic = “abc-bus-location”;
Map<String, String> locations = new HashMap<>();
locations.put(“BLRHBL001”, “13.071362, 77.461906”);
locations.put(“BLRHBL002”, “14.399654, 76.045834”);
locations.put(“BLRHBL003”, “15.183959, 75.137622”);
locations.put(“BLRHBL004”, “13.659576, 76.944675”);
locations.put(“BLRHBL005”, “12.981337, 77.596181”);
locations.put(“BLRHBL006”, “13.024843, 77.546983”);
IntStream.range(0, 10).forEach(i -> {
for (String trip : locations.keySet()) {
ProducerRecord<String, String> record
= new ProducerRecord<String, String>(
topic, trip, locations.get(trip));
producer.send(record);
}
});
producer.flush();
producer.close();
}
}
```
##### Java consumer
The Dashboard class implements the Kafka consumer application and it runs at the ABC Operations Centre. It listens to *abc-bus-location* with the group ID *abc-dashboard* and displays the location details from different buses as soon as messages are available. Here, too, many details are hard coded which are otherwise supposed to be configured:
```
public static void main(String[] args) {
String broker = “127.0.0.1:9092”;
String groupId = “abc-dashboard”;
Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, broker);
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
@SuppressWarnings(“resource”)
Consumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList(“abc-bus-location”));
while (true) {
ConsumerRecords<String, String> records
= consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
String topic = record.topic();
int partition = record.partition();
String key = record.key();
String value = record.value();
System.out.println(String.format(
“Topic=%s, Partition=%d, Key=%s, Value=%s”,
topic, partition, key, value));
}
}
}
```
##### Dependencies
A JDK of 8+ version is required to compile and run this code. The following Maven dependencies in the *pom.xml* download and add the required Kafka client libraries to the classpath:
```
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.25</version>
</dependency>
```
#### Deployment
As the topic *abc-bus-location* is created with three partitions, it makes sense to run three consumers to read the location updates quickly. For that, run the Dashboard in three different terminals simultaneously. Since all the three instances of Dashboard register with the same group ID, they form a group. Kafka attaches each Dashboard instance with a specific partition.
Once the Dashboard instances are up and running, start the *Fleet* on a different terminal. Figure 6, Figure 7, and Figure 8 are sample console messages on the Dashboard terminals.
![Figure 6: Dashboard Terminal 1][7]
A closer look at the console messages reveals that the consumers on the first, second and third terminals are reading messages from *partition-2, partition-1,* and *partition-0,* in that order. Also, it can be observed that the messages with the keys BLRHBL002, *BLRHBL004* and *BLRHBL006* are written into *partition-2*, the messages with the key *BLRHBL005* are written into *partition-1*, and the remaining are written into *partition-0*.
![Figure 7: Dashboard Terminal 2][8]
The good thing about Kafka is that it can be scaled horizontally to support a large number of buses and millions of messages as long as the cluster is designed appropriately.
![Figure 8: Dashboard Terminal 3][9]
### Beyond messaging
More than 80 per cent of the Fortune 100 companies are using Kafka, according to its website. It is deployed across many industry verticals like financial services, entertainment, etc. Though Apache Kafka started its journey as a simple messaging service, it has propelled itself into the Big Data ecosystem with industry-level stream processing capabilities. For the enterprises that prefer a managed solution, Confluent offers a cloud based Apache Kafka service for a subscription fee.
--------------------------------------------------------------------------------
via: https://www.opensourceforu.com/2021/11/apache-kafka-asynchronous-messaging-for-seamless-systems/
作者:[Krishna Mohan Koyya][a]
选题:[lkxed][b]
译者:[译者ID](https://github.com/译者ID)
校对:[校对者ID](https://github.com/校对者ID)
本文由 [LCTT](https://github.com/LCTT/TranslateProject) 原创编译,[Linux中国](https://linux.cn/) 荣誉推出
[a]: https://www.opensourceforu.com/author/krishna-mohan-koyya/
[b]: https://github.com/lkxed
[1]: https://www.opensourceforu.com/wp-content/uploads/2021/09/Digital-backgrund-connecting-in-globe.jpg
[2]: https://www.opensourceforu.com/wp-content/uploads/2021/09/Figure-1-Asynchronous-messaging.jpg
[3]: https://www.opensourceforu.com/wp-content/uploads/2021/09/Figure-2-Message-distribution-among-the-partitions.jpg
[4]: https://www.opensourceforu.com/wp-content/uploads/2021/09/Figure-3-Command-line-producer.jpg
[5]: https://www.opensourceforu.com/wp-content/uploads/2021/09/Figure-4-Command-line-consumer.jpg
[6]: https://www.opensourceforu.com/wp-content/uploads/2021/09/Figure-5-Kafka-based-architecture.jpg
[7]: https://www.opensourceforu.com/wp-content/uploads/2021/09/Figure-6-Dashboard-Terminal-1.jpg
[8]: https://www.opensourceforu.com/wp-content/uploads/2021/09/Figure-7-Dashboard-Terminal-2.jpg
[9]: https://www.opensourceforu.com/wp-content/uploads/2021/09/Figure-8-Dashboard-Terminal-3.jpg

View File

@ -0,0 +1,301 @@
[#]: subject: "Apache Kafka: Asynchronous Messaging for Seamless Systems"
[#]: via: "https://www.opensourceforu.com/2021/11/apache-kafka-asynchronous-messaging-for-seamless-systems/"
[#]: author: "Krishna Mohan Koyya https://www.opensourceforu.com/author/krishna-mohan-koyya/"
[#]: collector: "lkxed"
[#]: translator: "lkxed"
[#]: reviewer: " "
[#]: publisher: " "
[#]: url: " "
Apache Kafka为“无缝系统”提供异步消息支持
======
Apache Kafka 是最流行的开源消息代理之一。它已经成为了大数据操作的重要组成部分,你能够在几乎所有的微服务环境中找到它。本文对 Apache Kafka 进行了简要介绍,并提供了一个案例来展示它的使用方式。
![][1]
你有没有想过电子商务平台是如何在处理巨大的流量时做到不会卡顿的呢有没有想过OTT 平台是如何在同时向数百万用户交付内容时,做到平稳运行的呢?其实,关键就在于它们的分布式架构。
采用分布式架构设计的系统由多个功能组件组成。这些功能组件通常分布在许多个机器上,它们通过网络,异步地交换消息,从而实现相互协作。正是由于异步消息的存在,组件之间才能实现可伸缩、无阻塞的通信,整个系统才能够平稳运行。
### 异步消息
异步消息的常见特性有:
* 消息的生产者和消费者都不知道彼此的存在。它们在不知道其他对象的情况下,加入和离开系统。
* 消息代理充当了生产者和消费者之间的中介。
* 生产者把每条消息,都与一个<ruby>“主题”<rt>topic</rt></ruby>相关联。每个主题只是一个简单的字符串。
* 一个生产者可以把消息发往多个主题,不同生产者也可以把消息发送给同一主题。
* 消费者向代理订阅一个或多个主题的消息。
* 生产者只将消息发送给代理,而不发送给消费者。
* 代理会把消息发送给订阅该主题的所有消费者。
* 生产者并不期望得到消费者的任何回应。换句话说,生产者和消费者不会相互阻塞。
市场上的消息代理有很多,而 Apache Kafka 是其中最受欢迎的一种(之一)。
### Apache Kafka
Apache Kafka 是一个支持流处理的、开源的分布式消息系统,它由 Apache 软件基金会开发。在架构上,它是多个代理组成的集群,这些代理间通过 Apache ZooKeeper 服务来协调。在接收、持久化和发送消息时,这些代理共享集群上的负载。
#### 分区
Kafka 将消息写入称为<ruby>“分区”<rt>partitions</rt></ruby>的桶中。一个特定分区只保存一个主题上的消息。例如Kafka 会把 `heartbeats` 主题上的消息写入名为 “heartbeats-0” 的分区(假设它是个单分区主题),这个过程和生产者无关。
![图 1异步消息][2]
不过,为了利用 Kafka 集群所提供的并行处理能力,管理员通常会为指定主题创建多个分区。举个例子,假设管理员为 `heartbeats` 主题创建了三个分区Kafka 会将它们分别命名为 `heartbeats-0`、`heartbeats-1` 和 `heartbeats-2`。Kafka 会以某种方式,把消息分配到这三个分区中,并使它们均匀分布。
还有另一种可能的情况,生产者将每条消息与一个<ruby>消息键<rt>key</rt></ruby>相关联。例如,同样都是在 `heartbeats` 主题上发送消息,有个组件使用 `C1` 作为消息键,另一个则使用 `C2`。在这种情况下Kafka 会确保,在一个主题中,带有相同消息键的消息,总是会被写入到同一个分区。不过,在一个分区中,消息的消息键却不一定相同。下面的图 2 显示了消息在不同分区中的一种可能分布。
![图 2消息在不同分区中的分布][3]
#### 领导者和同步副本
Kafka 在(由多个代理组成的)集群中维护了多个分区。其中,负责维护分区的那个代理被称为<ruby>“领导者”<rt>leader</rt></ruby>。只有领导者能够在它的分区上接收和发送消息。
可是,万一分区的领导者发生故障了,又该怎么办呢?为了确保业务连续性,每个领导者(代理)都会把它的分区复制到其他代理上。此时,这些其他代理就称为该分区的<ruby>同步副本<rt>in-sync-replicas</rt></ruby>ISR。一旦分区的领导者发生故障ZooKeeper 就会发起一次选举,把选中的那个同步副本任命为新的领导者。此后,这个新的领导者将承担该分区的消息接受和发送任务。管理员可以指定分区需要维护的同步副本的大小。
![图 3命令行生产者][4]
#### 消息持久化
代理会将每个分区都映射到一个指定的磁盘文件,从而实现持久化。默认情况下,消息会在磁盘上保留一个星期。当消息写入分区后,它们的内容和顺序就不能更改了。管理员可以配置一些策略,如消息的保留时长、压缩算法等。
![图 4命令行消费者][5]
#### 消费消息
与大多数其他消息系统不同Kafka 不会主动将消息发送给消费者。相反消费者应该监听主题并主动读取消息。一个消费者可以某个主题的多个分区中读取消息。多个消费者也可以读取来自同一个分区的消息。Kafka 保证了同一条消息不会被同一个消费者重复读取。
Kafka 中的每个消费者都有一个组 ID。那些组 ID 相同的消费者们共同组成了一个消费者组。通常,为了从 N 个主题分区读取消息,管理员会创建一个包含 N 个消费者的消费者组。这样一来,组内的每个消费者都可以从它的指定分区中读取消息。如果组内的消费者比可用分区还要多,那么多出来的消费者就会处于闲置状态。
在任何情况下Kafka 都保证:不管组内有多少个消费者,同一条消息只会被该消费者组读取一次。这个架构提供了一致性、高性能、高可扩展性、准实时交付和消息持久性,以及零消息丢失。
### 安装、运行 Kafka
尽管在理论上Kafka 集群可以由任意数量的代理组成,但在生产环境中,大多数集群通常由三个或五个代理组成。
在这里,我们将搭建一个单代理集群,对于生产环境来说,它已经够用了。
在浏览器中访问 [https://kafka.apache.org/downloads][5a],下载 Kafka 的最新版本。在 Linux 终端中,我们也可以使用下面的命令来下载它:
```
wget https://www.apache.org/dyn/closer.cgi?path=/kafka/2.8.0/kafka_2.12-2.8.0.tgz
```
如果需要的话,我们也可以把下载来的档案文件 `kafka_2.12-2.8.0.tgz` 移动到另一个目录下。解压这个档案,你会得到一个名为 `kafka_2.12-2.8.0` 的目录,它就是之后我们要设置的 `KAFKA_HOME`
打开 `KAFKA_HOME/config` 目录下的 `server.properties` 文件,取消注释下面这一行配置:
```
listeners=PLAINTEXT://:9092
```
这行配置的作用是让 Kafka 在本机的 `9092` 端口接收普通文本消息。我们也可以配置 Kafka 通过<ruby>安全通道<rt>secure channel</rt></ruby>接收消息,在生产环境中,我们也推荐这么做。
无论集群中有多少个代理Kafka 都需要 ZooKeeper 来管理和协调它们。即使是单代理集群也是如此。Kafka 在安装时,会附带安装 ZooKeeper因此我们可以在 `KAFKA_HOME` 目录下,在命令行中使用下面的命令来启动它:
```
./bin/zookeeper-server-start.sh ./config/zookeeper.properties
```
当 ZooKeeper 运行起来后,我们就可以在另一个终端中启动 Kafka 了,命令如下:
```
./bin/kafka-server-start.sh ./config/server.properties
```
到这里,一个单代理的 Kafka 集群就运行起来了。
### 验证 Kafka
让我们在 `topic-1` 主题上尝试下发送和接收消息吧!我们可以使用下面的命令,在创建主题时为它指定分区的个数:
```
./bin/kafka-topics.sh --create --topic topic-1 --zookeeper localhost:2181 --partitions 3 --replication-factor 1
```
上述命令还同时指定了<ruby>复制因子<rt>replication factor</rt></ruby>,它的值不能大于集群中代理的数量。我们使用的是单代理集群,因此,复制因子只能设置为 1。
当主题创建完成后生产者和消费者就可以在上面交换消息了。Kafka 的发行版内附带了命令行工具生产者和消费者,供测试时用。
打开第三个终端,运行下面的命令,启动命令行生产者:
```
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic-1
```
上述命令显示了一个提示符,我们可以在后面输入简单文本消息。由于我们指定的命令选项,生产者会把 `topic-1` 上的消息,发送到运行在本机的 9092 端口的 Kafka 中。
打开第四个终端,运行下面的命令,启动命令行消费者:
```
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic-1 -from-beginning
```
上述命令启动了一个消费者,并指定它连接到本机 9092 端口的 Kafka。它订阅了 `topic-1` 主题,以读取其中的消息。由于命令行的最后一个选项,这个消费者会从最开头的位置,开始读取该主题的所有消息。
我们注意到,生产者和消费者连接的是同一个代理,访问的是同一个主题,因此,消费者在收到消息后会把消息打印到终端上。
下面,让我们在实际应用场景中,尝试使用 Kafka 吧!
### 案例
假设有一家叫做 ABC 的公共汽车运输公司,它拥有一支客运车队,往返于全国不同城市之间。由于 ABC 希望实时跟踪每辆客车,以提高其运营质量,因此,它提出了一个基于 Apache Kafka 的解决方案。
首先ABC 公司为所有公交车都配备了位置追踪设备。然后,它使用 Kafka 建立了一个操作中心,以接收来自数百辆客车的位置更新。它还开发了一个<ruby>仪表盘<rt>dashboard</rt></ruby>,以显示任一时间点所有客车的当前位置。图 5 展示了上述架构:
![图 5基于 Kafka 的架构][6]
在这种架构下,客车上的设备扮演了消息生产者的角色。它们会周期性地把当前位置发送到 Kafka 的 `abc-bus-location` 主题上。ABC 公司选择以客车的<ruby>行程码<rt>trip code</rt></ruby>作为消息键,以处理来自不同客车的消息。例如,对于从 Bengaluru 到 Hubballi 的客车,它的行程码就会是 `BLRHL003`,那么在这段旅程中,对于所有来自该客车的消息,它们的消息键都会是 `BLRHL003`
仪表盘应用扮演了消息消费者的角色。它在代理上注册了同一个主题 `abc-bus-location`。如此,这个主题就成为了生产者(客车)和消费者(仪表盘)之间的虚拟通道。
客车上的设备不会期待得到来自仪表盘应用的任何回复。事实上,它们相互之间都不知道对方的存在。得益于这种架构,数百辆客车和操作中心之间实现了非阻塞通信。
#### 实现
假设 ABC 公司想要创建三个分区来维护位置更新。由于我们的开发环境只有一个代理,因此复制因子应设置为 1。
相应地,以下命令创建了符合需求的主题:
```
./bin/kafka-topics.sh --create --topic abc-bus-location --zookeeper localhost:2181 --partitions 3 --replication-factor 1
```
生产者和消费者应用可以用多种语言编写,如 Java、Scala、Python 和 JavaScript 等。下面几节中的代码展示了它们在 Java 中的编写方式,好让我们有一个初步了解。
##### Java 生产者
下面的 `Fleet` 类模拟了在 ABC 公司的 6 辆客车上运行的 Kafka 生产者应用。它会把位置更新发送到指定代理的 `abc-bus-location` 主题上。请注意,简单起见,主题名称、消息键、消息内容和代理地址等,都在代码里写死了。
```
public class Fleet {
public static void main(String[] args) throws Exception {
String broker = “localhost:9092”;
Properties props = new Properties();
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker);
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
Producer<String, String> producer = new KafkaProducer<String, String>(props);
String topic = “abc-bus-location”;
Map<String, String> locations = new HashMap<>();
locations.put(“BLRHBL001”, “13.071362, 77.461906”);
locations.put(“BLRHBL002”, “14.399654, 76.045834”);
locations.put(“BLRHBL003”, “15.183959, 75.137622”);
locations.put(“BLRHBL004”, “13.659576, 76.944675”);
locations.put(“BLRHBL005”, “12.981337, 77.596181”);
locations.put(“BLRHBL006”, “13.024843, 77.546983”);
IntStream.range(0, 10).forEach(i -> {
for (String trip : locations.keySet()) {
ProducerRecord<String, String> record
= new ProducerRecord<String, String>(
topic, trip, locations.get(trip));
producer.send(record);
}
});
producer.flush();
producer.close();
}
}
```
##### Java 消费者
下面的 `Dashboard` 类实现了一个 Kafka 消费者应用,运行在 ABC 公司的操作中心。它会监听 `abc-bus-location` 主题,并且它的消费者组 ID 是 `abc-dashboard`。当收到消息后,它会立即显示来自客车的详细位置信息。我们本该配置这些详细位置信息,但简单起见,它们也是在代码里写死的:
```
public static void main(String[] args) {
String broker = “127.0.0.1:9092”;
String groupId = “abc-dashboard”;
Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, broker);
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
@SuppressWarnings(“resource”)
Consumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList(“abc-bus-location”));
while (true) {
ConsumerRecords<String, String> records
= consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
String topic = record.topic();
int partition = record.partition();
String key = record.key();
String value = record.value();
System.out.println(String.format(
“Topic=%s, Partition=%d, Key=%s, Value=%s”,
topic, partition, key, value));
}
}
}
```
##### 依赖
为了编译和运行这些代码,我们需要 JDK 8 及以上版本。看到下面的 `pom.xml` 文件中的 Maven 依赖了吗?它们会把所需的 Kafka 客户端库,下载并添加到类路径中:
```
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.25</version>
</dependency>
```
#### 部署
由于 `abc-bus-location` 主题在创建时指定了 3 个分区,我们自然就会想要运行 3 个消费者,来让读取位置更新的过程更快一些。为此,我们需要同时在 3 个不同的终端中运行仪表盘。因为所有这 3 个仪表盘都注册在同一个组 ID 下它们自然就构成了一个消费者组。Kafka 会为每个仪表盘都分配一个特定的分区(来消费)。
当所有仪表盘实例都运行起来后,在另一个终端中启动 `Fleet` 类。图 6、7、8 展示了仪表盘终端中的控制台示例输出。
![图 6仪表盘终端之一][7]
仔细看看控制台消息,我们会发现第一个、第二个和第三个终端中的消费者,正在分别从 `partition-2`、`partition-1` 和 `partition-0` 中读取消息。另外,我们还能发现,消息键为 `BLRHBL002`、`BLRHBL004` 和 `BLRHBL006` 的消息写入了 `partition-2`,消息键为 `BLRHBL005` 的消息写入了 `partition-1`,剩下的消息写入了 `partition-0`
![图 7仪表盘终端之二][8]
使用 Kafka 的好处在于,只要集群设计得当,它就可以水平扩展,从而支持大量客车和数百万条消息。
![图 8仪表盘终端之三][9]
### 不止是消息
根据 Kafka 官网上的数据在《财富》100 强企业中,超过 80% 都在使用 Kafka。它部署在许多垂直行业如金融服务、娱乐等。虽然 Kafka 起初只是一种简单的消息服务但它已凭借行业级的流处理能力成为了大数据生态系统的一环。对于那些喜欢托管解决方案的企业Confluent 提供了基于云的 Kafka 服务只需支付订阅费即可。LCTT 译注Confluent 是一个基于 Kafka 的商业公司,它提供的 Confluent Kafka 在 Apache Kafka 的基础上,增加了许多企业级特性,被认为是“更完整的 Kafka”。
--------------------------------------------------------------------------------
via: https://www.opensourceforu.com/2021/11/apache-kafka-asynchronous-messaging-for-seamless-systems/
作者:[Krishna Mohan Koyya][a]
选题:[lkxed][b]
译者:[lkxed](https://github.com/lkxed)
校对:[校对者ID](https://github.com/校对者ID)
本文由 [LCTT](https://github.com/LCTT/TranslateProject) 原创编译,[Linux中国](https://linux.cn/) 荣誉推出
[a]: https://www.opensourceforu.com/author/krishna-mohan-koyya/
[b]: https://github.com/lkxed
[1]: https://www.opensourceforu.com/wp-content/uploads/2021/09/Digital-backgrund-connecting-in-globe.jpg
[2]: https://www.opensourceforu.com/wp-content/uploads/2021/09/Figure-1-Asynchronous-messaging.jpg
[3]: https://www.opensourceforu.com/wp-content/uploads/2021/09/Figure-2-Message-distribution-among-the-partitions.jpg
[4]: https://www.opensourceforu.com/wp-content/uploads/2021/09/Figure-3-Command-line-producer.jpg
[5]: https://www.opensourceforu.com/wp-content/uploads/2021/09/Figure-4-Command-line-consumer.jpg
[5a]: https://kafka.apache.org/downloads
[6]: https://www.opensourceforu.com/wp-content/uploads/2021/09/Figure-5-Kafka-based-architecture.jpg
[7]: https://www.opensourceforu.com/wp-content/uploads/2021/09/Figure-6-Dashboard-Terminal-1.jpg
[8]: https://www.opensourceforu.com/wp-content/uploads/2021/09/Figure-7-Dashboard-Terminal-2.jpg
[9]: https://www.opensourceforu.com/wp-content/uploads/2021/09/Figure-8-Dashboard-Terminal-3.jpg