import { DirtyRaw, Q } from "@nozbe/watermelondb"
import {
  SyncDatabaseChangeSet,
  SyncPullArgs,
  SyncPushArgs,
  hasUnsyncedChanges,
  synchronize,
} from "@nozbe/watermelondb/sync"

import { Event } from "@treefort/lib/authenticator"
import { benchmark } from "@treefort/lib/debug/decorators"
import { WatermelonError } from "@treefort/lib/errors"
import { isSuccessStatus } from "@treefort/lib/is-success-status"
import { Semaphore } from "@treefort/lib/semaphore"

import api from "../lib/api"
import authenticator from "../lib/authenticator"
import { debug as appDebug, isAxiosNetworkError } from "../lib/logging"
import { logError } from "../lib/logging"
import { getClient } from "./db"
import { CREATED_LOCALLY_FIELD_NAME } from "./models/source-aware-model"
import { SYNC_IGNORE_PREFIX } from "./stores/types"

const syncDebug = appDebug.extend("sync")
const syncDebugVerbose = syncDebug.extend("verbose")
const benchmarkDebug = syncDebug.extend("benchmark")

class UserNotLoggedInError extends WatermelonError {
  constructor() {
    super("Attempted sync when user not logged in; aborting")
  }
}

// This in-memory set of locally-created IDs is to cover the following edge case:
// If you create a record, sync (thereby pushing it to server), then delete the
// record before your next sync, then the record pops back in as created. This is because
// the row in the local database (where we are persisting the "created locally" state)
// has been marked deleted, so we can't easily fetch it to exclude is as having
// been created locally. Watermelon sync logic, when it receives the created record
// from the server, resurrects the local soft-deleted record and updates it with
// the newly-received data.
//
// Conversely, if we were to _only_ store the IDs in memory in this
// set, and the app gets killed in that in-between-syncs twilight zone, then the record
// will likewise show up as created again on the subsequent sync.
const locallyCreatedIds = new Set<string>()

async function retryOnce<T>(action: () => Promise<T>) {
  try {
    return await action()
  } catch {
    syncDebug("Retrying sync after failure")
    return await action()
  }
}

const isLocalField = (fieldName: string) =>
  fieldName.startsWith(SYNC_IGNORE_PREFIX)

function omitLocalOnlyFields(row: DirtyRaw) {
  return Object.fromEntries(
    Object.entries(row).filter(([fieldName]) => !isLocalField(fieldName)),
  )
}

function omitOtherUserRows(userId: string) {
  return (rows: DirtyRaw[]) => rows.filter((row) => row["userId"] === userId)
}

async function filterAndAssertNoMismatchedUserRecords(
  userId: string,
  changes: SyncDatabaseChangeSet,
) {
  const filtered = await filterChangeSet(changes, {
    rowFilters: [omitOtherUserRows(userId)],
    columnFilters: [],
  })

  for (const table of Object.keys(changes)) {
    for (const collection of ["created", "updated"] as const) {
      if (
        filtered[table][collection].length !== changes[table][collection].length
      ) {
        logError(
          new WatermelonError("App received mismatched user sync records!", {
            cause: new Error("Mismatched user ID on incoming sync record", {
              cause: {
                table,
                incomingChanges: changes,
                authenticatedUserId: userId,
              },
            }),
          }),
        )
      }
    }
  }

  return filtered
}

async function filterChangeSet(
  changeset: SyncDatabaseChangeSet,
  {
    rowFilters = [],
    columnFilters = [],
  }: {
    rowFilters?: ((rows: DirtyRaw[]) => DirtyRaw[])[]
    columnFilters?: ((row: DirtyRaw) => DirtyRaw)[]
  },
) {
  function applyRowFilters(rows: DirtyRaw[]) {
    return rowFilters.reduce((finalResult, filter) => filter(finalResult), rows)
  }

  function applyColumnFilters(row: DirtyRaw) {
    return columnFilters.reduce((row, filter) => filter(row), row)
  }

  return Object.entries(changeset)
    .map((entry) => {
      return {
        tableName: entry[0],
        changeset: {
          created: applyRowFilters(entry[1].created).map(applyColumnFilters),
          updated: applyRowFilters(entry[1].updated).map(applyColumnFilters),
          deleted: entry[1].deleted,
        },
      }
    })
    .reduce<SyncDatabaseChangeSet>(
      (syncChangeset, tableChangesetPair) => (
        (syncChangeset[tableChangesetPair.tableName] =
          tableChangesetPair.changeset),
        syncChangeset
      ),
      {},
    )
}

