/*
 * Decompiled with CFR 0.152.
 */
package de.justsoftware.drive.content.kafka;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import de.justsoftware.drive.content.kafka.AbstractKafkaConsumer;
import de.justsoftware.kafka.client.RepeatingThread;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.ParametersAreNonnullByDefault;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.errors.InterruptException;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.ReadableDuration;
import org.slf4j.Logger;

/*
 * Exception performing whole class analysis ignored.
 */
@ParametersAreNonnullByDefault
public abstract class AbstractKafkaConsumer<K, V> {
    private static final java.time.Duration POLL_TIMEOUT = java.time.Duration.ofMillis(100L);
    private static final Duration IDLE_COMMIT_INTERVALL = Duration.standardMinutes((long)1L);
    private static final long MAX_WAIT_ON_ERROR_MILLIS = TimeUnit.MINUTES.toMillis(5L);
    private static final long WAIT_ON_ERROR_MILLIS_STEP = TimeUnit.SECONDS.toMillis(1L);
    private final Consumer<K, V> _consumer;
    private final AtomicInteger _pollErrorCount = new AtomicInteger(0);
    private final AtomicInteger _processErrorCount = new AtomicInteger(0);
    private volatile DateTime _lastCommit = DateTime.now();
    private final String _topic;
    private RepeatingThread _thread;
    private final String _name;

    public AbstractKafkaConsumer(@Nullable Consumer<K, V> consumer, String topic, int threadIdentifier) {
        this._topic = topic;
        this._consumer = consumer;
        this._name = "KafkaConsumer-" + topic + "-" + threadIdentifier;
    }

    @PostConstruct
    public void start() {
        if (this._thread != null) {
            throw new IllegalStateException("thread is already started");
        }
        if (this._consumer == null) {
            this.getLog().warn("not starting thread {} because consumer is not configured", (Object)this._name);
            return;
        }
        this.getLog().info("starting thread {}", (Object)this._name);
        this.subscribe();
        this._thread = new RepeatingThread(this._name, () -> this.pollConsumer()).onException(arg_0 -> this.onException(arg_0)).onFinish(() -> ((Consumer)this._consumer).wakeup()).setPriority(this.getPriority());
    }

    protected int getPriority() {
        return Thread.currentThread().getPriority();
    }

    private void subscribe() {
        this._consumer.subscribe((Collection)ImmutableList.of((Object)this._topic));
    }

    protected void onException(Throwable e) {
        this.getLog().error("error during polling:", e);
    }

    @PreDestroy
    public void destroy() {
        if (this._thread != null) {
            this.getLog().info("shutting down thread {}", (Object)this._name);
            this._thread.cancel();
        }
    }

    @VisibleForTesting
    public final void pollConsumer() {
        ConsumerRecords p;
        if (this._lastCommit.plus((ReadableDuration)IDLE_COMMIT_INTERVALL).isBeforeNow()) {
            this.getLog().debug("Idle commit for {}", (Object)this._topic);
            this.commit();
        }
        if ((p = this.poll()).isEmpty()) {
            return;
        }
        try {
            this.process(p);
            this.commit();
            this._processErrorCount.set(0);
        }
        catch (StopProcessingException e) {
            this.handleException("Could not process events " + e.getMessage(), (Throwable)e, this._processErrorCount);
        }
        catch (RuntimeException e) {
            this.handleException("Could not process events", (Throwable)e, this._processErrorCount);
        }
    }

    @Nonnull
    private ConsumerRecords<K, V> poll() {
        try {
            ConsumerRecords p = this._consumer.poll(POLL_TIMEOUT);
            this._pollErrorCount.set(0);
            return p;
        }
        catch (InterruptException interrupt) {
            this.getLog().warn("Received interrupt exception. This is expected when stopping the service.");
            return ConsumerRecords.empty();
        }
        catch (RuntimeException e) {
            this.handleException("Could not poll kafka", (Throwable)e, this._pollErrorCount);
            return ConsumerRecords.empty();
        }
    }

    private void handleException(String msg, Throwable throwable, AtomicInteger errorCounter) {
        this.getLog().error(msg, throwable);
        this._consumer.unsubscribe();
        try {
            Thread.sleep(AbstractKafkaConsumer.sleepTime((int)errorCounter.incrementAndGet()));
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        this.subscribe();
    }

    private static long sleepTime(int errorCount) {
        long current = WAIT_ON_ERROR_MILLIS_STEP;
        for (int i = 1; i < errorCount; ++i) {
            if ((current += current) <= MAX_WAIT_ON_ERROR_MILLIS) continue;
            return MAX_WAIT_ON_ERROR_MILLIS;
        }
        return current;
    }

    protected void commit() {
        this._lastCommit = DateTime.now();
        this._consumer.commitSync();
    }

    @Nonnull
    protected abstract Logger getLog();

    protected abstract void process(ConsumerRecords<K, V> var1) throws StopProcessingException;
}

