/*
 * Decompiled with CFR 0.152.
 */
package de.justsoftware.drive.business.migrationconversation;

import com.google.common.collect.ImmutableSet;
import de.justsoftware.drive.business.kafka.AbstractKafkaConsumer;
import de.justsoftware.drive.business.kafka.KafkaConfigurationProvider;
import de.justsoftware.drive.common.item.model.ItemId;
import de.justsoftware.drive.common.item.model.ItemType;
import de.justsoftware.drive.persistence.conversationmigration.ConversationMigrationDAO;
import de.justsoftware.drive.persistence.search.SearchIndexQueueDAO;
import de.justsoftware.drive.persistence.search.model.SearchIndexType;
import de.justsoftware.drive.persistence.transaction.TransactionSupport;
import java.util.Set;
import java.util.function.Consumer;
import java.util.regex.Pattern;
import javax.annotation.ParametersAreNonnullByDefault;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
@ParametersAreNonnullByDefault
public class MigrationTopicConsumer
extends AbstractKafkaConsumer<String, String> {
    public static final String TOPIC = "just.drive.conversationidmigration";
    private static final Logger LOG = LoggerFactory.getLogger(MigrationTopicConsumer.class);
    private static final Pattern OLD_CONVERSATION_ID_PATTERN = Pattern.compile("^CONVERSATION,\\d+$");
    private static final Pattern NEW_CONVERSATION_ID_PATTERN = Pattern.compile("^CONVERSATION,[0-9a-fA-F]{8}\\b-[0-9a-fA-F]{4}\\b-[0-9a-fA-F]{4}\\b-[0-9a-fA-F]{4}\\b-[0-9a-fA-F]{12}$");
    private final ConversationMigrationDAO _dao;
    private final SearchIndexQueueDAO _indexQueueDAO;

    @Autowired
    public MigrationTopicConsumer(KafkaConfigurationProvider kafkaConfigurationProvider, TransactionSupport transactionSupport, ConversationMigrationDAO dao, SearchIndexQueueDAO indexQueueDAO) {
        super(kafkaConfigurationProvider.stringConsumer("just.drive.MigrationTopicConsumer"), transactionSupport, TOPIC);
        this._dao = dao;
        this._indexQueueDAO = indexQueueDAO;
    }

    protected void process(ConsumerRecords<String, String> records, Consumer<Runnable> afterCommitCallbackConsumer) {
        ImmutableSet.Builder processedIds = ImmutableSet.builderWithExpectedSize((int)records.count());
        for (ConsumerRecord migration : records) {
            ItemId originalId = ItemId.valueOf((String)((String)migration.key()));
            ItemId migratedId = ItemId.valueOf((String)((String)migration.value()));
            if (originalId == null || originalId.getType() != ItemType.CONVERSATION || !OLD_CONVERSATION_ID_PATTERN.matcher((CharSequence)migration.key()).matches()) {
                LOG.warn("Original conversation id {} not valid for migration. Only CONVERSATION,%long% is allowed.", (Object)originalId);
                continue;
            }
            if (migratedId == null || migratedId.getType() != ItemType.CONVERSATION || !NEW_CONVERSATION_ID_PATTERN.matcher((CharSequence)migration.value()).matches()) {
                LOG.warn("Migrated conversation id {} not valid for migration. Only CONVERSATION,%uuid% is allowed.", (Object)migratedId);
                continue;
            }
            this._dao.migrate(originalId, migratedId);
            processedIds.add((Object)migratedId);
        }
        this._indexQueueDAO.insertAllDocumentsOfItems((Set)processedIds.build(), SearchIndexType.METADATA);
    }
}

