/*
 * Decompiled with CFR 0.152.
 */
package de.justsoftware.onx.container.business.impl;

import com.freiheit.toro.admin.shared.server.superoperty.Settings;
import com.freiheit.toro.common.shared.model.DateWithoutTimezone;
import com.freiheit.toro.common.shared.server.util.DateUtil;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSetMultimap;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Multimaps;
import com.google.common.eventbus.AllowConcurrentEvents;
import com.google.common.eventbus.Subscribe;
import de.justsoftware.kafka.client.KafkaCleaningProducer;
import de.justsoftware.kafka.client.KafkaCleaningProducerState;
import de.justsoftware.onx.common.business.events.ServerEventHandler;
import de.justsoftware.onx.container.business.DriveChangeVersionPublisher;
import de.justsoftware.onx.container.business.EntityVersionReadWriteDataService;
import de.justsoftware.onx.container.business.events.DriveChangesConnectedEvent;
import de.justsoftware.onx.container.business.events.EntityVersionUpdateEvent;
import de.justsoftware.onx.container.business.model.EntityVersion;
import de.justsoftware.onx.container.integration.persistence.EntityVersionDriveChangeDAO;
import de.justsoftware.onx.container.shared.model.DriveChangeId;
import de.justsoftware.onx.container.shared.model.EntityVersionId;
import de.justsoftware.onx.container.util.EntityVersionUtil;
import de.justsoftware.onx.server.business.JucoServerVersionUpdateProcessor;
import de.justsoftware.onx.server.business.UpdateBehaviour;
import de.justsoftware.onx.server.model.JucoVersionUpdateProcess;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import javax.annotation.CheckForNull;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.ParametersAreNonnullByDefault;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.joda.time.DateTime;
import org.joda.time.ReadableInstant;
import org.json.JSONObject;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;

