/*
 * Decompiled with CFR 0.152.
 */
package de.justsoftware.chat.kafka;

import de.justsoftware.chat.kafka.KafkaConsumerContext;
import de.justsoftware.toolbox.concurrent.RepeatingThread;
import java.time.Duration;
import java.time.Instant;
import java.util.Collection;
import java.util.LinkedList;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import kotlin.Metadata;
import kotlin.jvm.internal.Intrinsics;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.errors.InterruptException;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;

/*
 * Illegal identifiers - consider using --renameillegalidents true
 */
@Metadata(mv={1, 7, 1}, k=1, xi=48, d1={"\u0000\u0082\u0001\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0010\u0000\n\u0000\n\u0002\u0018\u0002\n\u0000\n\u0002\u0010\"\n\u0002\u0010\u000e\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0000\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\b\u0003\n\u0002\u0010\t\n\u0002\b\u0004\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\b\u0003\n\u0002\u0010\u0002\n\u0002\b\u0004\n\u0002\u0010\u0003\n\u0002\b\u0004\n\u0002\u0018\u0002\n\u0002\b\u0004\n\u0002\u0018\u0002\n\u0002\b\u0003\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0010\b\n\u0002\b\u0003\b&\u0018\u0000*\u0004\b\u0000\u0010\u0001*\u0004\b\u0001\u0010\u00022\u00020\u0003B'\u0012\u0012\u0010\u0004\u001a\u000e\u0012\u0004\u0012\u00028\u0000\u0012\u0004\u0012\u00028\u00010\u0005\u0012\f\u0010\u0006\u001a\b\u0012\u0004\u0012\u00020\b0\u0007\u00a2\u0006\u0002\u0010\tJ\b\u0010\u001f\u001a\u00020 H\u0002J\b\u0010!\u001a\u00020 H\u0007J \u0010\"\u001a\u00020 2\u0006\u0010#\u001a\u00020\b2\u0006\u0010$\u001a\u00020%2\u0006\u0010&\u001a\u00020\u0019H\u0002J\u0010\u0010'\u001a\u00020 2\u0006\u0010(\u001a\u00020%H\u0002J\u0014\u0010)\u001a\u000e\u0012\u0004\u0012\u00028\u0000\u0012\u0004\u0012\u00028\u00010*H\u0002J\b\u0010+\u001a\u00020 H\u0002J)\u0010,\u001a\u00020 2\u0012\u0010-\u001a\u000e\u0012\u0004\u0012\u00028\u0000\u0012\u0004\u0012\u00028\u00010*2\u0006\u0010.\u001a\u00020/H \u00a2\u0006\u0002\b0J\u0016\u00101\u001a\u00020 2\f\u00102\u001a\b\u0012\u0004\u0012\u00020403H\u0002J\u0010\u00105\u001a\u00020\u00142\u0006\u00106\u001a\u000207H\u0002J\b\u00108\u001a\u00020 H\u0007J\b\u00109\u001a\u00020 H\u0002R\u001a\u0010\u0004\u001a\u000e\u0012\u0004\u0012\u00028\u0000\u0012\u0004\u0012\u00028\u00010\u0005X\u0082\u0004\u00a2\u0006\u0002\n\u0000R\u000e\u0010\n\u001a\u00020\u000bX\u0082\u0004\u00a2\u0006\u0002\n\u0000R\u0016\u0010\f\u001a\n \u000e*\u0004\u0018\u00010\r0\rX\u0082\u000e\u00a2\u0006\u0002\n\u0000R\u0012\u0010\u000f\u001a\u00020\u0010X\u00a4\u0004\u00a2\u0006\u0006\u001a\u0004\b\u0011\u0010\u0012R\u000e\u0010\u0013\u001a\u00020\u0014X\u0082\u0004\u00a2\u0006\u0002\n\u0000R\u0014\u0010\u0015\u001a\u00020\b8BX\u0082\u0004\u00a2\u0006\u0006\u001a\u0004\b\u0016\u0010\u0017R\u000e\u0010\u0018\u001a\u00020\u0019X\u0082\u0004\u00a2\u0006\u0002\n\u0000R\u000e\u0010\u001a\u001a\u00020\u0019X\u0082\u0004\u00a2\u0006\u0002\n\u0000R\u0010\u0010\u001b\u001a\u0004\u0018\u00010\u001cX\u0082\u000e\u00a2\u0006\u0002\n\u0000R\u000e\u0010\u001d\u001a\u00020\u000bX\u0082\u0004\u00a2\u0006\u0002\n\u0000R\u0014\u0010\u0006\u001a\b\u0012\u0004\u0012\u00020\b0\u0007X\u0082\u0004\u00a2\u0006\u0002\n\u0000R\u000e\u0010\u001e\u001a\u00020\u0014X\u0082\u0004\u00a2\u0006\u0002\n\u0000\u00a8\u0006:"}, d2={"Lde/justsoftware/chat/kafka/AbstractKafkaConsumer;", "K", "V", "", "consumer", "Lorg/apache/kafka/clients/consumer/Consumer;", "topics", "", "", "(Lorg/apache/kafka/clients/consumer/Consumer;Ljava/util/Set;)V", "idleCommitInterval", "Ljava/time/Duration;", "lastCommit", "Ljava/time/Instant;", "kotlin.jvm.PlatformType", "log", "Lorg/slf4j/Logger;", "getLog", "()Lorg/slf4j/Logger;", "maxWaitOnErrorInMillis", "", "name", "getName", "()Ljava/lang/String;", "pollErrorCount", "Ljava/util/concurrent/atomic/AtomicInteger;", "processErrorCount", "thread", "Lde/justsoftware/toolbox/concurrent/RepeatingThread;", "timeout", "waitOnErrorMillisStep", "commit", "", "destroy", "handleException", "msg", "throwable", "", "errorCounter", "onException", "e", "poll", "Lorg/apache/kafka/clients/consumer/ConsumerRecords;", "pollConsumer", "process", "records", "context", "Lde/justsoftware/chat/kafka/KafkaConsumerContext;", "process$chat_service", "runQueue", "queue", "Ljava/util/Queue;", "Ljava/lang/Runnable;", "sleepTime", "errorCount", "", "start", "subscribe", "chat-service"})
public abstract class AbstractKafkaConsumer<K, V> {
    @NotNull
    private final Consumer<K, V> consumer;
    @NotNull
    private final Set<String> topics;
    @NotNull
    private final AtomicInteger pollErrorCount;
    @NotNull
    private final AtomicInteger processErrorCount;
    private volatile Instant lastCommit;
    @Nullable
    private RepeatingThread thread;
    @NotNull
    private final Duration timeout;
    @NotNull
    private final Duration idleCommitInterval;
    private final long maxWaitOnErrorInMillis;
    private final long waitOnErrorMillisStep;

