/*
 * Decompiled with CFR 0.152.
 */
package de.justsoftware.drive.business.kafka;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import de.justsoftware.drive.common.item.model.ItemId;
import de.justsoftware.drive.common.tenant.model.TenantId;
import de.justsoftware.drive.persistence.transaction.TransactionSupport;
import de.justsoftware.toolbox.concurrent.RepeatingThread;
import java.io.IOException;
import java.util.Collection;
import java.util.LinkedList;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.CheckForNull;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.ParametersAreNonnullByDefault;
import javax.annotation.PreDestroy;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.errors.InterruptException;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.ReadableDuration;
import org.joda.time.format.DateTimeFormatter;
import org.joda.time.format.ISODateTimeFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;

/*
 * Exception performing whole class analysis ignored.
 */
@ParametersAreNonnullByDefault
public abstract class AbstractKafkaConsumer<K, V>
implements InitializingBean {
    protected static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
    private static final java.time.Duration POLL_TIMEOUT = java.time.Duration.ofMillis(100L);
    private static final Duration IDLE_COMMIT_INTERVALL = Duration.standardHours((long)1L);
    private static final long MAX_WAIT_ON_ERROR_MILLIS = TimeUnit.MINUTES.toMillis(5L);
    private static final long WAIT_ON_ERROR_MILLIS_STEP = TimeUnit.SECONDS.toMillis(1L);
    private static final DateTimeFormatter ISO_DATETIME_PARSER = ISODateTimeFormat.dateTimeParser().withOffsetParsed();
    private final Logger _logger = LoggerFactory.getLogger(this.getClass());
    private final Consumer<K, V> _consumer;
    private final TransactionSupport _transactionSupport;
    private final AtomicInteger _pollErrorCount = new AtomicInteger(0);
    private final AtomicInteger _processErrorCount = new AtomicInteger(0);
    private volatile DateTime _lastCommit = DateTime.now();
    private final String _topic;
    private RepeatingThread _thread;
    private final String _name;

    public AbstractKafkaConsumer(@Nullable Consumer<K, V> consumer, TransactionSupport transactionSupport, String topic) {
        this._topic = topic;
        this._consumer = consumer;
        this._transactionSupport = transactionSupport;
        this._name = "KafkaConsumer-" + topic;
    }

    public void afterPropertiesSet() {
        if (this._thread != null) {
            throw new IllegalStateException("thread is already started");
        }
        if (this._consumer == null) {
            this.getLog().warn("not starting thread {} because consumer is not configured", (Object)this._name);
            return;
        }
        this.subscribe();
        this._thread = new RepeatingThread(this._name, () -> this.pollConsumer()).onException(arg_0 -> this.onException(arg_0)).onFinish(() -> ((Consumer)this._consumer).wakeup());
    }

    private void subscribe() {
        this._consumer.subscribe((Collection)ImmutableList.of((Object)this._topic));
    }

    protected void onException(Throwable e) {
        this.getLog().error("error during polling:", e);
    }

    @PreDestroy
    public void destroy() {
        if (this._thread != null) {
            this.getLog().info("shutting down thread {}", (Object)this._name);
            this._thread.cancel();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    public final void pollConsumer() {
        ConsumerRecords records;
        if (this._lastCommit.plus((ReadableDuration)IDLE_COMMIT_INTERVALL).isBeforeNow()) {
            this.getLog().debug("Idle commit for {}", (Object)this._topic);
            this.commit();
        }
        if ((records = this.poll()).isEmpty()) {
            return;
        }
        LinkedList afterCommit = new LinkedList();
        boolean[] kafkaCommitted = new boolean[]{false};
        boolean databaseCommitted = false;
        try {
            try {
                this._transactionSupport.doInTransaction(() -> {
                    this.process(records, r -> afterCommit.add((Runnable)Preconditions.checkNotNull((Object)r)));
                    this._transactionSupport.flushBatchStatements();
                    this.commit();
                    kafkaCommitted[0] = true;
                });
                databaseCommitted = true;
            }
            finally {
                if (!databaseCommitted && kafkaCommitted[0]) {
                    this.revertCommit(records);
                }
            }
            this._processErrorCount.set(0);
        }
        catch (RuntimeException e) {
            this.handleException("Could not process events", (Throwable)e, this._processErrorCount);
        }
        if (databaseCommitted && kafkaCommitted[0]) {
            this.runQueue(afterCommit);
        }
    }

    private void revertCommit(ConsumerRecords<K, V> records) {
        ImmutableMap.Builder offsetsBuilder = ImmutableMap.builder();
        records.partitions().forEach(tp -> records.records(tp).stream().findFirst().ifPresent(r -> offsetsBuilder.put(tp, (Object)new OffsetAndMetadata(r.offset()))));
        ImmutableMap offsets = offsetsBuilder.build();
        try {
            this._consumer.commitSync((Map)offsets);
        }
        catch (RuntimeException e) {
            this.getLog().error("unable to revert offsets" + offsets, (Throwable)e);
        }
    }

    @Nonnull
    private ConsumerRecords<K, V> poll() {
        try {
            ConsumerRecords p = this._consumer.poll(POLL_TIMEOUT);
            this._pollErrorCount.set(0);
            return p;
        }
        catch (InterruptException interrupt) {
            this.getLog().warn("Received interrupt exception. This is expected when stopping the service.");
            return ConsumerRecords.empty();
        }
        catch (RuntimeException e) {
            this.handleException("Could not poll kafka", (Throwable)e, this._pollErrorCount);
            return ConsumerRecords.empty();
        }
    }

    private void handleException(String msg, Throwable throwable, AtomicInteger errorCounter) {
        this.getLog().error(msg, throwable);
        this._consumer.unsubscribe();
        try {
            Thread.sleep(AbstractKafkaConsumer.sleepTime((int)errorCounter.incrementAndGet()));
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        this.subscribe();
    }

    private static long sleepTime(int errorCount) {
        long current = WAIT_ON_ERROR_MILLIS_STEP;
        for (int i = 1; i < errorCount; ++i) {
            if ((current += current) <= MAX_WAIT_ON_ERROR_MILLIS) continue;
            return MAX_WAIT_ON_ERROR_MILLIS;
        }
        return current;
    }

    private void runQueue(Queue<Runnable> queue) {
        queue.stream().filter(Objects::nonNull).forEach(callback -> {
            try {
                callback.run();
            }
            catch (RuntimeException e) {
                this.getLog().error("Could not process after commit callback", (Throwable)e);
            }
        });
    }

    private void commit() {
        this._lastCommit = DateTime.now();
        this._consumer.commitSync();
    }

    @CheckForNull
    protected JsonNode parseJson(String jsonString) {
        try {
            return OBJECT_MAPPER.readTree(jsonString);
        }
        catch (IOException e) {
            this.getLog().error("Could not parse JSON: " + jsonString, (Throwable)e);
            return null;
        }
    }

    @CheckForNull
    protected static JsonNode getNode(@Nullable JsonNode json, @Nullable String key) {
        if (json == null) {
            return null;
        }
        return key != null ? json.get(key) : json;
    }

    protected static boolean parseBoolean(@Nullable JsonNode json, @Nullable String key, boolean defaultValue) {
        JsonNode node = AbstractKafkaConsumer.getNode((JsonNode)json, (String)key);
        return node != null ? node.asBoolean(defaultValue) : defaultValue;
    }

    @Nonnull
    protected static com.google.common.base.Optional<String> parseString(@Nullable JsonNode json, @Nullable String key) {
        JsonNode node = AbstractKafkaConsumer.getNode((JsonNode)json, (String)key);
        return node != null ? com.google.common.base.Optional.fromNullable((Object)node.asText()) : com.google.common.base.Optional.absent();
    }

    @Nonnull
    protected static com.google.common.base.Optional<DateTime> parseDate(@Nullable JsonNode json, @Nullable String key) {
        return AbstractKafkaConsumer.parseString((JsonNode)json, (String)key).transform(arg_0 -> ((DateTimeFormatter)ISO_DATETIME_PARSER).parseDateTime(arg_0));
    }

    @Nonnull
    protected static com.google.common.base.Optional<ItemId> parseItemId(@Nullable JsonNode json, @Nullable String key) {
        return com.google.common.base.Optional.fromNullable((Object)ItemId.valueOf((String)((String)AbstractKafkaConsumer.parseString((JsonNode)json, (String)key).orNull())));
    }

    @Nonnull
    protected static Optional<TenantId> parseTenantId(@Nullable JsonNode json, String key) {
        return TenantId.parse((String)((String)AbstractKafkaConsumer.parseString((JsonNode)json, (String)key).orNull()));
    }

    @Nonnull
    protected Logger getLog() {
        return this._logger;
    }

    protected abstract void process(ConsumerRecords<K, V> var1, java.util.function.Consumer<Runnable> var2);
}