// Given a changeset, for each collection in it, change the column names
// for each created/updated row using the supplied mappings object.
//
// This was introduced to work around a limitation in WatermelonDB's model
// code, where it assumes that if a model has a `createdAt` or `updatedAt`
// field, then there must be a corresponding snake_cased `created_at`/`updated_at`
// column in the database. (This is part of its automatic create date/update date
// handling logic.) Because we want to use camelCased database fields, and
// we want to handle created/updated dates ourselves, we hacked around this by
// renaming the fields on the WatermelonDB model to `createdAtDate` and `updatedAtDate`.
// This function keeps the rename logic on the app side so that our API can
// continue to return "normal" column names that match what's in the database.
//
// We don't currently remap the names being sent back to the server because
// the API disregards client-side timestamps anyway.
function remapColumnNames(
  changeset: SyncDatabaseChangeSet,
  fieldNameMappings: [string, string][],
) {
  const remap = (rows: DirtyRaw[]) => {
    return rows.map((row) => {
      const remappedObject = {
        ...row,
      }

      for (const [oldFieldName, newFieldName] of fieldNameMappings) {
        if (!(oldFieldName in remappedObject)) {
          continue
        }

        remappedObject[newFieldName] = remappedObject[oldFieldName]
        delete remappedObject[oldFieldName]
      }

      return remappedObject
    })
  }

  return Object.fromEntries(
    Object.entries(changeset).map(([collectionName, changeset]) => {
      return [
        collectionName,
        {
          created: remap(changeset.created),
          updated: remap(changeset.updated),
          deleted: changeset.deleted,
        },
      ]
    }),
  )
}

const syncSemaphore = new Semaphore(1)

