import {Option} from 'funfix-core';
import {List} from 'immutable';
import {combineLatest, from, interval, Observable, of, OperatorFunction, Subject, throwError} from 'rxjs';
import {debounce, distinctUntilChanged, flatMap, map, reduce, shareReplay, single, switchMap, take, throttle} from 'rxjs/operators';

export const quickUpdateTimer = interval(50);
export const halfSecondUpdateTimer = interval(500);
export const tenthSecondUpdateTimer = interval(100);
export const secondUpdateTimer = interval(1000);
export const twoSecondUpdateTimer = interval(2000);
export const fiveSecondUpdateTimer = interval(5000);
export const tenSecondUpdateTimer = interval(1000);

// Unsubscribes all observables that have provided untilNever
// Note: Dont touch this, you should never call ubsubAll.next()
const unsubAll = new Subject<any>();
export const untilNever: Observable<any> = unsubAll.asObservable();

export function toObservable<T>(value: T | undefined | null): Observable<T> {
    return toObservableFromOption(Option.of(value));
}

export function toObservableFromOption<T>(value: Option<T>): Observable<T> {
    return value
        .map(v => of(v))
        .getOrElse(of());
}

export function toNonEmptyObservable<T>(o: Option<T>): Observable<T> {
    if (o.isEmpty()) {
        return throwError('Option was empty');
    }
    return toObservableFromOption(o);
}

/**
 * For using observables as a data model.
 * Prevents unnecessary updates, also safely unsubscribe with the "until" observable
 */
export function modelThrottle<T>(
    until: Observable<any> = untilNever,
    timer: (t: T) => Observable<number> = (t: T) => quickUpdateTimer): OperatorFunction<T, T> {
    return (o: Observable<T>) => {
        return o
            .pipe(throttle(timer, { leading: true, trailing: true }))
            .pipe(distinctUntilChanged())
            .pipe(shareReplay(1));
    };
}

/**
 * Identical to modelThrottle, but debounce instead
 */
export function modelDebounce<T>(
    until: Observable<any> = untilNever,
    timer: (t: T) => Observable<number> = (t: T) => quickUpdateTimer): OperatorFunction<T, T> {
    return (o: Observable<T>) => {
        return o
            .pipe(debounce(timer))
            .pipe(distinctUntilChanged())
            // .pipe(takeUntil(until)) // TODO: Re-enable, no idea why this is breaking
            .pipe(shareReplay(1));
    };
}

/**
 * Identical to modelThrottle, but without limiting.
 */
export function modelPassthrough<T>(until: Observable<any> = untilNever): OperatorFunction<T, T> {
    return (o: Observable<T>) => {
        return o
            .pipe(distinctUntilChanged())
            // .pipe(takeUntil(until)) // TODO: Re-enable, no idea why this is breaking
            .pipe(shareReplay(1));
    };
}

/**
 * Manually trigger the effect, thus even if noone subscribes, it is still triggered.
 * ShareReplay just in case someone subscribes so we do not trigger the effect twice
 */
export function triggerSingleEffect<T>(): OperatorFunction<T, T> {
    return (o: Observable<T>) => {
        const result =
            o.pipe(take(1))
                .pipe(single());

        result.subscribe();

        return result.pipe(shareReplay(1));
    };
}

/**
 * Ensure subscribed, forces effects to be run...
 *
 * IMPORTANT: Only used while testing as the subscription will not be cleaned up and will memory leak
 */
export function ensureSubscribed<T>(): OperatorFunction<T, T> {
    return (o: Observable<T>) => {
        o.subscribe();

        return o;
    };
}

export function bulkProcessListWithLimiter<A, B, C>(
    input: List<A>,
    f: (a: A) => Promise<B>,
    g: (b: B) => List<C>,
    limiter: number): Promise<List<C>> {
    return of(...input.toArray())
        .pipe(flatMap(req => from(f(req)), limiter))
        .pipe(map(b => g(b)))
        .pipe(reduce((a, b) => a.concat(b), List()))
        .toPromise();
}

/**
 * CombineLatest/Zip/Forkjoin all will NOT complete if the input array is empty.
 *
 * This is here to remedy that situation
 */
export function fallbackCombineLatestList<T>(o: List<Observable<T>>): Observable<List<T>> {
    if (o.isEmpty()) {
        return of(List());
    }

    return combineLatest(o.toArray())
        .pipe(map(items => List(items)));
}

/**
 * CombineLatest/Zip/Forkjoin all will NOT complete if the input array is empty.
 *
 * This is here to remedy that situation
 */
export function fallbackCombineLatest<T>(...o: Array<Observable<T>>): Observable<ReadonlyArray<T>> {
    if (o.length === 0) {
        return of([]);
    }

    return combineLatest(o);
}

/**
 * Returns an observable that runs when this one completes
 */
export function whenComplete<A, B>(f: () => Observable<B>): OperatorFunction<A, B> {
    return (o: Observable<A>) => {
        const subj = new Subject<boolean>();
        const obs = subj.asObservable()
            .pipe(switchMap(_ => f()));
        o.subscribe(_ => { }, _ => { }, () => {
            subj.next(true);
            subj.complete();
        });
        return obs;
    };
}