    public AbstractKafkaConsumer(@NotNull Consumer<K, V> consumer, @NotNull Set<String> topics) {
        Intrinsics.checkNotNullParameter(consumer, (String)"consumer");
        Intrinsics.checkNotNullParameter(topics, (String)"topics");
        this.consumer = consumer;
        this.topics = topics;
        this.pollErrorCount = new AtomicInteger(0);
        this.processErrorCount = new AtomicInteger(0);
        this.lastCommit = Instant.now();
        int $this$milliseconds$iv = 100;
        boolean $i$f$getMilliseconds = false;
        long $this$milliseconds$iv$iv = $this$milliseconds$iv;
        boolean $i$f$getMilliseconds2 = false;
        Duration duration = Duration.ofMillis($this$milliseconds$iv$iv);
        Intrinsics.checkNotNullExpressionValue((Object)duration, (String)"ofMillis(this)");
        this.timeout = duration;
        int $this$hours$iv = 1;
        boolean $i$f$getHours = false;
        long $this$hours$iv$iv = $this$hours$iv;
        boolean $i$f$getHours2 = false;
        Duration duration2 = Duration.ofHours($this$hours$iv$iv);
        Intrinsics.checkNotNullExpressionValue((Object)duration2, (String)"ofHours(this)");
        this.idleCommitInterval = duration2;
        int $this$minutes$iv = 5;
        boolean $i$f$getMinutes = false;
        long $this$minutes$iv$iv = $this$minutes$iv;
        boolean $i$f$getMinutes2 = false;
        Duration duration3 = Duration.ofMinutes($this$minutes$iv$iv);
        Intrinsics.checkNotNullExpressionValue((Object)duration3, (String)"ofMinutes(this)");
        this.maxWaitOnErrorInMillis = duration3.toMillis();
        int $this$seconds$iv = 1;
        boolean $i$f$getSeconds = false;
        long $this$seconds$iv$iv = $this$seconds$iv;
        boolean $i$f$getSeconds2 = false;
        Duration duration4 = Duration.ofSeconds($this$seconds$iv$iv);
        Intrinsics.checkNotNullExpressionValue((Object)duration4, (String)"ofSeconds(this)");
        this.waitOnErrorMillisStep = duration4.toMillis();
    }

    private final long sleepTime(int errorCount) {
        return Math.min((long)((double)this.waitOnErrorMillisStep * Math.pow(2.0, errorCount)), this.maxWaitOnErrorInMillis);
    }

