/*
 * Decompiled with CFR 0.152.
 */
package de.justsoftware.onx.kafka;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import de.justsoftware.onx.common.shared.model.PersonId;
import de.justsoftware.onx.common.shared.server.TransactionHelper;
import de.justsoftware.onx.drive.business.model.DriveChangePurpose;
import de.justsoftware.onx.kafka.KafkaConsumerContext;
import de.justsoftware.onx.kafka.metrics.ConsumerMetrics;
import de.justsoftware.toolbox.concurrent.RepeatingThread;
import java.io.IOException;
import java.util.Collection;
import java.util.LinkedList;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.CheckForNull;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.ParametersAreNonnullByDefault;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.errors.InterruptException;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.ReadableDuration;
import org.joda.time.format.DateTimeFormatter;
import org.joda.time.format.ISODateTimeFormat;
import org.slf4j.Logger;

@ParametersAreNonnullByDefault
public abstract class AbstractKafkaConsumer<K, V> {
    private static final java.time.Duration TIMEOUT = java.time.Duration.ofMillis(100L);
    private static final Duration IDLE_COMMIT_INTERVALL = Duration.standardHours((long)1L);
    private static final long MAX_WAIT_ON_ERROR_MILLIS = TimeUnit.MINUTES.toMillis(5L);
    private static final long WAIT_ON_ERROR_MILLIS_STEP = TimeUnit.SECONDS.toMillis(1L);
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
    private static final DateTimeFormatter ISO_DATETIME_PARSER = ISODateTimeFormat.dateTimeParser().withOffsetParsed();
    private final Consumer<K, V> _consumer;
    private final AtomicInteger _pollErrorCount = new AtomicInteger(0);
    private final AtomicInteger _processErrorCount = new AtomicInteger(0);
    private final AtomicBoolean _paused = new AtomicBoolean(false);
    private final AtomicBoolean _commitAll = new AtomicBoolean(false);
    private final TransactionHelper _transactionHelper;
    private final ImmutableSet<String> _topics;
    private volatile DateTime _lastCommit = DateTime.now();
    private RepeatingThread _thread;

    public AbstractKafkaConsumer(@Nullable Consumer<K, V> consumer, TransactionHelper transactionHelper, String ... topics) {
        this(consumer, transactionHelper, (ImmutableSet<String>)ImmutableSet.copyOf((Object[])topics));
    }

    public AbstractKafkaConsumer(@Nullable Consumer<K, V> consumer, TransactionHelper transactionHelper, ImmutableSet<String> topics) {
        this._consumer = consumer;
        this._transactionHelper = transactionHelper;
        this._topics = topics;
    }

    protected void onException(Throwable e) {
        this.getLog().error("error during polling:", e);
    }

    @PostConstruct
    public void start() {
        if (this._thread != null) {
            throw new IllegalStateException("thread is already started");
        }
        if (this._consumer == null) {
            this.getLog().warn("not starting thread {} because consumer is not configured", (Object)this.getName());
            return;
        }
        this.subscribe();
        this._thread = new RepeatingThread(this.getName(), this::poll).onException(this::onException).onFinish(() -> this._consumer.wakeup());
    }

    public void commitAll() {
        this._commitAll.set(true);
    }

    public void pause() {
        this._paused.set(true);
    }

    public void resume() {
        this._paused.set(false);
    }

    @Nonnull
    private String getName() {
        return "KafkaConsumer-" + this.getClass().getSimpleName();
    }

    @PreDestroy
    public void destroy() {
        if (this._thread != null) {
            this.getLog().info("shutting down thread {}", (Object)this.getName());
            this._thread.cancel();
        }
    }

    @Nonnull
    public ConsumerMetrics getMetrics() {
        return new ConsumerMetrics(this._consumer.metrics());
    }

