/*
 * Decompiled with CFR 0.152.
 */
package de.justsoftware.notificationservice.kafka;

import de.justsoftware.browserpushclient.BrowserPushKafkaProducer;
import de.justsoftware.browserpushclient.BrowserPushProducer;
import de.justsoftware.mobilepush.PushNotificationKafkaProducer;
import de.justsoftware.mobilepush.PushNotificationProducer;
import de.justsoftware.notificationclient.NotificationDeserializer;
import de.justsoftware.notificationclient.model.Notification;
import de.justsoftware.notificationservice.kafka.KafkaConfiguration;
import de.justsoftware.toolbox.kafka.client.KafkaCleaningProducer;
import de.justsoftware.toolbox.kafka.client.KafkaCleaningProducerImpl;
import java.util.Map;
import java.util.Properties;
import javax.annotation.Nonnull;
import javax.annotation.ParametersAreNonnullByDefault;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;

@Configuration
@DependsOn(value={"liquibase"})
@ParametersAreNonnullByDefault
public class KafkaConfiguration {
    @Value(value="${kafka.bootstrapServers:}")
    private String _bootstrapServers;

    @Bean(destroyMethod="close")
    @Nonnull
    public KafkaCleaningProducer<String, String> kafkaCleaningProducer() {
        Properties props = new Properties();
        props.put("bootstrap.servers", this._bootstrapServers);
        props.put("acks", "all");
        props.put("retries", (Object)0);
        props.put("batch.size", (Object)16384);
        props.put("linger.ms", (Object)1);
        props.put("buffer.memory", (Object)0x2000000);
        1 delegate = new /* Unavailable Anonymous Inner Class!! */;
        return new KafkaCleaningProducerImpl((Producer)delegate, arg_0 -> this.createCleaningConsumer(arg_0));
    }

    @Nonnull
    private Consumer<String, ?> createCleaningConsumer(TopicPartition tp) {
        Properties props = new Properties();
        props.put("bootstrap.servers", this._bootstrapServers);
        props.put("enable.auto.commit", "false");
        props.put("auto.offset.reset", "earliest");
        props.put("max.poll.records", (Object)100);
        return KafkaCleaningProducerImpl.createStringConsumer((TopicPartition)tp, (Properties)props);
    }

    @Bean
    @Nonnull
    public static PushNotificationProducer pushNotificationKafkaProducer(KafkaCleaningProducer<String, String> kafkaProducer) {
        return new PushNotificationKafkaProducer(kafkaProducer);
    }

    @Bean
    @Nonnull
    public static BrowserPushProducer browserPushProducer(KafkaCleaningProducer<String, String> kafkaProducer) {
        return new BrowserPushKafkaProducer(kafkaProducer);
    }

    @Nonnull
    private Properties defaultConsumerConfig() {
        Properties config = new Properties();
        config.put("bootstrap.servers", this._bootstrapServers);
        config.put("enable.auto.commit", "false");
        config.put("auto.offset.reset", "earliest");
        config.put("key.deserializer", ErrorHandlingDeserializer.class);
        config.put("value.deserializer", ErrorHandlingDeserializer.class);
        config.put("max.poll.records", (Object)100);
        return config;
    }

    @Bean
    @Nonnull
    public ConcurrentKafkaListenerContainerFactory<String, String> userGroupMemberListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory containerFactory = new ConcurrentKafkaListenerContainerFactory();
        Properties consumerConfig = this.defaultConsumerConfig();
        consumerConfig.put("group.id", "just.notificationservice.userGroupMemberConsumer");
        consumerConfig.put("spring.deserializer.key.delegate.class", StringDeserializer.class);
        consumerConfig.put("spring.deserializer.value.delegate.class", StringDeserializer.class);
        containerFactory.setConsumerFactory((ConsumerFactory)new DefaultKafkaConsumerFactory((Map)consumerConfig));
        containerFactory.setBatchListener(Boolean.valueOf(true));
        return containerFactory;
    }

    @Bean
    @Nonnull
    public ConcurrentKafkaListenerContainerFactory<String, Notification> notificationListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory containerFactory = new ConcurrentKafkaListenerContainerFactory();
        Properties consumerConfig = this.defaultConsumerConfig();
        consumerConfig.put("group.id", "just.notificationservice.notificationConsumer");
        consumerConfig.put("spring.deserializer.key.delegate.class", StringDeserializer.class);
        consumerConfig.put("spring.deserializer.value.delegate.class", NotificationDeserializer.class);
        containerFactory.setConsumerFactory((ConsumerFactory)new DefaultKafkaConsumerFactory((Map)consumerConfig));
        containerFactory.setBatchListener(Boolean.valueOf(true));
        return containerFactory;
    }
}

