/*
 * Decompiled with CFR 0.152.
 */
package de.justsoftware.wikiservice.kafka;

import de.justsoftware.toolbox.concurrent.RepeatingThread;
import de.justsoftware.wikiservice.kafka.KafkaConsumerContext;
import java.util.Collection;
import java.util.LinkedList;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import kotlin.Metadata;
import kotlin.jvm.internal.Intrinsics;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.errors.InterruptException;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.ReadableDuration;
import org.slf4j.Logger;

/*
 * Illegal identifiers - consider using --renameillegalidents true
 */
@Metadata(mv={1, 5, 1}, k=1, xi=48, d1={"\u0000\u0088\u0001\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0010\u0000\n\u0000\n\u0002\u0018\u0002\n\u0000\n\u0002\u0010\"\n\u0002\u0010\u000e\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0000\n\u0002\u0018\u0002\n\u0002\b\u0003\n\u0002\u0010\t\n\u0002\b\u0006\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0000\n\u0002\u0018\u0002\n\u0002\b\u0003\n\u0002\u0010\u0002\n\u0002\b\u0004\n\u0002\u0010\u0003\n\u0002\b\u0004\n\u0002\u0018\u0002\n\u0002\b\u0004\n\u0002\u0018\u0002\n\u0002\b\u0003\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0010\b\n\u0002\b\u0003\b&\u0018\u0000*\u0004\b\u0000\u0010\u0001*\u0004\b\u0001\u0010\u00022\u00020\u0003B'\u0012\u0012\u0010\u0004\u001a\u000e\u0012\u0004\u0012\u00028\u0000\u0012\u0004\u0012\u00028\u00010\u0005\u0012\f\u0010\u0006\u001a\b\u0012\u0004\u0012\u00020\b0\u0007\u00a2\u0006\u0002\u0010\tJ\b\u0010#\u001a\u00020$H\u0002J\b\u0010%\u001a\u00020$H\u0007J \u0010&\u001a\u00020$2\u0006\u0010'\u001a\u00020\b2\u0006\u0010(\u001a\u00020)2\u0006\u0010*\u001a\u00020\u001bH\u0002J\u0010\u0010+\u001a\u00020$2\u0006\u0010,\u001a\u00020)H\u0002J\u0014\u0010-\u001a\u000e\u0012\u0004\u0012\u00028\u0000\u0012\u0004\u0012\u00028\u00010.H\u0002J\b\u0010/\u001a\u00020$H\u0002J)\u00100\u001a\u00020$2\u0012\u00101\u001a\u000e\u0012\u0004\u0012\u00028\u0000\u0012\u0004\u0012\u00028\u00010.2\u0006\u00102\u001a\u000203H \u00a2\u0006\u0002\b4J\u0016\u00105\u001a\u00020$2\f\u00106\u001a\b\u0012\u0004\u0012\u00020807H\u0002J\u0010\u00109\u001a\u00020\u00142\u0006\u0010:\u001a\u00020;H\u0002J\b\u0010<\u001a\u00020$H\u0007J\b\u0010=\u001a\u00020$H\u0002R\u001a\u0010\u0004\u001a\u000e\u0012\u0004\u0012\u00028\u0000\u0012\u0004\u0012\u00028\u00010\u0005X\u0082\u0004\u00a2\u0006\u0002\n\u0000R\u0016\u0010\n\u001a\n \f*\u0004\u0018\u00010\u000b0\u000bX\u0082\u0004\u00a2\u0006\u0002\n\u0000R\u0016\u0010\r\u001a\n \f*\u0004\u0018\u00010\u000e0\u000eX\u0082\u000e\u00a2\u0006\u0002\n\u0000R\u0012\u0010\u000f\u001a\u00020\u0010X\u00a4\u0004\u00a2\u0006\u0006\u001a\u0004\b\u0011\u0010\u0012R\u0014\u0010\u0013\u001a\u00020\u0014X\u0082\u0004\u00a2\u0006\b\n\u0000\u0012\u0004\b\u0015\u0010\u0016R\u0014\u0010\u0017\u001a\u00020\b8BX\u0082\u0004\u00a2\u0006\u0006\u001a\u0004\b\u0018\u0010\u0019R\u000e\u0010\u001a\u001a\u00020\u001bX\u0082\u0004\u00a2\u0006\u0002\n\u0000R\u000e\u0010\u001c\u001a\u00020\u001bX\u0082\u0004\u00a2\u0006\u0002\n\u0000R\u0010\u0010\u001d\u001a\u0004\u0018\u00010\u001eX\u0082\u000e\u00a2\u0006\u0002\n\u0000R\u001c\u0010\u001f\u001a\n \f*\u0004\u0018\u00010 0 X\u0082\u0004\u00a2\u0006\b\n\u0000\u0012\u0004\b!\u0010\u0016R\u0014\u0010\u0006\u001a\b\u0012\u0004\u0012\u00020\b0\u0007X\u0082\u0004\u00a2\u0006\u0002\n\u0000R\u000e\u0010\"\u001a\u00020\u0014X\u0082\u0004\u00a2\u0006\u0002\n\u0000\u00a8\u0006>"}, d2={"Lde/justsoftware/wikiservice/kafka/AbstractKafkaConsumer;", "K", "V", "", "consumer", "Lorg/apache/kafka/clients/consumer/Consumer;", "topics", "", "", "(Lorg/apache/kafka/clients/consumer/Consumer;Ljava/util/Set;)V", "idleCommitInterval", "Lorg/joda/time/Duration;", "kotlin.jvm.PlatformType", "lastCommit", "Lorg/joda/time/DateTime;", "log", "Lorg/slf4j/Logger;", "getLog", "()Lorg/slf4j/Logger;", "maxWaitOnErrorInMillis", "", "getMaxWaitOnErrorInMillis$annotations", "()V", "name", "getName", "()Ljava/lang/String;", "pollErrorCount", "Ljava/util/concurrent/atomic/AtomicInteger;", "processErrorCount", "thread", "Lde/justsoftware/toolbox/concurrent/RepeatingThread;", "timeout", "Ljava/time/Duration;", "getTimeout$annotations", "waitOnErrorMillisStep", "commit", "", "destroy", "handleException", "msg", "throwable", "", "errorCounter", "onException", "e", "poll", "Lorg/apache/kafka/clients/consumer/ConsumerRecords;", "pollConsumer", "process", "records", "context", "Lde/justsoftware/wikiservice/kafka/KafkaConsumerContext;", "process$wiki_service", "runQueue", "queue", "Ljava/util/Queue;", "Ljava/lang/Runnable;", "sleepTime", "errorCount", "", "start", "subscribe", "wiki-service"})
public abstract class AbstractKafkaConsumer<K, V> {
    @NotNull
    private final Consumer<K, V> consumer;
    @NotNull
    private final Set<String> topics;
    @NotNull
    private final AtomicInteger pollErrorCount;
    @NotNull
    private final AtomicInteger processErrorCount;
    private volatile DateTime lastCommit;
    @Nullable
    private RepeatingThread thread;
    private final java.time.Duration timeout;
    private final Duration idleCommitInterval;
    private final long maxWaitOnErrorInMillis;
    private final long waitOnErrorMillisStep;

