// @flow
import CancelablePromise from "bluebird";
import { sortBy, reduce, filter } from "lodash";
import TinyQueue from "tinyqueue";
import uuid from "uuid";
import { UserCanceledError } from "core/errors";
import window from "core/global/window";
import createLogger from "core/lib/logger";
import { timeout } from "./promise";

type JobParams<T> = {
  name: string,
  priority?: number,
  timeout?: number,
  method: () => CancelablePromise<T> | Promise<T>,
};

type QueueParams = {
  id: string,
  defaults?: JobParams<*>,
};

export class Job<T> {
  static PRIORITY_HIGH = 100;
  static PRIORITY_MEDIUM = 50;
  static PRIORITY_LOW = 25;
  static PRIORITY_BACKGROUND = 1;
  static DEFAULT_TIMEOUT = 60 * 60 * 1000; // 1 hour

  id: string;
  name: string;
  priority: number;
  timeout: number;
  method: () => CancelablePromise<T> | Promise<T>;
  onCancel: (fn?: () => void) => void;
  createdAt: Date;
  startedAt: ?Date;
  completedAt: ?Date;
  canceledAt: ?Date;
  error: ?Error;
  promise: CancelablePromise<T>;
  resolve: (T) => void;
  reject: (Error) => void;

  // Explicit return type because of this flow issue: https://github.com/facebook/flow/issues/6400
  // eslint-disable-next-line no-use-before-define
  constructor(params: JobParams<T>): Job<T> {
    this.id = uuid.v4();
    this.name = params.name;
    this.priority = params.priority || Job.PRIORITY_HIGH;
    this.timeout = params.timeout || Job.DEFAULT_TIMEOUT;
    this.method = params.method;
    this.onCancel = () => {};
    this.createdAt = new Date();

    this.promise = new CancelablePromise((resolve, reject, onCancel) => {
      this.resolve = resolve;
      this.reject = reject;

      // this is for flow as Bluebird is typed as onCancel being optional
      // as it can be disabled in configuration
      if (onCancel) {
        this.onCancel = onCancel;
      }
    });

    if (typeof this.method !== "function") {
      throw new Error("Job must have a method parameter");
    }

    if (this.priority > 100) {
      throw new Error("Priority must be between 1 - 100");
    }

    return this;
  }

  get status(): "idle" | "active" | "complete" | "canceled" {
    if (this.canceledAt) {
      return "canceled";
    }
    if (this.completedAt) {
      return "complete";
    }
    if (this.startedAt) {
      return "active";
    }
    return "idle";
  }

  run() {
    // eslint-disable-next-line no-async-promise-executor
    return new Promise(async (resolve, reject) => {
      if (this.status === "active" || this.status === "canceled") {
        return this.promise;
      }

      this.startedAt = new Date();
      const resultPromise = this.method();

      // cancel the underlying method call if possible
      this.onCancel(() => {
        this.completedAt = new Date();
        this.canceledAt = new Date();
        reject(new UserCanceledError("Job was canceled"));

        // $FlowFixMe
        if (resultPromise.cancel) {
          resultPromise.cancel();
        }
      });

      timeout(resultPromise, this.timeout, "Job timed out")
        .then(this.resolve)
        .catch((err) => {
          this.error = err;
          this.reject(err);

          // If the job timed out then we can go ahead and cancel it the promise
          // which will kill the underlying CLI process.
          if (err.message === "Job timed out") {
            // $FlowFixMe
            if (resultPromise.cancel) {
              resultPromise.cancel();
            }
          }
        })
        .finally(() => {
          this.completedAt = new Date();
        });

      try {
        const result = await this.promise;
        resolve(result);
      } catch (error) {
        reject(error);
      }
    });
  }
}

class SubQueue extends TinyQueue {
  id: string;
  defaults: ?JobParams<*>;
  currentJob: ?Job<*>;
  log: (...args: any[]) => void;

  constructor(params: QueueParams) {
    super([], function comparator(a, b) {
      return b.priority - a.priority;
    });

    if (!params.id) {
      throw new Error("Queue must have an id");
    }

    this.id = params.id;
    this.defaults = params.defaults;
    this.logger = createLogger(`queue:${this.id}`);
    this.logger.log("created");
  }

  get status(): "idle" | "active" {
    if (this.currentJob) {
      return "active";
    }
    return "idle";
  }

  push = (job: *) => {
    this.logger.log(`enqueued ${job.name}`, job.id);
    super.push(job);
  };

  run = async () => {
    if (!this.length) {
      return;
    }

    try {
      // get the next job to be processed
      const job: Job<*> = this.pop();
      const queuedMs = new Date() - job.createdAt;
      this.currentJob = job;
      this.logger.log(
        `starting ${job.name} (queued for ${queuedMs}ms)`,
        job.id
      );
      const response = await job.run();
      this.logger.log(`completed ${job.name}`, job.id);
      return response;
    } catch (err) {
      if (this.currentJob) {
        this.logger.error(`failed ${this.currentJob.name}`, this.currentJob.id);
      }

      throw err;
    } finally {
      this.currentJob = undefined;
    }
  };
}

class Queue {
  static MAX_CONCURRENCY = 6;

  queues: {
    [name: string]: SubQueue,
  } = {};

  find(id: string) {
    return this.queues[id];
  }

  // find or create a queue based on given params
  findOrCreate(params: QueueParams) {
    let queue = this.find(params.id);
    if (queue) {
      return queue;
    }

    queue = new SubQueue(params);
    this.queues[params.id] = queue;
    return queue;
  }

  enqueueJob<T>(queueId: string, params: JobParams<T>): CancelablePromise<T> {
    const queue = this.findOrCreate({ id: queueId });
    const job: Job<T> = new Job(params);
    queue.push(job);
    return job.promise;
  }

  runJob<T>(queueId: string, params: JobParams<T>): CancelablePromise<T> {
    const promise = this.enqueueJob(queueId, params);
    window.requestIdleCallback(
      () => {
        this.run();
      },
      {
        timeout: 500,
      }
    );
    return promise;
  }

  getActiveQueueCount() {
    return filter(this.queues, (queue) => queue.status === "active").length;
  }

  // run the next job based on priority across all subqueues
  run = async () => {
    // find inactive queues. Because queues are serial, anything with
    // an existing job running won't be considered.
    const idle = filter(this.queues, (queue) => queue.status === "idle");

    // find the next job in line for each queue and check there is actually
    // something to run
    const pending = reduce(
      idle,
      (sum, queue) => {
        const nextJob = queue.peek();
        if (!nextJob) {
          return sum;
        }

        return [
          ...sum,
          {
            queue,
            nextJob,
          },
        ];
      },
      []
    );
    if (!pending.length) {
      return;
    }

    // sort jobs across queues by priority and time queued, run the first one
    const orderedPending = sortBy(pending, (item) => [
      100 - item.nextJob.priority,
      item.nextJob.createdAt,
    ]);

    const { queue, nextJob } = orderedPending[0];

    // check we're not already at capacity for the amount of jobs
    // we want to have running at once (High priority jobs are an exception)
    if (
      nextJob.priority !== Job.PRIORITY_HIGH &&
      this.getActiveQueueCount() >= Queue.MAX_CONCURRENCY
    ) {
      return;
    }

    try {
      await queue.run();
    } finally {
      // to make sure we process the queue as fast as possible, when a job is
      // complete immediately try and process another until there are none left
      this.run();
    }
  };
}

export default new Queue();
