/*
 * Decompiled with CFR 0.152.
 */
package de.justsoftware.people.business.usergroup.impl;

import de.justsoftware.people.business.usergroup.impl.UserGroupImportKafkaStreamsSupplier;
import de.justsoftware.people.business.usergroup.impl.UserGroupMemberProcessor;
import de.justsoftware.people.business.usergroup.impl.UserGroupProcessor;
import de.justsoftware.people.business.usergroup.model.IsMember;
import de.justsoftware.people.business.usergroup.model.UserGroupKafkaModel;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import javax.annotation.ParametersAreNonnullByDefault;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.WallclockTimestampExtractor;
import org.springframework.kafka.support.serializer.JsonSerde;

@ParametersAreNonnullByDefault
class UserGroupImportKafkaStreamsSupplier
implements Supplier<KafkaStreams> {
    private static final Long COMMIT_INTERVAL = TimeUnit.HOURS.toMillis(1L);
    private static final Serde<UserGroupKafkaModel> USER_GROUP_SERDE = new JsonSerde(UserGroupKafkaModel.class);
    private final UserGroupProcessor _userGroupProcessor;
    private final UserGroupMemberProcessor _userGroupMemberProcessor;
    private final String _kafkaBootstrapServers;
    private final String _kafkaStreamsStateDir;

    protected UserGroupImportKafkaStreamsSupplier(String kafkaBootstrapServers, String kafkaStreamsStateDir, UserGroupProcessor userGroupProcessor, UserGroupMemberProcessor userGroupMemberProcessor) {
        this._kafkaBootstrapServers = kafkaBootstrapServers;
        this._kafkaStreamsStateDir = kafkaStreamsStateDir;
        this._userGroupProcessor = userGroupProcessor;
        this._userGroupMemberProcessor = userGroupMemberProcessor;
    }

    @Override
    public KafkaStreams get() {
        Properties config = new Properties();
        config.put("application.id", "just.people.userGroupImportProcessor");
        config.put("bootstrap.servers", this._kafkaBootstrapServers);
        config.put("default.timestamp.extractor", LogAndSkipOnInvalidTimestamp.class);
        config.put("commit.interval.ms", COMMIT_INTERVAL);
        config.put("cache.max.bytes.buffering", (Object)0);
        config.put("default.deserialization.exception.handler", LogAndContinueExceptionHandler.class);
        config.put("state.dir", this._kafkaStreamsStateDir);
        StreamsBuilder builder = new StreamsBuilder();
        KTable userGroups = builder.table("just.people.import.userGroup", Consumed.with((Serde)UserGroupIdSerde.INSTANCE, (Serde)USER_GROUP_SERDE, (TimestampExtractor)new WallclockTimestampExtractor(), (Topology.AutoOffsetReset)Topology.AutoOffsetReset.EARLIEST));
        KTable userGroupProfiles = builder.table("just.people.import.userGroupMembers", Consumed.with((Serde)MembershipSerde.INSTANCE, (Serde)new JsonSerde(IsMember.class), (TimestampExtractor)new WallclockTimestampExtractor(), (Topology.AutoOffsetReset)Topology.AutoOffsetReset.EARLIEST));
        userGroups.toStream().process(() -> this._userGroupProcessor, new String[0]);
        userGroupProfiles.toStream().process(() -> this._userGroupMemberProcessor, new String[0]);
        return new KafkaStreams(builder.build(), config);
    }
}

