/*
 * Decompiled with CFR 0.152.
 */
package de.justsoftware.justimport.kafka;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import de.justsoftware.justimport.kafka.KafkaConfigurationProvider;
import de.justsoftware.kafka.client.RepeatingThread;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.errors.InterruptException;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.ReadableDuration;
import org.jspecify.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.HealthIndicator;

/*
 * Exception performing whole class analysis ignored.
 */
public abstract class AbstractKafkaConsumer<K, V>
implements HealthIndicator {
    protected static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
    private static final java.time.Duration POLL_TIMEOUT = java.time.Duration.ofMillis(100L);
    private static final Duration IDLE_COMMIT_INTERVALL = Duration.standardHours((long)1L);
    private static final long MAX_WAIT_ON_ERROR_MILLIS = TimeUnit.MINUTES.toMillis(5L);
    private static final long WAIT_ON_ERROR_MILLIS_STEP = TimeUnit.SECONDS.toMillis(1L);
    private final Logger _logger = LoggerFactory.getLogger(this.getClass());
    private final @Nullable Consumer<K, V> _consumer;
    private final AtomicInteger _pollErrorCount = new AtomicInteger(0);
    private final AtomicInteger _processErrorCount = new AtomicInteger(0);
    private volatile DateTime _lastCommit = DateTime.now();
    private final String _topic;
    private @Nullable RepeatingThread _thread;
    private final String _name;
    private final KafkaConfigurationProvider _configurationProvider;
    private @Nullable RuntimeException _lastProcessingError;

    public AbstractKafkaConsumer(@Nullable Consumer<K, V> consumer, KafkaConfigurationProvider configurationProvider, String topic) {
        this._configurationProvider = configurationProvider;
        this._topic = topic;
        this._consumer = consumer;
        this._name = "KafkaConsumer-" + topic;
    }

    private void subscribe() {
        if (this._consumer != null) {
            this._consumer.subscribe((Collection)ImmutableList.of((Object)this._topic));
        }
    }

    protected void onException(Throwable e) {
        this.getLog().error("error during polling:", e);
    }

    @PostConstruct
    public void start() {
        if (!this._configurationProvider.isKafkaEnabled()) {
            return;
        }
        if (this._thread != null) {
            throw new IllegalStateException("thread is already started");
        }
        if (this._consumer == null) {
            this.getLog().warn("not starting thread {} because consumer is not configured", (Object)this._name);
            return;
        }
        this.subscribe();
        this._thread = new RepeatingThread(this._name, () -> this.pollConsumer()).onException(arg_0 -> this.onException(arg_0)).onFinish(() -> ((Consumer)this._consumer).wakeup());
    }

    @PreDestroy
    public void destroy() {
        if (this._thread != null) {
            this.getLog().info("shutting down thread {}", (Object)this._name);
            this._thread.cancel();
        }
    }

    @VisibleForTesting
    public final void pollConsumer() {
        ConsumerRecords records;
        if (this._lastCommit.plus((ReadableDuration)IDLE_COMMIT_INTERVALL).isBeforeNow()) {
            this.getLog().debug("Idle commit for {}", (Object)this._topic);
            this.commit();
        }
        if ((records = this.poll()) == null || records.isEmpty()) {
            return;
        }
        try {
            this.process(records);
            this.commit();
            this._processErrorCount.set(0);
        }
        catch (RuntimeException e) {
            this._lastProcessingError = e;
            this.handleException("Could not process events", (Throwable)e, this._processErrorCount);
        }
    }

    private @Nullable ConsumerRecords<K, V> poll() {
        try {
            ConsumerRecords p = null;
            if (this._consumer != null) {
                p = this._consumer.poll(POLL_TIMEOUT);
            }
            this._pollErrorCount.set(0);
            return p;
        }
        catch (InterruptException interrupt) {
            this.getLog().warn("Received interrupt exception. This is expected when stopping the service.");
            return ConsumerRecords.empty();
        }
        catch (RuntimeException e) {
            this.handleException("Could not poll kafka", (Throwable)e, this._pollErrorCount);
            return ConsumerRecords.empty();
        }
    }

    private void handleException(String msg, Throwable throwable, AtomicInteger errorCounter) {
        this.getLog().error(msg, throwable);
        if (this._consumer != null) {
            this._consumer.unsubscribe();
        }
        try {
            Thread.sleep(AbstractKafkaConsumer.sleepTime((int)errorCounter.incrementAndGet()));
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        this.subscribe();
    }

    public Health health() {
        int currentPocessErrorCount = this._processErrorCount.get();
        if (currentPocessErrorCount != 0) {
            Health.Builder health = Health.down().withDetail("processErrorCount", (Object)currentPocessErrorCount);
            if (this._lastProcessingError != null) {
                health.withException((Throwable)this._lastProcessingError);
            }
            return health.build();
        }
        return Health.up().build();
    }

    private static long sleepTime(int errorCount) {
        long current = WAIT_ON_ERROR_MILLIS_STEP;
        for (int i = 1; i < errorCount; ++i) {
            if ((current += current) <= MAX_WAIT_ON_ERROR_MILLIS) continue;
            return MAX_WAIT_ON_ERROR_MILLIS;
        }
        return current;
    }

    private void commit() {
        this._lastCommit = DateTime.now();
        if (this._consumer != null) {
            this._consumer.commitSync();
        }
    }

    protected Logger getLog() {
        return this._logger;
    }

    protected abstract void process(ConsumerRecords<K, V> var1);
}

