/* eslint-disable max-lines-per-function */
import S3 from 'aws-sdk/clients/s3';
import * as Sentry from '@sentry/angular';
import {
  BehaviorSubject,
  ReplaySubject,
  MonoTypeOperatorFunction,
  Subject,
  Observable,
  forkJoin,
  of,
  throwError,
  race,
  EMPTY,
  from,
} from 'rxjs';
import { UploadDetailsResponse } from './dto/upload.dto';
import {
  switchMap,
  map,
  mergeScan,
  last,
  catchError,
  shareReplay,
  tap,
  filter,
  retryWhen,
  take,
  concatMap,
  first,
  distinctUntilChanged,
  takeUntil,
  finalize,
  bufferWhen,
  exhaustMap,
} from 'rxjs/operators';
import { retryWithDelay, bufferPause, delayEach } from '../../utils';
import { StoreSubject } from '../../classes/store-subject';
import { RxS3, S3Factory } from './rx-s3';
import { DbAdapter } from './db-adapter';

export interface MultiPartUploaderConfig {
  maxRetries?: number;
  retryDelay?: number;
  dbKey?: string;
  simultaneousUploads?: number;
  useDb?: boolean;
  timeout?: number;
}

const LOCAL_STORAGE_PREFIX = 'mpu-upload-key-';
const DB_NAME_PREFIX = 'mpu-recording-chunk-';

const DEFAULT_CONFIG: MultiPartUploaderConfig = {
  maxRetries: 5,
  retryDelay: 3000,
  simultaneousUploads: 2,
  useDb: true,
  timeout: 300000, //5 min
};

const MIN_CHUNK_SIZE = 6 * 1024 * 1024;

export enum MultiPartUploaderStatuses {
  Open = 'OPEN',
  Paused = 'PAUSED',
  Closed = 'CLOSED',
  Complete = 'COMPLETE',
  Aborted = 'ABORTED',
}

export class MultiPartUploaderStatus {
  status: MultiPartUploaderStatuses;
  progress: number;
  loaded: number;
  total: number;
  errors: string[];
}

class MpuStatusSubject extends StoreSubject<MultiPartUploaderStatus> {
  constructor() {
    super({
      status: MultiPartUploaderStatuses.Open,
      progress: 0,
      loaded: 0,
      total: 0,
      errors: [],
    });
  }

  setStatus(status: MultiPartUploaderStatuses) {
    this.modify(status, (state, value) => ({ ...state, status: value }));
  }

  setProgress(progress: number) {
    this.modify(progress, (state, value) => ({ ...state, progress: value }));
  }

  setTotal(total: number) {
    this.modify(total, (state, value) => ({ ...state, total: value }));
  }

  setLoaded(loaded: number) {
    this.modify(loaded, (state, value) => ({ ...state, loaded: value }));
  }

  addError(key: string) {
    this.modify(key, (state, value) => ({ ...state, errors: state.errors.concat([value]) }));
  }

  removeError(key: string) {
    this.modify(key, (state, value) => ({ ...state, errors: state.errors.filter((i) => i !== value) }));
  }

  setOpen() {
    this.setStatus(MultiPartUploaderStatuses.Open);
  }

  setPaused() {
    this.setStatus(MultiPartUploaderStatuses.Paused);
  }

  setClosed() {
    this.setStatus(MultiPartUploaderStatuses.Closed);
  }

  setComplete() {
    this.setStatus(MultiPartUploaderStatuses.Complete);
    this.complete();
  }

  setAborted() {
    this.setStatus(MultiPartUploaderStatuses.Aborted);
    this.complete();
  }
}

interface Part {
  part: Blob;
  partNumber: number;
}

export class MultiPartUploader {
  private s3: RxS3;
  private db$: Observable<DbAdapter<Part>>;
  private dbInitKeys$: Observable<[DbAdapter<Part>, IDBValidKey[]]>;
  private initialized$: Observable<boolean>;
  private partsSource = new Subject<Blob>();
  private partKeys$ = new ReplaySubject<number>();
  private size = 0;
  private uploadedSize = 0;
  private lastPartKey = 0;

