Вы находитесь в ситуации, когда хотите слушать сообщения Kafka с определенного смещения. Для этого вам нужно реализовать интерфейс с именем ConsumerSeekAware и искать позицию, с которой вы хотите читать. В этой статье я реализую процесс сброса прослушивателя сообщений Kafka на определенное смещение с помощью Java и Spring Framework.

План:

  1. Реализуйте интерфейс ConsumerSeekAware, который ищет смещение.
  2. Создайте прослушиватель Kafka, который принимает сообщения.
  3. Создайте службу, которая перезапускает прослушиватель Kafka.
  4. Порядок выполнения метода и результат.

1. РеализуйтеинтерфейсConsumerSeekAware, который ищет смещение.

Во-первых, вам нужно реализовать интерфейс ConsumerSeekAware и переопределить метод onPartitionsAssigned(…), который ищет смещение, которое вы хотеть. Вот пример реализации:

//just copy this code
@Service
public class ConsumerSeekAwareImpl implements ConsumerSeekAware {
    long offset = 777l; //your offset number
    String topic = "TopicName"; //your topic name 

    @Override
    public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
            assignments.keySet().forEach(partition ->
                    callback.seek(this.topic, partition, this.offset));
        }
    }
}

Метод onPartitionsAssigned(…)будет выполнен после того, как MessageListenerContainer Kafka начнет процесс прослушивания сообщения. Такие параметры, как смещение, тема partition должны быть определены самостоятельно.

2. Создайте прослушиватель Kafka, который принимает сообщения.

Создайте прослушиватель Kafka, который использует сообщения. Мы можем сделать это с помощью разных решений, но описанный подход реализуется с помощью самого простого, аннотации @KafkaListener:

@Service
public class ConsumerSeekAwareImpl implements ConsumerSeekAware {
    long offset = 777l; //your offset number
    String topic = "TopicName"; //your topic name
    String listenerId = "listenerId"; //your listener id

    //id of this listener has to be remembered for further
    //use in method getListenerContainer of KafkaListenerEndpointRegistry
    @KafkaListener(id = listenerId,
            groupId = "groupName",
            topics = topic)
    public void listenServiceCall(@Payload String message,
                                  @Header(KafkaHeaders.OFFSET) Long offset) {
        System.out.println("offset: " + offset ", message: " + message);
    }

    @Override
    public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
            assignments.keySet().forEach(partition ->
                    callback.seek(this.topic, partition, this.offset));
        }
    }
}

Здесь вы видите метод listenServiceCall(…), помеченный аннотацией @KafkaListener, что делает этот метод обнаруженным контейнером приложения и, в частности, MessageListenerContainer. .

3. Создайте службу, которая перезапускает прослушиватель Kafka.

Давайте обернем и улучшим методы stop() и start() из MessageListenerContainer для нашихпорпусов, а затем выполним их в соответствии с одним методом.

@Service
public class KafkaListenerRestartService {
    @Autowired
    private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

    String listenerId = "listenerId";

    public void restartKafkaListener() {
        stop();
        start();
    }

    public void start() {
        MessageListenerContainer container = kafkaListenerEndpointRegistry
                .getListenerContainer(this.listenerId);
        if (container != null && !container.isRunning()) {
            container.start();
        }
    }

    public void stop() {
        MessageListenerContainer container = kafkaListenerEndpointRegistry
                .getListenerContainer(this.listenerId);
        if (container != null && container.isRunning()) {
            container.stop();
        }
    }
}

Здесь вы видите методы start() и stop(), реализованные с помощью класса KafkaListenerEndpointRegistry, который обеспечивает управление запуском конкретного прослушивателя Kafka через MessageListenerContainer и идентификатор нужного слушателя.

4. Порядок выполнения методов и результат.

Вот порядок выполнения методов с момента, когда Kafka перестает слушать сообщения, до поиска смещения и снова начинает слушать. В результате слушатель Kafka читал сообщения с определенного смещения:

METHODS EXECUTION ORDER:
1. restartKafkaListener() -> 
2. stop() -> 
3. start() -> 
4. onPartitionsAssigned(...) -> 
5. listenServiceCall()

RESULT: 
listening from your offset

Заключение

Мы разработали решение, которое сбрасывает прослушиватель Kafka на определенное смещение во время выполнения. Примеры кода в этой статье можно повторно использовать в вашем собственном приложении и изменить в соответствии с вашими предпочтениями дизайна кода.