/*
 * Decompiled with CFR 0.152.
 */
package de.justsoftware.onx.migration.drive.impl;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.freiheit.toro.cache.ehcache.EhcacheClient;
import com.freiheit.toro.cache.memcached.MemcachedClientCache;
import com.google.common.annotations.VisibleForTesting;
import de.justsoftware.onx.migration.drive.DriveDocumentMigrationService;
import de.justsoftware.onx.migration.drive.MigratedObject;
import de.justsoftware.onx.migration.integration.persistence.MigratedItemDAO;
import java.io.IOException;
import java.io.UncheckedIOException;
import javax.annotation.CheckForNull;
import javax.annotation.Nullable;
import javax.annotation.ParametersAreNonnullByDefault;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;
import org.springframework.util.StopWatch;

@Service
@ParametersAreNonnullByDefault
public class DriveDocumentMigrationServiceImpl
implements DriveDocumentMigrationService {
    @VisibleForTesting
    static final String MIGRATION_TOPIC = "just.migration.documents";
    private static final Logger LOG = LoggerFactory.getLogger(DriveDocumentMigrationServiceImpl.class);
    private final ObjectMapper _mapper = new ObjectMapper();
    @CheckForNull
    private final Producer<String, String> _producer;
    private final MigratedItemDAO _migratedItemDAO;
    private final MemcachedClientCache _memcachedClient;
    private final EhcacheClient _ehcacheClient;

    @Autowired
    public DriveDocumentMigrationServiceImpl(@Nullable @Qualifier(value="KAFKA_STRING_PRODUCER") Producer<String, String> producer, MigratedItemDAO migratedItemDAO, MemcachedClientCache memcachedClient, EhcacheClient ehcacheClient) {
        this._producer = producer;
        this._migratedItemDAO = migratedItemDAO;
        this._memcachedClient = memcachedClient;
        this._ehcacheClient = ehcacheClient;
    }

    @Override
    public void publishMigratedObject(MigratedObject migratedObject) {
        try {
            String json = this._mapper.writeValueAsString((Object)migratedObject);
            if (this._producer == null) {
                throw new IllegalStateException("Kafka producer for sending migration data is null");
            }
            this._producer.send(new ProducerRecord(MIGRATION_TOPIC, (Object)migratedObject.getOriginId(), (Object)json));
            LOG.info(json);
        }
        catch (JsonProcessingException e) {
            throw new UncheckedIOException((IOException)((Object)e));
        }
    }

    @Override
    public void migrateDriveDocuments(StopWatch stopWatch) {
        stopWatch.start("migrating comments");
        int migratedCommments = this._migratedItemDAO.migrateComments();
        LOG.info("migrated " + migratedCommments + " comments");
        stopWatch.stop();
        stopWatch.start("migrating likes");
        int migratedLikes = this._migratedItemDAO.migrateLikes();
        LOG.info("migrated " + migratedLikes + " likes");
        stopWatch.stop();
        stopWatch.start("migrating subscriptions");
        int migratedSubscriptions = this._migratedItemDAO.migrateSubscriptions();
        LOG.info("migrated " + migratedSubscriptions + " subscriptions");
        stopWatch.stop();
        stopWatch.start("migrating attachments");
        int migratedAttachments = this._migratedItemDAO.migrateMicroblogAttachments();
        LOG.info("migrated " + migratedAttachments + " attachments");
        stopWatch.stop();
        LOG.info("flusing caches after migration");
        this._memcachedClient.flush();
        this._ehcacheClient.flush();
        LOG.info(stopWatch.prettyPrint());
    }

    @Override
    public boolean isReady() {
        return this._producer != null;
    }
}