  private statusSource = new MpuStatusSubject();
  status$ = this.statusSource.asObservable();
  progress$ = this.statusSource.select<number>('progress');
  statusText$ = this.statusSource.select<MultiPartUploaderStatuses>('status');
  isOpen$ = this.statusText$.pipe(
    map((s) => s === MultiPartUploaderStatuses.Open || s === MultiPartUploaderStatuses.Paused)
  );
  closed$ = this.statusText$.pipe(filter((s) => s === MultiPartUploaderStatuses.Closed));
  complete$ = this.statusText$.pipe(filter((s) => s === MultiPartUploaderStatuses.Complete));
  aborted$ = this.statusText$.pipe(filter((s) => s === MultiPartUploaderStatuses.Aborted));
  private errors$: Observable<string[]> = this.statusSource.select<string[]>('errors') as Observable<string[]>;
  error$ = this.errors$.pipe(filter((errors) => !!errors.length));
  hasError$ = this.errors$.pipe(
    map((errors) => !!errors.length),
    distinctUntilChanged()
  );
  onError$ = this.hasError$.pipe(filter((e) => e));
  private credentialErrorSource = new Subject();
  credentialError$ = this.credentialErrorSource.pipe(takeUntil(this.status$.pipe(last())));

  private pause$ = new BehaviorSubject(false);
  private retrySource = new Subject();
  private abortSource = new Subject();

  private _completed = false;
  get completed() {
    return this._completed;
  }
  private _aborted = false;
  get aborted() {
    return this._aborted;
  }

  private uploadId = new BehaviorSubject(null);
  uploadId$ = this.uploadId.asObservable();

  constructor(private uploadDetails: UploadDetailsResponse, private config: MultiPartUploaderConfig = {}) {
    // default the config settings
    this.config = { ...DEFAULT_CONFIG, ...{ dbKey: new Date().getTime().toString() }, ...this.config };

    // initialize db and share it
    this.db$ = this.initDb().pipe(shareReplay(1));
    this.dbInitKeys$ = this.db$.pipe(
      switchMap((db) => db.getAllKeys().pipe(map((keys) => [db, keys] as [DbAdapter<Part>, IDBValidKey[]]))),
      shareReplay(1)
    );
    this.initialized$ = this.dbInitKeys$.pipe(
      map(() => true),
      first()
    );
    // check if any completed parts stored in local storage for set dbkey, used to seed mergeScan if so
    const storedParts = this.storedParts;

    // when a part is available, store in indexDb, emit keys to replay subject to buffer keys
    // prevents having file chunks stored in subject buffer
    this.dbInitKeys$
      .pipe(
        switchMap(([db, storedChunks]) =>
          this.partsSource.pipe(
            concatMap((part, idx) => db.put({ part, partNumber: idx + 1 + storedParts.length + storedChunks.length }))
          )
        ),
        catchError((e) => {
          console.error(e, 'error putting chunk to db');
          Sentry.captureException(e, { tags: { detail: 'MPU PUT ERROR, COMPLETING UPLOAD NOW' } });
          this.complete();
          return EMPTY;
        })
      )
      .subscribe(this.partKeys$);

    // main stream. run the function to initialize multipart upload
    forkJoin([this.init$(), this.db$])
      .pipe(
        // switch into partKeys subject (replay subject so won't matter if parts come in before init done)
        switchMap(([uploadKey, db]) =>
          this.partKeys$.pipe(
            bufferPause(this.pause$), // operator to buffer partKeys on pause event and emit them in order on resume
            // mergeScan gets the keys, gets chunk from db, and runs the upload, and accumulates done uploads into completed parts list
            bufferWhen(() =>
              this.partKeys$.pipe(
                exhaustMap((partKey) =>
                  db.getAll().pipe(
                    map((parts) =>
                      parts
                        .filter((part) => part.partNumber > this.lastPartKey)
                        .reduce((acc, { part }) => (acc += part.size), 0)
                    ),
                    filter((size) => size > MIN_CHUNK_SIZE),
                    tap(() => (this.lastPartKey = partKey))
                  )
                )
              )
            ),
            mergeScan(
              (parts, partKeys) =>
                // get the chunk from db
                db.getByKeys(partKeys).pipe(
                  map((parts) => ({
                    partNumber: parts[0].partNumber,
                    partKeys: parts.map((b) => b.partNumber),
                    part: new Blob(
                      parts.map((b) => b.part),
                      { type: parts[0].part.type }
                    ),
                  })),
                  switchMap(({ part, partNumber, partKeys }) =>
                    // actually upload the part
                    // race with abort signal so aborts happen immediately
                    race(this.uploadPart$(part, partNumber, uploadKey), this.abortSource).pipe(
                      switchMap((resp: S3.UploadPartOutput) => {
                        console.log(
                          `uploaded part ${partNumber} with size ${(part.size / 1024 / 1024).toPrecision(2)}MB`
                        );
                        // accumulate parts and store them in localstorage
                        parts.push({ ETag: resp.ETag, PartNumber: partNumber });
                        this.storedParts = parts;

                        // delete from db and return completed parts list
                        return forkJoin([partKeys.map((partKey) => db.delete(partKey))]).pipe(map(() => parts));
                      })
                    )
                  )
                ),
              // seed with any stored parts, set simultaneous inner observables from config
              storedParts.slice(),
              this.config.simultaneousUploads
            ),
            last(null, this.storedParts), // only emit when partKeys$ is marked complete and all inner observables from mergeScan complete
            switchMap(
              (
                parts // when done, run the complete, and delete the db
              ) =>
                race(this.abortSource, this.completeUpload$(parts, uploadKey)).pipe(
                  tap(() => this.statusSource.setComplete())
                )
            ),
            catchError((e) => {
              // abort triggered by manual error trigger on partKeys$ / retry subjects
              if (e !== 'aborted') {
                Sentry.captureException(e, { tags: { detail: 'MPU UNEXPECTED ERROR' } });
              }
              return this.abort$(uploadKey).pipe(tap(() => this.statusSource.setAborted()));
            }),
            // last thing to do in happy path or error is clean up
            switchMap(() => {
              db.close();
              return db.deleteDb();
            }),
            catchError(() => {
              db.close();
              return db.deleteDb();
            })
          )
        )
      )
      .subscribe({
        error: (err) => {
          // can only get here if abort fails
          this.s3 = null;
          console.error(err, 'abort failed, unrecoverable');
          Sentry.captureException(err, { tags: { detail: 'MPU ABORT ERROR' } });
        },
        complete: () => {
          // just clean up
          this.s3 = null;
        },
      });

    // check if any parts left in local storage for the set dbkey and send them through
    this.checkForParts();
    this.complete$.subscribe(() => (this._completed = true));
    this.aborted$.subscribe(() => (this._aborted = true));
  }

