/*
 * Decompiled with CFR 0.152.
 */
package de.justsoftware.people.business;

import de.justsoftware.people.business.AbstractProcessorServiceImpl;
import java.time.Duration;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import javax.annotation.ParametersAreNonnullByDefault;
import javax.annotation.PreDestroy;
import javax.servlet.ServletContextListener;
import org.apache.kafka.streams.KafkaStreams;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;

@ParametersAreNonnullByDefault
public abstract class AbstractProcessorServiceImpl
implements KafkaStreams.StateListener,
ServletContextListener {
    private static final int MAX_RETRIES = 11;
    private static final long MAX_BACKOFF_SECONDS = TimeUnit.HOURS.toSeconds(1L);
    private static final int KAFKA_STREAMS_CLOSE_TIMEOUT_SECONDS = 10;
    protected KafkaStreams _kafkaStreams;
    private final Logger _log = LoggerFactory.getLogger(this.getClass());
    private final Supplier<KafkaStreams> _kafkaStreamsSupplier;
    private final ScheduledExecutorService _executorService;
    private int _retries = 0;

    public AbstractProcessorServiceImpl(Supplier<KafkaStreams> kafkaStreamsSupplier) {
        this._kafkaStreamsSupplier = kafkaStreamsSupplier;
        this._executorService = Executors.newSingleThreadScheduledExecutor();
    }

    @EventListener(value={ApplicationReadyEvent.class})
    public void start() {
        if (!this.preConditionsMet()) {
            long backoff = this.getBackoffSeconds();
            this._log.info("Preconditions not met. Waiting for {} seconds", (Object)backoff);
            this._executorService.schedule(() -> this.start(), backoff, TimeUnit.SECONDS);
            ++this._retries;
            return;
        }
        this._log.info("Starting KafkaStreams");
        this._kafkaStreams = (KafkaStreams)this._kafkaStreamsSupplier.get();
        this._kafkaStreams.setStateListener((KafkaStreams.StateListener)this);
        this._kafkaStreams.start();
    }

    @PreDestroy
    public void close() {
        this._log.info("Stopping KafkaStreams");
        if (this._kafkaStreams != null) {
            if (this._kafkaStreams.state() == KafkaStreams.State.CREATED) {
                this._kafkaStreams.setStateListener(null);
            }
            this._kafkaStreams.close();
            this._kafkaStreams = null;
        }
        this._executorService.shutdown();
    }

    public void onChange(KafkaStreams.State newState, KafkaStreams.State oldState) {
        this._log.info("state change from {} to {}", (Object)oldState, (Object)newState);
        if (this._kafkaStreams == null) {
            return;
        }
        switch (1.$SwitchMap$org$apache$kafka$streams$KafkaStreams$State[newState.ordinal()]) {
            case 1: {
                this._log.warn("KafkaStreams switched into ERROR state. Stopping ...");
                this._kafkaStreams.close(Duration.ofSeconds(10L));
                return;
            }
            case 2: {
                this._log.warn("KafkaStreams stopped. Restarting ...");
                this.start();
                return;
            }
        }
    }

    public boolean preConditionsMet() {
        return true;
    }

    private long getBackoffSeconds() {
        if (this._retries > 11) {
            return MAX_BACKOFF_SECONDS;
        }
        return (long)Math.pow(2.0, this._retries);
    }
}

