当先锋百科网

首页 1 2 3 4 5 6 7

Java是一种广泛使用的编程语言,用于创建各种类型的应用程序。其中,Kafka是一个流行的开源消息队列,它使用发布订阅模式来传递消息。Json是一种用于数据交换的轻量级格式,广泛运用于Web开发和API交互中。

使用Java与Kafka进行消息传递时,Json格式的数据常常被使用。下面的代码示例展示如何使用Java向Kafka发送Json数据:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producerproducer = new KafkaProducer<>(props);
JsonObject jsonObject = new JsonObject();
jsonObject.addProperty("id", 1);
jsonObject.addProperty("name", "John");
jsonObject.addProperty("age", 25);
String jsonString = jsonObject.toString();
ProducerRecordrecord = new ProducerRecord<>("test_topic", "key", jsonString);
producer.send(record);
producer.close();

首先,我们使用了KafkaProducer类创建了一个生产者实例。然后,我们使用JsonObject类创建了一个Json对象,并添加了id、name和age三个属性。最后,我们通过ProducerRecord类创建了一个包含Key、Value和Topic信息的数据记录,并将其发送到Kafka。

接收消息时,我们也可以使用Json格式的数据。以下示例演示如何在Java中解析从Kafka接收到的Json数据:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test_group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
Consumerconsumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("test_topic"));
while (true) {
ConsumerRecordsrecords = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecordrecord : records) {
String jsonString = record.value();
JsonObject jsonObject = new JsonParser().parse(jsonString).getAsJsonObject();
int id = jsonObject.get("id").getAsInt();
String name = jsonObject.get("name").getAsString();
int age = jsonObject.get("age").getAsInt();
System.out.println("id: " + id + ", name: " + name + ", age: " + age);
}
}

首先,我们创建了一个消费者实例,并订阅了test_topic主题。在接收到消息时,我们可以使用ConsumerRecord类来解析数据,并使用JsonParser和JsonObject来将Json字符串转换为Json对象。最后,我们可以通过JsonObject的get()方法来获取Json对象中的属性值,并将其用于后续的操作。