@Service
@ParametersAreNonnullByDefault
public class DriveChangeVersionPublisherImpl
implements DriveChangeVersionPublisher,
ServerEventHandler,
JucoServerVersionUpdateProcessor {
    @VisibleForTesting
    static final String EXPIRATION_DATE = "expirationDate";
    @VisibleForTesting
    static final String FIRST_PUBLISHED_DATE = "firstPublishedDate";
    @VisibleForTesting
    static final String FIRST_PUBLISHED_VERSION = "firstPublishedVersion";
    @VisibleForTesting
    static final String FIRST_VERSION = "firstVersion";
    private static final int CHUNK_SIZE = 100;
    private static final String TOPIC = "just.change.version";
    private final EntityVersionDriveChangeDAO _entityVersionDriveChangeDAO;
    private final EntityVersionReadWriteDataService _entityVersionReadWriteDataService;
    @CheckForNull
    private final KafkaCleaningProducer<String, String> _producer;
    private final Settings _settings;

    @Autowired
    public DriveChangeVersionPublisherImpl(EntityVersionDriveChangeDAO entityVersionDriveChangeDAO, EntityVersionReadWriteDataService entityVersionReadWriteDataService, @Nullable @Qualifier(value="KAFKA_STRING_PRODUCER") KafkaCleaningProducer<String, String> producer, Settings settings) {
        this._entityVersionDriveChangeDAO = entityVersionDriveChangeDAO;
        this._entityVersionReadWriteDataService = entityVersionReadWriteDataService;
        this._producer = producer;
        this._settings = settings;
    }

    @VisibleForTesting
    void publishChanges(Iterable<DriveChangeId> changes, Predicate<EntityVersionId> versionPredicate) {
        this.publishChanges(StreamSupport.stream(changes.spliterator(), false).collect(Collectors.toMap(Function.identity(), changeId -> Optional.absent(), (l, r) -> r)), versionPredicate);
    }

    @VisibleForTesting
    void publishChanges(Map<DriveChangeId, Optional<DateTime>> changes, Predicate<EntityVersionId> versionPredicate) {
        Map nonNullChanges = Maps.filterKeys(changes, Objects::nonNull);
        ImmutableSetMultimap versionsByChange = ImmutableSetMultimap.copyOf((Multimap)Multimaps.filterValues(this._entityVersionDriveChangeDAO.getEntityVersionsForDriveChanges(nonNullChanges.keySet()), versionPredicate));
        ImmutableSet entityVersionIds = versionsByChange.inverse().keySet();
        ImmutableMap<EntityVersionId, EntityVersion> versionMap = this._entityVersionReadWriteDataService.getByIdsAsMap((Set<EntityVersionId>)entityVersionIds);
        DateTime driveDocumentMigrationDate = this._settings.getDriveDocumentMigrationDate();
        nonNullChanges.forEach((change, optionalChangeDate) -> {
            ImmutableSet versionIds = versionsByChange.get(change);
            FluentIterable versions = FluentIterable.from((Iterable)FluentIterable.from((Iterable)versionIds).transform(arg_0 -> ((ImmutableMap)versionMap).get(arg_0)).filter(Objects::nonNull).toSortedList(EntityVersion.ENTITY_VERSION_ID_ORDERING));
            if (versions.isEmpty()) {
                this.send((DriveChangeId)change, null);
            } else {
                DateWithoutTimezone expirationDate;
                DateTime changeDate;
                FluentIterable publicVersions;
                EntityVersion firstPublicVersion;
                JSONObject json = new JSONObject();
                EntityVersion firstVersion = (EntityVersion)versions.first().orNull();
                if (firstVersion != null) {
                    json.put(FIRST_VERSION, (Object)firstVersion.getVersionString());
                }
                if ((firstPublicVersion = (EntityVersion)(publicVersions = versions.filter(v -> v.getStatus().isPublished())).first().orNull()) != null) {
                    json.put(FIRST_PUBLISHED_VERSION, (Object)firstPublicVersion.getVersionString());
                    json.put(FIRST_PUBLISHED_DATE, (Object)DateUtil.formatDateISO(DriveChangeVersionPublisherImpl.releaseDate(firstPublicVersion)));
                } else if (optionalChangeDate.isPresent() && driveDocumentMigrationDate != null && (changeDate = (DateTime)optionalChangeDate.get()).isBefore((ReadableInstant)driveDocumentMigrationDate)) {
                    json.put(FIRST_PUBLISHED_DATE, (Object)DateUtil.formatDateISO(changeDate.toDate()));
                }
                EntityVersion lastPublicVersion = (EntityVersion)publicVersions.last().orNull();
                if (lastPublicVersion != null && (expirationDate = EntityVersionUtil.expirationDate(lastPublicVersion)) != null) {
                    json.put(EXPIRATION_DATE, (Object)DateUtil.formatDateISO(expirationDate));
                }
                this.send((DriveChangeId)change, json.toString());
            }
        });
    }

    @Nonnull
    private static DateWithoutTimezone releaseDate(EntityVersion version) {
        return (DateWithoutTimezone)FluentIterable.of((Object)version.getReleaseDate(), (Object[])new DateWithoutTimezone[]{version.getModificationDate(), version.getCreationDate()}).firstMatch(Objects::nonNull).or(() -> new DateWithoutTimezone());
    }

    private void send(DriveChangeId change, @Nullable String s) {
        if (this._producer != null) {
            this._producer.send(new ProducerRecord(TOPIC, (Object)change.getId(), (Object)s));
        }
    }

    private void publishEntityVersions(Set<EntityVersionId> entityVersionIds, Predicate<EntityVersionId> versionPredicate) {
        ImmutableMap<EntityVersionId, DriveChangeId> changesForVersions = this._entityVersionDriveChangeDAO.getDriveChangesForEntityVersions(entityVersionIds);
        this.publishChanges((Iterable<DriveChangeId>)changesForVersions.values(), versionPredicate);
    }

    @Subscribe
    @AllowConcurrentEvents
    public void onEntityVersionUpdated(EntityVersionUpdateEvent e) {
        this.publishEntityVersions((Set<EntityVersionId>)ImmutableSet.of((Object)e.getVersion().getId()), (Predicate<EntityVersionId>)Predicates.alwaysTrue());
    }

    @Subscribe
    @AllowConcurrentEvents
    public void onChangesConnected(DriveChangesConnectedEvent e) {
        this.publishChanges((Map<DriveChangeId, Optional<DateTime>>)e.getChanges(), (Predicate<EntityVersionId>)Predicates.alwaysTrue());
    }

    @Override
    public void deleteEntityVersions(Set<EntityVersionId> entityVersionIds) {
        this.publishEntityVersions(entityVersionIds, (Predicate<EntityVersionId>)Predicates.not(entityVersionIds::contains));
    }

    @Override
    public void publishAllDriveChangeVersions() {
        if (this._producer != null) {
            try (KafkaCleaningProducerState state = this._producer.beginRefill(new String[]{TOPIC});){
                this._entityVersionDriveChangeDAO.forAllDriveChanges(100, changes -> this.publishChanges((Iterable<DriveChangeId>)changes, (Predicate<EntityVersionId>)Predicates.alwaysTrue()));
                state.finishRefill();
            }
        }
    }

    @Override
    public JucoVersionUpdateProcess getProcessType() {
        return JucoVersionUpdateProcess.PUBLISH_DRIVE_CHANGE_VERSIONS;
    }

    @Override
    public UpdateBehaviour getUpdateBehaviour() {
        return UpdateBehaviour.onceOnMasterForVersion(this, "10.0.0-60");
    }

    @Override
    public void process() {
        this.publishAllDriveChangeVersions();
    }
}