    private final void onException(Throwable e) {
        this.getLog().error("error during polling:", e);
    }

    @PostConstruct
    public final void start() {
        if (!(this.thread == null)) {
            boolean bl = false;
            String string2 = "thread is already started";
            throw new IllegalStateException(string2.toString());
        }
        this.getLog().info("Start Kafka Consumer thread " + this.getName());
        this.subscribe();
        this.thread = new RepeatingThread(this.getName(), () -> AbstractKafkaConsumer.start$lambda-1(this)).onException(arg_0 -> this.onException(arg_0)).onFinish(() -> AbstractKafkaConsumer.start$lambda-2(this));
    }

    private final String getName() {
        return "KafkaConsumer-" + this.getClass().getSimpleName();
    }

    @PreDestroy
    public final void destroy() {
        if (this.thread != null) {
            this.getLog().info("shutting down thread " + this.getName());
            RepeatingThread repeatingThread = this.thread;
            if (repeatingThread == null) {
                String string2 = "Required value was null.";
                throw new IllegalStateException(string2.toString());
            }
            repeatingThread.cancel();
        }
    }

    private final void subscribe() {
        this.consumer.subscribe((Collection)this.topics);
    }

    private final void pollConsumer() {
        if (this.lastCommit.plus(this.idleCommitInterval).compareTo(Instant.now()) < 0) {
            this.getLog().debug("Idle commit for {}", (Object)this.topics);
            this.commit();
        }
        Queue afterCommitQueue = new LinkedList();
        ConsumerRecords records = this.poll();
        if (records.isEmpty()) {
            return;
        }
        try {
            this.process$chat_service(records, (KafkaConsumerContext)new /* Unavailable Anonymous Inner Class!! */);
            this.commit();
            this.processErrorCount.set(0);
        }
        catch (RuntimeException e) {
            this.handleException("Could not process events", (Throwable)e, this.processErrorCount);
        }
        this.runQueue(afterCommitQueue);
    }

    private final void runQueue(Queue<Runnable> queue) {
        while (!queue.isEmpty()) {
            Runnable callback = queue.remove();
            try {
                callback.run();
            }
            catch (RuntimeException e) {
                this.getLog().error("Could not process after commit callback", (Throwable)e);
            }
        }
    }

    private final ConsumerRecords<K, V> poll() {
        ConsumerRecords consumerRecords;
        ConsumerRecords p = null;
        try {
            ConsumerRecords consumerRecords2 = this.consumer.poll(this.timeout);
            Intrinsics.checkNotNullExpressionValue((Object)consumerRecords2, (String)"consumer.poll(timeout)");
            p = consumerRecords2;
            this.pollErrorCount.set(0);
            consumerRecords = p;
        }
        catch (InterruptException interrupt) {
            this.getLog().warn("Received interrupt exception. This is expected when stopping the service.");
            ConsumerRecords consumerRecords3 = ConsumerRecords.empty();
            Intrinsics.checkNotNullExpressionValue((Object)consumerRecords3, (String)"{\n            log.warn(\"\u2026Records.empty()\n        }");
            consumerRecords = consumerRecords3;
        }
        catch (RuntimeException e) {
            this.handleException("Could not poll kafka", (Throwable)e, this.pollErrorCount);
            ConsumerRecords consumerRecords4 = ConsumerRecords.empty();
            Intrinsics.checkNotNullExpressionValue((Object)consumerRecords4, (String)"{\n            handleExce\u2026Records.empty()\n        }");
            consumerRecords = consumerRecords4;
        }
        return consumerRecords;
    }

    private final void handleException(String msg, Throwable throwable, AtomicInteger errorCounter) {
        this.getLog().error(msg, throwable);
        this.consumer.unsubscribe();
        try {
            Thread.sleep(this.sleepTime(errorCounter.incrementAndGet()));
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        this.subscribe();
    }

    private final void commit() {
        this.lastCommit = Instant.now();
        this.consumer.commitSync();
    }

    @NotNull
    protected abstract Logger getLog();

    public abstract void process$chat_service(@NotNull ConsumerRecords<K, V> var1, @NotNull KafkaConsumerContext var2);

    private static final void start$lambda-1(AbstractKafkaConsumer this$0) {
        Intrinsics.checkNotNullParameter((Object)this$0, (String)"this$0");
        this$0.pollConsumer();
    }

    private static final void start$lambda-2(AbstractKafkaConsumer this$0) {
        Intrinsics.checkNotNullParameter((Object)this$0, (String)"this$0");
        this$0.consumer.wakeup();
    }
}

