/*
 * Decompiled with CFR 0.152.
 */
package de.justsoftware.drive.business.change.impl;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.IntNode;
import com.fasterxml.jackson.databind.node.LongNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.node.TextNode;
import com.google.common.base.Optional;
import com.google.common.collect.Multimap;
import de.justsoftware.drive.business.change.ChangeTrigger;
import de.justsoftware.drive.business.change.ChangeWithTriggers;
import de.justsoftware.drive.business.change.DriveChangePublisher;
import de.justsoftware.drive.business.change.impl.DriveChangePublisherImpl;
import de.justsoftware.drive.business.kafka.KafkaUtil;
import de.justsoftware.drive.common.document.model.DocumentVersionId;
import de.justsoftware.drive.common.item.model.ItemId;
import de.justsoftware.drive.common.person.model.PersonId;
import java.util.Collection;
import java.util.Iterator;
import java.util.Set;
import java.util.UUID;
import javax.annotation.Nullable;
import javax.annotation.ParametersAreNonnullByDefault;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.joda.time.ReadableInstant;
import org.joda.time.format.ISODateTimeFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;

@Service
@ParametersAreNonnullByDefault
public class DriveChangePublisherImpl
implements DriveChangePublisher {
    static final String TOPIC = "just.drive.changes";
    private static final Logger LOG = LoggerFactory.getLogger(DriveChangePublisherImpl.class);
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
    private static final int TRIGGER_NODE_LIMIT = 1000;
    private static final String VERSION = "version";
    private static final String NAME = "name";
    private static final String VERSION_ID = "versionId";
    private static final String DOCUMENT_ID = "documentId";
    private static final String TYPE = "type";
    private static final String MIME_TYPE = "mimeType";
    private static final String FILE_SIZE = "fileSize";
    private static final String OWNER_ID = "ownerId";
    private static final String CHANGE_DATE = "changeDate";
    private static final String EXECUTED_BY = "executedBy";
    private final Producer<String, String> _kafkaProducer;

    @Autowired
    public DriveChangePublisherImpl(@Nullable @Qualifier(value="KAFKA_STRING_PRODUCER") Producer<String, String> kafkaStringProducer) {
        this._kafkaProducer = kafkaStringProducer;
    }

    private void logErrorCallback(RecordMetadata metadata, Exception exception) {
        if (exception != null) {
            LOG.error("Could not publish change to topic just.drive.changes", (Throwable)exception);
        }
    }

    public void publishChange(ChangeWithTriggers change) {
        if (this._kafkaProducer == null) {
            return;
        }
        try {
            ObjectNode jsonNode = OBJECT_MAPPER.createObjectNode();
            String itemIdString = (String)change.getItemId().getId();
            jsonNode.set("itemId", (JsonNode)TextNode.valueOf((String)itemIdString));
            jsonNode.set(VERSION, (JsonNode)IntNode.valueOf((int)change.getVersion()));
            jsonNode.set(CHANGE_DATE, (JsonNode)TextNode.valueOf((String)ISODateTimeFormat.dateTime().print((ReadableInstant)change.getChangeDate())));
            if (change.getExecutedBy().isPresent()) {
                jsonNode.set(EXECUTED_BY, (JsonNode)LongNode.valueOf((long)((PersonId)change.getExecutedBy().get()).getId()));
            }
            if (change.getChangeTriggers().size() > 1000) {
                LOG.info("To avoid exceeding Kafka payload size, only {} triggers are processed for change '{}'. This message is triggered for compressed changes and can be ignored.", (Object)1000, (Object)change.getId());
            }
            ArrayNode triggers = OBJECT_MAPPER.createArrayNode();
            change.getChangeTriggers().stream().map(trigger -> (Optional)trigger.accept((ChangeTrigger.Visitor)SerializeTriggerVisitor.INSTANCE)).filter(Optional::isPresent).map(Optional::get).limit(1000L).forEach(arg_0 -> ((ArrayNode)triggers).add(arg_0));
            jsonNode.set("triggers", (JsonNode)triggers);
            String purpose = change.getPurpose();
            if (purpose != null) {
                jsonNode.set("purpose", (JsonNode)TextNode.valueOf((String)purpose));
            }
            Integer partition = KafkaUtil.partition((String)itemIdString, (int)this._kafkaProducer.partitionsFor(TOPIC).size());
            ProducerRecord record = new ProducerRecord(TOPIC, partition, (Object)((UUID)change.getId().getId()).toString(), (Object)OBJECT_MAPPER.writeValueAsString((Object)jsonNode));
            this._kafkaProducer.send(record, (arg_0, arg_1) -> this.logErrorCallback(arg_0, arg_1));
        }
        catch (JsonProcessingException e) {
            LOG.error(e.getMessage(), (Throwable)e);
        }
    }

    public void publishRemovedChange(Multimap<ItemId, DocumentVersionId> change) {
        if (this._kafkaProducer == null) {
            return;
        }
        Set itemIds = change.asMap().keySet();
        for (ItemId itemId : itemIds) {
            this.deleteChange(itemId, change.get((Object)itemId));
        }
    }

    private void deleteChange(ItemId itemId, Collection<DocumentVersionId> changes) {
        if (this._kafkaProducer == null) {
            return;
        }
        String itemIdString = (String)itemId.getId();
        Integer partition = KafkaUtil.partition((String)itemIdString, (int)this._kafkaProducer.partitionsFor(TOPIC).size());
        changes.forEach(documentVersionId -> {
            ProducerRecord record = new ProducerRecord(TOPIC, partition, (Object)((UUID)documentVersionId.getId()).toString(), null);
            this._kafkaProducer.send(record, (arg_0, arg_1) -> this.logErrorCallback(arg_0, arg_1));
        });
    }

    public void publishAllChanges(Iterator<ChangeWithTriggers> changes) {
        if (this._kafkaProducer == null) {
            return;
        }
        changes.forEachRemaining(arg_0 -> this.publishChange(arg_0));
    }
}

