Java是一种广泛使用的编程语言,用于创建各种类型的应用程序。其中,Kafka是一个流行的开源消息队列,它使用发布订阅模式来传递消息。Json是一种用于数据交换的轻量级格式,广泛运用于Web开发和API交互中。
使用Java与Kafka进行消息传递时,Json格式的数据常常被使用。下面的代码示例展示如何使用Java向Kafka发送Json数据:
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producerproducer = new KafkaProducer<>(props); JsonObject jsonObject = new JsonObject(); jsonObject.addProperty("id", 1); jsonObject.addProperty("name", "John"); jsonObject.addProperty("age", 25); String jsonString = jsonObject.toString(); ProducerRecord record = new ProducerRecord<>("test_topic", "key", jsonString); producer.send(record); producer.close();
首先,我们使用了KafkaProducer类创建了一个生产者实例。然后,我们使用JsonObject类创建了一个Json对象,并添加了id、name和age三个属性。最后,我们通过ProducerRecord类创建了一个包含Key、Value和Topic信息的数据记录,并将其发送到Kafka。
接收消息时,我们也可以使用Json格式的数据。以下示例演示如何在Java中解析从Kafka接收到的Json数据:
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test_group"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); Consumerconsumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("test_topic")); while (true) { ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord record : records) { String jsonString = record.value(); JsonObject jsonObject = new JsonParser().parse(jsonString).getAsJsonObject(); int id = jsonObject.get("id").getAsInt(); String name = jsonObject.get("name").getAsString(); int age = jsonObject.get("age").getAsInt(); System.out.println("id: " + id + ", name: " + name + ", age: " + age); } }
首先,我们创建了一个消费者实例,并订阅了test_topic主题。在接收到消息时,我们可以使用ConsumerRecord类来解析数据,并使用JsonParser和JsonObject来将Json字符串转换为Json对象。最后,我们可以通过JsonObject的get()方法来获取Json对象中的属性值,并将其用于后续的操作。