diff --git a/sources/tech/20211102 Apache Kafka- Asynchronous Messaging for Seamless Systems.md b/sources/tech/20211102 Apache Kafka- Asynchronous Messaging for Seamless Systems.md index 8646536175..958d99e847 100644 --- a/sources/tech/20211102 Apache Kafka- Asynchronous Messaging for Seamless Systems.md +++ b/sources/tech/20211102 Apache Kafka- Asynchronous Messaging for Seamless Systems.md @@ -169,36 +169,36 @@ The Fleet class simulates the Kafka producer applications running on six buses o ``` public class Fleet { - public static void main(String[] args) throws Exception { - String broker = “localhost:9092”; - Properties props = new Properties(); - props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker); - props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, - StringSerializer.class.getName()); - props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, - StringSerializer.class.getName()); + public static void main(String[] args) throws Exception { + String broker = “localhost:9092”; + Properties props = new Properties(); + props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker); + props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, + StringSerializer.class.getName()); + props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, + StringSerializer.class.getName()); - Producer<String, String> producer = new KafkaProducer<String, String>(props); - String topic = “abc-bus-location”; - Map<String, String> locations = new HashMap<>(); - locations.put(“BLRHBL001”, “13.071362, 77.461906”); - locations.put(“BLRHBL002”, “14.399654, 76.045834”); - locations.put(“BLRHBL003”, “15.183959, 75.137622”); - locations.put(“BLRHBL004”, “13.659576, 76.944675”); - locations.put(“BLRHBL005”, “12.981337, 77.596181”); - locations.put(“BLRHBL006”, “13.024843, 77.546983”); + Producer<String, String> producer = new KafkaProducer<String, String>(props); + String topic = “abc-bus-location”; + Map<String, String> locations = new HashMap<>(); + locations.put(“BLRHBL001”, “13.071362, 77.461906”); + locations.put(“BLRHBL002”, “14.399654, 76.045834”); + locations.put(“BLRHBL003”, “15.183959, 75.137622”); + locations.put(“BLRHBL004”, “13.659576, 76.944675”); + locations.put(“BLRHBL005”, “12.981337, 77.596181”); + locations.put(“BLRHBL006”, “13.024843, 77.546983”); - IntStream.range(0, 10).forEach(i -> { - for (String trip : locations.keySet()) { - ProducerRecord<String, String> record - = new ProducerRecord<String, String>( - topic, trip, locations.get(trip)); - producer.send(record); - } - }); - producer.flush(); - producer.close(); - } + IntStream.range(0, 10).forEach(i -> { + for (String trip : locations.keySet()) { + ProducerRecord<String, String> record + = new ProducerRecord<String, String>( + topic, trip, locations.get(trip)); + producer.send(record); + } + }); + producer.flush(); + producer.close(); + } } ``` @@ -208,34 +208,33 @@ The Dashboard class implements the Kafka consumer application and it runs at the ``` public static void main(String[] args) { - String broker = “127.0.0.1:9092”; - String groupId = “abc-dashboard”; - Properties props = new Properties(); - props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, broker); - props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, - StringDeserializer.class.getName()); - props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, - StringDeserializer.class.getName()); - props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId); + String broker = “127.0.0.1:9092”; + String groupId = “abc-dashboard”; + Properties props = new Properties(); + props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, broker); + props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, + StringDeserializer.class.getName()); + props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, + StringDeserializer.class.getName()); + props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId); - @SuppressWarnings(“resource”) - Consumer<String, String> consumer = new KafkaConsumer<String, String>(props); - consumer.subscribe(Arrays.asList(“abc-bus-location”)); - while (true) { - ConsumerRecords<String, String> records - = consumer.poll(Duration.ofMillis(1000)); + @SuppressWarnings(“resource”) + Consumer<String, String> consumer = new KafkaConsumer<String, String>(props); + consumer.subscribe(Arrays.asList(“abc-bus-location”)); + while (true) { + ConsumerRecords<String, String> records + = consumer.poll(Duration.ofMillis(1000)); - for (ConsumerRecord<String, String> record : records) { - String topic = record.topic(); - int partition = record.partition(); - String key = record.key(); - String value = record.value(); - System.out.println(String.format( - “Topic=%s, Partition=%d, Key=%s, Value=%s”, - topic, partition, key, value)); - } - } - } + for (ConsumerRecord<String, String> record : records) { + String topic = record.topic(); + int partition = record.partition(); + String key = record.key(); + String value = record.value(); + System.out.println(String.format( + “Topic=%s, Partition=%d, Key=%s, Value=%s”, + topic, partition, key, value)); + } + } } ``` @@ -245,14 +244,14 @@ A JDK of 8+ version is required to compile and run this code. The following Mave ``` <dependency> - <groupId>org.apache.kafka</groupId> - <artifactId>kafka-clients</artifactId> - <version>2.8.0</version> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka-clients</artifactId> + <version>2.8.0</version> </dependency> <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-simple</artifactId> - <version>1.7.25</version> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-simple</artifactId> + <version>1.7.25</version> </dependency> ```