    public AbstractKafkaConsumer(@NotNull Consumer<K, V> consumer, @NotNull Set<String> topics) {
        Intrinsics.checkNotNullParameter(consumer, (String)"consumer");
        Intrinsics.checkNotNullParameter(topics, (String)"topics");
        this.consumer = consumer;
        this.topics = topics;
        this.pollErrorCount = new AtomicInteger(0);
        this.processErrorCount = new AtomicInteger(0);
        this.lastCommit = DateTime.now();
        this.timeout = java.time.Duration.ofMillis(100L);
        this.idleCommitInterval = Duration.standardHours((long)1L);
        this.maxWaitOnErrorInMillis = TimeUnit.MINUTES.toMillis(5L);
        this.waitOnErrorMillisStep = TimeUnit.SECONDS.toMillis(1L);
    }

    private static /* synthetic */ void getTimeout$annotations() {
    }

    private static /* synthetic */ void getMaxWaitOnErrorInMillis$annotations() {
    }

    private final long sleepTime(int errorCount) {
        double d = 2.0;
        boolean bl = false;
        long l = (long)((double)this.waitOnErrorMillisStep * Math.pow(d, errorCount));
        long l2 = this.maxWaitOnErrorInMillis;
        boolean bl2 = false;
        return Math.min(l, l2);
    }

    private final void onException(Throwable e) {
        this.getLog().error("error during polling:", e);
    }