  uploadPart(file: Blob) {
    this.size += file.size;
    this.updateProgress();
    this.initialized$.subscribe(() => {
      this.partsSource.next(file);
    });
  }

  complete() {
    this.initialized$.subscribe(() => {
      this.resumeUpload();
      this.statusSource.setClosed();
      this.partsSource.complete();
    });
  }

  pauseUpload() {
    this.statusSource.setPaused();
    this.pause$.next(true);
  }

  resumeUpload() {
    this.statusSource.setOpen();
    this.pause$.next(false);
  }

  abortUpload() {
    this.initialized$.subscribe(() => {
      this.abortSource.error('aborted');
      this.partsSource.error('aborted');
      this.retrySource.error('aborted');
    });
  }

  retryUpload() {
    this.retrySource.next(null);
  }

  uploadFile(file: File, chunkSize = MIN_CHUNK_SIZE) {
    this.initialized$.subscribe(() => {
      const total = file.size;
      const chunkCount = Math.ceil(total / chunkSize);
      const chunks = [...Array(chunkCount).keys()];
      from(chunks)
        .pipe(
          take(chunks.length),
          map((chunk) => {
            const offset = chunk * chunkSize;
            return file.slice(offset, offset + chunkSize);
          }),
          delayEach(1000),
          tap((dataChunk) => this.uploadPart(dataChunk)),
          finalize(() => this.complete())
        )
        .subscribe();
    });
  }

  refreshS3(uploadDetails: UploadDetailsResponse) {
    this.s3.refreshCredentials(this.getS3Config(uploadDetails));
  }

  private getS3Config(uploadDetails: UploadDetailsResponse): S3.ClientConfiguration {
    return {
      accessKeyId: uploadDetails.accessKeyId,
      secretAccessKey: uploadDetails.secretAccessKey,
      sessionToken: uploadDetails.sessionToken,
      region: uploadDetails.region,
      useAccelerateEndpoint: uploadDetails.useAccelerateEndpoint,
      logger: console,
      httpOptions: {
        timeout: this.config.timeout,
      },
    };
  }

  private init$() {
    // init S3
    this.s3 = S3Factory.createS3(this.getS3Config(this.uploadDetails));

    // check for a stored uploadKey, return that if it's there
    const uploadKey: S3.CreateMultipartUploadOutput | null = JSON.parse(localStorage.getItem(this.storageKey));
    if (uploadKey) {
      this.uploadId.next(uploadKey.UploadId);
      return of(uploadKey);
    }

    // otherwise create the upload and store the key
    return this.s3
      .createMultipartUpload$({
        Bucket: this.uploadDetails.bucket,
        Key: this.uploadDetails.path,
      })
      .pipe(
        this.retryErrors('init'),
        tap((uploadKey) => {
          this.uploadId.next(uploadKey.UploadId);
          localStorage.setItem(this.storageKey, JSON.stringify(uploadKey));
        })
      );
  }

