import {
  filter,
  map,
  merge,
  mergeMap,
  Observable,
  OperatorFunction,
  pipe,
  Subject,
  UnaryFunction,
} from "rxjs";
import initSqlJs, { Database, QueryExecResult } from "sql.js";
import { AppDb } from "./app-db";
import { AppFileSystem } from "./app-file-system";
import { ConflictFreeDbV2 } from "./conflict-free-db/conflict-free-db-v2";
import {
  Change,
  ConflictFreeDb,
  defaultIdName,
  Operation,
  PhysicalDb,
} from "./conflict-free-db/core";
import { MemoryVdb, VdbRegisters } from "./conflict-free-db/memory-vdb";
import { SqlDb } from "./conflict-free-db/sql-db";
import {
  dbPersistence,
  isSqlFile,
  parseEventChangeFile,
  sqlFileMimeType,
} from "./db-persistence";
import { FileChange, FileStore, ObservableFileStore } from "./file-store";
import { IdGeneratorByType } from "./id-generator";
import { Logger, makeTimer } from "./logging";
import { defaultSourceId, SourceStore } from "./source-store";
import { ChangeSetInfo } from "./sql-state";
import { DbSqlStateStore } from "./sql-state-store";
import { shortId } from "./uuid";

export function nextTimestamp(current?: number): number {
  const now = new Date().getTime();
  const currentTimestamp = current ?? 0;
  const timestamp = Math.max(now, currentTimestamp + 1);
  return timestamp;
}

export interface ChangeSet {
  fileId: string;
  md5Hash: string;
  changes: () => Change[];
}

class ChangeSetTracker {
  private fileIdToInfo: Record<string, ChangeSetInfo>;
  constructor(info: ChangeSetInfo[] = []) {
    this.fileIdToInfo = {};
    for (const record of info) {
      this.fileIdToInfo[record.fileId] = record;
    }
  }
  add(info: ChangeSetInfo) {
    this.fileIdToInfo[info.fileId] = {
      fileId: info.fileId,
      md5Hash: info.md5Hash,
    };
  }
  has(info: ChangeSetInfo) {
    const record = this.fileIdToInfo[info.fileId];
    return record?.md5Hash === info.md5Hash;
  }
  dump() {
    return Object.values(this.fileIdToInfo);
  }
}

export class AppSqlApi {
  private sharedDb: ConflictFreeDb;
  private privateDb: PhysicalDb;
  readonly onSnapshot: (logger: Logger) => Promise<void>;
  private localChangesSubject: Subject<Change> = new Subject();
  private remoteChangesSubject: Subject<Change> = new Subject();
  private changeSets: ChangeSetTracker;
  constructor(
    sharedDb: ConflictFreeDb,
    localDb: PhysicalDb,
    onSnapshot?: (logger: Logger) => Promise<void>,
    changeSets: ChangeSetTracker = new ChangeSetTracker()
  ) {
    this.privateDb = localDb;
    this.sharedDb = sharedDb;
    this.onSnapshot =
      onSnapshot ??
      (() => {
        return Promise.resolve();
      });
    this.changeSets = changeSets;
  }
  execPrivate(
    sql: string,
    params?: initSqlJs.BindParams | undefined
  ): QueryExecResult[] {
    return this.privateDb.exec(sql, params);
  }
  applyPrivateOp(operation: Operation) {
    operation.apply(this.privateDb);
  }
  queryShared(
    sql: string,
    params?: initSqlJs.BindParams | undefined
  ): QueryExecResult[] {
    return this.sharedDb.query(sql, params);
  }
  applyRemoteChangeSet(changeSet: ChangeSet) {
    if (this.changeSets.has(changeSet)) {
      return;
    }
    for (const change of changeSet.changes()) {
      this.applyRemoteChange(change);
    }
    this.changeSets.add(changeSet);
  }
  applyRemoteChange(change: Change) {
    if (this.sharedDb.apply(change)) {
      this.remoteChangesSubject.next(change);
    }
  }
  applyLocalOp(operation: Operation) {
    const maxTimestamp = this.sharedDb.virtualDb.getMaxTimestamp();
    const timestamp = nextTimestamp(maxTimestamp);
    const id = shortId();
    const change = { id, timestamp, operation };
    if (this.sharedDb.apply(change)) {
      this.localChangesSubject.next(change);
      return true;
    } else {
      return false;
    }
  }
  localChangeEvents() {
    return this.localChangesSubject.asObservable();
  }
  remoteChangeEvents() {
    return this.remoteChangesSubject.asObservable();
  }
  allChangeEvents() {
    return merge(this.localChangeEvents(), this.remoteChangeEvents());
  }
}

export function observeSqlChanges(
  sourceId: string,
  fileStore: FileStore,
  fileChanges: Observable<FileChange>
): Observable<ChangeSet> {
  return fileChanges.pipe(
    filter((change) => change.contentChanged),
    mergeMap(async (fileChange) => fileStore.get(fileChange.id)),
    filterUndefined(),
    filter((file) => file.source === sourceId),
    filter(isSqlFile),
    map((file) => {
      const data = file.data;
      if (!data) {
        throw new Error("Expected file data");
      }
      return {
        fileId: file.id,
        md5Hash: data.metadata.md5Checksum,
        changes: () => parseEventChangeFile(data.content),
      };
    })
  );
}

function filterUndefined<T>(): UnaryFunction<
  Observable<T | undefined>,
  Observable<T>
