/*
 * Decompiled with CFR 0.152.
 */
package de.justsoftware.drive.content.impl;

import com.google.common.annotations.VisibleForTesting;
import de.justsoftware.drive.content.impl.ExtractionRunner;
import de.justsoftware.drive.content.kafka.AbstractKafkaConsumer;
import de.justsoftware.drive.filepersistence.DriveFilePersistenceScanMarker;
import java.util.Collection;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.PreDestroy;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.DescribeClusterOptions;
import org.apache.kafka.clients.admin.DescribeClusterResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.HealthIndicator;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;

@Configuration
@ComponentScan(basePackageClasses={DriveFilePersistenceScanMarker.class})
public class ContentExtractorConfiguration {
    private static final Logger LOG = LoggerFactory.getLogger(ContentExtractorConfiguration.class);
    @VisibleForTesting
    final List<ExtractionRunner> _extractionRunners;

    @Autowired
    public ContentExtractorConfiguration(ApplicationContext applicationContext, @Value(value="${just.drive.contextExtractor.numberOfThreads:2}") int numberOfThreads) {
        LOG.info("Content Extractor is using {} threads", (Object)numberOfThreads);
        this._extractionRunners = Stream.generate(() -> (ExtractionRunner)applicationContext.getBean(ExtractionRunner.class)).limit(numberOfThreads).collect(Collectors.toList());
    }

    @PreDestroy
    public void destroy() {
        this._extractionRunners.forEach(AbstractKafkaConsumer::destroy);
    }

    @Bean
    public HealthIndicator kafkaHealthIndicator(@Value(value="${just.drive.kafka.boostrapServers:localhost:9092}") String bootstrapServers) {
        Properties props = new Properties();
        props.put("bootstrap.servers", bootstrapServers);
        AdminClient adminClient = AdminClient.create((Properties)props);
        DescribeClusterOptions describeClusterOptions = new DescribeClusterOptions().timeoutMs(Integer.valueOf(1000));
        return () -> {
            DescribeClusterResult describeCluster = adminClient.describeCluster(describeClusterOptions);
            try {
                String clusterId = (String)describeCluster.clusterId().get();
                int nodeCount = ((Collection)describeCluster.nodes().get()).size();
                return Health.up().withDetail("clusterId", (Object)clusterId).withDetail("nodeCount", (Object)nodeCount).build();
            }
            catch (InterruptedException | ExecutionException e) {
                return Health.down().withException((Throwable)e).build();
            }
        };
    }
}

