import { isDelegate, plugins } from '@dabble/app';
import { Change } from '@dabble/data/collective/changes';
import { ChangeProjectEvent, Collective, CommittedChangesEvent } from '@dabble/data/collective/collective';
import PromiseQueue from '@dabble/data/collective/promise-queue';
import { BATCH_SIZE, ConnectionStore, Wheres } from '@dabble/data/connection';
import { LoadedProjectsStore } from '@dabble/data/stores/loaded-projects';
import { ProjectMetasStore } from '@dabble/data/stores/project-metas';
import { Readable, Unsubscriber, readable } from '@dabble/data/stores/store';
import { UserProjectsStore } from '@dabble/data/stores/user-projects';
import { DabbleDatabase, ProjectSnapshot } from '@dabble/data/types';
import logging from '@dabble/util/log';
import { isEqual } from 'typewriter-editor';
import { SyncingStore } from '../stores/syncing';
import { ErrorReporter, SyncStep } from '../types';
import syncLastModifiedStep from './last-modified';

const log = logging.tagColor('Sync', '#36B1BF');
const SNAPSHOT_INTERVAL = 200;

export interface ProjectSyncStep extends SyncStep {
  stop(): void;
}

const projectSaveFunctions = new Map<string, () => Promise<void>>();
plugins.register({ projectSaveFunctions: readable(projectSaveFunctions) });

interface ProjectStepData {
  db: DabbleDatabase;
  collective: Collective;
  connection: ConnectionStore;
  fromRemote: Set<any>;
  syncing: SyncingStore;
  userProjects: UserProjectsStore;
  projectMetas: ProjectMetasStore;
  loadedProjects: LoadedProjectsStore;
  currentUser: Readable<any>;
}

