/*
 * Decompiled with CFR 0.152.
 */
package de.justsoftware.people.business.profile.impl;

import de.justsoftware.people.business.AbstractProcessorServiceImpl;
import de.justsoftware.people.business.profile.impl.ProfileAttributeConfigProcessor;
import de.justsoftware.people.business.profile.model.ProfileAttributeConfigKafkaModel;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import javax.annotation.ParametersAreNonnullByDefault;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.WallclockTimestampExtractor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Profile;
import org.springframework.kafka.support.serializer.JsonSerde;
import org.springframework.stereotype.Service;

/*
 * Exception performing whole class analysis ignored.
 */
@Service
@ParametersAreNonnullByDefault
@Profile(value={"!test"})
public class ProfileAttributeConfigProcessorServiceImpl
extends AbstractProcessorServiceImpl {
    private static final Long COMMIT_INTERVAL = TimeUnit.HOURS.toMillis(1L);

    @Autowired
    ProfileAttributeConfigProcessorServiceImpl(@Value(value="${kafka.bootstrapServers}") String kafkaBootstrapServers, @Value(value="${kafka.streams.stateDir}") String stateDir, ProfileAttributeConfigProcessor processor) {
        super(() -> ProfileAttributeConfigProcessorServiceImpl.getKafkaStreams((String)kafkaBootstrapServers, (String)stateDir, (ProfileAttributeConfigProcessor)processor));
    }

    @Nonnull
    static KafkaStreams getKafkaStreams(String kafkaBootstrapServers, String stateDir, ProfileAttributeConfigProcessor processor) {
        Properties config = new Properties();
        config.put("application.id", "just.people.profileAttributConfigProcessor" + UUID.randomUUID().toString());
        config.put("default.key.serde", Serdes.String().getClass());
        config.put("bootstrap.servers", kafkaBootstrapServers);
        config.put("default.timestamp.extractor", LogAndSkipOnInvalidTimestamp.class);
        config.put("commit.interval.ms", COMMIT_INTERVAL);
        config.put("cache.max.bytes.buffering", (Object)0);
        config.put("default.deserialization.exception.handler", LogAndContinueExceptionHandler.class);
        config.put("state.dir", stateDir);
        StreamsBuilder builder = new StreamsBuilder();
        KStream configStream = builder.stream("just.persons.profileAttributesConfig", Consumed.with((Serde)Serdes.String(), (Serde)new JsonSerde(ProfileAttributeConfigKafkaModel[].class), (TimestampExtractor)new WallclockTimestampExtractor(), (Topology.AutoOffsetReset)Topology.AutoOffsetReset.EARLIEST));
        configStream.process(() -> processor, new String[0]);
        return new KafkaStreams(builder.build(), config);
    }
}

