/*
 * Decompiled with CFR 0.152.
 */
package de.justsoftware.browserpushservice;

import de.justsoftware.browserpushclient.MessageSerde;
import de.justsoftware.browserpushclient.model.Message;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.UUID;
import javax.annotation.Nonnull;
import javax.annotation.ParametersAreNonnullByDefault;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.SmartLifecycle;
import org.springframework.stereotype.Component;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOptions;
import reactor.util.concurrent.Queues;
import reactor.util.retry.Retry;

@Component
@ParametersAreNonnullByDefault
public class IncomingMessageProcessor
implements SmartLifecycle {
    private static final Logger LOG = LoggerFactory.getLogger(IncomingMessageProcessor.class);
    private final Flux<Message> _broadcastStream;
    private final Sinks.ManyWithUpstream<Message> _messageStream = Sinks.unsafe().manyWithUpstream().multicastOnBackpressureBuffer(Queues.SMALL_BUFFER_SIZE, false);
    private Disposable _messageStreamDisposable;
    private boolean _started = false;
    private boolean _healthy = false;

    public IncomingMessageProcessor(@Value(value="${kafka.bootstrapServers}") String bootstrapServers) {
        this._broadcastStream = Flux.defer(() -> KafkaReceiver.create((ReceiverOptions)this.getReceiverOptions(bootstrapServers)).receive()).doOnError(error -> LOG.info("Got error in message streams - will retry: ", error)).retryWhen((Retry)Retry.backoff((long)Integer.MAX_VALUE, (Duration)Duration.ofSeconds(1L))).doOnCancel(() -> {
            LOG.error("Kafka stream was canceled. Marking service as unhealthy. Manual restart required.");
            this.markAsUnhealthy();
        }).doOnNext(record -> {
            record.receiverOffset().acknowledge();
            if (LOG.isDebugEnabled()) {
                LOG.debug("Kafka record with offset {} acknowledge.", (Object)record.offset());
            }
        }).filter(record -> record.value() != null).map(ConsumerRecord::value);
    }

    @Nonnull
    private ReceiverOptions<String, Message> getReceiverOptions(@Value(value="${kafka.bootstrapServers}") String bootstrapServers) {
        Properties consumerConfiguration = new Properties();
        consumerConfiguration.put("bootstrap.servers", bootstrapServers);
        consumerConfiguration.put("group.id", "browser-push-service-consumer-group-" + UUID.randomUUID());
        consumerConfiguration.put("key.deserializer", StringDeserializer.class);
        consumerConfiguration.put("value.deserializer", MessageSerde.class);
        consumerConfiguration.put("auto.offset.reset", "latest");
        consumerConfiguration.put("max.poll.records", "100");
        ReceiverOptions receiverOptions = ReceiverOptions.create((Properties)consumerConfiguration);
        return receiverOptions.subscription(Collections.singleton("just.browserpush.dispatch"));
    }

    @Nonnull
    public Flux<Message> getMessageStream() {
        return this._messageStream.asFlux();
    }

    public synchronized void start() {
        if (!this._started) {
            LOG.info("Subscribing to Broadcast Stream.");
            this._started = true;
            this.markAsHealthy();
            this._messageStreamDisposable = this._messageStream.subscribeTo((Publisher)this._broadcastStream);
        }
    }

    public synchronized boolean isRunning() {
        return this._started;
    }

    public synchronized boolean isHealthy() {
        return this._healthy;
    }

    private synchronized void markAsHealthy() {
        this._healthy = true;
    }

    private synchronized void markAsUnhealthy() {
        this._healthy = false;
    }

    public synchronized void stop() {
        if (this._started) {
            this._started = false;
            this.markAsUnhealthy();
            if (this._messageStreamDisposable != null) {
                this._messageStreamDisposable.dispose();
            }
            LOG.info("Disposed Message Stream.");
        }
    }
}