    @PostConstruct
    public final void start() {
        boolean bl = this.thread == null;
        boolean bl2 = false;
        boolean bl3 = false;
        if (!bl) {
            boolean bl4 = false;
            String string = "thread is already started";
            throw (Throwable)new IllegalStateException(string.toString());
        }
        this.getLog().info("Start Kafka Consumer thread " + this.getName());
        this.subscribe();
        this.thread = new RepeatingThread(this.getName(), () -> AbstractKafkaConsumer.start$lambda-1(this)).onException(arg_0 -> this.onException(arg_0)).onFinish(() -> AbstractKafkaConsumer.start$lambda-2(this));
    }

    private final String getName() {
        return "KafkaConsumer-" + this.getClass().getSimpleName();
    }

    @PreDestroy
    public final void destroy() {
        if (this.thread != null) {
            this.getLog().info("shutting down thread " + this.getName());
            RepeatingThread repeatingThread = this.thread;
            boolean bl = false;
            boolean bl2 = false;
            bl2 = false;
            boolean bl3 = false;
            if (repeatingThread == null) {
                boolean bl4 = false;
                String string = "Required value was null.";
                throw (Throwable)new IllegalStateException(string.toString());
            }
            repeatingThread.cancel();
        }
    }

    private final void subscribe() {
        this.consumer.subscribe((Collection)this.topics);
    }

    private final void pollConsumer() {
        if (this.lastCommit.plus((ReadableDuration)this.idleCommitInterval).isBeforeNow()) {
            this.getLog().debug("Idle commit for {}", (Object)this.topics);
            this.commit();
        }
        Queue afterCommitQueue = new LinkedList();
        ConsumerRecords records = this.poll();
        if (records.isEmpty()) {
            return;
        }
        try {
            this.process$wiki_service(records, (KafkaConsumerContext)new /* Unavailable Anonymous Inner Class!! */);
            this.commit();
            this.processErrorCount.set(0);
        }
        catch (RuntimeException e) {
            this.handleException("Could not process events", (Throwable)e, this.processErrorCount);
        }
        this.runQueue(afterCommitQueue);
    }

    private final void runQueue(Queue<Runnable> queue) {
        while (!queue.isEmpty()) {
            Runnable callback = queue.remove();
            try {
                callback.run();
            }
            catch (RuntimeException e) {
                this.getLog().error("Could not process after commit callback", (Throwable)e);
            }
        }
    }

    private final ConsumerRecords<K, V> poll() {
        ConsumerRecords consumerRecords;
        ConsumerRecords p = null;
        try {
            consumerRecords = this.consumer.poll(this.timeout);
            Intrinsics.checkNotNullExpressionValue((Object)consumerRecords, (String)"consumer.poll(timeout)");
            p = consumerRecords;
            this.pollErrorCount.set(0);
            consumerRecords = p;
        }
        catch (InterruptException interrupt) {
            this.getLog().warn("Received interrupt exception. This is expected when stopping the service.");
            ConsumerRecords consumerRecords2 = ConsumerRecords.empty();
            Intrinsics.checkNotNullExpressionValue((Object)consumerRecords2, (String)"{\n            log.warn(\"Received interrupt exception. This is expected when stopping the service.\")\n            ConsumerRecords.empty()\n        }");
            consumerRecords = consumerRecords2;
        }
        catch (RuntimeException e) {
            this.handleException("Could not poll kafka", (Throwable)e, this.pollErrorCount);
            ConsumerRecords consumerRecords3 = ConsumerRecords.empty();
            Intrinsics.checkNotNullExpressionValue((Object)consumerRecords3, (String)"{\n            handleException(\"Could not poll kafka\", e, pollErrorCount)\n            ConsumerRecords.empty()\n        }");
            consumerRecords = consumerRecords3;
        }
        return consumerRecords;
    }

    private final void handleException(String msg, Throwable throwable, AtomicInteger errorCounter) {
        this.getLog().error(msg, throwable);
        this.consumer.unsubscribe();
        try {
            Thread.sleep(this.sleepTime(errorCounter.incrementAndGet()));
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        this.subscribe();
    }

    private final void commit() {
        this.lastCommit = DateTime.now();
        this.consumer.commitSync();
    }

    @NotNull
    protected abstract Logger getLog();

    public abstract void process$wiki_service(@NotNull ConsumerRecords<K, V> var1, @NotNull KafkaConsumerContext var2);

    private static final void start$lambda-1(AbstractKafkaConsumer this$0) {
        Intrinsics.checkNotNullParameter((Object)this$0, (String)"this$0");
        this$0.pollConsumer();
    }

    private static final void start$lambda-2(AbstractKafkaConsumer this$0) {
        Intrinsics.checkNotNullParameter((Object)this$0, (String)"this$0");
        this$0.consumer.wakeup();
    }
}