    @VisibleForTesting
    public final void poll() {
        Set partitions = this._consumer.assignment();
        if (this._paused.get()) {
            this._consumer.pause((Collection)partitions);
        } else {
            this._consumer.resume((Collection)partitions);
        }
        if (this._commitAll.getAndSet(false)) {
            this._consumer.seekToEnd((Collection)partitions);
            this._consumer.poll(TIMEOUT);
            this._consumer.commitSync();
        }
        if (this._lastCommit.plus((ReadableDuration)IDLE_COMMIT_INTERVALL).isBeforeNow()) {
            this.getLog().debug("Idle commit for {}", this._topics);
            this.commit();
        }
        ConsumerRecords<K, V> records = this.pollConsumer();
        this.processRecords(records);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void processRecords(ConsumerRecords<K, V> records) {
        if (records.isEmpty()) {
            return;
        }
        LinkedList<Runnable> afterCommit = new LinkedList<Runnable>();
        boolean[] kafkaCommitted = new boolean[]{false};
        boolean databaseCommitted = false;
        try {
            try {
                this._transactionHelper.doInTransactionWithoutResult(status -> {
                    this.process(records, r -> afterCommit.add((Runnable)Preconditions.checkNotNull((Object)r)));
                    this.commit();
                    kafkaCommitted[0] = true;
                });
                databaseCommitted = true;
            }
            finally {
                if (!databaseCommitted && kafkaCommitted[0]) {
                    this.revertCommit(records);
                }
            }
            this._processErrorCount.set(0);
        }
        catch (RuntimeException e) {
            this.handleException("Could not process events", e, this._processErrorCount);
        }
        if (databaseCommitted && kafkaCommitted[0]) {
            this.runQueue(afterCommit);
        }
    }

    private void subscribe() {
        this._consumer.subscribe(this._topics);
    }

    private void revertCommit(ConsumerRecords<K, V> records) {
        ImmutableMap.Builder offsetsBuilder = ImmutableMap.builder();
        records.partitions().forEach(tp -> records.records(tp).stream().findFirst().ifPresent(r -> offsetsBuilder.put(tp, (Object)new OffsetAndMetadata(r.offset()))));
        ImmutableMap offsets = offsetsBuilder.build();
        try {
            this._consumer.commitSync((Map)offsets);
        }
        catch (RuntimeException e) {
            this.getLog().error("unable to revert offsets" + offsets, (Throwable)e);
        }
    }

    @Nonnull
    private ConsumerRecords<K, V> pollConsumer() {
        try {
            ConsumerRecords p = this._consumer.poll(TIMEOUT);
            this._pollErrorCount.set(0);
            return p;
        }
        catch (InterruptException interrupt) {
            this.getLog().warn("Received interrupt exception. This is expected when stopping the service.");
            return ConsumerRecords.empty();
        }
        catch (RuntimeException e) {
            this.handleException("Could not poll kafka", e, this._pollErrorCount);
            return ConsumerRecords.empty();
        }
    }

    private void handleException(String msg, Throwable throwable, AtomicInteger errorCounter) {
        this.getLog().error(msg, throwable);
        this._consumer.unsubscribe();
        try {
            Thread.sleep(AbstractKafkaConsumer.sleepTime(errorCounter.incrementAndGet()));
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        this.subscribe();
    }

    private static long sleepTime(int errorCount) {
        long current = WAIT_ON_ERROR_MILLIS_STEP;
        for (int i = 1; i < errorCount; ++i) {
            if ((current += current) <= MAX_WAIT_ON_ERROR_MILLIS) continue;
            return MAX_WAIT_ON_ERROR_MILLIS;
        }
        return current;
    }

    private void runQueue(Queue<Runnable> queue) {
        while (!queue.isEmpty()) {
            Runnable callback = queue.remove();
            try {
                callback.run();
            }
            catch (RuntimeException e) {
                this.getLog().error("Could not process after commit callback", (Throwable)e);
            }
        }
    }

    private void commit() {
        this._lastCommit = DateTime.now();
        this._consumer.commitSync();
    }

    @CheckForNull
    protected JsonNode parseJson(String json) {
        try {
            return OBJECT_MAPPER.readTree(json);
        }
        catch (IOException e) {
            this.getLog().error("Could not parse JSON: " + json, (Throwable)e);
            return null;
        }
    }

    @CheckForNull
    protected static JsonNode getNode(@Nullable JsonNode json, @Nullable String key) {
        if (json == null) {
            return null;
        }
        return key != null ? json.get(key) : json;
    }

    protected static boolean parseBoolean(@Nullable JsonNode json, @Nullable String key, boolean defaultValue) {
        JsonNode node = AbstractKafkaConsumer.getNode(json, key);
        return node != null ? node.asBoolean(defaultValue) : defaultValue;
    }

    @Nonnull
    protected static com.google.common.base.Optional<Boolean> parseBoolean(@Nullable JsonNode json, @Nullable String key) {
        JsonNode node = AbstractKafkaConsumer.getNode(json, key);
        if (node == null) {
            return com.google.common.base.Optional.absent();
        }
        return com.google.common.base.Optional.of((Object)node.asBoolean());
    }

    protected static long parseLong(@Nullable JsonNode json, @Nullable String key, long defaultValue) {
        JsonNode node = AbstractKafkaConsumer.getNode(json, key);
        return node != null ? node.asLong(defaultValue) : defaultValue;
    }

    protected static int parseInt(@Nullable JsonNode json, @Nullable String key, int defaultValue) {
        JsonNode node = AbstractKafkaConsumer.getNode(json, key);
        return node != null ? node.asInt(defaultValue) : defaultValue;
    }

    @Nonnull
    protected static com.google.common.base.Optional<String> parseString(@Nullable JsonNode json, @Nullable String key) {
        JsonNode node = AbstractKafkaConsumer.getNode(json, key);
        return node != null ? com.google.common.base.Optional.fromNullable((Object)node.asText()) : com.google.common.base.Optional.absent();
    }

    @Nonnull
    protected static com.google.common.base.Optional<DriveChangePurpose> parseChangePurpose(@Nullable JsonNode json, @Nullable String key) {
        return AbstractKafkaConsumer.parseString(json, key).transform(DriveChangePurpose::new);
    }

    @Nonnull
    protected static com.google.common.base.Optional<DateTime> parseDate(@Nullable JsonNode json, @Nullable String key) {
        return AbstractKafkaConsumer.parseString(json, key).transform(arg_0 -> ((DateTimeFormatter)ISO_DATETIME_PARSER).parseDateTime(arg_0));
    }

    @Nonnull
    protected static Optional<PersonId> parsePerson(JsonNode parsedJson, String key) {
        return AbstractKafkaConsumer.parseString(parsedJson, key).toJavaUtil().map(PersonId::parse);
    }

    @Nonnull
    protected abstract Logger getLog();

    protected abstract void process(ConsumerRecords<K, V> var1, KafkaConsumerContext var2);
}

