/*
 * Decompiled with CFR 0.152.
 */
package de.justsoftware.browserpushservice.controller;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import de.justsoftware.browserpushclient.model.Message;
import de.justsoftware.browserpushservice.IncomingMessageProcessor;
import de.justsoftware.browserpushservice.SSEMessageSerializer;
import de.justsoftware.browserpushservice.model.ProfileId;
import de.justsoftware.browserpushservice.model.SSEMessage;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.MeterRegistry;
import jakarta.annotation.PreDestroy;
import java.time.Duration;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Predicate;
import java.util.function.Supplier;
import javax.annotation.Nonnull;
import javax.annotation.ParametersAreNonnullByDefault;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.CacheControl;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

/*
 * Exception performing whole class analysis ignored.
 */
@ParametersAreNonnullByDefault
@RestController
public class SSEController {
    static final Message PING_MESSAGE = new Message("system-control", (Set)ImmutableSet.of(), "ping", null);
    static final String PING_STRING = SSEController.toSSEMessageString((Message)PING_MESSAGE);
    private static final Logger LOG = LoggerFactory.getLogger(SSEController.class);
    private final IncomingMessageProcessor _incomingMessageProcessor;
    private final AtomicLong _subscriptions;
    private final Supplier<Flux<String>> _pingSupplier;
    private final Scheduler _seeConnectionScheduler;

    @Autowired
    public SSEController(IncomingMessageProcessor incomingMessageProcessor, MeterRegistry meterRegistry, @Value(value="${browser-push.pingInterval:60}") long pingInterval, @Value(value="${browser-push.sse.threadPoolSize:0}") int threadPoolSize) throws IllegalArgumentException {
        this(incomingMessageProcessor, meterRegistry, () -> Flux.interval((Duration)Duration.ofSeconds(pingInterval), (Duration)Duration.ofSeconds(pingInterval)).map(i -> PING_STRING), threadPoolSize > 0 ? threadPoolSize : Schedulers.DEFAULT_POOL_SIZE);
    }

    @VisibleForTesting
    SSEController(IncomingMessageProcessor incomingMessageProcessor, MeterRegistry meterRegistry, Supplier<Flux<String>> pingSupplier, int threadPoolSize) {
        this._incomingMessageProcessor = incomingMessageProcessor;
        this._subscriptions = new AtomicLong(0L);
        this._pingSupplier = pingSupplier;
        this._seeConnectionScheduler = Schedulers.newParallel((String)"sse-connection", (int)threadPoolSize);
        Gauge.builder((String)"sse.message.subscriptions", (Object)this._subscriptions, AtomicLong::get).description("Count of subscriptions on message streams").baseUnit("Subscription").register(meterRegistry);
    }

    @GetMapping(value={"/messages"}, produces={"text/event-stream"})
    @ResponseBody
    @Nonnull
    public ResponseEntity<Flux<String>> messages(ProfileId profileId) {
        return ((ResponseEntity.BodyBuilder)ResponseEntity.ok().cacheControl(CacheControl.noStore())).body((Object)this._incomingMessageProcessor.getMessageStream().publishOn(this._seeConnectionScheduler).filter(SSEController.forReceiver((ProfileId)profileId)).map(SSEController::toSSEMessageString).transformDeferred(arg_0 -> this.withPing(arg_0)).doOnSubscribe(__ -> this.incrementSubscriptions()).doFinally(__ -> this.decrementSubscriptions()));
    }

    @Nonnull
    private Flux<String> withPing(Flux<String> upstream) {
        Flux pings = (Flux)this._pingSupplier.get();
        return upstream.startWith((Object[])new String[]{PING_STRING}).switchMap(message -> Flux.just((Object)message).mergeWith((Publisher)pings));
    }

    @Nonnull
    private static Predicate<Message> forReceiver(ProfileId profileId) {
        String profileIdString = profileId.asString();
        return msg -> msg.getReceivers().contains((Object)profileIdString);
    }

    @Nonnull
    private static String toSSEMessageString(Message msg) {
        return SSEMessageSerializer.INSTANCE.serialize(new SSEMessage(msg.getPushTopic(), msg.getType(), msg.getPayload()));
    }

    private void incrementSubscriptions() {
        this._subscriptions.incrementAndGet();
        LOG.debug("New Subscriber added (total subscriber = {})", (Object)this._subscriptions.get());
    }

    private void decrementSubscriptions() {
        this._subscriptions.decrementAndGet();
        LOG.debug("Subscriber removed (total subscriber = {})", (Object)this._subscriptions.get());
    }

    @PreDestroy
    public void onShutdown() {
        LOG.info("Dispose sse connection scheduler on shutdown");
        this._seeConnectionScheduler.dispose();
    }
}

