import {
  Observable,
  OperatorFunction,
  ObservedValueOf,
  ObservableInput,
} from 'rxjs';
import { repeatWhen, delay, takeUntil, map } from 'rxjs/operators';

const DEFAULT_REQUEST_DELAY = 1000; // ms

export interface PollingData<T> {
  pollingCount: number;
  response: T;
}

/* Polling when data is resolved */
export function pollingOnResolved<T, O extends ObservableInput<any>>(
  httpRequest$: Observable<T>,
  responseOperatorFunction$: OperatorFunction<
    PollingData<T>,
    ObservedValueOf<O>
  >,
  errorOperatorFunction$: OperatorFunction<T, T | ObservedValueOf<O>>,
  pollingEnds$: Observable<any>,
  ms = DEFAULT_REQUEST_DELAY
) {
  let pollingCount = 0;

  return httpRequest$.pipe(
    map((response) => ({ pollingCount: pollingCount++, response })),
    responseOperatorFunction$,
    repeatWhen((complete$) => complete$.pipe(delay(ms))),
    errorOperatorFunction$,
    takeUntil(pollingEnds$)
  );
}
