/*
 * Decompiled with CFR 0.152.
 */
package de.justsoftware.onx.events.business.impl;

import com.google.common.base.Function;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSetMultimap;
import com.google.common.collect.Maps;
import de.justsoftware.onx.container.shared.model.ItemId;
import de.justsoftware.onx.events.integration.persistence.EventQueueDAO;
import de.justsoftware.onx.events.shared.server.model.DBUpdateEvent;
import de.justsoftware.onx.events.shared.server.model.UpdateEventType;
import de.justsoftware.onx.monitoring.business.QueueStatisticsService;
import de.justsoftware.onx.monitoring.business.model.QueueName;
import de.justsoftware.toolbox.clock.Clock;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.ParametersAreNonnullByDefault;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ParametersAreNonnullByDefault
public class EventQueueProcessor {
    private static final Logger LOG = LoggerFactory.getLogger(EventQueueProcessor.class);
    private final Clock _clock;
    private final EventQueueDAO _dao;
    private final QueueStatisticsService _queueStatisticsService;
    private final QueueName _queueName;

    public EventQueueProcessor(EventQueueDAO dao, Clock clock, QueueStatisticsService queueStatisticsService, QueueName queueName) {
        this._clock = clock;
        this._dao = dao;
        this._queueStatisticsService = queueStatisticsService;
        this._queueName = queueName;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void processEvents(int fetchSize, EventQueueProcessingCallback callback) {
        List<DBUpdateEvent> entries;
        do {
            long startBatch = this._clock.nowMillis();
            entries = this._dao.getEvents(fetchSize);
            DateTime timeAfterLoadingBatch = this._clock.now();
            if (entries.isEmpty()) continue;
            this.reportStatistics(entries);
            ImmutableSet distinctEntries = ImmutableSet.copyOf(entries);
            LOG.info("Queue {}: Fetched {} entries ({} distinct) to process (took {} ms).", new Object[]{this._queueName.name(), entries.size(), distinctEntries.size(), this._clock.nowMillis() - startBatch});
            try {
                this.processEvents((ImmutableSet<DBUpdateEvent>)distinctEntries, callback);
            }
            catch (RuntimeException e) {
                LOG.error(String.format("Queue %s: Failed to process events. We will skip them!", this._queueName.name()), (Throwable)e);
            }
            finally {
                this._dao.delete(entries);
                this._dao.deleteEntriesOlderThan((Set<DBUpdateEvent>)distinctEntries, timeAfterLoadingBatch);
            }
        } while (entries.size() > 0);
    }

    private void processEvents(ImmutableSet<DBUpdateEvent> events, EventQueueProcessingCallback callback) {
        ImmutableSetMultimap eventsByType = (ImmutableSetMultimap)events.stream().collect(ImmutableSetMultimap.toImmutableSetMultimap(DBUpdateEvent::getEventType, DBUpdateEvent::getItemId));
        Map asMap = Maps.transformValues((Map)eventsByType.asMap(), ImmutableSet::copyOf);
        asMap.forEach(callback::processEventsOfType);
    }

    private void reportStatistics(List<DBUpdateEvent> entries) {
        ImmutableList timestamps = FluentIterable.from(entries).transform((Function)new Function<DBUpdateEvent, DateTime>(){

            public DateTime apply(DBUpdateEvent input) {
                return new DateTime((Object)input.getCreatedAt());
            }
        }).toList();
        this._queueStatisticsService.itemsDequeued(this._queueName, (Iterable<DateTime>)timestamps);
    }

    @ParametersAreNonnullByDefault
    @FunctionalInterface
    public static interface EventQueueProcessingCallback {
        public void processEventsOfType(UpdateEventType var1, Set<ItemId> var2);
    }
}

