/*
 * Decompiled with CFR 0.152.
 */
package de.justsoftware.onx.usergroup.business.impl.kafka;

import com.google.common.annotations.VisibleForTesting;
import de.justsoftware.onx.common.shared.server.TransactionHelper;
import de.justsoftware.onx.container.shared.model.IdParseException;
import de.justsoftware.onx.kafka.AbstractKafkaConsumer;
import de.justsoftware.onx.kafka.KafkaConfigurationProvider;
import de.justsoftware.onx.kafka.KafkaConsumerContext;
import de.justsoftware.onx.usergroup.business.impl.UserGroupService;
import de.justsoftware.onx.usergroup.business.impl.kafka.UserGroupDeserializer;
import de.justsoftware.onx.usergroup.model.UserGroup;
import de.justsoftware.onx.usergroup.model.UserGroupId;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.ParametersAreNonnullByDefault;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;

@Service
@Lazy(value=false)
@ParametersAreNonnullByDefault
public class UserGroupKafkaConsumer
extends AbstractKafkaConsumer<String, UserGroup> {
    private static final Logger LOG = LoggerFactory.getLogger(UserGroupKafkaConsumer.class);
    private final UserGroupService _userGroupService;

    @Autowired
    public UserGroupKafkaConsumer(TransactionHelper transactionHelper, KafkaConfigurationProvider kafkaConfigurationProvider, UserGroupService userGroupService) {
        this(kafkaConfigurationProvider.consumer("just.onx.message.userGroupConsumer", new StringDeserializer(), new UserGroupDeserializer()), transactionHelper, userGroupService);
    }

    @VisibleForTesting
    UserGroupKafkaConsumer(@Nullable Consumer<String, UserGroup> consumer, TransactionHelper transactionHelper, UserGroupService userGroupService) {
        super(consumer, transactionHelper, "just.people.userGroup");
        this._userGroupService = userGroupService;
    }

    @Override
    @Nonnull
    protected Logger getLog() {
        return LOG;
    }

    @Override
    public void process(ConsumerRecords<String, UserGroup> records, KafkaConsumerContext context) {
        records.forEach(record -> {
            try {
                if (record.value() == null) {
                    UserGroupId groupId = UserGroupId.parse((String)record.key()).orElseThrow(IdParseException::new);
                    this._userGroupService.removeUserGroup(groupId);
                } else {
                    this._userGroupService.addOrUpdateUserGroup((UserGroup)record.value());
                }
            }
            catch (IdParseException exp) {
                LOG.error(String.format("Failed to update user group with key '%s' and value '%s'. Skip this record.", record.key(), record.value()), (Throwable)exp);
            }
        });
    }
}