async function syncOfflineDatabase() {
  const lock = syncSemaphore.tryEnter()
  if (!lock) {
    syncDebug("Sync already in progress! Aborting.")
    return
  }

  const database = getClient()

  try {
    const omitLocallyCreatedRows = async (changeset: SyncDatabaseChangeSet) => {
      const result = await Promise.all(
        Object.keys(changeset).map(async (collectionName) => {
          syncDebug(
            "Checking for local %s changes",
            collectionName,
            changeset[collectionName],
          )

          const collection = database.get(collectionName)

          if (!collection || changeset[collectionName].created.length === 0) {
            syncDebug(
              "No locally-created records detected; returning unmodified changeset.",
            )
            return { [collectionName]: changeset[collectionName] }
          }

          syncDebug("Fetching locally-created %s record ids", collectionName)

          // Check for records that exist in the local database whose syncIgnore_createdLocally field is marked true
          // That means those records were created on this device and we can omit them from the response
          // NOTE: This assumes all collections have the syncIgnore_createdLocally column
          const locallyCreatedRecordIds = new Set(
            await collection
              .query(
                Q.where(
                  "id",
                  Q.oneOf(
                    changeset[collectionName].created.map(
                      (row: DirtyRaw) => row.id,
                    ),
                  ),
                ),
                Q.where(CREATED_LOCALLY_FIELD_NAME, true),
              )
              .fetchIds(),
          )

          syncDebug("Fetched locally-created ids", locallyCreatedRecordIds)

          return {
            [collectionName]: {
              created: changeset[collectionName].created.filter((row) => {
                // Check both the in-memory store and the field in the local DB
                if (locallyCreatedIds.has(row.id)) {
                  locallyCreatedIds.delete(row.id)
                  return false
                }

                return !locallyCreatedRecordIds.has(row.id)
              }),
              updated: changeset[collectionName].updated,
              deleted: changeset[collectionName].deleted,
            },
          }
        }),
      )

      return result.reduce((agg, coll) => Object.assign(agg, coll), {})
    }

    // Watermelon docs recommend a sync retry on any sync failure,
    // which gives the client an opportunity to resolve conflicts that
    // might have occurred on the initial sync.
    await retryOnce(() =>
      synchronize({
        database,
        pullChanges: async ({
          lastPulledAt,
          schemaVersion,
          migration,
        }: SyncPullArgs) => {
          syncDebug("Pulling changes", {
            lastPulledAt,
            schemaVersion,
            migration,
          })

          const userId = authenticator.getUser()?.id

          if (!userId) {
            throw new UserNotLoggedInError()
          }

          const params = {
            lastPulledAt,
            schemaVersion,
            migration: migration
              ? encodeURIComponent(JSON.stringify(migration))
              : undefined,
          }

          const response = await api.get(`/watermelon/sync`, { params })

          syncDebugVerbose("Received server response:", response)

          if (!isSuccessStatus(response.status)) {
            throw new WatermelonError(
              `${response.status} ${response.statusText}`,
            )
          }

          const { timestamp } = response.data

          const changes = await filterAndAssertNoMismatchedUserRecords(
            userId,
            response.data.changes,
          )

          const remoteChanges = remapColumnNames(
            await omitLocallyCreatedRows(changes),
            [
              ["createdAt", "createdAtDate"],
              ["updatedAt", "updatedAtDate"],
            ],
          )

          syncDebugVerbose(
            "Returning remapped remote changes after omitting locally-created rows",
            remoteChanges,
          )
          syncDebug(
            `Previous sync timestamp was ${lastPulledAt}. New sync timesetamp is ${timestamp}.`,
          )
          return { changes: remoteChanges, timestamp }
        },
        pushChanges: async ({ changes, lastPulledAt }: SyncPushArgs) => {
          syncDebug("Pushing changes", { changes, lastPulledAt })
          const currentUserId = authenticator.getUser()?.id

          if (!currentUserId) {
            throw new UserNotLoggedInError()
          }

          const syncChanges = await filterChangeSet(changes, {
            // IMPORTANT NOTE: We do not support multiple users' records being in the same watermelon database.
            // This guard to filter out other users' records is present in the event that some exceptional case
            // occurs, to ensure we do not attempt to save them under the current user ID, but after this sync
            // those records will be marked as "synced" when in reality they have not been saved on the server.
            // Thus, if we ever want to support multiple users simultaneously saving records in the offline
            // storage, we should move to one watermelon database per user.
            rowFilters: [omitOtherUserRows(currentUserId)],
            columnFilters: [omitLocalOnlyFields],
          })

          // Stash all outbound created IDs in our local in-memory set to avoid re-creating them on pull
          for (const changeset of Object.values(syncChanges)) {
            changeset.created.forEach((value) => {
              locallyCreatedIds.add(value.id)
            })
          }

          const response = await api.post(`/watermelon/sync`, {
            changes: syncChanges,
            lastPulledAt,
          })

          if (!isSuccessStatus(response.status)) {
            throw new WatermelonError(
              `${response.status} ${response.statusText}`,
            )
          }
        },
        migrationsEnabledAtVersion: 1,
      }),
    )
  } catch (cause: unknown) {
    // If the user is not logged in, there's no need to log an error.
    if (cause instanceof UserNotLoggedInError) {
      syncDebug(cause.message, { error: cause })
      return
    }

    const error = cause as Error

    const errorMessageIncludes = (error: Error, text: string) =>
      "message" in error && error.message?.includes(text)

    // Bail silently if we failed due to a network error: nothing we can do
    // about that and it happens all the time.
    if (isAxiosNetworkError(error)) {
      return
    }

    logError(new WatermelonError("Failed to sync after retry.", { cause }))

    if (errorMessageIncludes(error, "Cannot read properties of null")) {
      syncDebug("Error indicates a structural database problem")
      // This likely points to the local database structure being out of sync with the
      // remote database structure in a way that migrations cannot (or did not) fix.
      // This means every subsequent sync will fail. The only real solution here is to
      // reset the local database and re-sync.
      //
      // If the user has no local unsynced changes, this is an easy decision as no data will be lost.
      // If there _are_ unsynced changes, then...?

      try {
        syncDebug("Attempting to check for unsynced changes")
        const isDangerousToReset = await hasUnsyncedChanges({ database })

        if (!isDangerousToReset) {
          syncDebug(
            "Attempting to reset database; no unsynced local changes found",
          )
          await database.write(() => database.unsafeResetDatabase())

          // IMPORTANT: Do not await this call, as we are currently holding the
          // semaphore lock which requestSync will need to acquire. Awaiting here will
          // prevent the lock being released below, thereby creating a deadlock.
          syncManager.requestSync({ syncType: "immediate" })
          return
        }
      } catch {
        syncDebug(
          "Failed to check for local unsynced changes; resetting database as a last resort.",
        )
        // If we couldn't even check for unsynced changes, then there's nothing else
        // we can do. Reset the database.
        await database.write(() => database.unsafeResetDatabase())

        return
      }

      // Unsynced local changes exist. We are in a pickle.
      logError(
        new WatermelonError(
          "Fatal: Database structure compromised, but we are unwilling to reset database because local unsynced changes exist",
          { cause },
        ),
      )
    }
  } finally {
    lock.release()
  }
}

type SyncType = "periodic" | "user-initiated" | "immediate"

type SyncRequestArgs = {
  syncType: SyncType
}

class SyncManager {
  lastSync: Date | undefined

  throttleIntervals: Record<SyncType, number> = {
    periodic: 60000, // 1 minute
    "user-initiated": 10000, // 10 seconds
    immediate: 0,
  }

