/*
 * Decompiled with CFR 0.152.
 */
package de.justsoftware.onx.push.business;

import de.justsoftware.onx.push.business.PushNotificationQueue;
import de.justsoftware.onx.push.business.PushNotificationWriteDataService;
import de.justsoftware.onx.push.business.model.PushNotificationQueueEntry;
import de.justsoftware.onx.push.business.util.ExponentialBackoff;
import de.justsoftware.onx.push.shared.model.PushNotificationProtocol;
import de.justsoftware.onx.push.shared.model.PushTargetApp;
import de.justsoftware.pushnotification.client.model.PushNotificationResult;
import de.justsoftware.pushnotification.client.model.PushNotificationResults;
import java.time.Clock;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import javax.annotation.ParametersAreNonnullByDefault;

@ParametersAreNonnullByDefault
public abstract class AbstractPushNotificationQueueService<T> {
    private final PushNotificationQueue<T> _queue = new PushNotificationQueue();
    private final PushNotificationWriteDataService _pushNotificationWriteDataService;
    private final Clock _clock;
    private final int _maxLifetimeSeconds;
    private final ExponentialBackoff _backoff;
    private final PushNotificationProtocol _pushNotificationProtocol;
    private long _batchSize;

    public AbstractPushNotificationQueueService(PushNotificationWriteDataService pushNotificationWriteDataService, Clock clock, long batchSize, int maxLifetimeSeconds, int retryInitialIntervalSeconds, double retryBackoffMultiplier, PushNotificationProtocol pushNotificationProtocol) {
        this._pushNotificationWriteDataService = pushNotificationWriteDataService;
        this._clock = clock;
        this._batchSize = batchSize;
        this._maxLifetimeSeconds = maxLifetimeSeconds;
        this._backoff = new ExponentialBackoff((long)retryInitialIntervalSeconds * 1000L, retryBackoffMultiplier);
        this._pushNotificationProtocol = pushNotificationProtocol;
    }

    public void add(List<T> pushNotifications) {
        Instant now = this._clock.instant();
        pushNotifications.stream().map(pushNotification -> new PushNotificationQueueEntry<Object>(pushNotification, now)).forEach(this._queue::add);
    }

    public int queueSize() {
        return this._queue.size();
    }

    public synchronized void processQueue() {
        List<PushNotificationQueueEntry<T>> batch;
        while (!(batch = this.pollQueue()).isEmpty()) {
            this._queue.remove(batch);
            try {
                PushNotificationResults results = this.sendQueueEntries(batch);
                this.processResults(results, batch);
                continue;
            }
            catch (RuntimeException e) {
                this.addNotificationsForRetry(batch.stream());
                continue;
            }
            break;
        }
        return;
    }

    @Nonnull
    private List<PushNotificationQueueEntry<T>> pollQueue() {
        return this._queue.stream().filter(this::shouldBeProcessedNow).limit(this._batchSize).collect(Collectors.toList());
    }

    private boolean shouldBeProcessedNow(PushNotificationQueueEntry<T> queueEntry) {
        return queueEntry.getRetryCount() == 0 || queueEntry.getNextRetry().isBefore(this._clock.instant());
    }

    @Nonnull
    private PushNotificationResults sendQueueEntries(List<PushNotificationQueueEntry<T>> batch) {
        List pushNotifications = batch.stream().map(PushNotificationQueueEntry::getPushNotification).collect(Collectors.toList());
        return this.sendPushNotifications(pushNotifications);
    }

    @Nonnull
    protected abstract PushNotificationResults sendPushNotifications(List<T> var1);

    private void processResults(PushNotificationResults results, List<PushNotificationQueueEntry<T>> batch) {
        this._batchSize = results.getMaxNotificationCountPerRequest();
        this.deleteInvalidTokens(results.getNotificationResults(), batch);
        this.addSkippedNotifications(results.getNotificationResults(), batch);
        this.addFailedNotifications(results.getNotificationResults(), batch);
    }

    private void deleteInvalidTokens(Map<UUID, PushNotificationResult> notificationResults, List<PushNotificationQueueEntry<T>> batch) {
        List<PushTargetApp.Receiver> invalidReceivers = batch.stream().filter(queueEntry -> this.hasResult((PushNotificationQueueEntry<T>)queueEntry, notificationResults, PushNotificationResult.INVALID_TOKEN)).map(queueEntry -> new PushTargetApp.Receiver(this.token((PushNotificationQueueEntry<T>)queueEntry), this._pushNotificationProtocol)).collect(Collectors.toList());
        if (invalidReceivers.isEmpty()) {
            return;
        }
        this._pushNotificationWriteDataService.removePushTargetAppsByReceivers(invalidReceivers);
    }

    @Nonnull
    protected abstract String token(PushNotificationQueueEntry<T> var1);

    private boolean hasResult(PushNotificationQueueEntry<T> queueEntry, Map<UUID, PushNotificationResult> notificationResults, PushNotificationResult result) {
        return notificationResults.get(this.pushNotificationId(queueEntry)) == result;
    }

    @Nonnull
    protected abstract UUID pushNotificationId(PushNotificationQueueEntry<T> var1);

    private void addSkippedNotifications(Map<UUID, PushNotificationResult> notificationResults, List<PushNotificationQueueEntry<T>> batch) {
        batch.stream().filter(queueEntry -> this.hasResult((PushNotificationQueueEntry<T>)queueEntry, notificationResults, PushNotificationResult.SKIPPED)).forEach(this._queue::add);
    }

    private void addFailedNotifications(Map<UUID, PushNotificationResult> notificationResults, List<PushNotificationQueueEntry<T>> batch) {
        Stream<PushNotificationQueueEntry<T>> failedQueueEntryStream = batch.stream().filter(queueEntry -> this.hasResult((PushNotificationQueueEntry<T>)queueEntry, notificationResults, PushNotificationResult.ERROR));
        this.addNotificationsForRetry(failedQueueEntryStream);
    }

    private void addNotificationsForRetry(Stream<PushNotificationQueueEntry<T>> queueEntryStream) {
        queueEntryStream.filter(this::shouldBeProcessedAgain).map(this::updateRetryInformation).forEach(this._queue::add);
    }

    @Nonnull
    private PushNotificationQueueEntry<T> updateRetryInformation(PushNotificationQueueEntry<T> queueEntry) {
        int currentRetryCount = queueEntry.getRetryCount();
        return queueEntry.buildUpon().increaseRetryCount().addToNextRetry(this._backoff.nextInterval(currentRetryCount)).build();
    }

    private boolean shouldBeProcessedAgain(PushNotificationQueueEntry<T> queueEntry) {
        Instant oldestTimeToLive = this._clock.instant().minusSeconds(this._maxLifetimeSeconds);
        return queueEntry.getEntryTime().isAfter(oldestTimeToLive);
    }
}

