/*
 * Decompiled with CFR 0.152.
 */
package de.justsoftware.judo.kafka.conversion;

import de.justsoftware.judo.kafka.conversion.ConversionTaskProcessor;
import de.justsoftware.judo.kafka.conversion.ConversionTaskStreamProcessor;
import de.justsoftware.judo.services.converter.ConverterService;
import de.justsoftware.judoclient.conversiontask.ConversionTaskKafkaSerde;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import javax.annotation.ParametersAreNonnullByDefault;
import javax.annotation.PreDestroy;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.WallclockTimestampExtractor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.actuate.health.AbstractHealthIndicator;
import org.springframework.boot.actuate.health.Health;
import org.springframework.context.annotation.DependsOn;
import org.springframework.stereotype.Component;

/*
 * Exception performing whole class analysis ignored.
 */
@Component
@ParametersAreNonnullByDefault
@DependsOn(value={"conversionTaskTopic"})
public class ConversionTaskStreamProcessor
extends AbstractHealthIndicator
implements KafkaStreams.StateListener {
    private static final Logger LOG = LoggerFactory.getLogger(ConversionTaskStreamProcessor.class);
    private static final Duration KAFKA_STREAMS_CLOSE_TIMEOUT = Duration.ofSeconds(10L);
    private static final int POLL_TIMEOUT_IN_MS = Math.toIntExact(TimeUnit.of(ChronoUnit.MINUTES).toMillis(35L));
    private final KafkaStreams _streams;

    @Autowired
    public ConversionTaskStreamProcessor(@Value(value="${kafka.bootstrapServers}") String bootstrapServers, @Value(value="${kafka.stateDir}") String stateDir, @Value(value="${kafka.threads}") String threads, ConverterService converterService) {
        this._streams = new KafkaStreams(ConversionTaskStreamProcessor.createTopology((ConverterService)converterService), ConversionTaskStreamProcessor.createConfig((String)bootstrapServers, (String)stateDir, (String)threads));
        this._streams.setStateListener((KafkaStreams.StateListener)this);
        this._streams.start();
    }

    @Nonnull
    private static Properties createConfig(String bootstrapServers, String stateDir, String threads) {
        Properties config = new Properties();
        config.put("application.id", "just.documents.ConversionTaskProcessorV2");
        config.put("bootstrap.servers", bootstrapServers);
        config.put("default.timestamp.extractor", LogAndSkipOnInvalidTimestamp.class);
        config.put("default.deserialization.exception.handler", LogAndContinueExceptionHandler.class);
        config.put("num.stream.threads", threads);
        config.put("state.dir", stateDir);
        config.put("max.poll.interval.ms", (Object)POLL_TIMEOUT_IN_MS);
        return config;
    }

    @Nonnull
    private static Topology createTopology(ConverterService converterService) {
        StreamsBuilder builder = new StreamsBuilder();
        builder.stream("just.documents.conversion.taskv2", Consumed.with((Serde)Serdes.String(), (Serde)ConversionTaskKafkaSerde.getInstance(), (TimestampExtractor)new WallclockTimestampExtractor(), (Topology.AutoOffsetReset)Topology.AutoOffsetReset.EARLIEST)).filter((key, value) -> key.startsWith("CONVERSION,")).process(() -> new ConversionTaskProcessor(converterService), new String[0]);
        return builder.build();
    }

    @Nonnull
    public KafkaStreams.State getState() {
        return this._streams.state();
    }

    protected void doHealthCheck(Health.Builder builder) {
        builder.withDetail("KafkaStreams.State", (Object)this.getState());
        if (this.getState().isRunningOrRebalancing()) {
            builder.up();
        } else {
            builder.down();
        }
    }

    @PreDestroy
    public void close() {
        LOG.info("Stopping KafkaStreams");
        if (this._streams.state() == KafkaStreams.State.CREATED) {
            this._streams.setStateListener(null);
        }
        this._streams.close();
    }

    public void onChange(KafkaStreams.State newState, KafkaStreams.State oldState) {
        LOG.info("state change from {} to {}", (Object)oldState, (Object)newState);
        switch (1.$SwitchMap$org$apache$kafka$streams$KafkaStreams$State[newState.ordinal()]) {
            case 1: {
                LOG.warn("KafkaStreams switched into ERROR state. Stopping ...");
                this._streams.close(KAFKA_STREAMS_CLOSE_TIMEOUT);
                return;
            }
            case 2: {
                LOG.warn("KafkaStreams stopped. Restarting ...");
                this._streams.start();
                return;
            }
        }
    }
}