  private initDb() {
    return DbAdapter.initDb<Part>(this.dbKey, this.config.useDb);
  }

  private uploadPart$(file: Blob, partNumber: number, uploadKey: S3.CreateMultipartUploadOutput) {
    // does the upload and updates the progress
    return this.s3
      .uploadPart$({
        UploadId: uploadKey.UploadId,
        Bucket: this.uploadDetails.bucket,
        Key: this.uploadDetails.path,
        PartNumber: partNumber,
        Body: file,
      })
      .pipe(
        this.retryErrors(`part-${partNumber}`),
        tap(() => this.partDone(file))
      );
  }

  private completeUpload$(parts: S3.CompletedPartList, uploadKey: S3.CreateMultipartUploadOutput) {
    // completes the upload and cleans up local storage
    parts = parts.sort((partA, partB) => partA.PartNumber - partB.PartNumber);
    return this.s3
      .completeMultipartUpload$({
        UploadId: uploadKey.UploadId,
        Bucket: this.uploadDetails.bucket,
        Key: this.uploadDetails.path,
        MultipartUpload: {
          Parts: parts,
        },
      })
      .pipe(
        this.retryErrors('complete'),
        tap(() => this.cleanUpStorage())
      );
  }

  private abort$(uploadKey: S3.CreateMultipartUploadOutput) {
    // aborts the upload and cleans up local storage
    return this.s3
      .abortMultipartUpload$({
        UploadId: uploadKey.UploadId,
        Bucket: this.uploadDetails.bucket,
        Key: this.uploadDetails.path,
      })
      .pipe(
        this.retryErrors('abort'),
        tap(() => this.cleanUpStorage())
      );
  }

  private partDone(file: Blob) {
    this.uploadedSize += file.size;
    this.updateProgress();
  }

  private updateProgress() {
    if (this.size) {
      this.statusSource.setTotal(this.size);
      this.statusSource.setLoaded(this.uploadedSize);
      this.statusSource.setProgress(Math.min(this.uploadedSize / this.size, 1));
    }
  }

  private retryErrors<T>(errKey: string): MonoTypeOperatorFunction<T> {
    return (obs) =>
      obs.pipe(
        catchError((e) => {
          console.log(e);
          console.error('MPU ERROR', e, errKey);
          if (e.code && e.code === 'ExpiredToken') {
            this.credentialErrorSource.next(null);
          }
          return throwError(e);
        }),
        retryWithDelay(this.config.retryDelay, this.config.maxRetries),
        // retry uploads on signal
        retryWhen((errors$) =>
          errors$.pipe(
            tap(() => this.statusSource.addError(errKey)),
            switchMap(() =>
              this.retrySource.pipe(
                take(1),
                tap(() => this.statusSource.removeError(errKey))
              )
            )
          )
        )
      );
  }

  private checkForParts() {
    this.dbInitKeys$.subscribe(([, chunks]) => {
      // rough estimate of size of stored and uploaded chunks
      const storedParts = this.storedParts;
      this.size += (chunks.length + storedParts.length) * MIN_CHUNK_SIZE;
      this.uploadedSize += storedParts.length * MIN_CHUNK_SIZE;
      this.updateProgress();
      chunks.forEach((chunk) => this.partKeys$.next(+chunk));
    });
  }

  private get storageKey() {
    return LOCAL_STORAGE_PREFIX + this.config.dbKey;
  }

  get storedPartsKey() {
    return LOCAL_STORAGE_PREFIX + 'parts-' + this.config.dbKey;
  }

  private get dbKey() {
    return DB_NAME_PREFIX + this.config.dbKey;
  }

  private cleanUpStorage() {
    localStorage.removeItem(this.storageKey);
    localStorage.removeItem(this.storedPartsKey);
    this.lastPartKey = 0;
  }

  private get storedParts(): S3.CompletedPartList {
    return JSON.parse(localStorage.getItem(this.storedPartsKey)) || [];
  }
  private set storedParts(parts: S3.CompletedPartList) {
    localStorage.setItem(this.storedPartsKey, JSON.stringify(parts));
  }
}
