/*
 * Decompiled with CFR 0.152.
 */
package de.justsoftware.permission.kafka;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import de.justsoftware.kafka.client.RepeatingThread;
import de.justsoftware.permission.MonitoredService;
import de.justsoftware.permission.business.offsets.ConsumerOffsetService;
import de.justsoftware.permission.kafka.AbstractKafkaConsumer;
import de.justsoftware.permission.kafka.KafkaConfigurationProvider;
import de.justsoftware.permission.persistence.AfterCommitCallback;
import de.justsoftware.permission.persistence.TransactionSupport;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.annotation.CheckForNull;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.ParametersAreNonnullByDefault;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InterruptException;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.ReadableDuration;
import org.slf4j.Logger;

/*
 * Exception performing whole class analysis ignored.
 */
@ParametersAreNonnullByDefault
public abstract class AbstractKafkaConsumer<K, V>
implements MonitoredService {
    protected static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
    private static final java.time.Duration POLL_TIMEOUT = java.time.Duration.ofMillis(100L);
    private static final Duration IDLE_COMMIT_INTERVALL = Duration.standardMinutes((long)1L);
    private static final long MAX_WAIT_ON_ERROR_MILLIS = TimeUnit.MINUTES.toMillis(5L);
    private static final long WAIT_ON_ERROR_MILLIS_STEP = TimeUnit.SECONDS.toMillis(1L);
    private final Supplier<Consumer<K, V>> _consumerSupplier;
    private Consumer<K, V> _consumer;
    private final AtomicInteger _pollErrorCount = new AtomicInteger(0);
    private final AtomicInteger _processErrorCount = new AtomicInteger(0);
    private volatile long _lastPoll = 0L;
    private final AtomicReference<Map<TopicPartition, Long>> _initialOffsets = new AtomicReference<ImmutableMap>(ImmutableMap.of());
    private final AtomicBoolean _initialized = new AtomicBoolean(false);
    private volatile DateTime _lastCommit = DateTime.now();
    private final String _topic;
    private RepeatingThread _thread;
    private final String _name;
    private final ConsumerOffsetService _consumerOffsetService;
    private final TransactionSupport _transactionSupport;
    private final KafkaConfigurationProvider _configurationProvider;

    public AbstractKafkaConsumer(Supplier<Consumer<K, V>> consumerSupplier, String topic, int threadIdentifier, ConsumerOffsetService consumerOffsetDAO, KafkaConfigurationProvider configurationProvider, TransactionSupport transactionSupport) {
        this._topic = topic;
        this._consumerSupplier = consumerSupplier;
        this._consumerOffsetService = consumerOffsetDAO;
        this._configurationProvider = configurationProvider;
        this._transactionSupport = transactionSupport;
        this._name = "KafkaConsumer-" + topic + "-" + threadIdentifier;
    }

    @PostConstruct
    public void start() {
        if (!this._configurationProvider.isKafkaEnabled()) {
            return;
        }
        if (this._thread != null) {
            throw new IllegalStateException("thread is already started");
        }
        this.initConsumer();
        if (this._consumer == null) {
            this.getLog().warn("not starting thread {} because consumer is not configured", (Object)this._name);
            return;
        }
        this._consumerOffsetService.addTopicRequiredToReadAtStartup(this._topic);
        this.getLog().info("starting thread {}", (Object)this._name);
        this._thread = new RepeatingThread(this._name, () -> this.pollConsumer()).onException(arg_0 -> this.onException(arg_0)).onFinish(() -> ((Consumer)this._consumer).wakeup()).setPriority(this.getPriority());
    }

    @VisibleForTesting
    void initConsumer() {
        this._consumer = (Consumer)this._consumerSupplier.get();
    }

    protected int getPriority() {
        return Thread.currentThread().getPriority();
    }

    protected void subscribe() {
        ImmutableSet topics = ImmutableSet.of((Object)this._topic);
        if (!topics.equals((Object)this._consumer.subscription())) {
            this._consumer.subscribe((Collection)topics);
        }
    }

    protected void seekToBeginning(ImmutableSet<TopicPartition> partitions) {
        this._consumer.seekToBeginning(partitions);
    }

    protected void seekToEnd(ImmutableSet<TopicPartition> partitions) {
        this._consumer.seekToEnd(partitions);
    }

    protected void noSeek(ImmutableSet<TopicPartition> partitions) {
    }

    protected void assign(java.util.function.Consumer<ImmutableSet<TopicPartition>> onReassignAction) {
        ImmutableSet topicPartitions = (ImmutableSet)this._consumer.partitionsFor(this._topic).stream().map(p -> new TopicPartition(this._topic, p.partition())).collect(ImmutableSet.toImmutableSet());
        if (!topicPartitions.equals((Object)this._consumer.assignment())) {
            this._consumer.assign((Collection)topicPartitions);
            onReassignAction.accept((ImmutableSet<TopicPartition>)topicPartitions);
            this.getLog().info("Successfully (re-)assigned partitions {}.", (Object)topicPartitions);
        }
    }

    protected void onException(Throwable e) {
        if (!(e instanceof InterruptedException) && !(e instanceof InterruptException)) {
            this.getLog().error("error during polling:", e);
        }
    }

    @PreDestroy
    public void destroy() {
        if (this._thread != null) {
            this.getLog().info("shutting down thread {}", (Object)this._name);
            this._thread.cancel();
        }
    }

    @VisibleForTesting
    public void pollConsumer() {
        ConsumerRecords p;
        try {
            this.subscribe();
        }
        catch (RuntimeException e) {
            this.getLog().error("could not subscribe!");
            this.handleException(this._processErrorCount);
            return;
        }
        if (this._lastCommit.plus((ReadableDuration)IDLE_COMMIT_INTERVALL).isBeforeNow()) {
            this.getLog().debug("Idle commit for {}", (Object)this._topic);
            this.commit();
        }
        if ((p = this.poll()).isEmpty()) {
            return;
        }
        try {
            AfterCommitCallback afterCommitCallback = (AfterCommitCallback)this._transactionSupport.doInTransaction(() -> {
                this.process(p);
                return this.storeConsumedOffsets(p);
            });
            this.commit();
            if (afterCommitCallback != null) {
                afterCommitCallback.afterCommit();
            }
            this._processErrorCount.set(0);
        }
        catch (StopProcessingException e) {
            this.getLog().error("Could not process events " + e.getMessage());
            this.handleException(this._processErrorCount);
        }
        catch (RuntimeException e) {
            this.getLog().error("Could not process events", (Throwable)e);
            this.handleException(this._processErrorCount);
        }
    }

    @Nonnull
    private AfterCommitCallback storeConsumedOffsets(ConsumerRecords<K, V> records) {
        ImmutableMap consumedOffsets = (ImmutableMap)records.partitions().stream().collect(ImmutableMap.toImmutableMap(TopicPartition::toString, partition -> ((ConsumerRecord)Iterables.getLast((Iterable)records.records(partition))).offset()));
        return this._consumerOffsetService.storeOffsets((Map)consumedOffsets);
    }

    @Nonnull
    private ConsumerRecords<K, V> poll() {
        try {
            ConsumerRecords p = this._consumer.poll(POLL_TIMEOUT);
            if (((Map)this._initialOffsets.get()).isEmpty()) {
                Set topicPartitions = this._consumer.partitionsFor(this._topic).stream().map(it -> new TopicPartition(this._topic, it.partition())).collect(Collectors.toSet());
                this._initialOffsets.set(this._consumer.endOffsets(topicPartitions));
                this.getLog().info("initialOffsets for {} - {}", (Object)this._topic, ((Map)this._initialOffsets.get()).values());
            }
            this._lastPoll = System.currentTimeMillis();
            this._pollErrorCount.set(0);
            return p;
        }
        catch (InterruptException e) {
            this.unsubscribe();
            throw e;
        }
        catch (RuntimeException e) {
            this.getLog().error("Could not poll kafka", (Throwable)e);
            this.handleException(this._pollErrorCount);
            return ConsumerRecords.empty();
        }
    }

    private void handleException(AtomicInteger errorCounter) {
        this.unsubscribe();
        try {
            Thread.sleep(AbstractKafkaConsumer.sleepTime((int)errorCounter.incrementAndGet()));
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    protected void unsubscribe() {
        this._consumer.unsubscribe();
    }

    private static long sleepTime(int errorCount) {
        long current = WAIT_ON_ERROR_MILLIS_STEP;
        for (int i = 1; i < errorCount; ++i) {
            if ((current += current) <= MAX_WAIT_ON_ERROR_MILLIS) continue;
            return MAX_WAIT_ON_ERROR_MILLIS;
        }
        return current;
    }

    protected void commit() {
        this._lastCommit = DateTime.now();
        this._consumer.commitSync();
        this.checkInitialOffsetsReached();
    }

    protected void checkInitialOffsetsReached() {
        if (!this._initialized.get()) {
            Set topicPartitions = this._consumer.partitionsFor(this._topic).stream().map(it -> new TopicPartition(this._topic, it.partition())).collect(Collectors.toSet());
            Map<TopicPartition, Long> currentOffsets = this._consumer.committed(topicPartitions).entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> ((OffsetAndMetadata)e.getValue()).offset()));
            this.checkInitialOffsetsReached(currentOffsets);
        }
    }

    protected void checkInitialOffsetsReached(Map<TopicPartition, Long> currentOffsets) {
        if (!this._initialized.get()) {
            boolean initalOffsetsRead = currentOffsets.entrySet().stream().allMatch(entry -> {
                Map map = (Map)this._initialOffsets.get();
                Long initalOffset = (Long)map.get(entry.getKey());
                return initalOffset != null && (Long)entry.getValue() >= initalOffset;
            });
            this.getLog().info("{} initialized: {}", (Object)this._topic, (Object)initalOffsetsRead);
            this._initialized.set(initalOffsetsRead);
            if (initalOffsetsRead) {
                this._consumerOffsetService.markTopicAsInitialized(this._topic);
            }
        }
    }

    @Nonnull
    protected abstract Logger getLog();

    protected abstract void process(ConsumerRecords<K, V> var1) throws StopProcessingException;

    @CheckForNull
    protected JsonNode parseJson(String jsonString) {
        try {
            return OBJECT_MAPPER.readTree(jsonString);
        }
        catch (IOException e) {
            this.getLog().error("Could not parse JSON: " + jsonString, (Throwable)e);
            return null;
        }
    }

    @CheckForNull
    protected static JsonNode getNode(@Nullable JsonNode json, @Nullable String key) {
        if (json == null) {
            return null;
        }
        return key != null ? json.get(key) : json;
    }

    protected static boolean parseBoolean(@Nullable JsonNode json, @Nullable String key, boolean defaultValue) {
        JsonNode node = AbstractKafkaConsumer.getNode((JsonNode)json, (String)key);
        return node != null ? node.asBoolean(defaultValue) : defaultValue;
    }

    @Nonnull
    protected static Optional<String> parseString(@Nullable JsonNode json, @Nullable String key) {
        JsonNode node = AbstractKafkaConsumer.getNode((JsonNode)json, (String)key);
        return node != null ? Optional.ofNullable(node.asText()) : Optional.empty();
    }

    @Nonnull
    static Iterable<JsonNode> parseArrayLike(@Nullable JsonNode json, @Nullable String key) {
        JsonNode node = AbstractKafkaConsumer.getNode((JsonNode)json, (String)key);
        if (node == null) {
            return ImmutableList.of();
        }
        if (node.isArray()) {
            return node;
        }
        return ImmutableList.of((Object)node);
    }

    public String getName() {
        return this._name;
    }

    public Optional<String> getError() {
        if (!this._initialized.get()) {
            return Optional.of("not completetly initialized yet");
        }
        if (this._lastPoll == 0L) {
            return Optional.of("no successful polling");
        }
        long sinceLastSuccessfulPoll = System.currentTimeMillis() - this._lastPoll;
        if (sinceLastSuccessfulPoll > TimeUnit.MINUTES.toMillis(2L)) {
            long seconds = TimeUnit.MILLISECONDS.toSeconds(sinceLastSuccessfulPoll);
            return Optional.of("last successful poll was " + seconds + "s ago");
        }
        int processErrorCount = this._processErrorCount.get();
        if (processErrorCount > 1) {
            return Optional.of("process errors: " + processErrorCount);
        }
        return Optional.empty();
    }
}

