/*
 * Decompiled with CFR 0.152.
 */
package de.justsoftware.people.kafka;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import de.justsoftware.toolbox.concurrent.RepeatingThread;
import java.util.Collection;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nonnull;
import javax.annotation.ParametersAreNonnullByDefault;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.errors.InterruptException;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.ReadableDuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/*
 * Exception performing whole class analysis ignored.
 */
@ParametersAreNonnullByDefault
public abstract class AbstractKafkaConsumer<K, V> {
    private static final java.time.Duration TIMEOUT = java.time.Duration.ofMillis(100L);
    private static final Duration IDLE_COMMIT_INTERVAL = Duration.standardHours((long)1L);
    private static final long MAX_WAIT_ON_ERROR_MILLIS = TimeUnit.MINUTES.toMillis(5L);
    private static final long WAIT_ON_ERROR_MILLIS_STEP = TimeUnit.SECONDS.toMillis(1L);
    private final Logger _logger = LoggerFactory.getLogger(this.getClass());
    private final Consumer<K, V> _consumer;
    private final AtomicInteger _pollErrorCount = new AtomicInteger(0);
    private final AtomicInteger _processErrorCount = new AtomicInteger(0);
    private final ImmutableSet<String> _topics;
    private volatile DateTime _lastCommit = DateTime.now();
    private RepeatingThread _thread;

    public AbstractKafkaConsumer(Consumer<K, V> consumer, String ... topics) {
        this(consumer, ImmutableSet.copyOf((Object[])topics));
    }

    public AbstractKafkaConsumer(Consumer<K, V> consumer, ImmutableSet<String> topics) {
        this._consumer = consumer;
        this._topics = topics;
    }

    protected void onException(Throwable e) {
        this.getLog().error("error during polling:", e);
    }

    @PostConstruct
    public void start() {
        if (this._thread != null) {
            throw new IllegalStateException("thread is already started");
        }
        this.subscribe();
        this._thread = new RepeatingThread(this.getName(), () -> this.pollConsumer()).onException(arg_0 -> this.onException(arg_0)).onFinish(() -> ((Consumer)this._consumer).wakeup());
    }

    @Nonnull
    private String getName() {
        return "KafkaConsumer-" + this.getClass().getSimpleName();
    }

    @PreDestroy
    public void destroy() {
        if (this._thread != null) {
            this.getLog().info("shutting down thread {}", (Object)this.getName());
            this._thread.cancel();
        }
    }

    private void subscribe() {
        this._consumer.subscribe((Collection)this._topics);
    }

    @VisibleForTesting
    public final void pollConsumer() {
        if (this._lastCommit.plus((ReadableDuration)IDLE_COMMIT_INTERVAL).isBeforeNow()) {
            this.getLog().debug("Idle commit for {}", (Object)this._topics);
            this.commit();
        }
        LinkedList afterCommit = new LinkedList();
        ConsumerRecords records = this.poll();
        if (records.isEmpty()) {
            return;
        }
        try {
            this.process(records, r -> afterCommit.add((Runnable)Preconditions.checkNotNull((Object)r)));
            this.commit();
            this._processErrorCount.set(0);
        }
        catch (RuntimeException e) {
            this.handleException("Could not process events", (Throwable)e, this._processErrorCount);
        }
        this.runQueue(afterCommit);
    }

    private void runQueue(Queue<Runnable> queue) {
        while (!queue.isEmpty()) {
            Runnable callback = queue.remove();
            try {
                callback.run();
            }
            catch (RuntimeException e) {
                this.getLog().error("Could not process after commit callback", (Throwable)e);
            }
        }
    }

    @Nonnull
    private ConsumerRecords<K, V> poll() {
        try {
            ConsumerRecords p = this._consumer.poll(TIMEOUT);
            this._pollErrorCount.set(0);
            return p;
        }
        catch (InterruptException interrupt) {
            this.getLog().warn("Received interrupt exception. This is expected when stopping the service.");
            return ConsumerRecords.empty();
        }
        catch (RuntimeException e) {
            this.handleException("Could not poll kafka", (Throwable)e, this._pollErrorCount);
            return ConsumerRecords.empty();
        }
    }

    private void handleException(String msg, Throwable throwable, AtomicInteger errorCounter) {
        this.getLog().error(msg, throwable);
        this._consumer.unsubscribe();
        try {
            Thread.sleep(AbstractKafkaConsumer.sleepTime((int)errorCounter.incrementAndGet()));
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        this.subscribe();
    }

    private static long sleepTime(int errorCount) {
        long current = WAIT_ON_ERROR_MILLIS_STEP;
        for (int i = 1; i < errorCount; ++i) {
            if ((current += current) <= MAX_WAIT_ON_ERROR_MILLIS) continue;
            return MAX_WAIT_ON_ERROR_MILLIS;
        }
        return current;
    }

    private void commit() {
        this._lastCommit = DateTime.now();
        this._consumer.commitSync();
    }

    @Nonnull
    protected Logger getLog() {
        return this._logger;
    }

    protected abstract void process(ConsumerRecords<K, V> var1, java.util.function.Consumer<Runnable> var2);
}