export default function projectStep(projectId: string, data: ProjectStepData): ProjectSyncStep {
  const { collective, connection, syncing, projectMetas, loadedProjects } = data;
  let stopFn: Unsubscriber;
  let hasPermissions = false;
  const userDocsStep = syncLastModifiedStep('user_docs', data, { filters: [['projectId', '==', projectId]] });
  // Ensure this is only being called once at a time
  const commitChangesQueue = new PromiseQueue(1, 2);

  return { sync, start, stop, name: `projectStep:${projectId}` };

  async function sync() {
    //*** 1. Grab remote project, creating it if it does not exist yet

    hasPermissions = !!projectMetas.get()[projectId];

    if (!hasPermissions) return;

    //*** 2. Sync down user docs for this project, don't wait for this while the rest continue

    let userDocsSyncing: Promise<void>;
    if (!isDelegate || !loadedProjects.contains(projectId)) {
      userDocsSyncing = userDocsStep.sync();
    }

    //*** 3. Get all remote changes and commit them locally.

    await retrieveRemoteChanges();

    //*** 4. Get all uncommitted local changes and save them remotely.

    await commitChangesQueue.add(commitLocalChanges);

    //*** 5. Wait for user docs to finish syncing

    await userDocsSyncing;

    loadedProjects.mark(projectId, true);
  }

  function start(report: ErrorReporter) {
    if (!hasPermissions) return () => {};

    projectSaveFunctions.set(projectId, save);
    const remoteCancel = connection.listen(async ({ collection, data }) => {
      if (collection !== 'project_changes' || data.projectId !== projectId) return;
      const change = data as Change;
      try {
        await receiveRemoteChanges([change]);
      } catch (err) {
        report(err);
      }
    });

    // When a local change gets committed that matches a snapshot, upload the snapshot to the server
    async function onCommittedChanges({ changes }: CommittedChangesEvent) {
      if (!changes.length || changes[0].projectId !== projectId) return;
      const snapshotChanges = changes.filter(change => change.version % SNAPSHOT_INTERVAL === 0);

      if (snapshotChanges.length) {
        syncing.up(true);
        try {
          await Promise.all(
            snapshotChanges.map(async change => {
              const snapshot = await collective.store.getSnapshot(change.projectId, change.version);
              await connection.putDoc('project_snapshots', snapshot);
            })
          );
        } catch (err) {
          report(err);
        }
        syncing.up(false);
      }
    }

    function save() {
      return commitChangesQueue.add(() => commitLocalChanges().catch(report));
    }

    function onChangeProject({ change }: ChangeProjectEvent) {
      if (change.projectId !== projectId) return;
      save();
    }

    let userDocsUnsubscribe: Unsubscriber;
    if (!isDelegate) {
      userDocsUnsubscribe = userDocsStep.start(report);
    }
    collective.on('committedChanges', onCommittedChanges);
    collective.on('changeProject', onChangeProject);

    return (stopFn = () => {
      projectSaveFunctions.delete(projectId);
      remoteCancel();
      if (userDocsUnsubscribe) userDocsUnsubscribe();
      collective.off('committedChanges', onCommittedChanges);
      collective.off('changeProject', onChangeProject);
    });
  }

  function stop() {
    if (stopFn) {
      stopFn();
      stopFn = null;
    }
  }

  /**
   * Grab uncommitted changes from the server and commit them.
   */
  async function retrieveRemoteChanges(changesSince?: number) {
    if (changesSince === undefined) changesSince = await getLastCommittedVersion();
    if (!changesSince && changesSince !== 0) changesSince = -1;

    // First sync should pull grab the latest snapshot instead of filling in from the beginning
    if (changesSince === -1) {
      const snapshot = await getLastRemoteSnapshot();
      if (snapshot) {
        changesSince = snapshot.version;
        await collective.store.putSnapshot(snapshot);
      }
    }

    const remoteChanges = await getRemoteChanges(changesSince);

    // Get the changes since last the committed were received
    await receiveRemoteChanges(remoteChanges, true);
  }

  /**
   * Check for remote changes and receive them into our data.
   */
  async function receiveRemoteChanges(remoteChanges: Change[], afterRetrieve?: boolean) {
    if (remoteChanges.length) {
      syncing.down(true);
      try {
        await collective.receiveChanges(projectId, remoteChanges);
      } catch (err) {
        if (!afterRetrieve) {
          return retrieveRemoteChanges();
        }
        syncing.down(false);
        console.log('Error syncing project:', projectId);
        syncing.error(err);
        throw err;
      }
      syncing.down(false);
    }
  }

  /**
   * Get the last committed change version.
   */
  function getLastCommittedVersion() {
    return getLastCommittedChange().then(change => change && change.version);
  }

  /**
   * Get the last committed change.
   */
  function getLastCommittedChange() {
    return collective.store.getChange(projectId, { committed: true, reverse: true });
  }

  /**
   * Get all uncommitted local changes for the project.
   */
  function getUncommittedChanges() {
    return collective.store.getChanges(projectId, { committed: false });
  }

  /**
   * Get the last snapshot.
   */
  async function getLastRemoteSnapshot() {
    const wheres: Wheres = [
      ['id', '==', projectId],
      ['orderBy', 'version', 'desc'],
      ['limit', 1],
    ];
    return (await connection.getDocs('project_snapshots', wheres)).pop() as ProjectSnapshot;
  }

  /**
   * Gets all remote changes after the given version number.
   */
  async function getRemoteChanges(since: number) {
    const wheres: Wheres = [
      ['projectId', '==', projectId],
      ['version', '>', since],
      ['orderBy', 'version'],
    ];
    return connection.getDocs('project_changes', wheres) as Promise<Change[]>;
  }

  /**
   * Commit local changes to remote.
   */
  async function commitLocalChanges(lastChanges?: Change[]) {
    const changes = await getUncommittedChanges();

    if (!changes.length) return;

    if (lastChanges && isEqual(lastChanges, changes)) {
      const err = new Error('Retrieving remote changes did not help sync');
      console.log('Error syncing project:', projectId);
      syncing.error(err);
      throw err;
    }

    syncing.up(true);

    log('Saving local changes to remote:', changes);

    // const changesRef = connection.getDocs('project_changes');

    for (let i = 0; i < changes.length; i += BATCH_SIZE) {
      const batch = changes.slice(i, i + BATCH_SIZE);
      try {
        const committed = await connection.putDocs('project_changes', batch);
        batch.forEach(change => (change.committed = committed));
        await collective.receiveChanges(projectId, batch);
        log('Saved local changes');
      } catch (err) {
        syncing.up(false);

        if (
          err.message === 'VERSION_EXISTS' ||
          err.message === 'PREV_ID_INCORRECT' ||
          err.message === 'INVALID_VERSION'
        ) {
          log('Insufficient permissions, will try again.');
          // Stop processing now and retry after grabbing the latest remote changes
          retrieveRemoteChanges(batch[0].version - 1).then(() => commitLocalChanges(changes));
          return;
        } else {
          console.log('Error syncing project:', projectId);
          syncing.error(err);
          throw err;
        }
      }
    }

    syncing.up(false);
  }
}