> {
  return pipe(
    filter((x) => x !== undefined) as OperatorFunction<T | undefined, T>
  );
}

export class DbManager {
  private dbs: { [sourceId: string]: AppSqlApi } = {};
  private fs: AppFileSystem;
  private sql: initSqlJs.SqlJsStatic;
  private logger: Logger;
  private sqlStateStore: DbSqlStateStore;
  constructor(
    fs: AppFileSystem,
    sql: initSqlJs.SqlJsStatic,
    sqlStateStore: DbSqlStateStore,
    logger: Logger
  ) {
    this.fs = fs;
    this.logger = logger;
    this.sql = sql;
    this.sqlStateStore = sqlStateStore;
  }
  getDb(sourceId: string) {
    return this.dbs[sourceId];
  }
  async createDb(sourceId: string) {
    if (this.dbs[sourceId]) {
      throw new Error("db already exists");
    }
    const db = await createDb(
      this.sql,
      sourceId,
      this.fs.files,
      this.fs.sources,
      this.sqlStateStore,
      this.fs.idGenerator
    );
    this.dbs[sourceId] = db;
    return db;
  }
  async deleteDb(sourceId: string) {
    if (!this.dbs[sourceId]) {
      throw new Error("store not found");
    }
    delete this.dbs[sourceId];
  }
}

async function initializeAppSqlApi(
  sourceId: string,
  sql: initSqlJs.SqlJsStatic,
  sqlStateStore: DbSqlStateStore
) {
  const sqlState = await sqlStateStore.get(sourceId);
  let registers: VdbRegisters;
  let physicalSqlDb: Database;
  let changeSetInfos: ChangeSetInfo[];
  if (sqlState) {
    registers = new VdbRegisters(sqlState.vdbRegisters);
    physicalSqlDb = new sql.Database(sqlState.sqlData);
    changeSetInfos = sqlState.changeSetInfos ?? [];
  } else {
    registers = new VdbRegisters();
    physicalSqlDb = new sql.Database();
    changeSetInfos = [];
  }
  const virtualDb = new MemoryVdb(registers);
  const physicalDb = new SqlDb(physicalSqlDb, defaultIdName);
  physicalSqlDb.exec(
    `PRAGMA page_size = 8192; PRAGMA journal_mode = OFF; PRAGMA synchronous = 0;
    PRAGMA cache_size = 1000000; PRAGMA locking_mode = EXCLUSIVE;
    PRAGMA temp_store = MEMORY;`
  );
  const cfdb = new ConflictFreeDbV2(virtualDb, physicalDb);
  const changeSets = new ChangeSetTracker(changeSetInfos);
  async function snapshot(logger: Logger) {
    const timestamp = registers.getMaxTimestamp();
    const currentTimestamp = await sqlStateStore.getTimestamp(sourceId);
    if (currentTimestamp === timestamp) {
      return;
    }
    const vdbRegisters = registers.dump();
    const sqlData = physicalSqlDb.export();
    logger(`Persisting sql changes to indexed db`);
    sqlStateStore.set(
      sourceId,
      { vdbRegisters, sqlData, changeSetInfos: changeSets.dump() },
      timestamp
    );
  }
  const db = new AppSqlApi(cfdb, physicalDb, snapshot, changeSets);
  return db;
}

export async function createDb(
  sql: initSqlJs.SqlJsStatic,
  sourceId: string,
  fileStore: ObservableFileStore,
  sourceStore: SourceStore,
  sqlStateStore: DbSqlStateStore,
  idGenerator: IdGeneratorByType
) {
  const db = await initializeAppSqlApi(sourceId, sql, sqlStateStore);
  await subscribeDb(sourceId, db, fileStore);
  const persist = dbPersistence(sourceId, sourceStore, fileStore, idGenerator);
  db.localChangeEvents().subscribe(serializeCalls(persist));
  return db;
}

function serializeCalls<T extends (...args: any[]) => Promise<any>>(f: T) {
  let queue: Promise<any> = Promise.resolve();
  return function (...args: Parameters<T>) {
    const res = queue.then(() => f(...args));
    queue = res.catch();
    return res;
  };
}

async function subscribeDb(
  sourceId: string,
  db: AppSqlApi,
  fileStore: ObservableFileStore
) {
  const existingFiles = new Subject<FileChange>();
  const fileChanges = merge(fileStore.observable, existingFiles);
  const dbChanges = observeSqlChanges(sourceId, fileStore, fileChanges);
  dbChanges.subscribe((changeSet) => db.applyRemoteChangeSet(changeSet));
  for (const id of await fileStore.listContentType(sqlFileMimeType)) {
    const file = await fileStore.get(id);
    if (file?.source === sourceId) {
      existingFiles.next({ id, contentChanged: true });
    }
  }
}

export async function initializeSqlDbs(
  db: AppDb,
  fs: AppFileSystem,
  logger: Logger
) {
  const sqlStateStore = new DbSqlStateStore(db);
  const timer = makeTimer(logger);
  const sql = await timer("initialize sql-wasm", async () =>
    initSqlJs({ locateFile: () => "/sql-wasm.wasm" })
  );
  const manager = new DbManager(fs, sql, sqlStateStore, logger);
  for (const sourceId of await fs.sources.list()) {
    if (sourceId === defaultSourceId) {
      continue;
    }
    await timer(`create db for source ${sourceId}`, () =>
      manager.createDb(sourceId)
    );
  }
  return manager;
}