  nextSyncTimeout: NodeJS.Timeout | undefined
  nextSyncDate: Date | undefined

  @benchmark(benchmarkDebug)
  private async performSync() {
    this.lastSync = new Date()
    const result = await syncOfflineDatabase()
    this.cancelNextSync()

    return result
  }

  @benchmark(benchmarkDebug)
  private cancelNextSync() {
    if (this.nextSyncTimeout) {
      clearTimeout(this.nextSyncTimeout)
    }

    this.nextSyncTimeout = undefined
    this.nextSyncDate = undefined
  }

  @benchmark(benchmarkDebug)
  private async enqueueSync(syncType: SyncType): Promise<void> {
    syncDebug(
      `sync of type ${syncType} requested at ${new Date().toISOString()}.`,
    )

    // If we have never yet synced, just do it.
    if (!this.lastSync || syncType === "immediate") {
      if (this.lastSync) {
        // If the last sync was performed within the last 2 seconds,
        // enqueue this as a user-requested sync instead. This works
        // around immediate sync requests in response to events such
        // as audio player pause, as the player sometimes fires
        // bursts of these events in rapid succession when foregrounding
        // the app.
        const msToLastSync = new Date().getTime() - this.lastSync?.getTime()
        if (msToLastSync < 2000) {
          syncDebug(
            `An immediate sync was requested, but the previous sync was only ${msToLastSync}ms ago. Reclassifying this sync as user-initiated.`,
          )

          return this.enqueueSync("user-initiated")
        }
      }

      syncDebug(
        `performing an immediate sync when ${syncType} was requested because it's immediate or we have never synced before. It's ${new Date().toISOString()}.`,
      )
      return await this.performSync()
    }

    const throttleInterval = this.throttleIntervals[syncType]

    if (this.nextSyncDate && this.nextSyncDate > new Date()) {
      syncDebug(`future sync date exists: ${this.nextSyncDate.toISOString()}.`)
      // If there is already a sync scheduled for earlier than lastSync + throttle interval, do nothing.
      if (
        this.nextSyncDate <=
        new Date(this.lastSync.getTime() + throttleInterval)
      ) {
        syncDebug(
          `already a suitable sync scheduled for ${this.nextSyncDate.toISOString()} (${this.nextSyncDate.getTime() - new Date().getTime()}ms from now); returning.`,
        )
        return
      }

      syncDebug("cancelling next sync.")
      // Otherwise, there must be a sync scheduled for a time later than now + throttle interval,
      // so bump up that scheduled sync to the current throttle interval.
      this.cancelNextSync()
    }

    this.nextSyncDate = new Date(
      this.lastSync.getTime() + this.throttleIntervals[syncType],
    )

    // Guarantee we don't inadvertently try to schedule a sync in the past
    const nextSyncMsFromNow = Math.max(
      1,
      this.nextSyncDate.getTime() - new Date().getTime(),
    )

    syncDebug(
      `scheduling a new sync for ${this.nextSyncDate.toISOString()} (${nextSyncMsFromNow}ms from now)`,
    )

    // Arriving here means there is no sync scheduled or it's scheduled later than
    // the requested throttle interval. Queue up a sync for the requested throttle interval
    // from the time of last sync.
    this.nextSyncTimeout = setTimeout(async () => {
      try {
        await this.performSync()
      } catch (syncError) {
        // We must not throw from here because otherwise it will be unobserved/an unhandled promise rejection.
        logError(syncError)
      }
    }, nextSyncMsFromNow)
  }

  @benchmark(benchmarkDebug)
  async requestSync(options?: SyncRequestArgs) {
    if (authenticator.getUser() === null) {
      syncDebug("User is not authenticated; ignoring sync request.")
      // Only authenticated users can sync.
      return
    }

    return this.enqueueSync(options?.syncType || "periodic")
  }

  @benchmark(benchmarkDebug)
  async resetDatabase() {
    const lock = await syncSemaphore.enter()

    try {
      const db = getClient()
      await db.write(() => db.unsafeResetDatabase())
    } finally {
      lock.release()
    }
  }
}

const syncManager = new SyncManager()

// TODO: #1644: Investigate using user-specific watermelon databases
// Clear the local database on user logout
authenticator.on(Event.User, async (user) => {
  if (user === null) {
    try {
      await syncManager.resetDatabase()
    } catch (error) {
      logError(error, {
        extra: { details: "Failed to reset local database on user logout" },
      })
    }

    return
  }

  await syncManager.requestSync({ syncType: "immediate" })
})

export { syncManager }
