import { catchError, map, mergeAll, mergeMap, retry, tap, toArray } from 'rxjs/operators';
import { BehaviorSubject, defer, forkJoin, from, Observable, Subject, throwError } from 'rxjs';
import { HttpClient, HttpHeaders } from '@angular/common/http';
import { fromPromise } from 'rxjs/internal-compatibility';

import {
  CompleteMultipartUploadResponse,
  MultipartUploadService,
  MultipartUploadTarget,
  Part
} from '../services/multipart-upload.service';

interface MultipartUpload {
  bucket: string;
  key: string;
  uploadId: string;
  presignedUrls: string[];
}

export interface PartUploadProgress {
  totalParts: number;
  uploadedParts: number;
}

/**
 * An encapsulation of client-side multi-part upload, which involves both our own backend (for starting the
 * S3 multi-part upload and obtaining the pre-signed upload URLs) as well as S3 (client uploads parts and completes
 * upload directly with S3).
 */
export class MultipartUploader {

  // upload progress, reported as a range from [0..100]
  progress$: Subject<number>;

  // number of parts
  partUploadProgress$: Subject<PartUploadProgress | undefined>;

  // internal MPU state
  private multipartUpload: MultipartUpload;
  private uploadedParts: Part[];
  private numUploadedParts: number;
  private completed = false;

  private readonly NUM_RETRIES_PER_PART = 3;

  // max number of concurrent part uploads
  private readonly MAX_CONCURRENCY = 5;

  // min part size for MPUs to S3
  private readonly MIN_PART_SIZE = 5242880;

  constructor(private service: MultipartUploadService,
              private target: MultipartUploadTarget,
              private http: HttpClient,
              private file: File,
              private partSize: number) {
    if (file.size <= 0 || partSize <= 0) {
      throw new Error('Illegal value encountered for fileSize or partSize');
    }
    if (partSize < this.MIN_PART_SIZE) {
      throw new Error('Part size must be at least ' + partSize);
    }
    this.progress$ = new BehaviorSubject(0);
    this.partUploadProgress$ = new BehaviorSubject(undefined);
  }

  /**
   * Upload file using S3 multi-part upload. Encapsulates all steps, and returns an Observable emitting the key
   * of the uploaded object, if successful.
   */
  upload(): Observable<string> {
    return this.start().pipe(
      mergeMap(() => {
        return this.uploadParts()
      }),
      mergeMap(() => {
        return this.complete()
      }),
      catchError(err => {
        console.error(`Multi-part upload failed: ${err}`);
        if (this.multipartUpload) {
          // an MPU that was started on S3 should be aborted, to avoid consuming resources
          console.log(`Aborting started multi-part upload: ${this.multipartUpload.uploadId}`);
          return this.service.abortMultipartUpload(
            this.multipartUpload.bucket, this.multipartUpload.key, this.multipartUpload.uploadId)
            .pipe(
              mergeMap(() => {
                  return throwError(err);
                }
              )
            )
        } else {
          return throwError(err);
        }
      })
    );
  }

  /**
   * Start a multi-part upload at our backend, obtaining the pre-signed URLs for uploading parts to S3.
   */
  private start(): Observable<MultipartUpload> {
    if (this.multipartUpload !== undefined) {
      return throwError('Multi-part upload was already started');
    }

    this.progress$.next(0);
    return this.service.createMultipartUpload(this.target, this.file.name, undefined, this.file.size, this.partSize)
      .pipe(
        tap(response => {
          console.log(`Successfully started MPU for object ${response.key}: ${response.upload_id}`);
          this.multipartUpload = {
            bucket: response.bucket,
            key: response.key,
            uploadId: response.upload_id,
            presignedUrls: response.presigned_upload_part_urls
          };
        }),
        map(() => {
          return this.multipartUpload;
        }),
        catchError(e => {
          return throwError(`Unable to start multi-part upload at Curator backend: ${e}`);
        })
      );
  }

