English 中文(简体)
kafka的基因化
原标题:Generic deserialization in kafka

我在卡夫卡有两门序列化和脱硫。 帝国化工作受到罚款,但我对帝国化有问题。

我找到了许多解决办法,但没有任何结果。

A. 通用等级T的发射器

public class DeserializerU<T> implements Deserializer<T> {

@Override
public void configure(Map map, boolean bln) {
}

@Override
public void close() {
}

@Override
public T deserialize(String string, byte[] bytes) {
    ObjectMapper mapper = new ObjectMapper();
    T object = null;
    try {
      object = mapper.readValue(bytes, new TypeReference<T>() {});
    } catch (Exception e) {
        e.printStackTrace();
    }
    return object;
}

航天员

public class MyObject航天员 implements 航天员 {

@Override
public void configure(Map map, boolean bln) {
}

@Override
public byte[] serialize(String string, Object t) {
    byte[] retVal = null;
    ObjectMapper objectMapper = new ObjectMapper();
    try {
      retVal = objectMapper.writeValueAsString(t).getBytes();
    } catch (Exception e) {
      e.printStackTrace();
    }
    return retVal;
}

@Override
public void close() {   
} 

1. 不动产

Properties props = new Properties();
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, new DeserializerU<MyOwnObject>().getClass());

If I replace "TypeRefence(){}" for specific type then deserializer works but i need deserializer for many objects. I also tried convertValue instead of readValue but everything return LinkedHashMap which cannot be converted to my object. Any advice on how to do this? Thanks for help

问题回答

我的回答可能是稍后的,但可以帮助某人。

因此,我无需为每一个专题设立一个消费者基金,而且,必要时,我可以将json搁置。

@KafkaListener(topics = "${topic...}")
public void consume(MyObject message) { ... }

无需为每个物体建立一个JsonDeserializer。

我的风俗习惯 班级:

@Component
public class CustomJsonDeserializer implements Deserializer<Object> {

    private final ObjectMapper mapper;

    // This MAP maps the topic to the class that I need to convert.
    private final Map<String, Class<?>> maps; 

    public CustomJsonDeserializer(
            // I can access the application properties that were defined
            final Environment environment,
            final ObjectMapper mapper
    ) {
        this.mapper = mapper;
        maps = new HashMap<>(2);
        maps.put(environment.getProperty("my-topic-1"), MyClass1.class);
        maps.put(environment.getProperty("my-topic-2"), MyClass2.class);

    }

    @Override
    public Object deserialize(String topic, byte[] data) {
        if (Objects.isNull(data) || data.length == 0) {
            return null;
        }
        try {
            return mapper.readValue(data, target);
        } catch (IOException e) {
            // TODO
            e.printStackTrace();
            throw new RuntimeException(e);
        }
    }

My Kafka Configuration category:

@EnableKafka
@Configuration
public class KafkaConfiguration {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;

    // You need to create a bean of ObjectMapper, so, spring can inject it into here
    @Autowired
    private CustomJsonDeserializer customJsonDeserializer;

    @Bean
    public ConsumerFactory<String, Object> consumerFactory() {
        final var properties = new HashMap<String, Object>(6);
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        properties.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, false);
        return new DefaultKafkaConsumerFactory<String, Object>(properties, new StringDeserializer(), customJsonDeserializer);
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Object>> kafkaListenerContainerFactory() {
        final var factory = new ConcurrentKafkaListenerContainerFactory<String, Object>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

}

Then, on my consumers, I can write:
MyConsumers.java


    @KafkaListener(topics = "${my-topic-1}")
    public void consume(final MyClass1 item) {
        //Do what whatever you want
    }

    @KafkaListener(topics = "${my-topic-2}")
    public void consume(final MyClass2 item) {
        //Do what whatever you want
    }

阁下 工厂与JsonMessageConverter建立了电文转换器。 看一看,我没有具体说明任何习俗类别。

@Configuration
@EnableKafka
public class KafkaConfig {

    @Value("${spring.kafka.consumer.bootstrap-servers}")
    private String bootstrapServer;

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setMessageConverter(new JsonMessageConverter());
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

迟。 我希望我的答案能够帮助u

我假定你的Kafka人的财产就是这样。

exampe.kafka.deserializer.mapper =com.example.consumer.model.Example
public class DeserializerU<T> implements Deserializer<T> {

private Class<T> type;

@Override
public void configure(Map map, boolean bln) {
   this.type = (Class) map.get("example.kafka.deserializer.mapper");
}

@Override
public void close() {
}

@Override
public T deserialize(String string, byte[] bytes) {
    ObjectMapper mapper = new ObjectMapper();
    T object = null;
    try {
      object = mapper.readValue(bytes, type);
    } catch (Exception e) {
        e.printStackTrace();
    }
    return object;
}
}




相关问题
Spring Properties File

Hi have this j2ee web application developed using spring framework. I have a problem with rendering mnessages in nihongo characters from the properties file. I tried converting the file to ascii using ...

Logging a global ID in multiple components

I have a system which contains multiple applications connected together using JMS and Spring Integration. Messages get sent along a chain of applications. [App A] -> [App B] -> [App C] We set a ...

Java Library Size

If I m given two Java Libraries in Jar format, 1 having no bells and whistles, and the other having lots of them that will mostly go unused.... my question is: How will the larger, mostly unused ...

How to get the Array Class for a given Class in Java?

I have a Class variable that holds a certain type and I need to get a variable that holds the corresponding array class. The best I could come up with is this: Class arrayOfFooClass = java.lang....

SQLite , Derby vs file system

I m working on a Java desktop application that reads and writes from/to different files. I think a better solution would be to replace the file system by a SQLite database. How hard is it to migrate ...

热门标签