/*
 * Decompiled with CFR 0.152.
 */
package de.justsoftware.onx.export.ldap.business;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import de.justsoftware.onx.common.shared.model.PersonId;
import de.justsoftware.onx.common.shared.server.TransactionHelper;
import de.justsoftware.onx.container.business.EntityService;
import de.justsoftware.onx.container.business.EntityVersionService;
import de.justsoftware.onx.container.business.model.EntityVersion;
import de.justsoftware.onx.container.shared.model.EntityId;
import de.justsoftware.onx.container.shared.model.EntityMemberRole;
import de.justsoftware.onx.container.shared.model.EntityStatus;
import de.justsoftware.onx.container.shared.model.db.DBEntity;
import de.justsoftware.onx.export.ldap.business.LDAPMemberGroupExporter;
import de.justsoftware.onx.export.ldap.model.MemberGroupExportException;
import de.justsoftware.onx.export.ldap.model.MemberRoleUpdateAction;
import de.justsoftware.onx.export.ldap.model.MemberRoleUpdateKafkaKey;
import de.justsoftware.onx.kafka.AbstractKafkaConsumer;
import de.justsoftware.onx.kafka.KafkaConfigurationProvider;
import de.justsoftware.onx.kafka.KafkaConsumerContext;
import java.util.List;
import java.util.Set;
import javax.annotation.Nullable;
import javax.annotation.ParametersAreNonnullByDefault;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ParametersAreNonnullByDefault
public class MemberRoleExportUpdateConsumer
extends AbstractKafkaConsumer<String, String> {
    static final String TOPIC = "just.export.memberRoleUpdates";
    private static final Logger LOG = LoggerFactory.getLogger(MemberRoleExportUpdateConsumer.class);
    private final EntityService _entityService;
    private final EntityVersionService _entityVersionService;
    private final ImmutableList<LDAPMemberGroupExporter> _memberRoleExporters;

    public MemberRoleExportUpdateConsumer(TransactionHelper transactionHelper, KafkaConfigurationProvider kafkaConfigurationProvider, EntityService entityService, EntityVersionService entityVersionService, List<LDAPMemberGroupExporter> memberGroupExporters) {
        this(kafkaConfigurationProvider.stringConsumer("just.toro.MemberRoleExportUpdateConsumer"), transactionHelper, entityService, entityVersionService, memberGroupExporters);
    }

    @VisibleForTesting
    MemberRoleExportUpdateConsumer(@Nullable Consumer<String, String> consumer, TransactionHelper transactionHelper, EntityService entityService, EntityVersionService entityVersionService, List<LDAPMemberGroupExporter> memberGroupExporters) {
        super(consumer, transactionHelper, TOPIC);
        this._entityService = entityService;
        this._entityVersionService = entityVersionService;
        this._memberRoleExporters = ImmutableList.copyOf(memberGroupExporters);
    }

    @Override
    protected Logger getLog() {
        return LOG;
    }

    @Override
    protected void process(ConsumerRecords<String, String> records, KafkaConsumerContext context) throws MemberGroupExportException {
        records.forEach(record -> {
            MemberRoleUpdateKafkaKey key = MemberRoleUpdateKafkaKey.parse((String)record.key());
            EntityId entityId = key.getEntityId();
            PersonId personId = key.getPersonId();
            EntityMemberRole entityMemberRole = key.getEntityMemberRole();
            MemberRoleUpdateAction memberRoleUpdateAction = MemberRoleUpdateAction.valueOf((String)record.value());
            DBEntity entity = this._entityService.getById(entityId);
            EntityVersion newestVersion = this._entityVersionService.getNewestVersionWithStatus(entityId, (Set<EntityStatus>)ImmutableSet.of());
            if (entity == null || newestVersion == null) {
                LOG.warn("Could not export non existent entity " + entityId);
                return;
            }
            try {
                this._memberRoleExporters.stream().filter(exporter -> !newestVersion.getStatus().isArchived()).forEach(exporter -> exporter.processMemberUpdate(entity, personId, entityMemberRole, memberRoleUpdateAction));
            }
            catch (MemberGroupExportException e) {
                LOG.error("Processing record with offset " + record.offset() + " failed. Could not process member update " + memberRoleUpdateAction + " " + key, (Throwable)e);
                throw e;
            }
        });
    }
}

