/*
 * Decompiled with CFR 0.152.
 */
package de.justsoftware.authentication.application;

import de.justsoftware.authentication.application.KafkaStreamsService;
import de.justsoftware.authentication.model.kafka.KafkaPerson;
import de.justsoftware.authentication.model.kafka.KafkaPersonExternalId;
import de.justsoftware.authentication.service.KafkaPersonService;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import javax.annotation.ParametersAreNonnullByDefault;
import javax.annotation.PreDestroy;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.actuate.health.AbstractHealthIndicator;
import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.Status;
import org.springframework.kafka.support.serializer.JsonSerde;
import org.springframework.stereotype.Service;

@Service
@ParametersAreNonnullByDefault
public class KafkaStreamsService
extends AbstractHealthIndicator
implements KafkaStreams.StateListener {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaStreamsService.class);
    private static final Serde<KafkaPerson> PROFILE_SERDE = new JsonSerde(KafkaPerson.class);
    private static final Serde<KafkaPersonExternalId> PROFILE_EXTERNAL_ID_SERDE = new JsonSerde(KafkaPersonExternalId.class);
    private static final int KAFKA_STREAMS_CLOSE_TIMEOUT_SECONDS = 10;
    private final KafkaPersonService _kafkaPersonService;
    private final KafkaStreams _kafkaStreams;
    private boolean _shuttingDown = false;

    public KafkaStreamsService(@Value(value="${kafka.bootstrapServers}") String kafkaBootstrapServers, @Value(value="${kafka.streams.stateDir:/tmp/kafka-streams}") String kafkaStreamsStateDir, @Value(value="${sso.useEmailForUserLookup}") boolean useEmailForUserLookup, KafkaPersonService kafkaPersonService) {
        this._kafkaPersonService = kafkaPersonService;
        this._kafkaStreams = this.initKafkaStreams(kafkaBootstrapServers, kafkaStreamsStateDir, useEmailForUserLookup);
    }

    @Nonnull
    private KafkaStreams initKafkaStreams(String kafkaBootstrapServers, String kafkaStreamsStateDir, boolean useEmailForUserLookup) {
        Properties config = new Properties();
        config.put("application.id", "just-authentication-" + UUID.randomUUID().toString());
        config.put("bootstrap.servers", kafkaBootstrapServers);
        config.put("default.key.serde", Serdes.String().getClass());
        config.put("default.value.serde", Serdes.String().getClass());
        config.put("default.timestamp.extractor", LogAndSkipOnInvalidTimestamp.class);
        config.put("state.dir", kafkaStreamsStateDir);
        config.put("cache.max.bytes.buffering", (Object)0);
        config.put("default.deserialization.exception.handler", LogAndContinueExceptionHandler.class);
        StreamsBuilder builder = new StreamsBuilder();
        builder.stream("just.persons", Consumed.with((Serde)Serdes.String(), (Serde)PROFILE_SERDE)).process(() -> this._kafkaPersonService.personProcessor(), new String[0]);
        if (!useEmailForUserLookup) {
            builder.stream("just.persons.externalId", Consumed.with((Serde)Serdes.String(), (Serde)PROFILE_EXTERNAL_ID_SERDE)).process(() -> this._kafkaPersonService.externalIdProcessor(), new String[0]);
        }
        KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), config);
        kafkaStreams.setStateListener((KafkaStreams.StateListener)this);
        kafkaStreams.start();
        return kafkaStreams;
    }

    @PreDestroy
    public void close() {
        LOG.info("Stopping KafkaStreams");
        this._shuttingDown = true;
        if (this._kafkaStreams.state() == KafkaStreams.State.CREATED) {
            this._kafkaStreams.setStateListener(null);
        }
        this._kafkaStreams.close();
    }

    public void onChange(KafkaStreams.State newState, KafkaStreams.State oldState) {
        LOG.info("state change from {} to {}", (Object)oldState, (Object)newState);
        switch (1.$SwitchMap$org$apache$kafka$streams$KafkaStreams$State[newState.ordinal()]) {
            case 1: {
                this._kafkaStreams.close(10L, TimeUnit.SECONDS);
                return;
            }
            case 2: {
                if (!this._shuttingDown) {
                    this._kafkaStreams.start();
                }
                return;
            }
        }
    }

    protected void doHealthCheck(Health.Builder builder) {
        Status status = this._kafkaStreams.state().isRunning() ? Status.UP : Status.DOWN;
        builder.withDetail("KafkaStream.State", (Object)this._kafkaStreams.state()).status(status).build();
    }
}

