import { inject, injectable } from 'inversify';
import { RxReplicationPullStreamItem } from 'rxdb';
import {
  ReplicationPullHandlerResult,
  RxCollection,
  RxReplicationWriteToMasterRow,
  WithDeleted,
} from 'rxdb/dist/types/types';
import { replicateRxCollection, RxReplicationState } from 'rxdb/plugins/replication';
import { Observable } from 'rxjs';

import {
  APP_LOGGER_TYPES,
  GLOBAL_ERROR_TYPES,
  ONLINE_TRACKER_TYPES,
  SYNC_TYPES,
} from '@/ioc/types';

import { CollectionName, Database } from '@/features/system/db';
import { IGlobalErrorRepository } from '@/features/system/globalError';
import { IAppLogger } from '@/features/system/logger';
import { NetworkError, UnauthorizedError } from '@/features/system/network';
import type { IOnlineTrackerRepository } from '@/features/system/OnlineTracker';
import type { IResumeTokenManager } from '@/features/system/sync';

import { ReplicationPushHandlerCreator } from './ReplicationPushHandlerCreator';

export interface IReplicationService {
  create(db: Database): void;
  start(): Promise<void>;
  stop(): Promise<unknown>;
  getCollectionName(): CollectionName;
  isActive$(): Observable<boolean>;
  isActive(): boolean;
}

export type MasterCheckpointType = { resume_token: string | null };

export type ReplicationPullItem<Document> = RxReplicationPullStreamItem<
  Document,
  MasterCheckpointType
>;

type PushHandlersMap<T> = Partial<{
  create: (docsToCreate: RxReplicationWriteToMasterRow<T>[]) => Promise<T | T[]>;
  update: (docsToUpdate: RxReplicationWriteToMasterRow<T>[]) => Promise<T | T[]>;
  delete: (docsToDelete: RxReplicationWriteToMasterRow<T>[]) => Promise<unknown>;
}>;

@injectable()
export abstract class ReplicationService<Document extends { uuid: string }>
  implements IReplicationService
{
  private static REPLICATION_IDENTIFIER =
    'my-rest-replication-to-https://example.com/api/sync';

  @inject(APP_LOGGER_TYPES.AppLogger)
  private logger: IAppLogger;

  @inject(ONLINE_TRACKER_TYPES.OnlineTrackerRepository)
  private onlineTrackerRepository: IOnlineTrackerRepository;

  @inject(SYNC_TYPES.ResumeTokenManager)
  private resumeTokenManager: IResumeTokenManager;

  private replication: RxReplicationState<Document, MasterCheckpointType>;

  private readonly collectionName: CollectionName;

  private pullStreamFactory: () => Observable<ReplicationPullItem<Document>>;
  private pushHandlerCreator?: ReplicationPushHandlerCreator<Document>;

  @inject(GLOBAL_ERROR_TYPES.GlobalErrorRepository)
  protected globalErrorRepository: IGlobalErrorRepository;

  constructor(params: {
    collectionName: CollectionName;
    pullStreamFactory: () => Observable<ReplicationPullItem<Document>>;
    pushHandlers?: PushHandlersMap<Document>;
  }) {
    this.collectionName = params.collectionName;
    this.pullStreamFactory = params.pullStreamFactory;

    if (params.pushHandlers) {
      this.pushHandlerCreator = new ReplicationPushHandlerCreator<Document>(
        params.pushHandlers,
      );
    }
  }

  protected canPush(): boolean {
    return this.onlineTrackerRepository.isOnline;
  }

  private startResumeTokenVerification(): void {
    this.replication.metaInstance?.changeStream().subscribe((data) => {
      data.events.forEach((event) => {
        if (event.documentData.isCheckpoint !== '1') return;

        const resumeToken = Reflect.get(
          event.documentData.checkpointData as object,
          'resume_token',
        );

        if (resumeToken && typeof resumeToken === 'string') {
          this.resumeTokenManager.confirm(resumeToken);
        }
      });
    });
  }

  private buildReplication(
    collection: RxCollection,
  ): RxReplicationState<Document, MasterCheckpointType> {
    return replicateRxCollection({
      collection,
      replicationIdentifier: ReplicationService.REPLICATION_IDENTIFIER,
      live: true,
      autoStart: false,
      waitForLeadership: false,
      push: {
        handler: (
          docs: RxReplicationWriteToMasterRow<Document>[],
        ): Promise<WithDeleted<Document>[]> => {
          const hasAccess = this.canPush();

          if (!hasAccess) {
            // @ts-ignore
            return;
          }

          try {
            return this.pushHandler(docs);
          } catch (error) {
            if (error instanceof NetworkError && !(error instanceof UnauthorizedError)) {
              this.globalErrorRepository.pushProcedureError(error);
            }

            throw error;
          }
        },
      },
      pull: {
        stream$: this.pullStreamFactory(),
        handler: async (
          lastCheckpoint: MasterCheckpointType,
        ): Promise<ReplicationPullHandlerResult<Document, MasterCheckpointType>> => {
          try {
            if (lastCheckpoint?.resume_token) {
              this.resumeTokenManager.confirm(lastCheckpoint.resume_token);
            }
          } catch (error) {
            this.logger.error(error);
          }

          return {
            documents: [],
            checkpoint: lastCheckpoint,
          };
        },
      },
    });
  }

  private pushHandler(
    docs: RxReplicationWriteToMasterRow<Document>[],
  ): Promise<WithDeleted<Document>[]> {
    return this.pushHandlerCreator?.create(docs) ?? Promise.resolve([]);
  }

  public create(db: Database): void {
    this.replication = this.buildReplication(db.collections[this.collectionName]);
  }

  public async start(): Promise<void> {
    await this.replication.start();
    this.startResumeTokenVerification();
    await this.replication?.awaitInSync();
  }

  public async stop(): Promise<void> {
    await this.replication.cancel();
  }

  public getCollectionName(): CollectionName {
    return this.collectionName;
  }

  public isActive$(): Observable<boolean> {
    return this.replication.subjects.active;
  }

  public isActive(): boolean {
    return this.replication.subjects.active.getValue();
  }
}
