/*
 * Decompiled with CFR 0.152.
 */
package com.freiheit.toro.cache.memcached;

import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class KeyScheduler {
    private static final int NUM_WORKER_THREADS = 5;
    private static final Logger LOG = LoggerFactory.getLogger(KeyScheduler.class);
    private static final Logger QUEUE_WORKER_LOG = LoggerFactory.getLogger(QueueWorker.class);
    private final ConcurrentHashMap<String, TaskInfo> _taskMap = new ConcurrentHashMap();
    private ThreadPoolExecutor _executor = KeyScheduler.createExecutor();

    KeyScheduler() {
    }

    private static ThreadPoolExecutor createExecutor() {
        return (ThreadPoolExecutor)Executors.newFixedThreadPool(5, new ThreadFactoryBuilder().setNameFormat("KeyScheduler-%d").build());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public <T> Future<T> addTask(String key, Callable<T> callable) {
        FutureTask<T> task;
        TaskInfo taskInfo;
        boolean isDeleted;
        TaskInfo myTaskInfo = new TaskInfo();
        LOG.debug("Adding task for key [{}]", (Object)key);
        do {
            if ((taskInfo = this._taskMap.putIfAbsent(key, myTaskInfo)) == null) {
                LOG.trace("No tasks for [{}] yet", (Object)key);
                taskInfo = myTaskInfo;
            }
            taskInfo.lock();
            isDeleted = taskInfo.isDeleted();
            if (!isDeleted) continue;
            LOG.debug("aquired lock for [{}], but TaskInfo is marked as deleted, releasing lock and trying again", (Object)key);
            taskInfo.unlock();
        } while (isDeleted);
        LOG.trace("Got lock for [{}]", (Object)key);
        try {
            task = new FutureTask<T>(callable);
            ConcurrentLinkedQueue<FutureTask<?>> queue = taskInfo.getQueue();
            queue.add(task);
            LOG.trace("added task for [{}] to queue {}", (Object)key, queue);
        }
        finally {
            LOG.trace("releasing lock on [{}]", (Object)key);
            taskInfo.unlock();
        }
        if (taskInfo == myTaskInfo) {
            boolean success;
            LOG.trace("was first task for [{}]", (Object)key);
            do {
                try {
                    LOG.trace("Scheduling [{}]", (Object)key);
                    this._executor.execute(new QueueWorker(key, taskInfo));
                    success = true;
                }
                catch (RejectedExecutionException e) {
                    LOG.warn("Cannot add to executor, retrying", (Throwable)e);
                    success = false;
                }
            } while (!success);
        }
        return task;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void stopAllTasks() {
        this._executor.shutdownNow();
        for (Map.Entry<String, TaskInfo> entry : this._taskMap.entrySet()) {
            boolean holdingLock = false;
            TaskInfo taskInfo = entry.getValue();
            try {
                holdingLock = taskInfo.tryLock(100L, TimeUnit.MILLISECONDS);
            }
            catch (InterruptedException e) {
                LOG.warn("Interrupted", (Throwable)e);
                Thread.currentThread().interrupt();
                return;
            }
            try {
                taskInfo.markDeleted();
            }
            finally {
                if (!holdingLock) continue;
                taskInfo.unlock();
            }
        }
        this._taskMap.clear();
        this._executor = KeyScheduler.createExecutor();
    }

    public void shutdown() {
        MoreExecutors.shutdownAndAwaitTermination((ExecutorService)this._executor, (long)1L, (TimeUnit)TimeUnit.MINUTES);
    }

    private class QueueWorker
    implements Runnable {
        private final TaskInfo _taskInfo;
        private final String _keyName;
        private boolean _finished = false;

        public QueueWorker(String keyName, TaskInfo taskInfo) {
            this._keyName = keyName;
            this._taskInfo = taskInfo;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            try {
                QUEUE_WORKER_LOG.trace("Working on queue for key [{}]", (Object)this._keyName);
                if (this._taskInfo.isDeleted()) {
                    QUEUE_WORKER_LOG.debug("TaskInfo for key [{}] marked as deleted. Quitting", (Object)this._keyName);
                    return;
                }
                ConcurrentLinkedQueue<FutureTask<?>> queue = this._taskInfo.getQueue();
                for (int taskNum = 0; taskNum < 5; ++taskNum) {
                    FutureTask<?> task = queue.poll();
                    QUEUE_WORKER_LOG.trace("Running next task for key [{}] #{}", (Object)this._keyName, (Object)taskNum);
                    if (task == null) {
                        QUEUE_WORKER_LOG.trace("no more tasks");
                        break;
                    }
                    task.run();
                    QUEUE_WORKER_LOG.trace("#{} finished", (Object)taskNum);
                }
                QUEUE_WORKER_LOG.trace("Acquiring lock for [{}]", (Object)this._keyName);
                try {
                    this._taskInfo.lockInterruptibly();
                }
                catch (InterruptedException e) {
                    QUEUE_WORKER_LOG.warn("Interrupted", (Throwable)e);
                    Thread.currentThread().interrupt();
                    if (!this._finished) {
                        QUEUE_WORKER_LOG.trace("Key [{}] still has tasks. Rescheduling", (Object)this._keyName);
                        try {
                            KeyScheduler.this._executor.execute(this);
                        }
                        catch (RejectedExecutionException e2) {
                            QUEUE_WORKER_LOG.warn("Cannot add to executor! Assuming I'm in a dead executor. Dying", (Throwable)e2);
                        }
                    }
                    return;
                }
                QUEUE_WORKER_LOG.trace("Acquired lock for [{}]", (Object)this._keyName);
                try {
                    if (queue.isEmpty()) {
                        QUEUE_WORKER_LOG.trace("Key [{}] with queue {} has no more tasks. Removing from key queue", (Object)this._keyName, queue);
                        this._taskInfo.markDeleted();
                        KeyScheduler.this._taskMap.remove(this._keyName);
                        this._finished = true;
                    }
                }
                finally {
                    this._taskInfo.unlock();
                }
            }
            finally {
                if (!this._finished) {
                    QUEUE_WORKER_LOG.trace("Key [{}] still has tasks. Rescheduling", (Object)this._keyName);
                    try {
                        KeyScheduler.this._executor.execute(this);
                    }
                    catch (RejectedExecutionException e) {
                        QUEUE_WORKER_LOG.warn("Cannot add to executor! Assuming I'm in a dead executor. Dying", (Throwable)e);
                    }
                }
            }
        }
    }

    private static class TaskInfo
    implements Lock {
        private final Lock _lock = new ReentrantLock();
        private boolean _deleted = false;
        private final ConcurrentLinkedQueue<FutureTask<?>> _queue = new ConcurrentLinkedQueue();

        @Override
        public void lock() {
            this._lock.lock();
        }

        @Override
        public void lockInterruptibly() throws InterruptedException {
            this._lock.lockInterruptibly();
        }

        @Override
        public Condition newCondition() {
            return this._lock.newCondition();
        }

        @Override
        public boolean tryLock() {
            return this._lock.tryLock();
        }

        @Override
        public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
            return this._lock.tryLock(time, unit);
        }

        @Override
        public void unlock() {
            this._lock.unlock();
        }

        public boolean isDeleted() {
            return this._deleted;
        }

        public void markDeleted() {
            this._deleted = true;
        }

        public ConcurrentLinkedQueue<FutureTask<?>> getQueue() {
            return this._queue;
        }
    }
}

