/*
 * Decompiled with CFR 0.152.
 */
package de.justsoftware.people.business.profile.impl;

import de.justsoftware.people.business.profile.impl.ProfileKafkaStreamsSupplier;
import de.justsoftware.people.business.profile.impl.ProfileProcessor;
import de.justsoftware.people.business.profile.model.DeletableProfile;
import de.justsoftware.people.business.profile.model.ProfileKafkaModel;
import de.justsoftware.people.domain.model.TenantId;
import de.justsoftware.people.domain.model.profile.DynamicProfileFields;
import de.justsoftware.people.domain.model.profile.StaticProfileFields;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import javax.annotation.ParametersAreNonnullByDefault;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.WallclockTimestampExtractor;
import org.springframework.kafka.support.serializer.JsonSerde;

@ParametersAreNonnullByDefault
class ProfileKafkaStreamsSupplier
implements Supplier<KafkaStreams> {
    private static final Long COMMIT_INTERVAL = TimeUnit.HOURS.toMillis(1L);
    private static final Serde<DeletableProfile> PROFILE_SERDE = new JsonSerde(DeletableProfile.class);
    private static final Serde<DynamicProfileFields> PROFILE_FIELDS_SERDE = new JsonSerde(DynamicProfileFields.class);
    private static final Serde<TenantId[]> TENANT_IDS_SERDE = new JsonSerde(TenantId[].class);
    private final ProfileProcessor _profileProcessor;
    private final String _kafkaBootstrapServers;
    private final String _kafkaStreamsStateDir;

    protected ProfileKafkaStreamsSupplier(String kafkaBootstrapServers, String kafkaStreamsStateDir, ProfileProcessor profileProcessor) {
        this._kafkaBootstrapServers = kafkaBootstrapServers;
        this._profileProcessor = profileProcessor;
        this._kafkaStreamsStateDir = kafkaStreamsStateDir;
    }

    @Override
    public KafkaStreams get() {
        Properties config = new Properties();
        config.put("application.id", "just.people.ProfileProcessorV2");
        config.put("bootstrap.servers", this._kafkaBootstrapServers);
        config.put("default.timestamp.extractor", LogAndSkipOnInvalidTimestamp.class);
        config.put("commit.interval.ms", COMMIT_INTERVAL);
        config.put("cache.max.bytes.buffering", (Object)0);
        config.put("default.deserialization.exception.handler", LogAndContinueExceptionHandler.class);
        config.put("state.dir", this._kafkaStreamsStateDir);
        StreamsBuilder builder = new StreamsBuilder();
        KTable profileTable = builder.table("just.persons", Consumed.with((Serde)ProfileIdSerde.INSTANCE, (Serde)PROFILE_SERDE, (TimestampExtractor)new WallclockTimestampExtractor(), (Topology.AutoOffsetReset)Topology.AutoOffsetReset.EARLIEST));
        KTable fieldsTable = builder.table("just.persons.profileAttributes", Consumed.with((Serde)ProfileIdSerde.INSTANCE, (Serde)PROFILE_FIELDS_SERDE, (TimestampExtractor)new WallclockTimestampExtractor(), (Topology.AutoOffsetReset)Topology.AutoOffsetReset.EARLIEST));
        KTable tenantIdTable = builder.table("just.persons.tenantIds", Consumed.with((Serde)ProfileIdSerde.INSTANCE, (Serde)TENANT_IDS_SERDE, (TimestampExtractor)new WallclockTimestampExtractor(), (Topology.AutoOffsetReset)Topology.AutoOffsetReset.EARLIEST));
        profileTable.leftJoin(fieldsTable, (deletableProfile, fields) -> new ProfileKafkaModel.ProfileKafkaModelBuilder((StaticProfileFields)deletableProfile, fields, deletableProfile.isDeleted())).leftJoin(tenantIdTable, (profileKafkaModelBuilder, tenantIds) -> profileKafkaModelBuilder.withTenantIds(tenantIds).build()).toStream().process(() -> this._profileProcessor, new String[0]);
        return new KafkaStreams(builder.build(), config);
    }
}