  /**
   * Upload all parts.
   */
  private uploadParts(): Observable<any> {

    this.numUploadedParts = 0;

    // prepare data structure holding parts
    const numParts = Math.trunc(this.file.size / this.partSize) + (this.file.size % this.partSize === 0 ? 0 : 1);
    this.uploadedParts = new Array<Part>(numParts);
    this.reportProgress();
    console.log(`Starting to upload ${numParts} parts...`);

    // concurrently upload parts (note: no need to throttle, browser does this anyway)
    let uploads: Observable<any>[] = [];
    for (let i = 0; i < numParts; i++) {
      const startOffset = i * this.partSize;
      let endOffset;
      if (i === numParts - 1) {
        endOffset = startOffset + (this.file.size % this.partSize === 0 ? this.partSize : this.file.size % this.partSize);
      } else {
        endOffset = startOffset + this.partSize;
      }

      const partNumber = i + 1; // AWS part number starts at 1
      const filePart = this.file.slice(startOffset, endOffset);

      // deferred upload observable: created when subscribed
      const uploadObservable = defer(() => this.http.put(this.multipartUpload.presignedUrls[i], filePart, {
          observe: 'response',
          responseType: 'text',
          headers: new HttpHeaders().delete('Content-Type') // for PUT method, this apparently works to suppress Content-Type
        }
      ).pipe(

        // retry on failure
        retry(this.NUM_RETRIES_PER_PART),

        // extract ETag on success and record successfully uploaded part
        tap((response) => {
          const etag = response.headers.get('ETag');
          console.log(`Part ${partNumber} was uploaded successfully: ETag: ${etag}`);
          const uploadedPart: Part = {
            etag: etag,
            part_number: partNumber
          };
          this.uploadedParts[i] = uploadedPart;
          this.numUploadedParts++;
          this.reportProgress();
        }),
        catchError(err => {
          console.error(`Upload of part ${partNumber} failed after ${this.NUM_RETRIES_PER_PART} retries: ${err}`);
          return throwError(`Upload of part ${partNumber} failed: ${err}`)
        })
      ));
      uploads.push(uploadObservable);
    }

    // limit concurrency to 5 simultaneous subscriptions (= requests)
    // uploads = uploads.map(o => defer(() => o));
    return from(uploads).pipe(
      mergeMap(o => o, this.MAX_CONCURRENCY),
      toArray()
    );
  }

  /**
   * Complete the multi-part upload.
   */
  private complete(): Observable<string> {
    console.log(`Completing ${this.uploadedParts.length} parts at Curator backend...`);
    return this.service.completeMultipartUpload(
      this.multipartUpload.bucket, this.multipartUpload.key, this.multipartUpload.uploadId, this.uploadedParts)
      .pipe(
        mergeMap(response => {
          return fromPromise(this.completeMultipartUploadS3(response));
        }),
        tap(() => {
          this.completed = true;
          this.reportProgress();
        }),
        map(key => {
          console.log(`Successfully completed MPU at S3, object available at: ${key}`);
          return key;
        }),
        catchError(err => {
          console.error(`Completing MPU failed: ${err}`);
          return throwError(`Completing of MPU failed: ${err}`);
        })
      );
  }

  /**
   * Complete the multi-part upload (Client -> S3), this is done using a plain XmlHttpRequest as it must be sent
   * without Content-Type header and that is apparently not possible using Angular Http Client (sigh).
   */
  private completeMultipartUploadS3(response: CompleteMultipartUploadResponse): Promise<string> {
    console.log(`Completing MPU at S3 via URL: ${response.presigned_url}`);
    return new Promise<string>((resolve, reject) => {
      const body = atob(response.request_body_base64);
      const xhr = new XMLHttpRequest();
      xhr.open('POST', response.presigned_url, true);
      xhr.onreadystatechange = () => {
        if (xhr.readyState === 4) { // readyState 4 = DONE
          if (xhr.status === 200) {
            resolve(response.key)
          } else {
            console.error(`Received non-200 status in S3 CompleteMultipartUpload response: ${xhr.status}`);
            reject(xhr.response)
          }
        }
      }
      const blob = new Blob([body]);
      xhr.send(blob);
    });
  }

  /**
   * Report progress to client. Progress proceeds along the three stages of an MPU
   *
   * 1) start (10%)
   * 2) uploading parts (80%)
   * 3) complete (10%)
   *
   * This is not related to actual runtimes of the individual operations, but just provides a consistent UX.
   */
  private reportProgress() {

    this.partUploadProgress$.next({ totalParts: this.uploadedParts.length, uploadedParts: this.numUploadedParts });

    // start() is the first 10%
    if (!this.uploadedParts) {
      this.progress$.next(10);
    }

    // parts are 80%
    if (!this.completed) {
      const partsUploaded = this.uploadedParts.filter(p => p !== undefined).length;
      const progress = partsUploaded / this.uploadedParts.length;
      this.progress$.next(10 + progress * 80);
    } else {

      // completion is the remaining 10%
      this.progress$.next(100);
    }
  }
}
