Merge pull request #26113 from lkxed/master

调整代码缩进
This commit is contained in:
六开箱 2022-06-17 20:01:36 +08:00 committed by GitHub
commit 77d892a54e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -169,36 +169,36 @@ The Fleet class simulates the Kafka producer applications running on six buses o
```
public class Fleet {
public static void main(String[] args) throws Exception {
String broker = “localhost:9092”;
Properties props = new Properties();
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker);
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
public static void main(String[] args) throws Exception {
String broker = “localhost:9092”;
Properties props = new Properties();
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker);
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
Producer<String, String> producer = new KafkaProducer<String, String>(props);
String topic = “abc-bus-location”;
Map<String, String> locations = new HashMap<>();
locations.put(“BLRHBL001”, “13.071362, 77.461906”);
locations.put(“BLRHBL002”, “14.399654, 76.045834”);
locations.put(“BLRHBL003”, “15.183959, 75.137622”);
locations.put(“BLRHBL004”, “13.659576, 76.944675”);
locations.put(“BLRHBL005”, “12.981337, 77.596181”);
locations.put(“BLRHBL006”, “13.024843, 77.546983”);
Producer<String, String> producer = new KafkaProducer<String, String>(props);
String topic = “abc-bus-location”;
Map<String, String> locations = new HashMap<>();
locations.put(“BLRHBL001”, “13.071362, 77.461906”);
locations.put(“BLRHBL002”, “14.399654, 76.045834”);
locations.put(“BLRHBL003”, “15.183959, 75.137622”);
locations.put(“BLRHBL004”, “13.659576, 76.944675”);
locations.put(“BLRHBL005”, “12.981337, 77.596181”);
locations.put(“BLRHBL006”, “13.024843, 77.546983”);
IntStream.range(0, 10).forEach(i -> {
for (String trip : locations.keySet()) {
ProducerRecord<String, String> record
= new ProducerRecord<String, String>(
topic, trip, locations.get(trip));
producer.send(record);
}
});
producer.flush();
producer.close();
}
IntStream.range(0, 10).forEach(i -> {
for (String trip : locations.keySet()) {
ProducerRecord<String, String> record
= new ProducerRecord<String, String>(
topic, trip, locations.get(trip));
producer.send(record);
}
});
producer.flush();
producer.close();
}
}
```
@ -208,34 +208,33 @@ The Dashboard class implements the Kafka consumer application and it runs at the
```
public static void main(String[] args) {
String broker = “127.0.0.1:9092”;
String groupId = “abc-dashboard”;
Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, broker);
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
String broker = “127.0.0.1:9092”;
String groupId = “abc-dashboard”;
Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, broker);
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
@SuppressWarnings(“resource”)
Consumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList(“abc-bus-location”));
while (true) {
ConsumerRecords<String, String> records
= consumer.poll(Duration.ofMillis(1000));
@SuppressWarnings(“resource”)
Consumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList(“abc-bus-location”));
while (true) {
ConsumerRecords<String, String> records
= consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
String topic = record.topic();
int partition = record.partition();
String key = record.key();
String value = record.value();
System.out.println(String.format(
“Topic=%s, Partition=%d, Key=%s, Value=%s”,
topic, partition, key, value));
}
}
}
for (ConsumerRecord<String, String> record : records) {
String topic = record.topic();
int partition = record.partition();
String key = record.key();
String value = record.value();
System.out.println(String.format(
“Topic=%s, Partition=%d, Key=%s, Value=%s”,
topic, partition, key, value));
}
}
}
```
@ -245,14 +244,14 @@ A JDK of 8+ version is required to compile and run this code. The following Mave
```
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.25</version>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.25</version>
</dependency>
```