/*
 * Decompiled with CFR 0.152.
 */
package de.justsoftware.people.business.search.impl;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import de.justsoftware.people.business.search.VisibilityService;
import de.justsoftware.people.domain.RepositoryCommunicationException;
import de.justsoftware.people.domain.RepositoryException;
import de.justsoftware.people.domain.model.ItemType;
import de.justsoftware.people.domain.model.profile.ProfileId;
import de.justsoftware.people.domain.model.usergroup.UserGroupId;
import de.justsoftware.people.domain.search.SearchIndexService;
import de.justsoftware.people.kafka.AbstractKafkaConsumer;
import de.justsoftware.people.kafka.KafkaConfigurationProvider;
import javax.annotation.ParametersAreNonnullByDefault;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@ParametersAreNonnullByDefault
@Service
public class SearchIndexQueueConsumer
extends AbstractKafkaConsumer<String, String> {
    static final String SEARCH_INDEX_TOPIC = "just.people.search.index";
    private static final Logger LOGGER = LoggerFactory.getLogger(SearchIndexQueueConsumer.class);
    private final VisibilityService _visibilityService;
    private final SearchIndexService _searchIndexService;

    @Autowired
    public SearchIndexQueueConsumer(KafkaConfigurationProvider kafkaConfigurationProvider, SearchIndexService searchIndexService, VisibilityService visibilityService) {
        this(kafkaConfigurationProvider.modelConsumer("just.people.search.indexConsumer", (Deserializer)new StringDeserializer()), searchIndexService, visibilityService);
    }

    @VisibleForTesting
    SearchIndexQueueConsumer(Consumer<String, String> consumer, SearchIndexService searchIndexService, VisibilityService visibilityService) {
        super(consumer, new String[]{SEARCH_INDEX_TOPIC});
        this._searchIndexService = searchIndexService;
        this._visibilityService = visibilityService;
    }

    protected void process(ConsumerRecords<String, String> records, java.util.function.Consumer<Runnable> afterCommitCallbackConsumer) {
        LOGGER.debug("Update visibilities for {} records", (Object)records.count());
        ImmutableSet.Builder profiles = ImmutableSet.builder();
        ImmutableSet.Builder userGroups = ImmutableSet.builder();
        records.forEach(r -> {
            if (((String)r.key()).startsWith(ItemType.PROFILE.name())) {
                profiles.add((Object)ProfileId.parse((String)((String)r.key())));
            } else {
                userGroups.add((Object)UserGroupId.parse((String)((String)r.key())));
            }
        });
        this.processRecords(profiles.build(), userGroups.build());
    }

    private void processRecords(ImmutableSet<ProfileId> profileIds, ImmutableSet<UserGroupId> userGroupIds) {
        LOGGER.debug("Updating visibilities for profiles {} and groups {} ", profileIds, userGroupIds);
        try {
            this._searchIndexService.updateVisibleForRolesForProfiles(this._visibilityService.visibilitiesForProfiles(profileIds));
            this._searchIndexService.updateVisibleForRolesForUserGroups(this._visibilityService.visibilitiesForGroups(userGroupIds));
            this._searchIndexService.commit(false);
        }
        catch (RepositoryCommunicationException e) {
            throw e;
        }
        catch (RepositoryException e) {
            LOGGER.error("Failed to process records, see previous log messages for causes.", (Throwable)e);
        }
    }
}

